时态表

TemporalTable

关于时态表的介绍可以看看flink中文社区的这篇文章Flink SQL 如何实现数据流的 Join?还有该篇博文Flink Table & SQL 时态表Temporal Table
append表(追加表)关联时态表数据,进行流join操作(时态表可以减少时态表中保存的状态)

import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.TemporalTableFunction

object FlinkTemporalTable {
  def main(args: Array[String]): Unit = {
    // 获取流处理执行坏境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 通过流处理执行引擎构建表执行引擎
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    val props = new Properties()
    props.setProperty("bootstrap.servers", "xxx.xxx.xxx.xxx:9092")
    props.setProperty("auto.offset.reset", "latest") // 设置消费起点 earliest,latest
    props.setProperty("group.id", "local_consumer")

    val foTopic = "order"
    val fcdv2Topic = "deal"

    // 获取到原始数据
    val foOriginalStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](foTopic, new SimpleStringSchema(), props))
    // {"order_id":"order_01","order_no":"0001","date_create":"1584497993601","date_update":"1584497993601"}
    val foStream: DataStream[(String, String, Long, Long)] = foOriginalStream
      // 解析原始数据(json格式)
      .map { json =>
        val jsonObj: JSONObject = JSON.parseObject(json)
        (jsonObj.getString("order_id"), jsonObj.getString("order_no"), jsonObj.getLongValue("date_create"), jsonObj.getLongValue("date_update"))
      }
      .assignAscendingTimestamps(tp => tp._4) // 设置时间水位线

    // {"car_deal_id":"car_01","sale_order_no":"0001","car_attribute_id":"A0001","vin":"a1000","date_create":"1584497993601","date_update":"1584497993601"}
    val fcdv2OriginalStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](fcdv2Topic, new SimpleStringSchema(), props))
    val fcdv2Stream: DataStream[(String, String, String, String, Long, Long)] = fcdv2OriginalStream
      .map { json =>
        val jsonObj: JSONObject = JSON.parseObject(json)
        (jsonObj.getString("car_deal_id"), jsonObj.getString("sale_order_no"), jsonObj.getString("car_attribute_id"), jsonObj.getString("vin"), jsonObj.getLongValue("date_create"), jsonObj.getLongValue("date_update"))
      }
      .assignAscendingTimestamps(tp => tp._6) // 设置时间水位线

    // 注册成表
    tableEnv.registerDataStream("fo", foStream, 'order_id, 'order_no, 'date_create, 'date_update, 'foRowtime.rowtime)
    tableEnv.registerDataStream("fcdv2", fcdv2Stream, 'car_deal_id, 'sale_order_no, 'car_attribute_id, 'vin, 'date_create, 'date_update, 'fcdv2Rowtime.rowtime)

    // 设置Temporal Table的时间属性和主键
    val fcdv2TemporalFunction: TemporalTableFunction = tableEnv.scan("fcdv2").createTemporalTableFunction("fcdv2Rowtime", "sale_order_no");
    //注册TableFunction
    tableEnv.registerFunction("FCDV2_TEMPORAL_FUNCTION", fcdv2TemporalFunction)

    // 运行SQL
    val sql =
      """
        |SELECT fo.`order_no`            AS `order_no`
        |     , fo.`date_create`         AS `fo_date_create`
        |     , fo.`date_update`         AS `fo_date_update`
        |     , fcdv2.`car_attribute_id` AS `fcdv2_car_attribute_id`
        |     , fcdv2.`vin`              AS `fcdv2_vin`
        |     , fcdv2.`date_create`      AS `fcdv2_date_create`
        |     , fcdv2.`date_update`      AS `fcdv2_date_update`
        |FROM fo
        |    , LATERAL TABLE(FCDV2_TEMPORAL_FUNCTION(fo.foRowtime)) as fcdv2
        |WHERE fo.order_no = fcdv2.sale_order_no
        |""".stripMargin


    val table = tableEnv.sqlQuery(sql)

    tableEnv.getConfig.setIdleStateRetentionTime(Time.minutes(1L), Time.minutes(7L)) // 结合使用状态清理
    tableEnv.toAppendStream[(String, Long, Long, String, String, Long, Long)](table) // 添加流
      .print()

    //6、开始执行
    tableEnv.execute(FlinkTemporalTable.getClass.getSimpleName)
  }
}

input

-- order
{"order_id":"order_01","order_no":"0001","date_create":"1584497993601","date_update":"1584497993601"}
{"order_id":"order_02","order_no":"0002","date_create":"1584497994602","date_update":"1584497994602"}
{"order_id":"order_03","order_no":"0003","date_create":"1584497995603","date_update":"1584497995603"}
{"order_id":"order_04","order_no":"0004","date_create":"1584497997604","date_update":"1584497997604"}
{"order_id":"order_05","order_no":"0005","date_create":"1584497998605","date_update":"1584497998605"}
{"order_id":"order_06","order_no":"0006","date_create":"1584497998606","date_update":"1584497998606"}
{"order_id":"order_07","order_no":"0007","date_create":"1584497998607","date_update":"1584497999899"}

-- deal
{"car_deal_id":"car_01","sale_order_no":"0001","car_attribute_id":"A0001","vin":"a1000","date_create":"1584497993601","date_update":"1584497993601"}
{"car_deal_id":"car_02","sale_order_no":"0002","car_attribute_id":"A0002","vin":"b1010","date_create":"1584497994602","date_update":"1584497994602"}
{"car_deal_id":"car_03","sale_order_no":"0003","car_attribute_id":"A0003","vin":"c1011","date_create":"1584497995603","date_update":"1584497995603"}
{"car_deal_id":"car_04","sale_order_no":"0004","car_attribute_id":"A0004","vin":"d1110","date_create":"1584497997604","date_update":"1584497997604"}
{"car_deal_id":"car_05","sale_order_no":"0005","car_attribute_id":"A0005","vin":"e1111","date_create":"1584497998605","date_update":"1584497998605"}
{"car_deal_id":"car_05","sale_order_no":"0005","car_attribute_id":"A0005","vin":"f000","date_create":"1584497998605","date_update":"1584497999788"}
{"car_deal_id":"car_04","sale_order_no":"0004","car_attribute_id":"A0004","vin":"d0001","date_create":"1584497997604","date_update":"1584497999799"}

output

注意:此处使用的时间语义为event_time,也就是wartmark来驱动表的join。且append表只能够join上时间戳小于等于自己的时态表数据

temporal_table.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 229,460评论 6 538
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 99,067评论 3 423
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 177,467评论 0 382
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,468评论 1 316
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 72,184评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,582评论 1 325
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,616评论 3 444
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,794评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,343评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 41,096评论 3 356
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,291评论 1 371
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,863评论 5 362
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,513评论 3 348
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,941评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,190评论 1 291
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 52,026评论 3 396
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,253评论 2 375

推荐阅读更多精彩内容