Kafka Loader用户手册
Last updated
Last updated
Kafka是一款目前十分流行的“发布-订阅”消息中间件系统; 通过Kafka, 系统可以向用户提供一整套实时的、可容错的分布式管道服务。TigerGraph的Kafka加载器能够让用户非常便捷地与现有的Kafka集群整合,从而加快数据的实时分析。同时,用户还能够通过使用一系列Kafka生态系统中的插件轻松实现架构扩展。
Kafka加载器会读取并处理Kafka集群中的数据,并将结果导入TigerGraph系统中。
简单来说,Kafka加载器的工作方式为: 用户通过GSQL命令告诉TigerGraph系统要做什么,然后TigerGraph系统从外部的Kafka集群中将数据导入它的RestPP服务器中。下图简要地展示了Kafka加载器的基本架构:
Kafka集群已经正确配置并正常工作。
用户需要准备以下两个Kafka集群的配置文件,并将文件复制到TigerGraph系统规定的目录中:
Kafka配置文件: 这个文件包含了外部Kafka服务器(broker)的域名和端口号。TigerGraph系统通过该配置文找到并连接Kafka集群。 具体配置步骤请参考 第一步: 定义数据源.
Kafka话题(topic)与分区(Partition)配置文件: 该文件包含了Kafka的话题和分区列表, 并包含加载数据的起始偏移量。 具体配置步骤请参考 第二步: 创建加载作业.
使用Kafka加载器的三个主要步骤为:
Kafka加载器所使用的GSQL语法与现有的GSQL加载语法相同。
CREATE DATA_SOURCE
在创建Kafka数据源的加载作业前,用户首先要定义Kafka的服务器地址。 用户可以通过CREATE DATA_SOURCE语句定义一个data_source变量:
Kafka数据源配置文件
在数据源创建之后,用户需要使用SET命令配置针对该数据源的配置文件:
在执行过程中,该SET命令会读取并检查该配置文件。如果没有问题,则会将该配置文件整合到TigerGraph的字典中。 数据源配置文件为JSON格式对象,包含了Kafka服务器的全局配置信息,即数据源的IP地址和端口号。 下面为一个非常简单的kafka.conf文件的例子:
“broker”参数是必须的,而"kafka_config"参数(提供一些额外的配置信息,详情请参考Kafka的官方文档)则为可选值。 用户可以通过一系列键值对的形式定义"kafka_config"参数。 例如:
用户也可以将CREATE DATA_SOURCE语句和SET语句合二为一:
如果用户的TigerGraph为集群架构,则上述配置文件必须保存在m1节点(即同时拥有GSQL server和GSQL client的节点)上,且必须为JSON格式。 如果配置文件中使用了相对路径, 则相对路径的起始点必须是GSQL客户端的工作目录。
每次更改过配置文件的内容后,用户都必须执行"SET data_source_name"命令来更新字典中的数据源信息。
Kafka加载器支持TigerGraph的多图模式。 在多图模式的情况下, 数据源可以是私有的,也可以是全局的。
全局数据源只能由超级管理员用户创建, 且必须由超级管理员分配到每个图中。
普通管理员只能创建仅适用于本图的私有数据源。 该数据源不能分配给其他的图使用。
下面的例子列出了几种允许的DATA_SOURCE操作的示例:
1.一名超级管理员可以创建全局数据源,并且不将其分配给任何图:
2. 一名超级管理员可以向(从)一张或多张图中赋予(收回)一个数据源:
3. 一名普通管理员可以为他管理的图创建私有数据源:
在上面的例子中,私有数据源kl只能被test_graph访问。
一个data_source变量可以由拥有权限的用户删除; 例如,全局数据源只能被超级管理员删除, 而私有数据源则即可以被该图的普通管理员删除,也可以被超级管理员删除。 DROP语句的语法格式如下:
下面列出的创建或删除数据源命令均为合法命令:
拥有权限的用户可以使用SHOW DATA_SOURCE语句读取所有现有的数据源信息:
Kafka加载器使用与标准GSQL加载作业相同的 创建加载作业语法。 用户需要使用DEFINE FILENAME语句声明一个FILENAME变量, 用于让加载器找到Kafka的配置文件。
同时, 用户也可以选择在RUN LOADING JOB语句中使用USING方法来设定一个新的文件地址。 RUN语句中的文件地址将覆盖之前在CREATE LOADING JOB中配置的文件地址。
下例展示了用于Kafka加载器的DEFINE FILENAME的语法。 在该语句中, $DATA_SOURCE_NAME参数表示Kafka的数据源名称以及一条指向配置文件的路径地址, 该配置文件包含了kafka集群的话题及分区信息。
示例: 加载一个Kafka数据源 k1 , 话题及分区信息配置文件地址为: "~/topic_partition1.conf":
Kafka Topic-Partition 配置文件
Kafka Topic-Partition配置文件用于告诉TigerGraph如何阅读Kafka中的数据。 与之前的数据源配置文件类似,该话题-分区配置文件也是以JSON格式保存的。 下面是一个例子:
"topic"参数值是必须的。 "partition_list"参数值则是可选的, 该可选参数可用于声明需要读取的话题及分区,以及开始点的偏移量。 如果"partition_list"值为空,则表示该话题下的所有分区都会被加载。 默认的起始点偏移量为 “-1”, 表示加载作业从最新的消息开始(例如从该话题的末尾开始加载)。 如果用户希望从话题的起始行开始加载, 则"start_offset"的参数值应该设为“-2”。
用户可以通过修改Kafka Topic-Partition文件的"default_start_offset"参数值来修改默认的偏移量。 例如:
除了直接配置参数文件地址之外, 用户也可以选择直接将Topic-Partition 配置信息编辑成字符串, 例如:
Kafka加载器使用与标准GSQL执行加载作业相同的执行加载作业语法。 用户可以向每个文件名变量输入一个形似“DATA_SOURCE Var:topic_partition configure”一样的字符串, 用于覆盖加载作业中早先定义的文件名。 在下面的例子中, f1使用的是在CREATE LOADING JOB中赋予的值, 而f3和f4则使用在RUN语句中赋予的值。
每次执行RUN LOADING JOB命令只能导入同一种类型的数据源。 也就是说,你不能在一次加载作业中同时导入源自Kafka集群的数据和源自普通数据文件中的数据。
一次加载作业中的所有文件名变量必须指向同一个DATA_SOURCE变量。
Kafka加载器有两种加载模式: 流模式(Streaming Mode)和EOF模式。 默认模式为流模式。 在流模式中, 加载器的加载动作只有在作业停止后才会停止。 而在EOF模式中, 加载器在完成读取所有的当前的Kafka消息后便会停止。
如果需要将模式修改为EOF模式,需要在RUN LOADING JOB语句后添加一个参数:
管理Kafka的加载作业与管理普通加载作业的方法相同。三个主要的管理命令为:
SHOW LOADING STATUS
ABORT LOADING JOB
RESUME LOADING JOB
例如: SHOW LOADING STATUS命令的语法如下:
如果想单独读取某个作业的情况, 用户需要在RUN LOADING JOB语句中添加job_id参数。 对于针对单独作业的SHOW命令来说, 下面的信息会显示出来:
对于每一个分区, 当前已加载的偏移量
平均加载速度
加载数据量
加载持续时间
详情请参考: 监控与管理加载作业。
下面将演示一个通过Kafka加载器导入数据的案例: