数据传输工具 —— Kafka Connect

1、什么是 kafka connect?

  Kafka Connect 是一种用于在 kafka 和其他系统之间可扩展、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出 kafka 的连接器变得简单。

  Kafka Connect 可以获取整个数据库或从应用程序服务器收集指标到 kafka 主题,使数据可用于低延迟的流处理。

  导出作业可以将数据从 kafka topic 传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。


2、功能

  • kafka connector 通用框架,提供统一的集成 API
  • 同时支持分布式模式和单机模式
  • 自动化的 offset 管理,开发人员不必担心错误处理的影响
  • rest 接口,用来查看和管理 kafka connectors


3、概念

Connectors:通过管理任务来处理数据流的高级抽象
Tasks:数据写入 kafka 和从 kafka 读出的实现
Workers:运行 connectors 和 tasks 的进程
Converters:kafka connect 和其他存储系统直接发送和接收数据之间转换数据

  Connector 决定了数据要从哪里复制过来以及数据应该写到哪里去,一个 connector 实例是一个需要负责在 kafka 和其他系统之间复制数据的逻辑作业,connector plugin 是 jar 文件,实现了 kafka 定义的一些接口来完成特定的任务。

  Task 是 kafka connect 数据模型的主角,每一个 connector 都会协调一系列的 task 去执行任务,connector 可以把一项工作分割成许多的 task,然后再把 task 分发到各个 worker 中去执行(分布式模式下),task 不自己保存自己的状态信息,而是交给特定的 kafka 主题去保存(config.storage.topicstatus.storage.topic)。在分布式模式下有一个概念叫做任务再平衡(Task Rebalancing),当一个 connector 第一次提交到集群时,所有的 worker 都会做一个 task rebalancing 从而保证每一个 worker 都运行了差不多数量的工作,而不是所有的工作压力都集中在某个 worker 进程中,而当每个进程挂了之后也会执行 task rebalance。

  Connectors 和 Tasks 都是逻辑工作单位,必须安排在进程中执行,而在 kafka connect 中,这些进程就是 workers,分别有两种 worker:standalone、distributed。生产中 distributed worker 表现很棒,因为它提供了可扩展性以及自动容错的功能,可以用一个 group.id 来启动很多 worker 进程,在有效的 worker 进程中它们会自动地去协调执行 connector 和 task,如果新加或者挂了一个 worker,其他的 worker 会检测到然后再重新分配 connector 和 task。

  Converter 会把 bytes 数据转换为 kafka connect 内部的格式,也可以把 kafka connect 内部存储格式的数据变成 bytes,converter 对 connector 来说是解耦的,所以其他的 connector 都可以重用。例如使用了 avro converter,那么 jdbc connector 可以写 avro 格式的数据到 kafka,同时 hfds connector 也可以从 kafka 中读出 avro 格式的数据。




4、实战

  启动 confluent

cd /app/confluent/bin
./confluent local start

  使用 standalone 模式启动

# 启动 kafka connect
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
connector1.properties [connector2.properties]

  在 $CONFLUENT_HOME/etc/kafka 下存在很多配置文件

  其中 connect-standalone.properties 是启动 connect 服务组件自身的配置,内容如下:

# kafka 服务
bootstrap.servers=localhost:9092

# 转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# 是否启用转换器
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# 偏移量存储文件名
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

# 插件路径
plugin.path=/usr/share/java,/app/confluent/share/confluent-hub-components

# 默认端口为8083,需要修改端口时启动以下配置
# rest.port=8084


(1)标准 connect

启动一个带 FileSource 的 Connect

  connect-file-source.properties 是一个 source connect 的模板配置,启用该配置就能够从指定文件中复制数据到 kafka 中,其默认的配置如下:

# connect 的名字
name=local-file-source
# 将文件读取到数据流中
connector.class=FileStreamSource
# 工作线程是 1 个
tasks.max=1
# 读取的文件名为 test.txt
file=test.txt
# 复制到的主题为 connect-test
topic=connect-test

  启动 connect

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties

  结果报错 Java 内存不足

  关闭虚拟机,加大内存,重启服务器和 confluent,再次启动 connect,报错 8083 端口已被绑定

  修改 connect-standalone.properties 配置中的端口为 8084 再启动,新的报错:不存在 source 配置文件中的指定的文件,在启动路径下创建文件,日志恢复正常

echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt

  可以通过 kafka tools 看到新增了主题 connect-test,写入了3条数据

  往文件中写入数据,会报告又成功提交一次偏移量

# 写数据
/app/confluent# echo -e "foo1\nbar1\n" >> test.txt

# 日志
INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
...

  然后可以看到主题中多了3条数据


启动带 FileSource 和 FileSink 的 Connect

  connect-file-sink.properties 是一个 source connect 的模板配置,启用该配置就能够从指定文件中复制数据到 kafka 中,其默认的配置如下:

# connect 的名字
name=local-file-sink
# 从数据流中读取数据到文件中
connector.class=FileStreamSink
# 工作线程是 1 个
tasks.max=1
# 写入的文件是 test.sink.txt
file=test.sink.txt
# 读取数据的主题是 connect-test
topics=connect-test

  启动 connect

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

  可以看到自动创建了 test.sink.txt 文件

  同时可以看到 consumer 中多了一个 connect-local-file-sink ,偏移量为6(即已将6条数据都 sink 到了文件中)



(2)REST API

  使用 Rest API 必须启动分布式模式,通过 Rest API 可以管理集群中的 connect 服务,默认端口是 8083。

GET /connectors - 返回所有正在运行的connector名。
POST /connectors - 新建一个connector;请求体必须是json格式并且需要包含name字段和config字段,name是connectors的名字,config是json格式,必须包含connector的配置信息。
GET /connectors/{name} - 获取指定connector的信息。
GET /connectors/{name}/config - 获取指定connector的配置信息。

  在分布式模式下,有两种方式来配置 connector,第一种是类似 standalone 模式一样,写好配置文件,然后在启动时指定

$CONFLUENT_HOME/bin/connect-distributed \
  $CONFLUENT_HOME/etc/kafka/connect-distributed.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

  另外一种方式更加灵活,就是直接通过 Rest API 来对 connector 配置进行增删查。

  查看 connectors

  添加 connectors

  查看某个 connector

  这里指定的文件是相对路径,所以要在 $CONFLUENT_HOME/bin 路径下创建一个 test-distributed.txt 文件

cd $CONFLUENT_HOME/bin
echo -e "foo\nbar\n" > test-distributed.txt

  可以看到出现了 connect-distributed 主题

  添加 sink

  从服务器可以看到产生了 sink 文件

  删除 connector

  再次往 test-distributed.txt 文件中追加数据,可以看到 connect-distributed 主题中的数据增加了,source connector 依然在工作,但是 sink connector 已经停止了,所以 test-distributed.sink.txt 文件中数据不再从主题中复制。

【注意】

  如果要在脚本中处理,发起HTTP请求,可以使用 curl 工具,将请求的配置在 json 文件中,如:

curl -d @$CONFLUENT_HOME/connect-file-sink.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors


创建带有 Convert 的 connector

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-transformation.txt",
        "topic": "connect-transformation",
        "transforms": "MakeMap,InsertSource",
        "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.MakeMap.field": "line",
        "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertSource.static.field": "data_source",
        "transforms.InsertSource.static.value": "test-file-source"
    }
}

  添加 connector(由于跟上述实验 name 一致,所以需要先删除或者换个 name)

  创建 test-transformation.txt 文件,可以看到自动创建了 connect-transformation 主题

  添加 sink

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test-transformation.sink.txt",
        "topics": "connect-transformation"
    }
}

  可以看到 sink 自动生成了 test-transformation.sink.txt 文件,并且内容不是 source 过来的原始数据,而是经过 convertor 处理后的带格式的数据


(3)MySQL Source、ESSink

  演示将数据从 MySQL 复制到 kafka 中,再通过 kafka 将数据下沉到 ElasticSearch。这里 MySQL 是数据源,所以需要支持 MySQL 的 source connector,ES 是目标数据系统,所以需要支持 ES 的 sink connectors,可以从 https://www.confluent.io/hub/ 下载。

MySQL

  MySQL 下载插件搜索关键字 "JDBC",可以看到提供了在线安装的脚本和离线安装的包下载。

  MySQL 环境准备

# 安装 MySQL
sudo apt-get install mysql-server

# 安装 Confluent 插件
confluent-hub install confluentinc/kafka-connect-jdbc:10.4.1

# 将 MySQL 驱动上传到 confluent 目录
# mv mysql.jar /app/confluent/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib

【注意】下载下来的 jdbc connector 插件,在处理 mysql 时需要相应的驱动,而插件不带驱动,实际采集数据时会报错,这时需要将驱动 jar 包拷贝到插件库目录中。

  数据准备,创建用户并授权,用该用户创建数据库、表和插入数据

grant all on *.* to hyh@'localhost' identified by 'hyh';
create database studentsDB;
use studentsDB;
create table students (rollno int primary key auto_increment, name varchar(30), marks varchar(30));
insert into students (name, marks) values ('James', 35);

  创建 source 配置文件(connect-mysql-source.properties),内容如下:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/studentsDB?user=hyh&password=hyh
mode=incrementing
# 表中的自增列字段
incrementing.column.name=rollno
# 表会被采集到的 topic 名前缀,比如表名叫 students,对应的 topic 就为 test-mysql-jdbc-students
topic.prefix=test-mysql-jdbc-

  启动 mysql source connector

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties

  可以看到启动之后,开启了 JDBC source task,然后执行了查询的 SQL,最后提交和刷新的偏移量

  与此同时,可以看到 kafka 中新增了一个 topic test-mysql-jdbc-students

  里面有一条数据,如果此时往表中再插入两条数据,可以看到数据变成了3条



ElasticSearch

  ES 下载插件搜索关键字 "ElasticSearch" ,可以看到有 ElasticSearch Sink Connector、ElasticSearch Source Connector,注意有些插件是支持 source、sink,有些是分开两个插件。

  ES 环境准备

tar -zxvf elasticsearch-7.6.0-linux-x86_64.tar.gz -C /app
mv /app/elasticsearch-7.6.0 /app/elasticsearch

# 配置环境变量
export ES_HOME=/app/elasticsearch
export PATH=${ES_HOME}/bin:$PATH

# 安装 Confluent 插件
confluent-hub install confluentinc/kafka-connect-elasticsearch:13.0.0

  启动 ES

cd /app/elasticsearch
.bin/elasticsearch

  报错不能以root用户启动

  创建用户用户组es,并修改 es 安装目录所属用户和组

chown -R es:es elasticsearch/

  再次启动看到以下日志即正常

  配置 sink 配置文件(connect-es-sink.properties),内容如下:

name=test-sink-elastic
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
connection.url=http://localhost:9200
topics=test-mysql-jdbc-students 
key.ignore=true
type.name=kafka-connect

  启动 ES sink connector

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-es-sink.properties

  访问 es 9092 端口查询数据,可以查到有三条数据

# 查询命令
curl -H "Content-Type: application/json" -X GET http://localhost:9200/test-mysql-jdbc-students/_search

# 查到的结果
{
    "took": 121,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 3,
            "relation": "eq"
        },
        "max_score": 1,
        "hits": [
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+0",
                "_score": 1,
                "_source": {
                    "rollno": 1,
                    "name": "James",
                    "marks": "35"
                }
            },
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+1",
                "_score": 1,
                "_source": {
                    "rollno": 2,
                    "name": "James2",
                    "marks": "36"
                }
            },
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+2",
                "_score": 1,
                "_source": {
                    "rollno": 3,
                    "name": "James3",
                    "marks": "37"
                }
            }
        ]
    }
}

  往数据库插入一条新的数据

insert into students (name, marks) values ('James4', 38);

  可以看到 es 侧接收到了这条数据

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 228,936评论 6 535
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 98,744评论 3 421
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 176,879评论 0 381
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,181评论 1 315
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 71,935评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,325评论 1 324
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,384评论 3 443
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,534评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,084评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 40,892评论 3 356
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,067评论 1 371
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,623评论 5 362
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,322评论 3 347
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,735评论 0 27
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 35,990评论 1 289
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,800评论 3 395
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,084评论 2 375

推荐阅读更多精彩内容