Java UDF, UDAF, UDTF
概述
Java UDF 为用户提供使用 Java 编写 UDF 的接口,以方便用户使用 Java 语言进行自定义函数的执行。 PALO 支持使用 JAVA 编写 UDF、UDAF 和 UDTF。下文如无特殊说明,使用 UDF 统称所有用户自定义函数。
- Java UDF 是较为常见的自定义标量函数 (Scalar Function),即每输入一行数据,就会有一行对应的结果输出,较为常见的有 ABS,LENGTH 等。值得一提的是对于用户来讲,Hive UDF 是可以直接迁移至 PALO 的。
- Java UDAF 即为自定义的聚合函数 (Aggregate Function),即在输入多行数据进行聚合后,仅输出一行对应的结果,较为常见的有 MIN,MAX,COUNT 等。
- JAVA UDTF 即为自定义的表函数 (Table Function),即每输一行数据,可以产生一行或多行的结果,在 PALO 中需要结合 Lateral View 使用可以达到行转列的效果,较为常见的有 EXPLODE,EXPLODE_SPLIT 等。该功能自 PALO 3.0 版本起开始支持。
类型对应关系
PALO 数据类型 | Java UDF 参数类型 |
---|---|
Bool | Boolean |
TinyInt | Byte |
SmallInt | Short |
Int | Integer |
BigInt | Long |
LargeInt | BigInteger |
Float | Float |
Double | Double |
Date | LocalDate |
Datetime | LocalDateTime |
IPV4/IPV6 | InetAddress |
String | String |
Decimal | BigDecimal |
array<Type> |
ArrayList<Type> (支持嵌套) |
map<Type1,Type2> |
HashMap<Type1,Type2> (支持嵌套) |
struct<Type...> |
ArrayList<Object> (从 3.0.0 版本开始支持) |
提示:
array
、map
、struct
类型可以嵌套其它类型。例如,PALO 中的 array<array<int>>
对应 Java UDF 参数类型为 ArrayList<ArrayList<Integer>>
,其他类型依此类推。
注意:
在创建函数时,请务必使用 string
类型而不是 varchar
,否则可能会导致函数执行失败。
使用限制
- 不支持复杂数据类型(HLL,Bitmap)。
- 当前允许用户自己指定 JVM 最大堆大小,配置项是 be.conf 中的
JAVA_OPTS
的 -Xmx 部分。默认 1024m,如果需要聚合数据,建议调大一些,增加性能,减少内存溢出风险。 - 由于 jvm 加载同名类的问题,不要同时使用多个同名类作为 udf 实现,如果想更新某个同名类的 udf,需要重启 be 重新加载 classpath。
快速上手
本节主要介绍如何开发 Java UDF。在 samples/doris-demo/java-udf-demo/
目录下提供了示例代码,供您参考。
UDF 的使用与普通的函数方式一致,唯一的区别在于,内置函数的作用域是全局的,而 UDF 的作用域是 DB 内部。
所以如果当前链接 session 位于数据库 DB 内部时,直接使用 UDF 名字会在当前 DB 内部查找对应的 UDF。否则用户需要显示的指定 UDF 的数据库名字,例如 dbName.funcName
。
接下来的章节介绍实例,均会在test_table
上做测试,对应建表如下:
1CREATE TABLE `test_table` (
2 id int NULL,
3 d1 double NULL,
4 str string NULL
5) ENGINE=OLAP
6DUPLICATE KEY(`id`)
7DISTRIBUTED BY HASH(`id`) BUCKETS 1
8PROPERTIES (
9"replication_num" = "1");
10
11insert into test_table values (1, 111.11, "a,b,c");
12insert into test_table values (6, 666.66, "d,e");
Java-UDF 实例介绍
使用 Java 代码编写 UDF,UDF 的主入口必须为 evaluate
函数。这一点与 Hive 等其他引擎保持一致。在本示例中,我们编写了 AddOne
UDF 来完成对整型输入进行加一的操作。
-
首先编写对应的 Java 代码,打包生成 JAR 包。
Java1public class AddOne extends UDF { 2 public Integer evaluate(Integer value) { 3 return value == null ? null : value + 1; 4 } 5}
-
在 PALO 中注册创建 Java-UDF 函数。
SQL1CREATE FUNCTION java_udf_add_one(int) RETURNS int PROPERTIES ( 2 "file"="file:///path/to/java-udf-demo-jar-with-dependencies.jar", 3 "symbol"="org.apache.doris.udf.AddOne", 4 "always_nullable"="true", 5 "type"="JAVA_UDF" 6);
-
用户使用 UDF 必须拥有对应数据库的
SELECT
权限。 如果想查看注册成功的对应 UDF 函数,可以使用SHOW FUNCTIONS 命令。SQL1select id,java_udf_add_one(id) from test_table; 2+------+----------------------+ 3| id | java_udf_add_one(id) | 4+------+----------------------+ 5| 1 | 2 | 6| 6 | 7 | 7+------+----------------------+
- 当不再需要 UDF 函数时,可以通过下述命令来删除一个 UDF 函数。
另外,如果定义的 UDF 中需要加载很大的资源文件,或者希望可以定义全局的 static 变量,可以参照文档下方的 static 变量加载方式。
Java-UDAF 实例介绍
在使用 Java 代码编写 UDAF 时,有一些必须实现的函数 (标记 required) 和一个内部类 State,下面将以具体的实例来说明。
- 首先编写对应的 Java UDAF 代码,打包生成 JAR 包。
示例 1: SimpleDemo 将实现一个类似的 sum 的简单函数,输入参数 INT,输出参数是 INT
1package org.apache.doris.udf;
2
3import java.io.DataInputStream;
4import java.io.DataOutputStream;
5import java.io.IOException;
6import java.util.logging.Logger;
7
8public class SimpleDemo {
9
10Logger log = Logger.getLogger("SimpleDemo");
11
12//Need an inner class to store data
13/*required*/
14public static class State {
15 /*some variables if you need */
16 public int sum = 0;
17}
18
19/*required*/
20public State create() {
21 /* here could do some init work if needed */
22 return new State();
23}
24
25/*required*/
26public void destroy(State state) {
27 /* here could do some destroy work if needed */
28}
29
30/*Not Required*/
31public void reset(State state) {
32 /*if you want this udaf function can work with window function.*/
33 /*Must impl this, it will be reset to init state after calculate every window frame*/
34 state.sum = 0;
35}
36
37/*required*/
38//first argument is State, then other types your input
39public void add(State state, Integer val) throws Exception {
40 /* here doing update work when input data*/
41 if (val != null) {
42 state.sum += val;
43 }
44}
45
46/*required*/
47public void serialize(State state, DataOutputStream out) throws IOException {
48 /* serialize some data into buffer */
49 out.writeInt(state.sum);
50}
51
52/*required*/
53public void deserialize(State state, DataInputStream in) throws IOException {
54 /* deserialize get data from buffer before you put */
55 int val = in.readInt();
56 state.sum = val;
57}
58
59/*required*/
60public void merge(State state, State rhs) throws Exception {
61 /* merge data from state */
62 state.sum += rhs.sum;
63}
64
65/*required*/
66//return Type you defined
67public Integer getValue(State state) throws Exception {
68 /* return finally result */
69 return state.sum;
70}
71}
示例 2: MedianUDAF 是一个计算中位数的功能,输入类型为 (DOUBLE, INT), 输出为 DOUBLE
1package org.apache.doris.udf.demo;
2
3import java.io.DataInputStream;
4import java.io.DataOutputStream;
5import java.math.BigDecimal;
6import java.util.Arrays;
7import java.util.logging.Logger;
8
9/*UDAF 计算中位数*/
10public class MedianUDAF {
11Logger log = Logger.getLogger("MedianUDAF");
12
13//状态存储
14public static class State {
15 //返回结果的精度
16 int scale = 0;
17 //是否是某一个 tablet 下的某个聚合条件下的数据第一次执行 add 方法
18 boolean isFirst = true;
19 //数据存储
20 public StringBuilder stringBuilder;
21}
22
23//状态初始化
24public State create() {
25 State state = new State();
26 //根据每个 tablet 下的聚合条件需要聚合的数据量大小,预先初始化,增加性能
27 state.stringBuilder = new StringBuilder(1000);
28 return state;
29}
30
31
32//处理执行单位处理各自 tablet 下的各自聚合条件下的每个数据
33public void add(State state, Double val, int scale) throws IOException {
34 if (val != null && state.isFirst) {
35 state.stringBuilder.append(scale).append(",").append(val).append(",");
36 state.isFirst = false;
37 } else if (val != null) {
38 state.stringBuilder.append(val).append(",");
39 }
40}
41
42//处理数据完需要输出等待聚合
43public void serialize(State state, DataOutputStream out) throws IOException {
44 //目前暂时只提供 DataOutputStream,如果需要序列化对象可以考虑拼接字符串,转换 json,序列化成字节数组等方式
45 //如果要序列化 State 对象,可能需要自己将 State 内部类实现序列化接口
46 //最终都是要通过 DataOutputStream 传输
47 out.writeUTF(state.stringBuilder.toString());
48}
49
50//获取处理数据执行单位输出的数据
51public void deserialize(State state, DataInputStream in) throws IOException {
52 String string = in.readUTF();
53 state.scale = Integer.parseInt(String.valueOf(string.charAt(0)));
54 StringBuilder stringBuilder = new StringBuilder(string.substring(2));
55 state.stringBuilder = stringBuilder;
56}
57
58//聚合执行单位按照聚合条件合并某一个键下数据的处理结果 ,每个键第一次合并时,state1 参数是初始化的实例
59public void merge(State state1, State state2) throws IOException {
60 state1.scale = state2.scale;
61 state1.stringBuilder.append(state2.stringBuilder.toString());
62}
63
64//对每个键合并后的数据进行并输出最终结果
65public Double getValue(State state) throws IOException {
66 String[] strings = state.stringBuilder.toString().split(",");
67 double[] doubles = new double[strings.length + 1];
68 doubles = Arrays.stream(strings).mapToDouble(Double::parseDouble).toArray();
69
70 Arrays.sort(doubles);
71 double n = doubles.length - 1;
72 double index = n * 0.5;
73
74 int low = (int) Math.floor(index);
75 int high = (int) Math.ceil(index);
76
77 double value = low == high ? (doubles[low] + doubles[high]) * 0.5 : doubles[high];
78
79 BigDecimal decimal = new BigDecimal(value);
80 return decimal.setScale(state.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
81}
82
83//每个执行单位执行完都会执行
84public void destroy(State state) {
85}
86
87}
-
在 PALO 中注册创建 Java-UADF 函数。
SQL1CREATE AGGREGATE FUNCTION simple_demo(INT) RETURNS INT PROPERTIES ( 2 "file"="file:///pathTo/java-udaf.jar", 3 "symbol"="org.apache.doris.udf.SimpleDemo", 4 "always_nullable"="true", 5 "type"="JAVA_UDF" 6);
-
使用 Java-UDAF, 可以分组聚合或者聚合全部结果:
SQL1select simple_demo(id) from test_table group by id; 2+-----------------+ 3| simple_demo(id) | 4+-----------------+ 5| 1 | 6| 6 | 7+-----------------+
SQL1select simple_demo(id) from test_table; 2+-----------------+ 3| simple_demo(id) | 4+-----------------+ 5| 7 | 6+-----------------+
Java-UDTF 实例介绍
注意: UDTF 自 PALO 3.0 版本开始支持
最佳实践
Static 变量加载
当前在 PALO 中,执行一个 UDF 函数,例如 select udf(col) from table
, 每一个并发 Instance 会加载一次 udf.jar 包,在该 Instance 结束时卸载掉 udf.jar 包。
所以当 udf.jar 文件中需要加载一个几百 MB 的文件时,会因为并发的原因,使得占据的内存急剧增大,容易 OOM。 或者想使用一个连接池时,这样无法做到仅在 static 区域初始化一次。
这里提供两个解决方案,其中方案二需要 PALO 版本在 branch-3.0 以上才行。
解决方案 1:
可以将资源加载代码拆分开,单独生成一个 JAR 包文件,然后其他包直接引用该资源 JAR 包。
假设已经将代码拆分为了 DictLibrary 和 FunctionUdf 两个文件。
1public class DictLibrary {
2 private static HashMap<String, String> res = new HashMap<>();
3
4 static {
5 // suppose we built this dictionary from a certain local file.
6 res.put("key1", "value1");
7 res.put("key2", "value2");
8 res.put("key3", "value3");
9 res.put("0", "value4");
10 res.put("1", "value5");
11 res.put("2", "value6");
12 }
13
14 public static String evaluate(String key) {
15 if (key == null) {
16 return null;
17 }
18 return res.get(key);
19 }
20}
1public class FunctionUdf {
2 public String evaluate(String key) {
3 String value = DictLibrary.evaluate(key);
4 return value;
5 }
6}
-
单独编译 DictLibrary 文件,使其生成一个独立的 JAR 包,这样可以得到一个资源文件包 DictLibrary.jar:
Shell1javac ./DictLibrary.java 2jar -cf ./DictLibrary.jar ./DictLibrary.class
-
编译 FunctionUdf 文件,需要引用上一步得到的资源包作为库使用,这样打包后可以得到 UDF 的 FunctionUdf.jar 包。
Shell1javac -cp ./DictLibrary.jar ./FunctionUdf.java 2jar -cvf ./FunctionUdf.jar ./FunctionUdf.class
- 由于想让资源 JAR 包被所有的并发引用,所以想让它被 JVM 直接加载,可以将它放到指定路径
be/custom_lib
下面,BE 服务重启之后就可以随着 JVM 的启动加载进来,因此都会随着服务启动而加载,停止而释放。 -
最后利用
CREATE FUNCTION
语句创建一个 UDF 函数,这样每次卸载仅是 FunctionUdf.jar。SQL1CREATE FUNCTION java_udf_dict(string) RETURNS string PROPERTIES ( 2 "file"="file:///pathTo/FunctionUdf.jar", 3 "symbol"="org.apache.doris.udf.FunctionUdf", 4 "always_nullable"="true", 5 "type"="JAVA_UDF" 6);
解决方案 2:
BE 全局缓存 JAR 包,自定义过期淘汰时间,在 create function 时增加两个属性字段,其中 static_load: 用于定义是否使用静态 cache 加载的方式。
expiration_time: 用于定义 JAR 包的过期时间,单位为分钟。
若使用静态 cache 加载方式,则在第一次调用该 UDF 函数时,在初始化之后会将该 UDF 的实例缓存起来,在下次调用该 UDF 时,首先会在 cache 中进行查找,如果没有找到,则会进行相关初始化操作。
并且后台有线程定期检查,如果在配置的过期淘汰时间内,一直没有被调用过,则会从缓存 cache 中清理掉。如果被调用时,则会自动更新缓存时间点。
1public class Print extends UDF {
2 static Integer val = 0;
3 public Integer evaluate() {
4 val = val + 1;
5 return val;
6 }
7}
1CREATE FUNCTION print_12() RETURNS int
2PROPERTIES (
3 "file" = "file:///path/to/java-udf-demo-jar-with-dependencies.jar",
4 "symbol" = "org.apache.doris.udf.Print",
5 "always_nullable"="true",
6 "type" = "JAVA_UDF",
7 "static_load" = "true", // default value is false
8 "expiration_time" = "60" // default value is 360 minutes
9);
可以看到结果是一直在递增的,证明加载的 JAR 包没有被卸载后又加载,导致重新初始化变量为 0.
1mysql [test_query_qa]>select print_12();
2+------------+
3| print_12() |
4+------------+
5| 1 |
6+------------+
71 row in set (0.40 sec)
8
9mysql [test_query_qa]>select print_12();
10+------------+
11| print_12() |
12+------------+
13| 2 |
14+------------+
151 row in set (0.03 sec)
16
17mysql [test_query_qa]>select print_12();
18+------------+
19| print_12() |
20+------------+
21| 3 |
22+------------+
231 row in set (0.04 sec)