每日分享 – 大数据Presto(四):Presto自定义函数和JDBC连接

​Presto自定义函数和JDBC连接

一、Presto 自定义函数

我们可以登录Presto客户端,使用命令:show functions 来查询对应的内置函数。我们也可以自己定义函数,自定义的函数包含UDF和UDAF函数。

1、​​​​​​​​​​​​​​UDF函数

自定义UDF函数及使用可以按照下面步骤来实现。

1.1、创建Maven项目,加入如下依赖

<dependency>
    <groupId>com.facebook.presto</groupId>
    <artifactId>presto-spi</artifactId>
    <version>0.259</version>
</dependency>
<dependency>
    <groupId>com.facebook.presto</groupId>
    <artifactId>presto-array</artifactId>
    <version>0.259</version>
</dependency>
<dependency>
    <groupId>io.airlift</groupId>
    <artifactId>stats</artifactId>
    <version>0.163</version>
</dependency>

<build>
  <plugins>

    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>2.4</version>
      <configuration>
        <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
        <!--<appendAssemblyId>false</appendAssemblyId>-->
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <mainClass>com.lw.java.myflink.Streaming.example.FlinkReadSocketData</mainClass>
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>assembly</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

1.2、创建Presto注册插件类

package com.lansonjy.prestocode;
import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableSet;

import java.util.Set;
//Presto 注册自定义函数的类,此类需要继承Plugin接口
public class MyFunctionsPlugin implements Plugin {
    @Override
    public Set<Class<?>> getFunctions()
    {
        return ImmutableSet.<Class<?>>builder()
                //注册UDF,这里填写对应的UDF类
                .add(MyUDF.class)
                .build();
    }
}

1.3、创建“MyUDF”类,实现自定义UDF逻辑

这里自定义的UDF函数实现大写字母转换成小写字母。代码如下:

package com.lansonjy.prestocode;

import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlType;
import com.facebook.presto.spi.type.StandardTypes;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

//自定义UDF函数
public class MyUDF {
    //自定义UDF函数使用时的名称
    @ScalarFunction("myudf")
    //函数的描述
    @Description("转换字母大写为小写")
    //指定函数的返回类型,字符串类型必须返回Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型
    @SqlType(StandardTypes.VARCHAR)
    public static Slice lowercase(@SqlType(StandardTypes.VARCHAR) Slice in)
    {
        String argument = in.toStringUtf8();
        return Slices.utf8Slice(argument.toLowerCase());
    }
}

1.4、创建“resources”资源目录

在resouces资源目录中创建“META-INF/services”多级目录,在目录中创建“com.facebook.presto.spi.Plugin”配置文件,Presto将会根据此配置文件找到对应的注册自定义函数类。在此文件中需要指定注册自定义函数的类:

com.lansonjy.prestocode.MyFunctionsPlugin

1.5、将项目打包,上传到Presto集群

将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。

1.6、使用自定义UDF函数

#登录Presto客户端
./presto --server node3:8080 --catalog mysql --schema presto_db

#查询所有函数
presto:presto_db> show functions;

#使用这个函数查询转换数据
presto:presto_db> select myudf('ABCDEF');
 _col0  
--------
 abcdef 
(1 row)

2、​​​​​​​UDAF函数

UDAF是自定义聚合函数,下面自定义一个UDAF实现计算平均数聚合函数功能,步骤如下:

2.1、在项目中创建“MyUDAF”类

package com.lansonjy.prestocode;

import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.function.*;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.StandardTypes;

//presto 自定义聚合函数实现-实现平均数计算
//自定义聚合函数使用时的名称
@AggregationFunction("myudaf")
//自定义聚合函数注释
@Description("我的自定义聚合函数,实现计算平均数")
public class MyUDAF {
    //输入数据注释
    @InputFunction
    public static void input(LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value) {
        //针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。
        state.setLong(state.getLong() + 1);
        state.setDouble(state.getDouble() + value);
    }

    //聚合数据注释
    @CombineFunction
    public static void combine(LongAndDoubleState state, LongAndDoubleState otherState) {
        //将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。
        state.setLong(state.getLong() + otherState.getLong());
        state.setDouble(state.getDouble() + otherState.getDouble());
    }

    //输出数据注释
    @OutputFunction(StandardTypes.DOUBLE)
    public static void output(LongAndDoubleState state, BlockBuilder out) {
        //最终输出结果到一个 BlockBuilder。
        long count = state.getLong();
        if (count == 0) {
            out.appendNull();
        } else {
            double value = state.getDouble();
            DoubleType.DOUBLE.writeDouble(out, value / count);
        }
    }
}

以上类中涉及到了自定义类型LongAndDoubelState接口实现,此接口继承了AccumulatorState接口,对于简单的计算逻辑,只是获取设置值,那么可以定义简单接口来实现,里面只需要实现对应的get,set方法实现即可。对于复杂的计算逻辑需要自定义类实现接口,实现复杂的计算逻辑,代码如下:

package com.lansonjy.prestocode;

import com.facebook.presto.spi.function.AccumulatorState;

public interface LongAndDoubleState extends AccumulatorState {
    long getLong();

    void setLong(long value);

    double getDouble();

    void setDouble(double value);
}

2.2、在“MyFunctionPlugin”中注册UDAF

//Presto 注册自定义函数的类,此类需要继承Plugin接口
public class MyFunctionsPlugin implements Plugin {
    @Override
    public Set<Class<?>> getFunctions()
    {
        return ImmutableSet.<Class<?>>builder()
                //注册UDF,这里填写对应的UDF类
                .add(MyUDF.class)
                //注册UDAF,这里填写对应的UDAF 类
                .add(MyUDAF.class)
                .build();
    }
}

2.3、打包,上传到各个Presto

将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。

2.4、在presto中执行如下命令

#登录Presto客户端
[root@node3 presto-0.259]# ./presto --server node3:8080 --catalog mysql --schema presto_db

#查看函数
presto:presto_db> show functions;

#执行聚合查询
presto:presto_db> select pkg_name,myudaf(amount) as abc from machine_consume_detail group by pkg_name;

二、Presto JDBC连接

使用JDBC连接Presto需要在项目中导入以下依赖:

<dependency>
    <groupId>io.prestosql</groupId>
    <artifactId>presto-jdbc</artifactId>
    <version>312</version>
</dependency>

JDBC连接代码如下:

public class ReadDataFromPresto {
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
        Connection conn = DriverManager.getConnection("jdbc:presto://node3:8080/mysql/presto_db","root",null);
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery("select pkg_name,sum(amount) as total_amount from machine_consume_detail group by pkg_name");
        while (rs.next()) {
            String pkgName = rs.getString("pkg_name");
            double totalAmount = rs.getDouble("total_amount");
            System.out.println("pkgName = "+pkgName+",totalAmount="+totalAmount);
        }
        rs.close();
        conn.close();
    }
}

正文完