Flink源码

下面我们以一个具体例子来说怎么做SQL语法拓展,比如我们需要支持语法

USE FUNCTION func_name [ WITH (name = '', version = '') ]

1. 创建maven项目

2. 复制模板文件

从Flink源码flink-table/flink-sql-parser/src/main/codegen文件夹copy到自己项目的src/main目录下

3. 拓展SQL语法

主要分为下面三步

3.1 在 codegen/includes/parserImpls.ftl 定义解析use function 的规则,包含了方法名及对应的规则。

/**

*  USE FUNCTION func_name [ WITH (name = '', version = '') ]
*/

SqlCall SqlUseFunction() :

{

     SqlParserPos pos;

     SqlIdentifier funcName;

     SqlNodeList funcProps = null;

}

{

     <USE>

     {

         // Token位置

         pos = getPos();

     }

     <FUNCTION>

         funcName = CompoundIdentifier()

     [ <WITH> funcProps = TableProperties() ]

     {

         return new SqlUseFunction(pos, funcName, funcProps);

     }

}

3.2 拓展 SqlCall

规则匹配成功返回一个SqlUseFunction节点,作为解析树中的SqlNode。

public class SqlUseFunction extends SqlCall {

    private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE FUNCTION",

            SqlKind.OTHER_FUNCTION);

    private final SqlIdentifier funcName;

    private final SqlNodeList funcProps;

    /**

     * SqlUseFunction constructor.
     *
     * @param pos sql define location
     * @param funcName function name
     * @param funcProps function property
     * */
    public SqlUseFunction(SqlParserPos pos, SqlIdentifier funcName, SqlNodeList funcProps) {
        super(pos);
        this.funcName = funcName;
        this.funcProps = funcProps;
    }

    @Override

    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {

        writer.keyword("USE FUNCTION");

        funcName.unparse(writer, leftPrec, rightPrec);

        if (funcProps != null) {

            writer.keyword("WITH");

            SqlWriter.Frame frame = writer.startList("(", ")");

            for (SqlNode c : funcProps) {

                writer.sep(",");

                c.unparse(writer, 0, 0);

            }

            writer.endList(frame);

        }

    }

    @Override

    public SqlOperator getOperator() {

        return OPERATOR;

    }

    @Override

    public List<SqlNode> getOperandList() {

        return ImmutableNullableList.of(funcName, funcProps);

    }

}

3.3 将添加的规则,加入配置文件 codegen/data/Parser.tdd

  • imports增加SqlUseFunction类。
  • statementParserMethods增加定义的规则方法。在Parser中使用新规则。
 imports: [

    "org.apache.flink.sql.extended.SqlUseFunction"

 ]

  # List of methods for parsing custom SQL statements.

  # Return type of method implementation should be 'SqlNode'.

  # Example: SqlShowDatabases(), SqlShowTables().

  statementParserMethods: [

    "SqlUseFunction()"

  ]

  1. 编译undefined执行mvn clean
    compile生成定义的解析类SqlParserImpl.java。重点看生成的SqlUseFunction(),编译成功就可以在sql中使用该语法了。
正文完