联 系 我 们
售前咨询
售后咨询
微信关注:星环科技服务号
更多联系方式 >
6.3.2 基于 Slipstream 实时写入数据
更新时间:7/16/2024, 3:53:57 AM

ArgoDB 可以借助 Slipstream 流计算引擎,提供高吞吐、低延迟的数据实时写入能力

kafka slipstream  holo
图 5.1.1:实时写入流程

Transwarp Slipstream 是星环科技自主研发的企业级、高性能实时流计算引擎,支撑百万级高吞吐、毫秒级低延时业务需求。Slipstream 支持事件驱动和微批处理两种模式,支持exactly-once语义、复杂事件处理(CEP)、规则引擎等功能,支持SQL编程与开发。Slipstream帮助用户快速开发实时数据仓库、实时报表分析、实时智能推荐、实时欺诈检测与风险控制等应用。

本节主要包含以下内容:

实时写入功能操作

管理 Topic

Kafka/Event Store的相关脚本文件存放在在TDH-Client文件夹下。运行创建或查看topic脚本之前,需要执行source ./init.sh 初始化环境变量
创建topic
./kafka-topics.sh  --zookeeper <hostname:port> --create --topic <topic_name> --partitions 3 --replication-factor 1

查看Topic
./kafka-topics.sh  --zookeeper <hostname:port> --describe --topic <topic_name>
复制

管理STREAM

需要在slipstream服务中操作
非安全模式创建Stream
CREATE stream stream_name(column1 datatype1, column2 datatype2, coulumn3 datatype3, ......)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' -- [1]
TBLPROPERTIES("topic"="topic_name",
"kafka.zookeeper"="hostname1:port1", -- [2]
"kafka.broker.list"="hostname2:port2");  --[3]
复制
  • [1]:指定分隔符

  • [2]:hostname1为zookeeper节点的主机名或IP地址,port1为Zookeeper的参数zookeeper.client.port的值

  • [3]:hostname2为brocker指定节点的主机名或IP地址,port2为broker的端口号

安全模式创建Stream:
CREATE stream stream_name(column1 datatype1, column2 datatype2, coulumn3 datatype3, ......)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' -- [1]
TBLPROPERTIES("topic"="topic_name",
"kafka.zookeeper"="hostname1:port1", --[2]
"kafka.broker.list"="hostname2:port2",  --[3]
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.kerberos.service.name"="kafka", --[4]
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/kafka.keytab\"
principal=\"kafka@TDH\""
);
复制
  • [1]:指定分隔符

  • [2]:hostname1为zookeeper节点的主机名或IP地址,port1为Zookeeper的参数zookeeper.client.port的值

  • [3]:hostname2为brocker指定节点的主机名或IP地址,port2为broker的端口号

  • [4]:kafka超级用户

  • 可以通过Klist命令查询

  • 安全模式下的配置与操作方法请查看下文的"流式实时写入示例"

查看当前数据库的 stream
SHOW streams;
复制
查看其他数据库中的所有 stream
SHOW streams from <database_name>;
复制
删除 Stream
DROP stream <stream_name>;
复制

管理 STREAM JOB

需要在 slipstream 服务中操作
创建 Streamjob
CREATE STREAMJOB <streamjob_name> AS("insert into table_name Select * from stream_name")
JOBPROPERTIES("morphling.hdfs.flush.interval.ms"="1000"); --[1]
复制
  • [1]:用于设置入库间隔,单位 ms。间隔越小延迟越短,开销越大

启停 Streamjob
启动Streamjob
START STREAMJOB <streamjob_name>;

暂停Streamjob
STOP STREAMJOB <streamjob_name>;
复制
查询Streamjob
--查询当前数据库中的所有Streamjob
SHOW STREAMJOBS;

--查询当前数据库中所有处于运行中的Streamjob
LIST STREAMJOBS;
复制
删除Streamjob
DROP STREAMJOB <streamjob_name>;
--若此StreamJob处于正在运行的状态时,则需要先停止任务才能对其进行删除。
复制

创建Producer发送数据

非安全模式下,创建producer
./kafka-console-producer.sh --broker-list hostname:port --topic topic_name
--hostname和port是producer节点指定的主机名或IP地址。
复制
安全模式下创建Producer
当TDH-client开启安全时使用Kafka, 需要进入kafka文件夹对configure文件进行手动配置,或者新建配置文件security-client.properties并放置于kafka的bin路径下
在文件中添加下列语句:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

./kafka-console-producer.sh --broker-list hostname:9092 --topic topic_name --producer.config security-client.properties
然后发送数据,数据通过对应的topic以及运行的Streamjob,实现将producer发送的数据实时的写入目标表中。

--安全模式下,运行脚本文件时,需指定配置项
1)kafka-console-producer.sh 指定配置项。例如--producer.config../config/security-client.properties
2)kafka-console-consuemr.sh 指定配置项。例如 --consumer.config ../config/security-client.properties
3)其他脚本如kafka-topic.sh、kafka-consumer-groups.sh 指定配置项。例如 --command-config ../config/security-client.properties
复制
实时写入示例
如何查看并更改服务的安全模式

第一步:通过 Manager 确定服务的安全模式信息

进入 Manager 中,选择服务 kafka,进入安全界面,显示”当前服务已开启Kerberos“,即代表该服务为安全模式。

slipstream holo 1
图 5.1.2:Kafka 开启 Kerberos 安全

查询该服务的安全认证模式,当前为LDAP认证模式。

slipstream holo 2
图 5.1.3:Kafka 安全认证模式
非安全模式下实时写入示例

第一步:进入 TDH-Client\kafka\bin文件夹

cd TDH-Client\kafka\bin
复制

第二步:使用Kafka/Event Store 脚本创建一个 Topic

./kafka-topics.sh --zookeeper localhost:2181 --create --topic topic_orders --partitions 3 --replication-factor 1
复制
  • kafka-topics.sh 用于创建 Topic

第三步:连接Slipstream并创建Stream

CREATE stream stream_slipstream_to_argodb_orders(
trans_id int,
acc_num int,
trans_time date,
trans_type string,
stock_id string,
price decimal,
amount int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES(
"topic"="topic_orders",
"kafka.zookeeper"="localhost:2181",
"kafka.broker.list"="localhost:9092"
);
复制

第四步:通过Slipstream 创建Stream Job

CREATE STREAMJOB streamjob_slipstream_to_argodb_orders AS (
"insert into database_argodb.orders select trans_id , acc_num ,trans_time ,trans_type ,stock_id , price ,amount  from stream_slipstream_to_argodb_orders")
JOBPROPERTIES("morphling.hdfs.flush.interval.ms"="1000");
复制
  • morphling.hdfs.flush.interval.ms入库间隔参数,单位 ms。间隔越小延迟越低。

第五步:通过Slipstream启动Stream job

START streamjob streamjob_slipstream_to_argodb_orders;
复制

第六步:使用Kafka/Event Store 脚本,启动 Kafka/Event Store console producer 发送数据

./kafka-console-producer.sh --broker-list localhost:9092 --topic topic_orders
>943197522,6513065,2014-01-05,b,AA7105670,12.13,200
>929634984,3912384,2014-02-05,b,UA1467891,11.11,300
复制

第七步:通过ArgoDB Quark查看数据入库结果

select trans_id , acc_num ,trans_time ,trans_type ,stock_id , price ,amount from orders;
复制
安全模式下实时写入案例

第一步:进入 TDH-Client\kafka\bin文件夹

cd TDH-Client\kafka\bin
复制

第二步:使用Kafka/Event Store 脚本创建一个 Topic

./kafka-topics.sh  --zookeeper hostname:port --create --topic topic_name --partitions 3 --replication-factor 1
-- hostname的值为ZooKeeper Server节点的主机名或IP地址
-- port为服务zookeeper的端口
复制

第三步:在 ArgoDB Quark 中创建表

Create table table_name(column1 datatype1, column2 datatype2, coulumn3 datatype3, ......) STORED AS HOLODESK;
复制

第四步:连接 Slipstream

beeline -u "jdbc:transwarp2://hostname:port/database_name" -n admin -p admin
复制
  • [1]:port为其参数hive.server2.thrift.port 的值,默认为10010,hostname为Slipstream节点的主机名或IP地址:

第五步:进行 Kafka 安全配置

为了在安全模式下成功创建 Stream 并连接上 kafka 中的 topic,需要上传 kafka.keytab 文件至 Slipstream 的每一个节点上。

此操作需要进入 Guardian Server 中进行下载,并将下载的 kafka.keytab 文件放在 Slipstream 的每一个节点的相同路径下。为支持 Slipstream 访问该文件,推荐的文件路径为:/etc/transwarp/conf/kafka.keytab

具体操作步骤如下:

  1. 进入集群的manager界面,点击右上角的全局服务,打开 Guardian 服务。

    slipstream holo 4
    图 5.1.4:Manager 中打开 Guardian 服务
  2. 在角色中任意选择 Guardian 节点并点击进入,打开 Guardian 网站。由于 ArgoDB 集群的 Guardian 默认采用 Guardian Federation 提供的 OAuth 2.0 认证机制,首次登录时会自动跳转到 Guardian Federation 登录页面(需要在浏览器所在机器中配置好节点 IP 与主机名的映射,也可以手动将主机名更改成 IP 地址)

    slipstream holo 5
    图 5.1.5:登录 Guardian Server
  3. 登录之后,点击上侧菜单栏中的【租户】,然后点击右上角的【下载keytab】, 在弹出框中输入用户 kafka,点击确定进行下载。

    slipstream holo 6
    图 5.1.6:下载 kafka.keytab 文件
  4. 下载之后需要将该 keytab 文件上传至集群的每一个节点中,为保证 Slipstream 能够成功读取该文件,推荐将其路径为:

    /etc/transwarp/conf/kafka.keytab
    复制
  5. 之后在文件路径下运行命令,查看 principal 参数的具体值:

    klist -k kafka.keytab
    复制

第六步:通过 Slipstream 建立 stream

CREATE stream stream_name(column1 datatype1, column2 datatype2, coulumn3 datatype3, ......)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' --数据输入时用”,“作为分隔符
TBLPROPERTIES("topic"="topic_name", "kafka.zookeeper"="hostname1:port1",
"kafka.broker.list"="hostname2:9092",
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.kerberos.service.name"="kafka", --kafka为超级用户
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/kafka.keytab\" principal=\"kafka@TDH\""  --Principle参数的值
);
复制

第七步:通过 Slipstream 创建并启动 Stream Job

  1. 创建 Stream Job 将 stream 中的全部数据实时写入表格中:

    CREATE STREAMJOB streamjob_name AS
    
    ("insert into table_name
    
    Select * from stream_name")
    
    JOBPROPERTIES("morphling.hdfs.flush.interval.ms"="1000"); --[1]
    复制
    • [1]:morphling.hdfs.flush.interval.ms入库间隔参数,单位 ms。间隔越小延迟越低。

  2. 启动Stream Job,数据开始实时入库:

    START streamjob streamjob_name;
    复制
  3. 列出正在运行的 StreamJob,检查 job 是否启动成功:

    LIST STREAMJOBS;
    复制
  4. 需要关闭 job 时,运行 Stop streamjob 停止:

    STOP streamjob streamjob_name;
    复制

第八步:使用 Kafka/Event Store 脚本,启动 Kafka/Event Store console producer 发送数据

  1. 当 Kafka 开启安全时 ,需要进入 kafka 客户端对 configure 文件进行手动配置,或者新建配置文件 security-client.properties,并在其中添加下列语句:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
    sasl.kerberos.service.name=kafka
    复制
  2. 然后按下列规则运行 bin 中的操作文件,其中 ../config 表示文件 security-client.properties 被放置的路径:

    kafka-console-producer.sh 使用 --producer.config ../config/security-client.properties 进行指定;
    
    kafka-console-consuemr.sh 使用 --consumer.config ../config/security-client.properties 进行指定;
    
    其他脚本如kafka-topic.sh、kafka-consumer-groups.sh 一般都使用 --command-config ../config/security-client.properties 进行指定。
    复制

例如:此时 security-client.properties 放在 bin 中,hostname 是 kafka 节点的主机名或 IP 地址:

./kafka-console-producer.sh --broker-list hostname:9092 --topic topic_name --producer.config security-client.properties

# 手动输入数据,回车提交:

>200001,beige azure white purple navajo,Manufacturer#5,Brand#51,ECONOMY BURNISHED COPPER,31,JUMBO BAG,1101,uickly regular accoun

>200002,beige azure white purple navajo,Manufacturer#5,Brand#51,ECONOMY BURNISHED COPPER,33,JUMBO BAG,1101,wwooowww
复制