Spark GenericUDF动态加载外部资源
前言
文章1中提到的动态加载外部资源,其实需要重启Spark任务才会生效。受到文章2启动,可以在数据中加入常量列,表示外部资源的地址,并作为UDF的参数(UDF不能输入非数据列,因此用此方法迂回解决问题),再结合文章1的方法,实现同一UDF,动态加载不同资源。本文通过继承GenericUDF类,读取存放在Redis集群中的字符串,构建字典树,完成词包匹配,来说明这一工作。
由于GenericUDF不能通过spark.udf().register(…)的方式注册3,我们将采用文章4方法,即通过在SparkSQL或Hive中创建UDF函数,再调用。
UDF和GenericUDF的区别
UDF和GenericUDF的区别可参考文章5:
开发自定义UDF函数有两种方式,一个是继承org.apache.hadoop.hive.ql.exec.UDF,另一个是继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
如果是针对简单的数据类型(比如String、Integer等)可以使用UDF,如果是针对复杂的数据类型(比如Array、Map、Struct等),可以使用GenericUDF,另外,GenericUDF还可以在函数开始之前和结束之后做一些初始化和关闭的处理操作。
GenericUDF的demo可参考文章6,文章7详细介绍了Generic UDF中的ObjectInspector。
准备工作
外部资源的数据结构
KeyWordSetEntity.java
name字段:两方面作用:1. 在外部存储中,name唯一标记对应资源(如mysql的主键,Redis中的key); 2. 后续UDF中的常量列的值。
keyWordSet字段:外部资源;list结构表示存在多个词包;KeyWordPackage结构表示词包中存在”关键词”和”否词”。
package com.sogo.sparkudf.entity;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.*;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan
* @Date: 2020/8/31
* @Time: 10:44
* @des:
*/
@Setter
@Getter
public class KeyWordSetEntity implements Serializable {
// name
private String name;
// key word set
private List<KeyWordPackage> keyWordSet;
// constructor
public KeyWordSetEntity() {}
// constructor
public KeyWordSetEntity(String name, List<KeyWordPackage> keyWordSet) {
this.name = name;
this.keyWordSet = keyWordSet;
}
// constructor
public KeyWordSetEntity(String name, KeyWordPackage... keyWordPackages) {
this.name = name;
keyWordSet = new ArrayList<>();
Collections.addAll(keyWordSet, keyWordPackages);
}
@Getter
@Setter
public static class KeyWordPackage implements Serializable {
private Set<String> keywords;
private Set<String> stopwords;
public KeyWordPackage() {}
public KeyWordPackage(Set<String> keywords, Set<String> stopwords) {
this.keywords = keywords;
this.stopwords = stopwords;
}
}
public static Set<String> generateKeyWordSet(String keyword, String separator) {
Set<String> resSet = new HashSet<>();
String[] fields = keyword.split(separator, -1);
Collections.addAll(resSet, fields);
return resSet;
}
public static KeyWordSetEntity generateTestData() {
Set<String> keywords1 = generateKeyWordSet("小米手机,小米10", ",");
Set<String> stopwords1 = generateKeyWordSet("大米", ",");
Set<String> keywords2 = generateKeyWordSet("雷军,武汉大学", ",");
Set<String> stopwords2 = generateKeyWordSet("联想", ",");
KeyWordPackage keyWordPackage1 = new KeyWordPackage(keywords1, stopwords1);
KeyWordPackage keyWordPackage2 = new KeyWordPackage(keywords2, stopwords2);
String name = "xiaomi_udf";
return new KeyWordSetEntity(name, keyWordPackage1, keyWordPackage2);
}
public static KeyWordSetEntity generateTestData2() {
Set<String> keywords1 = generateKeyWordSet("华为手机,华为荣耀", ",");
Set<String> stopwords1 = generateKeyWordSet("小米手机", ",");
Set<String> keywords2 = generateKeyWordSet("华为P40", ",");
Set<String> stopwords2 = generateKeyWordSet("联想", ",");
KeyWordPackage keyWordPackage1 = new KeyWordPackage(keywords1, stopwords1);
KeyWordPackage keyWordPackage2 = new KeyWordPackage(keywords2, stopwords2);
String name = "huawei_udf";
return new KeyWordSetEntity(name, keyWordPackage1, keyWordPackage2);
}
public static void main(String[] args) {
KeyWordSetEntity keyWordSetEntity = generateTestData2();
String jsonString = JSONObject.toJSONString(keyWordSetEntity, true);
System.out.println("jsonString;" + jsonString);
}
}
字典树数据类型
WordTrieEntity.java
完成字典树的构建、关键词匹配的工作。
package com.sogo.sparkudf.entity;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.ahocorasick.trie.Trie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Seq;
import scala.collection.mutable.ArraySeq;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan
* @Date: 2020/8/26
* @Time: 23:08
* @des:
*/
@Setter
@Getter
public class WordTrieEntity implements Serializable {
// LOGGER
private static final Logger LOGGER = LoggerFactory.getLogger(WordTrieEntity.class);
// 不被序列化
private transient Trie keywordsTrie;
// 不被序列化
private transient Trie stopwordsTrie;
public WordTrieEntity(Trie keywordsTrie, Trie stopwordsTrie) {
this.keywordsTrie = keywordsTrie;
this.stopwordsTrie = stopwordsTrie;
}
public static List<WordTrieEntity> generateKeywordTrieList(String jsonString) {
// get key word
KeyWordSetEntity keyWordSetEntity = JSON.parseObject(jsonString, KeyWordSetEntity.class);
List<WordTrieEntity> keywordsTrieList = new ArrayList<>();
for (KeyWordSetEntity.KeyWordPackage keyWordPackage: keyWordSetEntity.getKeyWordSet()) {
Trie keywordsTrie = buildTrie(keyWordPackage.getKeywords());
Trie stopwordsTrie = buildTrie(keyWordPackage.getStopwords());
keywordsTrieList.add(new WordTrieEntity(keywordsTrie, stopwordsTrie));
}
System.out.println("[DEBUG]I am initialized in WordTrieEntity");
return keywordsTrieList;
}
private static Trie buildTrie(Set<String> stringSet) {
return Trie.builder().addKeywords(stringSet).build();
}
public static Boolean contains(Seq<String> stringSeq, List<WordTrieEntity> wordTrieList) {
// nothing to filter
if (null == wordTrieList || wordTrieList.isEmpty()) {
return true;
}
for (WordTrieEntity wordTrie : wordTrieList) {
// 词包间是“与”的关系
if (Boolean.FALSE.equals(contains(wordTrie, stringSeq))) {
return false;
}
}
return true;
}
public static Boolean contains(List<String> stringSeq, List<WordTrieEntity> wordTrieList) {
// nothing to filter
if (null == wordTrieList || wordTrieList.isEmpty()) {
return true;
}
for (WordTrieEntity wordTrie : wordTrieList) {
// 词包间是“与”的关系
if (Boolean.FALSE.equals(contains(wordTrie, stringSeq))) {
return false;
}
}
return true;
}
private static Boolean contains(WordTrieEntity wordTrie, Seq<String> stringSeq) {
// 只要存在一个即可
for (int i = 0; i < stringSeq.size(); i ++)
// 词包内是“或”的关系
if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.apply(i)))) {
return true;
}
// 都不存在时,返回false
return false;
}
private static Boolean contains(WordTrieEntity wordTrie, List<String> stringSeq) {
// 只要存在一个即可
for (int i = 0; i < stringSeq.size(); i ++)
// 词包内是“或”的关系
if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.get(i)))) {
return true;
}
// 都不存在时,返回false
return false;
}
private static Boolean contains(WordTrieEntity wordTrie, String query) {
// 否词
if (null != wordTrie.getStopwordsTrie() && wordTrie.getStopwordsTrie().containsMatch(query)) {
return false;
}
// 匹配关键词
if (null == wordTrie.getKeywordsTrie()) {
LOGGER.error("keyword is null");
}
return null != wordTrie.getKeywordsTrie() && wordTrie.getKeywordsTrie().containsMatch(query);
}
private static Seq<String> list2Seq(List<String> list) {
Seq<String> stringSeq = new ArraySeq<>(list.size());
for (int i = 0; i < list.size(); i ++) {
((ArraySeq<String>) stringSeq).update(i, list.get(i));
}
return stringSeq;
}
}
Redis Cluster的单例
RedisClusterConnector.java
存放外部资源(词包),用于构建字典树。单列模式保证了redis实例仅被初始化一次。
package com.sogo.sparkudf.connnector;
import com.alibaba.fastjson.JSON;
import com.sogo.sparkudf.entity.KeyWordSetEntity;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan
* @Date: 2020/8/20
* @Time: 22:25
* @des:
*/
@Setter
@Getter
public class RedisClusterConnector implements Serializable {
// LOGGER
private static final Logger LOGGER = LoggerFactory.getLogger(RedisClusterConnector.class);
// jedis cluster instance
private static transient JedisCluster jedisCluster;
public static JedisCluster getRedisCluster() {
if (null == jedisCluster) {
synchronized (RedisClusterConnector.class) {
if (null == jedisCluster) {
LOGGER.warn("[JUST_FOR_DEBUG] Test how many times the function is called");
// jedis pool config
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxIdle(50);
poolConfig.setMinIdle(0);
poolConfig.setMaxTotal(50);
// host and port
Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("10.162.48.186", 6001));
nodes.add(new HostAndPort("10.162.48.186", 6020));
nodes.add(new HostAndPort("10.162.48.225", 6001));
nodes.add(new HostAndPort("10.162.48.225", 6020));
// init
int connectionTimeout = 10000;
int soTimeout = 5000;
int maxAttempts = 2;
jedisCluster = new JedisCluster(nodes, connectionTimeout, soTimeout, maxAttempts, poolConfig);
}
}
}
return jedisCluster;
}
public static void close() {
if (null != jedisCluster) {
synchronized (RedisClusterConnector.class) {
if (null != jedisCluster) {
jedisCluster.close();
}
}
}
}
public static void main(String[] args) {
JedisCluster jedisCluster = RedisClusterConnector.getRedisCluster();
KeyWordSetEntity keyWordSetEntity = KeyWordSetEntity.generateTestData();
String udfName = keyWordSetEntity.getName();
String value = JSON.toJSONString(keyWordSetEntity);
jedisCluster.set("keyword_package_" + udfName, value);
String s = jedisCluster.get("keyword_package_" + udfName);
System.out.println("value:" + s);
}
}
继承GenericUDF
KeyWordKeyFilterUdf.java
package com.sogo.sparkudf.udf;
import com.sogo.sparkudf.connnector.RedisClusterConnector;
import com.sogo.sparkudf.entity.WordTrieEntity;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
import java.util.*;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan
* @Date: 2020/8/29
* @Time: 10:58
* @des: reference from
* https://www.jianshu.com/p/ca9dce6b5c37
* https://www.jianshu.com/p/ba0e54579cc4
*/
/**
* @Describtion 注解是可选的,用于对函数进行说明,其中的FUNC字符串表示函数名,
* 当使用DESCRIBE FUNCTION命令时,替换成函数名。其包含三个属性:
*
* name:用于指定Hive中的函数名。
* value:用于描述函数的参数。
* extended:额外的说明,如,给出示例。当使用DESCRIBE FUNCTION EXTENDED name的时候打印。
* 链接:https://www.jianshu.com/p/ca9dce6b5c37
*/
@Description(
name = "keyword_match_udf",
value = "_FUNC_(queries, 'keyword_package') - from the input string"
+ "returns true if queries contain keyword_package",
extended = "Example:\n"
+ " > SELECT _FUNC_(queries, 'keyword_package') FROM src;"
)
public class KeyWordKeyFilterUdf extends GenericUDF {
// LOGGER
private static final Logger LOGGER = LoggerFactory.getLogger(KeyWordKeyFilterUdf.class);
// prefix of redis key
private static final String PREFIX_REDIS_KEY = "keyword_package_";
// key word trie
private static Map<String, List<WordTrieEntity>> dictTrie = new HashMap<>();
// object inspector: String
private static final ObjectInspector valueOI = PrimitiveObjectInspectorFactory
.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);
// string object inspector: for parse string
private static final StringObjectInspector stringOI = (StringObjectInspector) valueOI;
// object inspector: list<string>
private static final ListObjectInspector arrayOI = ObjectInspectorFactory.getStandardListObjectInspector(valueOI);
// redis cluster client
private static JedisCluster jedisCluster;
/**
* Additionally setup GenericUDF with MapredContext before initializing.
* This is only called in runtime of MapRedTask.
*
* @param context context
*/
@Override
public void configure(MapredContext context) {
/*
org.apache.hadoop.mapred.JobConf jobConf = context.getJobConf();
String user = jobConf.getUser();
String jobName = jobConf.getJobName();
String queueName = jobConf.getQueueName();
int numMapTasks = jobConf.getNumMapTasks();
int numReduceTasks = jobConf.getNumReduceTasks();
int maxMapAttempts = jobConf.getMaxMapAttempts();
int maxReduceAttempts = jobConf.getMaxReduceAttempts();
LOGGER.warn("[JUST_FOR_DEBUG] user:{}, jobName:{}, queueName:{}, " +
"numMapTasks:{}, numReduceTasks:{}, maxMapAttempts:{}, maxReduceAttempts:{}",
user, jobName, queueName, numMapTasks, numReduceTasks, maxMapAttempts, maxReduceAttempts);
*/
}
//这个方法只调用一次,并且在evaluate()方法之前调用。该方法接受的参数是一个ObjectInspectors数组。
// 该方法检查接受正确的参数类型和参数个数。
// 注:每次执行sql前,都会运行
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
// 输入1个元素:1. query
if (objectInspectors.length < 2) {
throw new UDFArgumentLengthException("param invalid: 1->query word, 2->type");
}
// ObjectInspector.Category.LIST
if (!objectInspectors[0].getCategory().equals(arrayOI.getCategory())) {
throw new UDFArgumentTypeException(0, "[" + arrayOI.getTypeName() + "] type is needed, " +
"but [" + objectInspectors[0].getTypeName() + "] type is found");
}
// ObjectInspector.Category.PRIMITIVE
if (!objectInspectors[1].getCategory().equals(valueOI.getCategory())) {
throw new UDFArgumentTypeException(1, "[" + valueOI.getTypeName() + "] type is needed, " +
"but [" + objectInspectors[1].getTypeName() + "] is found");
}
// init jedis cluster
jedisCluster = RedisClusterConnector.getRedisCluster();
// getUdfName:返回包名; getFuncName返回包名的最后一个字段,仅截取第10个字符以后的字符串
LOGGER.warn("[JUST_FOR_DEBUG] getUdfName:[{}], getFuncName:[{}]", getUdfName(), getFuncName());
return PrimitiveObjectInspectorFactory
.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN);
//定义函数的返回类型为Java的list
//ObjectInspector returnOi = PrimitiveObjectInspectorFactory
// .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);
//return ObjectInspectorFactory.getStandardListObjectInspector(returnOi);
}
// 这个方法类似UDF的evaluate()方法。它处理真实的参数,并返回最终结果。
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
// 读取传入UDF的第一个字段
Object queries = deferredObjects[0].get();
// 读取传入UDF的第二个字段
Object keywordPackageTag = deferredObjects[1].get();
if (null == queries || null == keywordPackageTag) {
return false;
}
// 解析String字段(方式一):StringObjectInspector 解析
String keywordPackageName = stringOI.getPrimitiveJavaObject(keywordPackageTag);
// 解析String字段(方式二):toString()
// String udfName = deferredObjects[1].get().toString();
// 解析 List<String> 字段(方式一):一起解析
List<String> queryList = (List<String>) arrayOI.getList(queries);
/*
// 解析 List<String> 字段(方式二):逐个元素解析
int listLength = arrayOI.getListLength(queries);
List<String> queryList = new ArrayList<>(listLength);
for (int i = 0; i < listLength; i ++) {
String s = arrayOI.getListElement(queries, i).toString();
queryList.add(s);
}
*/
List<WordTrieEntity> dict = generateFromRedis(keywordPackageName);
return WordTrieEntity.contains(queryList, dict);
}
// 里面写一些介绍性信息,在用户对sql语句进行explain的时候显示
@Override
public String getDisplayString(String[] strings) {
if (null == strings || strings.length == 0) {
return "null or empty";
} else {
return String.join(";", strings);
}
}
/**
* 从Redis 中读取词包,在此处构建字典树
* @param keywordPackageName
* @return
*/
private static List<WordTrieEntity> generateFromRedis(String keywordPackageName) {
if (null == dictTrie || dictTrie.isEmpty() || !dictTrie.containsKey(keywordPackageName)) {
synchronized (KeyWordKeyFilterUdf.class) {
if (null == dictTrie || dictTrie.isEmpty()) {
dictTrie = new HashMap<>();
}
if (!dictTrie.containsKey(keywordPackageName)) {
LOGGER.warn("[JUST_FOR_DEBUG] Build dict trie by keyword: [{}]", keywordPackageName);
String redisKey = PREFIX_REDIS_KEY + keywordPackageName;
if (null == jedisCluster) {
jedisCluster = RedisClusterConnector.getRedisCluster();
LOGGER.error("Something happened to jedis cluster, reconnect to it");
}
String value = jedisCluster.get(redisKey);
if (StringUtils.isEmpty(value)) {
LOGGER.error("Cannot load keyword from redis by key:[{}]", redisKey);
return new ArrayList<>();
}
List<WordTrieEntity> wordTrieEntityList = WordTrieEntity.generateKeywordTrieList(value);
dictTrie.put(keywordPackageName, wordTrieEntityList);
}
}
}
return dictTrie.get(keywordPackageName);
}
public static void main(String[] args) {
// WordKeyFilterUdf wordKeyFilterUdf = new WordKeyFilterUdf();
// Set<String> dict = WordKeyFilterUdf.generateMap("sogo_dict_udf");
// System.out.println("trueOrFalse:" + dict.contains("sogo"));
// System.out.println("trueOrFalse:" + dict.contains("sogo1"));
// dict = WordKeyFilterUdf.generateMap("xiaomi_dict_udf");
// System.out.println("trueOrFalse:" + dict.contains("miui"));
// System.out.println("trueOrFalse:" + dict.contains("sougou"));
//
// List<WordTrieEntity> wordTrieEntityList = WordKeyFilterUdf.generateFromRedis("xiaomi_udf", "");
// Seq<String> stringSeq = new ArraySeq<>(1);
// String query = "小米10周年,雷军";
// stringSeq.update(0, query);
// System.out.println("trueOrFalse:" + WordTrieEntity.contains(stringSeq, wordTrieEntityList));
StringObjectInspector stringObjectInspector = (StringObjectInspector) valueOI;
System.out.println("stringObjectInspector:" + stringObjectInspector.getTypeName());
System.out.println("valueOI:" + valueOI.getTypeName());
}
}
注册UDF
打包
将上述代码打包后的文件名为:
sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
发送至开发机
在开发机上的地址为:
/search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
注册
HiveSQL/SparkSQL
进入Hive或SparkSQL环境后,执行
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar;
CREATE TEMPORARY FUNCTION keyword_udf AS 'com.sogo.sparkudf.udf.KeyWordKeyFilterUdf';
show functions;
PySpark
进入PySpark环境后,执行
spark.sql("ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION keyword_udf AS 'com.sogo.sparkudf.udf.KeyWordKeyFilterUdf'")
spark.sql("show user functions").show(10,0)
测试
以PySpark中测试为列:
测试数据
testDs.show(4)
DataFrame[imei: string, fwords: array<string>]
+--------------------+--------------------+
| imei| fwords|
+--------------------+--------------------+
|00003AC86C0E62825...| [鬼谷子狼道等十本书]|
|00005FD5EA9B96624...|[哄女友, 后来, 古, 火焰切割...|
|00006231671F8272E...| [欧尚]|
|00007A428750D7C19...| [公仔迷你]|
+--------------------+--------------------+
关键词匹配
测试 1: 测试UDF是否能正常运行
testDs.registerTempTable('testDs')
xiaomi = spark.sql("select imei,fwords from testDs where keyword_udf(fwords, 'xiaomi_udf')!=0")
xiaomi.show(10,0)
结果:
每执行1条SQL,打印一条日志,结果符合预期
20/09/01 15:44:12 WARN KeyWordKeyFilterUdf: [DEBUG] getUdfName:[com.sogo.sparkudf.udf.KeyWordKeyFilterUdf], getFuncName:[filterudf]
过滤结果符合预期
+----------------------------------------+--------+
|imei |fword |
+----------------------------------------+--------+
|82C455E4845CA8C50ABFCCDAE80FFB1D4F444135|小米手机的 |
|82C455E4845CA8C50ABFCCDAE80FFB1D4F444135|小米主题 |
|08B687D554A238008EA117049A87776C4E6A6730|小米 |
|08B687D554A238008EA117049A87776C4E6A6730|小米书包 |
|08B687D554A238008EA117049A87776C4E6A6730|小米10 |
|08B687D554A238008EA117049A87776C4E6A6730|小米10至尊宝 |
|08B687D554A238008EA117049A87776C4E6A6730|小米10至尊 |
|08B687D554A238008EA117049A87776C4E6A6730|小米旅行箱青春版|
|08B687D554A238008EA117049A87776C4E6A6730|如何进入小米 |
|08B687D554A238008EA117049A87776C4E6A6730|小米 |
+----------------------------------------+--------+
测试 2: 测试动态加载词包
在测试1的基础上,直接运行华为词包
huawei = spark.sql("select imei,fwords from testDs where keyword_udf(fwords, 'huawei_udf')!=0")
huawei.show(10,0)
输出结果符合预期
20/09/01 16:02:41 WARN KeyWordKeyFilterUdf: [DEBUG] getUdfName:[com.sogo.sparkudf.udf.KeyWordKeyFilterUdf], getFuncName:[filterudf]
+----------------------------------------+-------------------+
|imei |fword |
+----------------------------------------+-------------------+
|52686FA528D898ECE0F30EDF43A2E4B94D444D33|华为P40pro |
|52686FA528D898ECE0F30EDF43A2E4B94D444D33|华为手机 |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为手机屏幕最左侧有色线 |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为P40Pro侧边色线 |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为 |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为手机屏幕最左侧有色线 |
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|华为P40开发者xuanxiang选项|
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|华为P40kaifazhe开发者 |
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|电脑怎么用华为手机怎么共享网络 |
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|huawei华为 |
+----------------------------------------+-------------------+
小结
为了使用同一个UDF动态加载不同的词包(词包可以无限扩展),通过构建常量列的方式,补充UDF不能传入非数据列,最终实现了动态加载词包的功能。当然,我们还应删除过期得词包,以节约资源占用。
参考文献
1 Spark UDF加载外部资源 https://cloud.tencent.com/developer/article/1688828
4 Spark UDF实现demo https://cloud.tencent.com/developer/article/1672068
5 Hive udf、UDF、GenericUDF https://blog.csdn.net/wangshuminjava/article/details/79663998
6 Hive- UDF&GenericUDF https://www.jianshu.com/p/ca9dce6b5c37
7 Hive之ObjectInspector接口解析笔记 https://blog.csdn.net/weixin_39469127/article/details/89739285