
ArgoDB 可以借助 Slipstream 流计算引擎,提供高吞吐、低延迟的数据实时写入能力

Transwarp Slipstream 是星环科技自主研发的企业级、高性能实时流计算引擎,支撑百万级高吞吐、毫秒级低延时业务需求。Slipstream 支持事件驱动和微批处理两种模式,支持exactly-once语义、复杂事件处理(CEP)、规则引擎等功能,支持SQL编程与开发。Slipstream帮助用户快速开发实时数据仓库、实时报表分析、实时智能推荐、实时欺诈检测与风险控制等应用。
本节主要包含以下内容:
-
使用 Kafka\Event Store、Slipstream 实现实时写入的相关功能操作
-
创建 Topic 如何在数据源 Kafka/Event Store 中创建topic
-
创建 Stream 如何在 Slipstream 中创建stream
-
创建 Streamjob 如何在 Slipstream 中创建、查看、启动以及停止sreamjob
-
创建Producer并发送数据 如何在数据源Kafka/Event Store中创建producer并发送数据
-
-
实时写入示例
-
非安全模式下实时写入示例 非安全模式下,流式实时写入示例
-
安全模式下实时写入示例 安全模式下,流式实时写入示例
-
实时写入功能操作
管理 Topic
Kafka/Event Store的相关脚本文件存放在在TDH-Client文件夹下。运行创建或查看topic脚本之前,需要执行source ./init.sh 初始化环境变量 |
创建topic ./kafka-topics.sh --zookeeper <hostname:port> --create --topic <topic_name> --partitions 3 --replication-factor 1 查看Topic ./kafka-topics.sh --zookeeper <hostname:port> --describe --topic <topic_name>
复制
管理STREAM
需要在slipstream服务中操作 |
CREATE stream stream_name(column1 datatype1, column2 datatype2, coulumn3 datatype3, ......) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' -- [1] TBLPROPERTIES("topic"="topic_name", "kafka.zookeeper"="hostname1:port1", -- [2] "kafka.broker.list"="hostname2:port2"); --[3]
复制
-
[1]:指定分隔符
-
[2]:hostname1为zookeeper节点的主机名或IP地址,port1为Zookeeper的参数zookeeper.client.port的值
-
[3]:hostname2为brocker指定节点的主机名或IP地址,port2为broker的端口号
CREATE stream stream_name(column1 datatype1, column2 datatype2, coulumn3 datatype3, ......) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' -- [1] TBLPROPERTIES("topic"="topic_name", "kafka.zookeeper"="hostname1:port1", --[2] "kafka.broker.list"="hostname2:port2", --[3] "transwarp.consumer.security.protocol"="SASL_PLAINTEXT", "transwarp.consumer.sasl.kerberos.service.name"="kafka", --[4] "transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/kafka.keytab\" principal=\"kafka@TDH\"" );
复制
-
[1]:指定分隔符
-
[2]:hostname1为zookeeper节点的主机名或IP地址,port1为Zookeeper的参数zookeeper.client.port的值
-
[3]:hostname2为brocker指定节点的主机名或IP地址,port2为broker的端口号
-
[4]:kafka超级用户
-
可以通过Klist命令查询
-
安全模式下的配置与操作方法请查看下文的"流式实时写入示例"
SHOW streams;复制
SHOW streams from <database_name>;复制
DROP stream <stream_name>;复制
管理 STREAM JOB
需要在 slipstream 服务中操作 |
CREATE STREAMJOB <streamjob_name> AS("insert into table_name Select * from stream_name") JOBPROPERTIES("morphling.hdfs.flush.interval.ms"="1000"); --[1]
复制
-
[1]:用于设置入库间隔,单位 ms。间隔越小延迟越短,开销越大
启动Streamjob START STREAMJOB <streamjob_name>; 暂停Streamjob STOP STREAMJOB <streamjob_name>;复制
--查询当前数据库中的所有Streamjob SHOW STREAMJOBS; --查询当前数据库中所有处于运行中的Streamjob LIST STREAMJOBS;复制
DROP STREAMJOB <streamjob_name>; --若此StreamJob处于正在运行的状态时,则需要先停止任务才能对其进行删除。复制
创建Producer发送数据
./kafka-console-producer.sh --broker-list hostname:port --topic topic_name --hostname和port是producer节点指定的主机名或IP地址。复制
当TDH-client开启安全时使用Kafka, 需要进入kafka文件夹对configure文件进行手动配置,或者新建配置文件security-client.properties并放置于kafka的bin路径下 在文件中添加下列语句: security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka ./kafka-console-producer.sh --broker-list hostname:9092 --topic topic_name --producer.config security-client.properties 然后发送数据,数据通过对应的topic以及运行的Streamjob,实现将producer发送的数据实时的写入目标表中。 --安全模式下,运行脚本文件时,需指定配置项 1)kafka-console-producer.sh 指定配置项。例如--producer.config../config/security-client.properties 2)kafka-console-consuemr.sh 指定配置项。例如 --consumer.config ../config/security-client.properties 3)其他脚本如kafka-topic.sh、kafka-consumer-groups.sh 指定配置项。例如 --command-config ../config/security-client.properties复制
实时写入示例
第一步:通过 Manager 确定服务的安全模式信息
进入 Manager 中,选择服务 kafka,进入安全界面,显示”当前服务已开启Kerberos“,即代表该服务为安全模式。

查询该服务的安全认证模式,当前为LDAP认证模式。

非安全模式下实时写入示例
第一步:进入 TDH-Client\kafka\bin文件夹
cd TDH-Client\kafka\bin复制
第二步:使用Kafka/Event Store 脚本创建一个 Topic
./kafka-topics.sh --zookeeper localhost:2181 --create --topic topic_orders --partitions 3 --replication-factor 1
复制
-
kafka-topics.sh 用于创建 Topic
第三步:连接Slipstream并创建Stream
CREATE stream stream_slipstream_to_argodb_orders( trans_id int, acc_num int, trans_time date, trans_type string, stock_id string, price decimal, amount int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' TBLPROPERTIES( "topic"="topic_orders", "kafka.zookeeper"="localhost:2181", "kafka.broker.list"="localhost:9092" );
复制
-
连接 Slipstream 方式,请参考 Slipstream 使用手册
第四步:通过Slipstream 创建Stream Job
CREATE STREAMJOB streamjob_slipstream_to_argodb_orders AS ( "insert into database_argodb.orders select trans_id , acc_num ,trans_time ,trans_type ,stock_id , price ,amount from stream_slipstream_to_argodb_orders") JOBPROPERTIES("morphling.hdfs.flush.interval.ms"="1000");
复制
-
morphling.hdfs.flush.interval.ms入库间隔参数,单位 ms。间隔越小延迟越低。
第五步:通过Slipstream启动Stream job
START streamjob streamjob_slipstream_to_argodb_orders;
复制
第六步:使用Kafka/Event Store 脚本,启动 Kafka/Event Store console producer 发送数据
./kafka-console-producer.sh --broker-list localhost:9092 --topic topic_orders >943197522,6513065,2014-01-05,b,AA7105670,12.13,200 >929634984,3912384,2014-02-05,b,UA1467891,11.11,300
复制
第七步:通过ArgoDB Quark查看数据入库结果
select trans_id , acc_num ,trans_time ,trans_type ,stock_id , price ,amount from orders;
复制
安全模式下实时写入案例
第一步:进入 TDH-Client\kafka\bin文件夹
cd TDH-Client\kafka\bin复制
第二步:使用Kafka/Event Store 脚本创建一个 Topic
./kafka-topics.sh --zookeeper hostname:port --create --topic topic_name --partitions 3 --replication-factor 1 -- hostname的值为ZooKeeper Server节点的主机名或IP地址 -- port为服务zookeeper的端口复制
第三步:在 ArgoDB Quark 中创建表
Create table table_name(column1 datatype1, column2 datatype2, coulumn3 datatype3, ......) STORED AS HOLODESK;复制
第四步:连接 Slipstream
beeline -u "jdbc:transwarp2://hostname:port/database_name" -n admin -p admin复制
-
[1]:port为其参数hive.server2.thrift.port 的值,默认为10010,hostname为Slipstream节点的主机名或IP地址:
第五步:进行 Kafka 安全配置
为了在安全模式下成功创建 Stream 并连接上 kafka 中的 topic,需要上传 kafka.keytab 文件至 Slipstream 的每一个节点上。
此操作需要进入 Guardian Server 中进行下载,并将下载的 kafka.keytab 文件放在 Slipstream 的每一个节点的相同路径下。为支持 Slipstream 访问该文件,推荐的文件路径为:/etc/transwarp/conf/kafka.keytab
。
具体操作步骤如下:
-
进入集群的manager界面,点击右上角的全局服务,打开 Guardian 服务。
图 5.1.4:Manager 中打开 Guardian 服务 -
在角色中任意选择 Guardian 节点并点击进入,打开 Guardian 网站。由于 ArgoDB 集群的 Guardian 默认采用 Guardian Federation 提供的 OAuth 2.0 认证机制,首次登录时会自动跳转到 Guardian Federation 登录页面(需要在浏览器所在机器中配置好节点 IP 与主机名的映射,也可以手动将主机名更改成 IP 地址)
图 5.1.5:登录 Guardian Server -
登录之后,点击上侧菜单栏中的【租户】,然后点击右上角的【下载keytab】, 在弹出框中输入用户 kafka,点击确定进行下载。
图 5.1.6:下载 kafka.keytab 文件 -
下载之后需要将该 keytab 文件上传至集群的每一个节点中,为保证 Slipstream 能够成功读取该文件,推荐将其路径为:
/etc/transwarp/conf/kafka.keytab
复制 -
之后在文件路径下运行命令,查看 principal 参数的具体值:
klist -k kafka.keytab
复制
第六步:通过 Slipstream 建立 stream
CREATE stream stream_name(column1 datatype1, column2 datatype2, coulumn3 datatype3, ......) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' --数据输入时用”,“作为分隔符 TBLPROPERTIES("topic"="topic_name", "kafka.zookeeper"="hostname1:port1", "kafka.broker.list"="hostname2:9092", "transwarp.consumer.security.protocol"="SASL_PLAINTEXT", "transwarp.consumer.sasl.kerberos.service.name"="kafka", --kafka为超级用户 "transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/kafka.keytab\" principal=\"kafka@TDH\"" --Principle参数的值 );
复制
第七步:通过 Slipstream 创建并启动 Stream Job
-
创建 Stream Job 将 stream 中的全部数据实时写入表格中:
CREATE STREAMJOB streamjob_name AS ("insert into table_name Select * from stream_name") JOBPROPERTIES("morphling.hdfs.flush.interval.ms"="1000"); --[1]
复制-
[1]:morphling.hdfs.flush.interval.ms入库间隔参数,单位 ms。间隔越小延迟越低。
-
-
启动Stream Job,数据开始实时入库:
START streamjob streamjob_name;
复制 -
列出正在运行的 StreamJob,检查 job 是否启动成功:
LIST STREAMJOBS;
复制 -
需要关闭 job 时,运行 Stop streamjob 停止:
STOP streamjob streamjob_name;
复制
第八步:使用 Kafka/Event Store 脚本,启动 Kafka/Event Store console producer 发送数据
-
当 Kafka 开启安全时 ,需要进入 kafka 客户端对 configure 文件进行手动配置,或者新建配置文件 security-client.properties,并在其中添加下列语句:
security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka
复制 -
然后按下列规则运行 bin 中的操作文件,其中
../config
表示文件 security-client.properties 被放置的路径:kafka-console-producer.sh 使用 --producer.config ../config/security-client.properties 进行指定; kafka-console-consuemr.sh 使用 --consumer.config ../config/security-client.properties 进行指定; 其他脚本如kafka-topic.sh、kafka-consumer-groups.sh 一般都使用 --command-config ../config/security-client.properties 进行指定。
复制
例如:此时 security-client.properties 放在 bin 中,hostname 是 kafka 节点的主机名或 IP 地址:
./kafka-console-producer.sh --broker-list hostname:9092 --topic topic_name --producer.config security-client.properties # 手动输入数据,回车提交: >200001,beige azure white purple navajo,Manufacturer#5,Brand#51,ECONOMY BURNISHED COPPER,31,JUMBO BAG,1101,uickly regular accoun >200002,beige azure white purple navajo,Manufacturer#5,Brand#51,ECONOMY BURNISHED COPPER,33,JUMBO BAG,1101,wwooowww
复制