Presto自定义函数和JDBC连接
一、Presto 自定义函数
我们可以登录Presto客户端,使用命令:show functions 来查询对应的内置函数。我们也可以自己定义函数,自定义的函数包含UDF和UDAF函数。
1、UDF函数
自定义UDF函数及使用可以按照下面步骤来实现。
1.1、创建Maven项目,加入如下依赖
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>0.259</version>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-array</artifactId>
<version>0.259</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>stats</artifactId>
<version>0.163</version>
</dependency>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
<!--<appendAssemblyId>false</appendAssemblyId>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.lw.java.myflink.Streaming.example.FlinkReadSocketData</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1.2、创建Presto注册插件类
package com.lansonjy.prestocode;
import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
//Presto 注册自定义函数的类,此类需要继承Plugin接口
public class MyFunctionsPlugin implements Plugin {
@Override
public Set<Class<?>> getFunctions()
{
return ImmutableSet.<Class<?>>builder()
//注册UDF,这里填写对应的UDF类
.add(MyUDF.class)
.build();
}
}
1.3、创建“MyUDF”类,实现自定义UDF逻辑
这里自定义的UDF函数实现大写字母转换成小写字母。代码如下:
package com.lansonjy.prestocode;
import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlType;
import com.facebook.presto.spi.type.StandardTypes;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
//自定义UDF函数
public class MyUDF {
//自定义UDF函数使用时的名称
@ScalarFunction("myudf")
//函数的描述
@Description("转换字母大写为小写")
//指定函数的返回类型,字符串类型必须返回Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型
@SqlType(StandardTypes.VARCHAR)
public static Slice lowercase(@SqlType(StandardTypes.VARCHAR) Slice in)
{
String argument = in.toStringUtf8();
return Slices.utf8Slice(argument.toLowerCase());
}
}
1.4、创建“resources”资源目录
在resouces资源目录中创建“META-INF/services”多级目录,在目录中创建“com.facebook.presto.spi.Plugin”配置文件,Presto将会根据此配置文件找到对应的注册自定义函数类。在此文件中需要指定注册自定义函数的类:
com.lansonjy.prestocode.MyFunctionsPlugin
1.5、将项目打包,上传到Presto集群
将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。
1.6、使用自定义UDF函数
#登录Presto客户端
./presto --server node3:8080 --catalog mysql --schema presto_db
#查询所有函数
presto:presto_db> show functions;
#使用这个函数查询转换数据
presto:presto_db> select myudf('ABCDEF');
_col0
--------
abcdef
(1 row)
2、UDAF函数
UDAF是自定义聚合函数,下面自定义一个UDAF实现计算平均数聚合函数功能,步骤如下:
2.1、在项目中创建“MyUDAF”类
package com.lansonjy.prestocode;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.function.*;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.StandardTypes;
//presto 自定义聚合函数实现-实现平均数计算
//自定义聚合函数使用时的名称
@AggregationFunction("myudaf")
//自定义聚合函数注释
@Description("我的自定义聚合函数,实现计算平均数")
public class MyUDAF {
//输入数据注释
@InputFunction
public static void input(LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value) {
//针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。
state.setLong(state.getLong() + 1);
state.setDouble(state.getDouble() + value);
}
//聚合数据注释
@CombineFunction
public static void combine(LongAndDoubleState state, LongAndDoubleState otherState) {
//将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。
state.setLong(state.getLong() + otherState.getLong());
state.setDouble(state.getDouble() + otherState.getDouble());
}
//输出数据注释
@OutputFunction(StandardTypes.DOUBLE)
public static void output(LongAndDoubleState state, BlockBuilder out) {
//最终输出结果到一个 BlockBuilder。
long count = state.getLong();
if (count == 0) {
out.appendNull();
} else {
double value = state.getDouble();
DoubleType.DOUBLE.writeDouble(out, value / count);
}
}
}
以上类中涉及到了自定义类型LongAndDoubelState接口实现,此接口继承了AccumulatorState接口,对于简单的计算逻辑,只是获取设置值,那么可以定义简单接口来实现,里面只需要实现对应的get,set方法实现即可。对于复杂的计算逻辑需要自定义类实现接口,实现复杂的计算逻辑,代码如下:
package com.lansonjy.prestocode;
import com.facebook.presto.spi.function.AccumulatorState;
public interface LongAndDoubleState extends AccumulatorState {
long getLong();
void setLong(long value);
double getDouble();
void setDouble(double value);
}
2.2、在“MyFunctionPlugin”中注册UDAF
//Presto 注册自定义函数的类,此类需要继承Plugin接口
public class MyFunctionsPlugin implements Plugin {
@Override
public Set<Class<?>> getFunctions()
{
return ImmutableSet.<Class<?>>builder()
//注册UDF,这里填写对应的UDF类
.add(MyUDF.class)
//注册UDAF,这里填写对应的UDAF 类
.add(MyUDAF.class)
.build();
}
}
2.3、打包,上传到各个Presto
将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。
2.4、在presto中执行如下命令
#登录Presto客户端
[root@node3 presto-0.259]# ./presto --server node3:8080 --catalog mysql --schema presto_db
#查看函数
presto:presto_db> show functions;
#执行聚合查询
presto:presto_db> select pkg_name,myudaf(amount) as abc from machine_consume_detail group by pkg_name;
二、Presto JDBC连接
使用JDBC连接Presto需要在项目中导入以下依赖:
<dependency>
<groupId>io.prestosql</groupId>
<artifactId>presto-jdbc</artifactId>
<version>312</version>
</dependency>
JDBC连接代码如下:
public class ReadDataFromPresto {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
Connection conn = DriverManager.getConnection("jdbc:presto://node3:8080/mysql/presto_db","root",null);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("select pkg_name,sum(amount) as total_amount from machine_consume_detail group by pkg_name");
while (rs.next()) {
String pkgName = rs.getString("pkg_name");
double totalAmount = rs.getDouble("total_amount");
System.out.println("pkgName = "+pkgName+",totalAmount="+totalAmount);
}
rs.close();
conn.close();
}
}