博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink Table的ScalarFunction
阅读量:5904 次
发布时间:2019-06-19

本文共 11545 字,大约阅读时间需要 38 分钟。

本文主要研究一下flink Table的ScalarFunction

实例

public class HashCode extends ScalarFunction {    private int factor = 0;    @Override    public void open(FunctionContext context) throws Exception {        // access "hashcode_factor" parameter        // "12" would be the default value if parameter does not exist        factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12"));     }    public int eval(String s) {        return s.hashCode() * factor;    }}ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// set job parameterConfiguration conf = new Configuration();conf.setString("hashcode_factor", "31");env.getConfig().setGlobalJobParameters(conf);// register the functiontableEnv.registerFunction("hashCode", new HashCode());// use the function in Java Table APImyTable.select("string, string.hashCode(), hashCode(string)");// use the function in SQLtableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
  • HashCode继承了ScalarFunction,它定义了eval方法

ScalarFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala

abstract class ScalarFunction extends UserDefinedFunction {  /**    * Creates a call to a [[ScalarFunction]] in Scala Table API.    *    * @param params actual parameters of function    * @return [[Expression]] in form of a [[ScalarFunctionCall]]    */  final def apply(params: Expression*): Expression = {    ScalarFunctionCall(this, params)  }  // ----------------------------------------------------------------------------------------------  /**    * Returns the result type of the evaluation method with a given signature.    *    * This method needs to be overridden in case Flink's type extraction facilities are not    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation    * method. Flink's type extraction facilities can handle basic types or    * simple POJOs but might be wrong for more complex, custom, or composite types.    *    * @param signature signature of the method the return type needs to be determined    * @return [[TypeInformation]] of result type or null if Flink should determine the type    */  def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null  /**    * Returns [[TypeInformation]] about the operands of the evaluation method with a given    * signature.    *    * In order to perform operand type inference in SQL (especially when NULL is used) it might be    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.    * By default Flink's type extraction facilities are used for this but might be wrong for    * more complex, custom, or composite types.    *    * @param signature signature of the method the operand types need to be determined    * @return [[TypeInformation]] of operand types    */  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {    signature.map { c =>      try {        TypeExtractor.getForClass(c)      } catch {        case ite: InvalidTypesException =>          throw new ValidationException(            s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +            s"automatically determined. Please provide type information manually.")      }    }  }}
  • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP

CRowProcessRunner

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowProcessRunner.scala

class CRowProcessRunner(    name: String,    code: String,    @transient var returnType: TypeInformation[CRow])  extends ProcessFunction[CRow, CRow]  with ResultTypeQueryable[CRow]  with Compiler[ProcessFunction[Row, Row]]  with Logging {  private var function: ProcessFunction[Row, Row] = _  private var cRowWrapper: CRowWrappingCollector = _  override def open(parameters: Configuration): Unit = {    LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)    LOG.debug("Instantiating ProcessFunction.")    function = clazz.newInstance()    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)    FunctionUtils.openFunction(function, parameters)    this.cRowWrapper = new CRowWrappingCollector()  }  override def processElement(      in: CRow,      ctx: ProcessFunction[CRow, CRow]#Context,      out: Collector[CRow])    : Unit = {    cRowWrapper.out = out    cRowWrapper.setChange(in.change)    function.processElement(      in.row,      ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],      cRowWrapper)  }  override def getProducedType: TypeInformation[CRow] = returnType  override def close(): Unit = {    FunctionUtils.closeFunction(function)  }}
  • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成

ProcessFunction

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java

@PublicEvolvingpublic abstract class ProcessFunction
extends AbstractRichFunction { private static final long serialVersionUID = 1L; /** * Process one element from the input stream. * *

This function can output zero or more elements using the {@link Collector} parameter * and also update internal state or set timers using the {@link Context} parameter. * * @param value The input value. * @param ctx A {@link Context} that allows querying the timestamp of the element and getting * a {@link TimerService} for registering timers and querying the time. The * context is only valid during the invocation of this method, do not store it. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ public abstract void processElement(I value, Context ctx, Collector

out) throws Exception; /** * Called when a timer set using {@link TimerService} fires. * * @param timestamp The timestamp of the firing timer. * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer, * querying the {@link TimeDomain} of the firing timer and getting a * {@link TimerService} for registering timers and querying the time. * The context is only valid during the invocation of this method, do not store it. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ public void onTimer(long timestamp, OnTimerContext ctx, Collector
out) throws Exception {} /** * Information available in an invocation of {@link #processElement(Object, Context, Collector)} * or {@link #onTimer(long, OnTimerContext, Collector)}. */ public abstract class Context { /** * Timestamp of the element currently being processed or timestamp of a firing timer. * *

This might be {@code null}, for example if the time characteristic of your program * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. */ public abstract Long timestamp(); /** * A {@link TimerService} for querying time and registering timers. */ public abstract TimerService timerService(); /** * Emits a record to the side output identified by the {@link OutputTag}. * * @param outputTag the {@code OutputTag} that identifies the side output to emit to. * @param value The record to emit. */ public abstract

void output(OutputTag
outputTag, X value); } /** * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}. */ public abstract class OnTimerContext extends Context { /** * The {@link TimeDomain} of the firing timer. */ public abstract TimeDomain timeDomain(); }}

  • ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement

DataStreamCalc

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala

class DataStreamCalc(    cluster: RelOptCluster,    traitSet: RelTraitSet,    input: RelNode,    inputSchema: RowSchema,    schema: RowSchema,    calcProgram: RexProgram,    ruleDescription: String)  extends Calc(cluster, traitSet, input, calcProgram)  with CommonCalc  with DataStreamRel {  //......  override def translateToPlan(      tableEnv: StreamTableEnvironment,      queryConfig: StreamQueryConfig): DataStream[CRow] = {    val config = tableEnv.getConfig    val inputDataStream =      getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)    // materialize time attributes in condition    val condition = if (calcProgram.getCondition != null) {      val materializedCondition = RelTimeIndicatorConverter.convertExpression(        calcProgram.expandLocalRef(calcProgram.getCondition),        inputSchema.relDataType,        cluster.getRexBuilder)      Some(materializedCondition)    } else {      None    }    // filter out time attributes    val projection = calcProgram.getProjectList.asScala      .map(calcProgram.expandLocalRef)    val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo)    val genFunction = generateFunction(      generator,      ruleDescription,      inputSchema,      schema,      projection,      condition,      config,      classOf[ProcessFunction[CRow, CRow]])    val inputParallelism = inputDataStream.getParallelism    val processFunc = new CRowProcessRunner(      genFunction.name,      genFunction.code,      CRowTypeInfo(schema.typeInfo))    inputDataStream      .process(processFunc)      .name(calcOpName(calcProgram, getExpressionString))      // keep parallelism to ensure order of accumulate and retract messages      .setParallelism(inputParallelism)  }}
  • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

小结

  • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
  • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成;ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement
  • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

doc

转载地址:http://nkupx.baihongyu.com/

你可能感兴趣的文章
实现c协程
查看>>
ASP.NET视频教程 手把手教你做企业论坛网站 视频教程
查看>>
[LeetCode] Meeting Rooms II
查看>>
从Swift学习iOS开发的路线指引
查看>>
Scribes:小型文本编辑器,支持远程编辑
查看>>
ssh 安装笔记
查看>>
3-继承
查看>>
海归千千万 为何再无钱学森
查看>>
vue2.0 仿手机新闻站(六)详情页制作
查看>>
JSP----九大内置对象
查看>>
Java中HashMap详解
查看>>
delphi基本语法
查看>>
沙盒目录介绍
查看>>
260. Single Number III
查看>>
Hadoop生态圈-Kafka的完全分布式部署
查看>>
[MODx] Build a CMP (Custom manager page) using MIGX in MODX 2.3 -- 1
查看>>
jQuery自动完成点击html元素
查看>>
[算法]基于分区最近点算法的二维平面
查看>>
webpack多页应用架构系列(七):开发环境、生产环境傻傻分不清楚?
查看>>
笨办法学C 练习1:启用编译器
查看>>