Spark GenericUDF动态加载外部资源

Spark GenericUDF动态加载外部资源

前言

文章1中提到的动态加载外部资源,其实需要重启Spark任务才会生效。受到文章2启动,可以在数据中加入常量列,表示外部资源的地址,并作为UDF的参数(UDF不能输入非数据列,因此用此方法迂回解决问题),再结合文章1的方法,实现同一UDF,动态加载不同资源。本文通过继承GenericUDF类,读取存放在Redis集群中的字符串,构建字典树,完成词包匹配,来说明这一工作。

由于GenericUDF不能通过spark.udf().register(…)的方式注册3,我们将采用文章4方法,即通过在SparkSQL或Hive中创建UDF函数,再调用。

UDF和GenericUDF的区别

UDF和GenericUDF的区别可参考文章5:

开发自定义UDF函数有两种方式,一个是继承org.apache.hadoop.hive.ql.exec.UDF,另一个是继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
如果是针对简单的数据类型(比如String、Integer等)可以使用UDF,如果是针对复杂的数据类型(比如Array、Map、Struct等),可以使用GenericUDF,另外,GenericUDF还可以在函数开始之前和结束之后做一些初始化和关闭的处理操作。

GenericUDF的demo可参考文章6,文章7详细介绍了Generic UDF中的ObjectInspector。

准备工作

外部资源的数据结构

KeyWordSetEntity.java

name字段:两方面作用:1. 在外部存储中,name唯一标记对应资源(如mysql的主键,Redis中的key); 2. 后续UDF中的常量列的值。

keyWordSet字段:外部资源;list结构表示存在多个词包;KeyWordPackage结构表示词包中存在”关键词”和”否词”。

package com.sogo.sparkudf.entity;

import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;
import java.util.*;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/31
 * @Time: 10:44
 * @des:
 */
@Setter
@Getter
public class KeyWordSetEntity implements Serializable {
    // name
    private String name;
    // key word set
    private List<KeyWordPackage> keyWordSet;
    // constructor
    public KeyWordSetEntity() {}
    // constructor
    public KeyWordSetEntity(String name, List<KeyWordPackage> keyWordSet) {
        this.name = name;
        this.keyWordSet = keyWordSet;
    }
    // constructor
    public KeyWordSetEntity(String name, KeyWordPackage... keyWordPackages) {
        this.name = name;
        keyWordSet = new ArrayList<>();
        Collections.addAll(keyWordSet, keyWordPackages);
    }

    @Getter
    @Setter
    public static class KeyWordPackage implements Serializable {
        private Set<String> keywords;
        private Set<String> stopwords;

        public KeyWordPackage() {}

        public KeyWordPackage(Set<String> keywords, Set<String> stopwords) {
            this.keywords = keywords;
            this.stopwords = stopwords;
        }
    }

    public static Set<String> generateKeyWordSet(String keyword, String separator) {
        Set<String> resSet = new HashSet<>();
        String[] fields = keyword.split(separator, -1);
        Collections.addAll(resSet, fields);
        return resSet;
    }

    public static KeyWordSetEntity generateTestData() {
        Set<String> keywords1 = generateKeyWordSet("小米手机,小米10", ",");
        Set<String> stopwords1 = generateKeyWordSet("大米", ",");
        Set<String> keywords2 = generateKeyWordSet("雷军,武汉大学", ",");
        Set<String> stopwords2 = generateKeyWordSet("联想", ",");

        KeyWordPackage keyWordPackage1 = new KeyWordPackage(keywords1, stopwords1);
        KeyWordPackage keyWordPackage2 = new KeyWordPackage(keywords2, stopwords2);

        String name = "xiaomi_udf";
        return new KeyWordSetEntity(name, keyWordPackage1, keyWordPackage2);
    }

    public static KeyWordSetEntity generateTestData2() {
        Set<String> keywords1 = generateKeyWordSet("华为手机,华为荣耀", ",");
        Set<String> stopwords1 = generateKeyWordSet("小米手机", ",");
        Set<String> keywords2 = generateKeyWordSet("华为P40", ",");
        Set<String> stopwords2 = generateKeyWordSet("联想", ",");

        KeyWordPackage keyWordPackage1 = new KeyWordPackage(keywords1, stopwords1);
        KeyWordPackage keyWordPackage2 = new KeyWordPackage(keywords2, stopwords2);

        String name = "huawei_udf";
        return new KeyWordSetEntity(name, keyWordPackage1, keyWordPackage2);
    }

    public static void main(String[] args) {
        KeyWordSetEntity keyWordSetEntity = generateTestData2();
        String jsonString = JSONObject.toJSONString(keyWordSetEntity, true);
        System.out.println("jsonString;" + jsonString);
    }
}

字典树数据类型

WordTrieEntity.java

完成字典树的构建、关键词匹配的工作。

package com.sogo.sparkudf.entity;

import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.ahocorasick.trie.Trie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Seq;
import scala.collection.mutable.ArraySeq;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/26
 * @Time: 23:08
 * @des:
 */
@Setter
@Getter
public class WordTrieEntity implements Serializable {
    // LOGGER
    private static final Logger LOGGER = LoggerFactory.getLogger(WordTrieEntity.class);
    // 不被序列化
    private transient Trie keywordsTrie;
    // 不被序列化
    private transient Trie stopwordsTrie;

    public WordTrieEntity(Trie keywordsTrie, Trie stopwordsTrie) {
        this.keywordsTrie = keywordsTrie;
        this.stopwordsTrie = stopwordsTrie;
    }

    public static List<WordTrieEntity> generateKeywordTrieList(String jsonString) {
        // get key word
        KeyWordSetEntity keyWordSetEntity = JSON.parseObject(jsonString, KeyWordSetEntity.class);
        List<WordTrieEntity> keywordsTrieList = new ArrayList<>();
        for (KeyWordSetEntity.KeyWordPackage keyWordPackage: keyWordSetEntity.getKeyWordSet()) {
            Trie keywordsTrie = buildTrie(keyWordPackage.getKeywords());
            Trie stopwordsTrie = buildTrie(keyWordPackage.getStopwords());
            keywordsTrieList.add(new WordTrieEntity(keywordsTrie, stopwordsTrie));
        }
        System.out.println("[DEBUG]I am initialized in WordTrieEntity");
        return keywordsTrieList;
    }

    private static Trie buildTrie(Set<String> stringSet) {
        return Trie.builder().addKeywords(stringSet).build();
    }

    public static Boolean contains(Seq<String> stringSeq, List<WordTrieEntity> wordTrieList) {
        // nothing to filter
        if (null == wordTrieList || wordTrieList.isEmpty()) {
            return true;
        }
        for (WordTrieEntity wordTrie : wordTrieList) {
            // 词包间是“与”的关系
            if (Boolean.FALSE.equals(contains(wordTrie, stringSeq))) {
                return false;
            }
        }
        return true;
    }

    public static Boolean contains(List<String> stringSeq, List<WordTrieEntity> wordTrieList) {
        // nothing to filter
        if (null == wordTrieList || wordTrieList.isEmpty()) {
            return true;
        }
        for (WordTrieEntity wordTrie : wordTrieList) {
            // 词包间是“与”的关系
            if (Boolean.FALSE.equals(contains(wordTrie, stringSeq))) {
                return false;
            }
        }
        return true;
    }

    private static Boolean contains(WordTrieEntity wordTrie, Seq<String> stringSeq) {
        // 只要存在一个即可
        for (int i = 0; i < stringSeq.size(); i ++)
            // 词包内是“或”的关系
            if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.apply(i)))) {
                return true;
            }
        // 都不存在时,返回false
        return false;
    }

    private static Boolean contains(WordTrieEntity wordTrie, List<String> stringSeq) {
        // 只要存在一个即可
        for (int i = 0; i < stringSeq.size(); i ++)
            // 词包内是“或”的关系
            if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.get(i)))) {
                return true;
            }
        // 都不存在时,返回false
        return false;
    }

    private static Boolean contains(WordTrieEntity wordTrie, String query) {
        // 否词
        if (null != wordTrie.getStopwordsTrie() && wordTrie.getStopwordsTrie().containsMatch(query)) {
            return false;
        }
        // 匹配关键词
        if (null == wordTrie.getKeywordsTrie()) {
            LOGGER.error("keyword is null");
        }
        return null != wordTrie.getKeywordsTrie() && wordTrie.getKeywordsTrie().containsMatch(query);
    }

    private static Seq<String> list2Seq(List<String> list) {
        Seq<String> stringSeq = new ArraySeq<>(list.size());
        for (int i = 0; i < list.size(); i ++) {
            ((ArraySeq<String>) stringSeq).update(i, list.get(i));
        }
        return stringSeq;
    }
}

Redis Cluster的单例

RedisClusterConnector.java

存放外部资源(词包),用于构建字典树。单列模式保证了redis实例仅被初始化一次。

package com.sogo.sparkudf.connnector;

import com.alibaba.fastjson.JSON;
import com.sogo.sparkudf.entity.KeyWordSetEntity;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/20
 * @Time: 22:25
 * @des:
 */
@Setter
@Getter
public class RedisClusterConnector implements Serializable {
    // LOGGER
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisClusterConnector.class);
    // jedis cluster instance
    private static transient JedisCluster jedisCluster;

    public static JedisCluster getRedisCluster() {
        if (null == jedisCluster) {
            synchronized (RedisClusterConnector.class) {
                if (null == jedisCluster) {
                    LOGGER.warn("[JUST_FOR_DEBUG] Test how many times the function is called");
                    // jedis pool config
                    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
                    poolConfig.setMaxIdle(50);
                    poolConfig.setMinIdle(0);
                    poolConfig.setMaxTotal(50);
                    // host and port
                    Set<HostAndPort> nodes = new HashSet<>();
                    nodes.add(new HostAndPort("10.162.48.186", 6001));
                    nodes.add(new HostAndPort("10.162.48.186", 6020));
                    nodes.add(new HostAndPort("10.162.48.225", 6001));
                    nodes.add(new HostAndPort("10.162.48.225", 6020));
                    // init
                    int connectionTimeout = 10000;
                    int soTimeout = 5000;
                    int maxAttempts = 2;
                    jedisCluster = new JedisCluster(nodes, connectionTimeout, soTimeout, maxAttempts, poolConfig);
                }
            }
        }
        return jedisCluster;
    }

    public static void close() {
        if (null != jedisCluster) {
            synchronized (RedisClusterConnector.class) {
                if (null != jedisCluster) {
                    jedisCluster.close();
                }
            }
        }
    }

    public static void main(String[] args) {

        JedisCluster jedisCluster = RedisClusterConnector.getRedisCluster();
        KeyWordSetEntity keyWordSetEntity = KeyWordSetEntity.generateTestData();
        String udfName = keyWordSetEntity.getName();
        String value = JSON.toJSONString(keyWordSetEntity);
        jedisCluster.set("keyword_package_" + udfName, value);
        String s = jedisCluster.get("keyword_package_" + udfName);
        System.out.println("value:" + s);
    }
}

继承GenericUDF

KeyWordKeyFilterUdf.java

package com.sogo.sparkudf.udf;

import com.sogo.sparkudf.connnector.RedisClusterConnector;
import com.sogo.sparkudf.entity.WordTrieEntity;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;

import java.util.*;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/29
 * @Time: 10:58
 * @des: reference from
 * https://www.jianshu.com/p/ca9dce6b5c37
 * https://www.jianshu.com/p/ba0e54579cc4
 */

/**
 * @Describtion 注解是可选的,用于对函数进行说明,其中的FUNC字符串表示函数名,
 * 当使用DESCRIBE FUNCTION命令时,替换成函数名。其包含三个属性:
 *
 * name:用于指定Hive中的函数名。
 * value:用于描述函数的参数。
 * extended:额外的说明,如,给出示例。当使用DESCRIBE FUNCTION EXTENDED name的时候打印。
 * 链接:https://www.jianshu.com/p/ca9dce6b5c37
 */
@Description(
        name = "keyword_match_udf",
        value = "_FUNC_(queries, 'keyword_package') - from the input string"
                + "returns true if queries contain keyword_package",
        extended = "Example:\n"
                + " > SELECT _FUNC_(queries, 'keyword_package') FROM src;"
)
public class KeyWordKeyFilterUdf extends GenericUDF {
    // LOGGER
    private static final Logger LOGGER = LoggerFactory.getLogger(KeyWordKeyFilterUdf.class);
    // prefix of redis key
    private static final String PREFIX_REDIS_KEY = "keyword_package_";
    // key word trie
    private static Map<String, List<WordTrieEntity>> dictTrie = new HashMap<>();
    // object inspector: String
    private static final ObjectInspector valueOI = PrimitiveObjectInspectorFactory
            .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);
    // string object inspector: for parse string
    private static final StringObjectInspector stringOI = (StringObjectInspector) valueOI;
    // object inspector: list<string>
    private static final ListObjectInspector arrayOI = ObjectInspectorFactory.getStandardListObjectInspector(valueOI);
    // redis cluster client
    private static JedisCluster jedisCluster;

    /**
     * Additionally setup GenericUDF with MapredContext before initializing.
     * This is only called in runtime of MapRedTask.
     *
     * @param context context
     */
    @Override
    public void configure(MapredContext context) {
        /*
        org.apache.hadoop.mapred.JobConf jobConf = context.getJobConf();
        String user = jobConf.getUser();
        String jobName = jobConf.getJobName();
        String queueName = jobConf.getQueueName();
        int numMapTasks = jobConf.getNumMapTasks();
        int numReduceTasks = jobConf.getNumReduceTasks();
        int maxMapAttempts = jobConf.getMaxMapAttempts();
        int maxReduceAttempts = jobConf.getMaxReduceAttempts();
        LOGGER.warn("[JUST_FOR_DEBUG] user:{}, jobName:{}, queueName:{}, " +
                "numMapTasks:{}, numReduceTasks:{}, maxMapAttempts:{}, maxReduceAttempts:{}",
                user, jobName, queueName, numMapTasks, numReduceTasks, maxMapAttempts, maxReduceAttempts);
        */
    }

    //这个方法只调用一次,并且在evaluate()方法之前调用。该方法接受的参数是一个ObjectInspectors数组。
    // 该方法检查接受正确的参数类型和参数个数。
    // 注:每次执行sql前,都会运行
    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // 输入1个元素:1. query
        if (objectInspectors.length < 2) {
            throw new UDFArgumentLengthException("param invalid: 1->query word, 2->type");
        }
        // ObjectInspector.Category.LIST
        if (!objectInspectors[0].getCategory().equals(arrayOI.getCategory())) {
            throw new UDFArgumentTypeException(0, "[" + arrayOI.getTypeName() + "] type is needed, " +
                    "but [" + objectInspectors[0].getTypeName() + "] type is found");
        }
        // ObjectInspector.Category.PRIMITIVE
        if (!objectInspectors[1].getCategory().equals(valueOI.getCategory())) {
            throw new UDFArgumentTypeException(1, "[" + valueOI.getTypeName() + "] type is needed, " +
                    "but [" + objectInspectors[1].getTypeName() + "] is found");
        }
        // init jedis cluster
        jedisCluster = RedisClusterConnector.getRedisCluster();

        // getUdfName:返回包名; getFuncName返回包名的最后一个字段,仅截取第10个字符以后的字符串
        LOGGER.warn("[JUST_FOR_DEBUG] getUdfName:[{}], getFuncName:[{}]", getUdfName(), getFuncName());

        return PrimitiveObjectInspectorFactory
                .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN);
        //定义函数的返回类型为Java的list
        //ObjectInspector returnOi = PrimitiveObjectInspectorFactory
        // .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);
        //return ObjectInspectorFactory.getStandardListObjectInspector(returnOi);
    }

    // 这个方法类似UDF的evaluate()方法。它处理真实的参数,并返回最终结果。
    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        // 读取传入UDF的第一个字段
        Object queries = deferredObjects[0].get();
        // 读取传入UDF的第二个字段
        Object keywordPackageTag = deferredObjects[1].get();
        if (null == queries || null == keywordPackageTag) {
            return false;
        }
        // 解析String字段(方式一):StringObjectInspector 解析
        String keywordPackageName = stringOI.getPrimitiveJavaObject(keywordPackageTag);
        // 解析String字段(方式二):toString()
        // String udfName = deferredObjects[1].get().toString();

        // 解析 List<String> 字段(方式一):一起解析
        List<String> queryList = (List<String>) arrayOI.getList(queries);
        /*
        // 解析 List<String> 字段(方式二):逐个元素解析
        int listLength = arrayOI.getListLength(queries);
        List<String> queryList = new ArrayList<>(listLength);
        for (int i = 0; i < listLength; i ++) {
            String s = arrayOI.getListElement(queries, i).toString();
            queryList.add(s);
        }
        */
        List<WordTrieEntity> dict = generateFromRedis(keywordPackageName);
        return WordTrieEntity.contains(queryList, dict);
    }

    // 里面写一些介绍性信息,在用户对sql语句进行explain的时候显示
    @Override
    public String getDisplayString(String[] strings) {
        if (null == strings || strings.length == 0) {
            return "null or empty";
        } else {
            return String.join(";", strings);
        }
    }

    /**
     * 从Redis 中读取词包,在此处构建字典树
     * @param keywordPackageName
     * @return
     */
    private static List<WordTrieEntity> generateFromRedis(String keywordPackageName) {
        if (null == dictTrie || dictTrie.isEmpty() || !dictTrie.containsKey(keywordPackageName)) {
            synchronized (KeyWordKeyFilterUdf.class) {
                if (null == dictTrie || dictTrie.isEmpty()) {
                    dictTrie = new HashMap<>();
                }
                if (!dictTrie.containsKey(keywordPackageName)) {
                    LOGGER.warn("[JUST_FOR_DEBUG] Build dict trie by keyword: [{}]", keywordPackageName);
                    String redisKey = PREFIX_REDIS_KEY + keywordPackageName;
                    if (null == jedisCluster) {
                        jedisCluster = RedisClusterConnector.getRedisCluster();
                        LOGGER.error("Something happened to jedis cluster, reconnect to it");
                    }
                    String value = jedisCluster.get(redisKey);
                    if (StringUtils.isEmpty(value)) {
                        LOGGER.error("Cannot load keyword from redis by key:[{}]", redisKey);
                        return new ArrayList<>();
                    }
                    List<WordTrieEntity> wordTrieEntityList = WordTrieEntity.generateKeywordTrieList(value);
                    dictTrie.put(keywordPackageName, wordTrieEntityList);
                }
            }
        }
        return dictTrie.get(keywordPackageName);
    }

    public static void main(String[] args) {
//        WordKeyFilterUdf wordKeyFilterUdf = new WordKeyFilterUdf();
//        Set<String> dict = WordKeyFilterUdf.generateMap("sogo_dict_udf");
//        System.out.println("trueOrFalse:" + dict.contains("sogo"));
//        System.out.println("trueOrFalse:" + dict.contains("sogo1"));
//        dict = WordKeyFilterUdf.generateMap("xiaomi_dict_udf");
//        System.out.println("trueOrFalse:" + dict.contains("miui"));
//        System.out.println("trueOrFalse:" + dict.contains("sougou"));
//
//        List<WordTrieEntity> wordTrieEntityList = WordKeyFilterUdf.generateFromRedis("xiaomi_udf", "");
//        Seq<String> stringSeq = new ArraySeq<>(1);
//        String query = "小米10周年,雷军";
//        stringSeq.update(0, query);
//        System.out.println("trueOrFalse:" + WordTrieEntity.contains(stringSeq, wordTrieEntityList));
        StringObjectInspector stringObjectInspector = (StringObjectInspector) valueOI;
        System.out.println("stringObjectInspector:" + stringObjectInspector.getTypeName());
        System.out.println("valueOI:" + valueOI.getTypeName());
    }
}

注册UDF

打包

将上述代码打包后的文件名为:

sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar

发送至开发机

在开发机上的地址为:

/search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar

注册

HiveSQL/SparkSQL

进入Hive或SparkSQL环境后,执行

ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar;
CREATE TEMPORARY FUNCTION keyword_udf  AS 'com.sogo.sparkudf.udf.KeyWordKeyFilterUdf';
show functions;

PySpark

进入PySpark环境后,执行

spark.sql("ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION keyword_udf  AS 'com.sogo.sparkudf.udf.KeyWordKeyFilterUdf'")
spark.sql("show user functions").show(10,0)

测试

以PySpark中测试为列:

测试数据

testDs.show(4)

DataFrame[imei: string, fwords: array<string>]
+--------------------+--------------------+
|                imei|              fwords|
+--------------------+--------------------+
|00003AC86C0E62825...|         [鬼谷子狼道等十本书]|
|00005FD5EA9B96624...|[哄女友, 后来,, 火焰切割...|
|00006231671F8272E...|                [欧尚]|
|00007A428750D7C19...|              [公仔迷你]|
+--------------------+--------------------+

关键词匹配

测试 1: 测试UDF是否能正常运行

testDs.registerTempTable('testDs')
xiaomi = spark.sql("select imei,fwords from testDs where keyword_udf(fwords, 'xiaomi_udf')!=0")
xiaomi.show(10,0)

结果:

每执行1条SQL,打印一条日志,结果符合预期

20/09/01 15:44:12 WARN KeyWordKeyFilterUdf: [DEBUG] getUdfName:[com.sogo.sparkudf.udf.KeyWordKeyFilterUdf], getFuncName:[filterudf]

过滤结果符合预期

+----------------------------------------+--------+                             
|imei                                    |fword   |
+----------------------------------------+--------+
|82C455E4845CA8C50ABFCCDAE80FFB1D4F444135|小米手机的   |
|82C455E4845CA8C50ABFCCDAE80FFB1D4F444135|小米主题    |
|08B687D554A238008EA117049A87776C4E6A6730|小米      |
|08B687D554A238008EA117049A87776C4E6A6730|小米书包    |
|08B687D554A238008EA117049A87776C4E6A6730|小米10    |
|08B687D554A238008EA117049A87776C4E6A6730|小米10至尊宝 |
|08B687D554A238008EA117049A87776C4E6A6730|小米10至尊  |
|08B687D554A238008EA117049A87776C4E6A6730|小米旅行箱青春版|
|08B687D554A238008EA117049A87776C4E6A6730|如何进入小米  |
|08B687D554A238008EA117049A87776C4E6A6730|小米      |
+----------------------------------------+--------+

测试 2: 测试动态加载词包

在测试1的基础上,直接运行华为词包

huawei = spark.sql("select imei,fwords from testDs where keyword_udf(fwords, 'huawei_udf')!=0")
huawei.show(10,0)

输出结果符合预期

20/09/01 16:02:41 WARN KeyWordKeyFilterUdf: [DEBUG] getUdfName:[com.sogo.sparkudf.udf.KeyWordKeyFilterUdf], getFuncName:[filterudf]

+----------------------------------------+-------------------+                  
|imei                                    |fword              |
+----------------------------------------+-------------------+
|52686FA528D898ECE0F30EDF43A2E4B94D444D33|华为P40pro           |
|52686FA528D898ECE0F30EDF43A2E4B94D444D33|华为手机               |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为手机屏幕最左侧有色线       |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为P40Pro侧边色线       |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为                 |
|FD1417C3439BE6D55A2FC8D3722EA80A4D6A5532|华为手机屏幕最左侧有色线       |
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|华为P40开发者xuanxiang选项|
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|华为P40kaifazhe开发者   |
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|电脑怎么用华为手机怎么共享网络    |
|C137437C5F70B8B9F2F6CAF0271880AB4E6A4134|huawei华为           |
+----------------------------------------+-------------------+

小结

为了使用同一个UDF动态加载不同的词包(词包可以无限扩展),通过构建常量列的方式,补充UDF不能传入非数据列,最终实现了动态加载词包的功能。当然,我们还应删除过期得词包,以节约资源占用。

参考文献

1 Spark UDF加载外部资源 https://cloud.tencent.com/developer/article/1688828

2 流水账:使用GenericUDF为Hive编写扩展函数 http://zuojie.github.io/2014/03/14/%E6%B5%81%E6%B0%B4%E8%B4%A6-%E4%BD%BF%E7%94%A8GenericUDF%E4%B8%BAHive%E7%BC%96%E5%86%99%E6%8F%92%E4%BB%B6%E5%87%BD%E6%95%B0.html

3 https://stackoverflow.com/questions/36915090/in-spark-sql-how-do-you-register-and-use-a-generic-udf

4 Spark UDF实现demo https://cloud.tencent.com/developer/article/1672068

5 Hive udf、UDF、GenericUDF https://blog.csdn.net/wangshuminjava/article/details/79663998

6 Hive- UDF&GenericUDF https://www.jianshu.com/p/ca9dce6b5c37

7 Hive之ObjectInspector接口解析笔记 https://blog.csdn.net/weixin_39469127/article/details/89739285

正文完