
在数据分析和处理的快速演变中,企业正逐渐依赖于多样化的数据处理工具来满足其业务需求。Apache Flink作为一个广泛采用的流处理框架,已经在许多企业中建立了坚实的地位,用于构建实时的数据流分析和应用。
对于已经拥有成熟 Flink 系统的企业来说,完全迁移至 ArgoDB来 实现所有数据处理任务会带来较大的成本。为此,我们提供 flink-argodb-connector 插件,通过支持 Flink 直接访问 ArgoDB 实现流式数据直接写入 TDDMS 功能。
功能说明
通过 flink-argodb-connector 配置 ArgoDB 连接和认证信息,支持 Flink 通过 Datastream API 直接流式写入 ArgoDB 存储层。同样,也支持使用 Flink SQL 命令读写 ArgoDB 数据库。

功能边界
-
支持 Flink 对接 ArgoDB 通过 Datastream API 实现流式写入、同时也支持通过 FlinkSQL 命令进行批量写入
-
支持 Flink 查询 ArgoDB 中的表数据,但暂不支持流式读取。
-
支持 Flink 动态分区插入 Holodesk 表,但暂不支持根据分区列自动创建分区的功能,需提前在 ArgoDB 中创建分区
-
支持的数据格式:TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL(30, 17), STRING, VARCHAR(11), BOOLEAN, TIMESTAMP, DATE
环境条件
-
JDK 版本需高于 jdk-1.8.161,如果低于该版本,会导致 Flink 任务 Checkpoint 卡住
-
目前仅 ArgoDB 5.2 及以上版本支持对接 Flink 功能
-
支持 Flink 1.13、1.14、1.15、1.16、1.17、1.19 版本通过 Flink Connector 访问 ArgoDB,但查询功能目前仅支持 flink 1.17
-
Flink 任务必须开 checkpoint
对接方式
下面向您介绍使用 Flink 对接 ArgoDB 的具体步骤:
-
首先确保您已经成功安装了 Flink 和 jdk-1.8.161,可进入 Flink 官网自行下载
-
联系星环技术人员获取对应版本的 flink-argodb-connector 驱动包。包含一个 common 包和 Flink 对应版本的 Jar 包,如果您使用的 Flink 为 1.13 或 1.14 版本,则还需额外下载对应的 JDBC 驱动包:
图 11. ArgoDB 6.0 提供的 connector 驱动包 -
将 connetor 驱动包拷贝至 Flink 安装路径的 lib 目录下
cp argodb-flink-1.15-8.37.3.jar flink-1.15.4/lib cp argodb-flink-common-8.37.3.jar flink-1.15.4/lib
复制 -
配置 Flink 配置文件 flink-conf.yml
# 修改slot数: 并行度不够任务是起不来的 taskmanager.numberOfTaskSlots: 20 # checkpoint配置默认关闭, 需要配置以驱动数据入Argo execution.checkpointing.interval: 5000 state.checkpoints.num-retained: 20 execution.checkpointing.mode: EXACTLY_ONCE execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION state.backend: filesystem state.checkpoints.dir: file:///tmp/chk state.savepoints.dir: file:///tmp/chk # 远程 Debug 排查问题 env.java.opts.jobmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006" env.java.opts.taskmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" # webui火焰图 rest.flamegraph.enabled: true # 远程访问webui rest.bind-address: 0.0.0.0
复制 -
修改 Flink 配置文件 log4j.properties:
inceptor.root.logger=DEBUG,RFA log4j.rootLogger=DEBUG,RFA log4j.appender.console=io.transwarp.org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=io.transwarp.org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n log4j.appender.RFA=io.transwarp.org.apache.log4j.RollingFileAppender log4j.appender.RFA.File=${log.file} log4j.appender.RFA.MaxFileSize=10MB log4j.appender.RFA.MaxBackupIndex=1024 log4j.appender.RFA.layout=io.transwarp.org.apache.log4j.PatternLayout log4j.appender.RFA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
复制 -
修改以上配置后需重启 Flink。
-
您可以选择启动 Flink standalone 模式,进行 Flink 功能的简单测试:
# 启动 Flink 集群: ./bin/start-cluster.sh # 停止 Flink 集群: ./bin/stop-cluster.sh # 进入 Flink SQL 客户端 ./bin/sql-client.sh
复制 -
启动 Flink on Yarn 集群模式,并执行一个流处理示例:
# 需要在 TDH-Client 目录下先执行 source init.sh cd /root/TDH-Client/ source ./init.sh y y # 添加 HADOOP 环境变量 export HADOOP_CLASSPATH=`hadoop classpath` export HADOOP_CONF_DIR=/etc/yarn1/conf/ # 提交一个 Flink 应用到集群 ./bin/flink run-application -t yarn-application \ -D taskmanager.numberOfTaskSlots=2 \ -D execution.checkpointing.interval=1000 \ -D state.backend=filesystem \ -D state.checkpoint-storage=filesystem \ -D state.checkpoints.dir=hdfs://nameservice1/user/root/flink-checkpoints/1 \ examples/streaming/StateMachineExample.jar --kafka-topic test --brokers stream32:9092
复制-
-D taskmanager.numberOfTaskSlots=2: 设置 Flink 每个 TaskManager 的任务槽(TaskSlot)数量。任务槽是 Flink 中的资源分配单位。
-
-D execution.checkpointing.interval=1000: 设置执行检查点的间隔时间,单位是毫秒。
-
-D state.backend=filesystem: 指定了状态后端为文件系统。状态后端负责管理应用程序的状态。
-
-D state.checkpoint-storage=filesystem: 指定了检查点的存储方式为文件系统。
-
-D state.checkpoints.dir=hdfs://nameservice1/user/root/flink-checkpoints/1: 这个参数设置了检查点存储的目录。这里指定的是 HDFS 的路径,nameservice1 是 HDFS 的名称服务,/user/root/flink-checkpoints/1 是存储检查点的具体路径。
-
examples/streaming/StateMachineExample.jar: 这是 Flink 提供的示例应用程序的 JAR 文件路径。StateMachineExample 是 Flink 的官方示例之一,用于演示状态机的概念。
-
--kafka-topic test --brokers stream32:9092: --kafka-topic 指定了 Kafka 的主题名称,这里是 test。--brokers 指定了 Kafka 的 broker 地址,这里是 stream32:9092。
-
参数说明
下面向您介绍 Flink 对接 ArgoDB 时的参数说明,具体参数的使用请参考读写示例章节
datastream API 参数 | Flink SQL 参数 | 说明 | 取值示例 |
---|---|---|---|
url |
metastore.url |
配置 Quark metastore 的连接, 用来获取 Quark 中的 table 信息 |
jdbc:hive2://localhost:10000/default |
masterGroup |
master.group |
TDDMS MasterGroup 连接地址, 推荐写 hostname |
tdh01:9630,tdh02:9630,tdh03:9630 |
tableName |
table.name |
在 Quark 中创建表时定义的表名,需要带上数据库名, 库名和表名用点分割 |
default.flink_holo |
tmpDirectory |
dir |
Flink中写 ArgoDB 的临时文件地址,会在 flush 后删除 |
/tmp |
useExternalAddress |
use.external.address |
如果 Flink 客户端节点不支持配置 host,则需要将此参数设置成 true |
true |
ldapUser |
ldap.user |
LDAP 认证时的 username |
user |
ldapPassword |
ldap.password |
LDAP 认证时的 passwrod |
passwd |
None |
sink.parallelism |
ArgoDB Sink 任务并行度 |
1 |
读写示例
通过 API 写入 ArgoDB
下面向您介绍通过对接 datastream API 实现流式数据写入 ArgoDB 的代码示例:
import io.transwarp.connector.argodb.ArgoDBSinkConfig; import io.transwarp.connector.argodb.serde.ArgoDBRecordSerializationSchema; import io.transwarp.connector.argodb.serde.ArgoDBSimpleStringSerializer; import io.transwarp.connector.argodb.streaming.HolodeskSinkLevelWriterFunction; import io.transwarp.connector.argodb.table.ArgoDBSink; import io.transwarp.connector.argodb.table.ArgodbSinkBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class ArgoDBSinkExample { public static void main(String[] args) { String metastoreUrl = System.getProperty("metastore.url"); String tableName = System.getProperty("table.name"); String shivaMasterGroup = System.getProperty("shiva.mastergroup"); String kafkaTopic = System.getProperty("kafka.topic"); String kafkaServer = System.getProperty("kafka.bootstrap.server"); String kafkaGroupName = System.getProperty("kafka.group.name"); String para = System.getProperty("sink.parallelism"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(10000); // 高级选项: // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 允许两个连续的 checkpoint 错误 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 env.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 开启实验性的 unaligned checkpoints env.getCheckpointConfig().enableUnalignedCheckpoints(); env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink115/checkpoints"); // 设置检查点存储路径 KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(kafkaServer) .setTopics(kafkaTopic) .setGroupId(kafkaGroupName) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(30); DataStream<String[]> dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(30) .map( new RichMapFunction<String, String[]>() { @Override public String[] map(String s) throws Exception { return StringUtils.split(s, ",", -1); } } ).setParallelism(30); ArgoDBSinkConfig flinkArgoDBSinkConfig = ArgoDBSinkConfig.builder() .masterGroup(shivaMasterGroup) .tableName(tableName) .tmpDirectory("/tmp") .url(metastoreUrl) .enableShiva2(true) .useExternalAddress(false) .build(); ArgoDBSimpleStringSerializer stringRecordSerializationSchema = new ArgoDBSimpleStringSerializer(); HolodeskSinkLevelWriterFunction<String[]> argoSink = new HolodeskSinkLevelWriterFunction<>(flinkArgoDBSinkConfig, stringRecordSerializationSchema); dataStream.addSink(argoSink).setParallelism(Integer.parseInt(para)); try { env.execute("argodb-sink"); } catch (Exception e) { e.printStackTrace(); } } }
复制
通过 Flink SQL 读写
通过 Flink SQL 流式写入 ArgoDB
-
Flink 中创建源表,并关联 kafka topic
CREATE TABLE KafkaTable4 ( `user` string, message string, num decimal(22,2) ) WITH ( 'connector' = 'kafka', 'topic' = 'flink_input_rowkey555', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'newgroupid510', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' );
复制若 Kafka 服务开启安全模式,请参考如下示例创建源表
CREATE TABLE KafkaTableA ( id int, name int ) WITH ( 'connector' = 'kafka', 'topic' = 'flink11', 'properties.bootstrap.servers' = 'gxb13:9092,gxb14:9092,gxb15:9092', 'properties.group.id' = 'newgroupid510', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.jaas.config' = 'com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab="/etc/yarn1/conf/kafka.keytab" principal="kafka@TDH";', 'properties.sasl.kerberos.service.name'= 'kafka' );
复制 -
Flink 中创建 Holodesk 目标表视图
create table flink_input_rowkey4 ( `user` string, message string, num decimal(22,2) ) WITH ( 'connector' = 'argodb', 'master.group' = '172.18.120.32:29630,172.18.120.33:29630,172.18.120.34:29630', 'table.name' = 'default.flink_input_rowkey_col3', 'shiva2.enable' = 'true', 'compression' = 'snappy', 'use.external.address' = 'true', 'metastore.url' = 'jdbc:hive2://172.18.120.33:10010/default' );
复制 -
流入库任务
insert into flink_input_rowkey4 select * from KafkaTable4;
复制
通过 Flink SQL 批量写入 ArgoDB
-
在 Flink 中创建 Holodesk 目标表视图
set sql-client.execution.result-mode=TABLEAU; create table flink_input_rowkey4 ( `user` string, message string, num decimal(22,2) ) WITH ( 'connector' = 'argodb', 'master.group' = '172.18.120.32:29630,172.18.120.33:29630,172.18.120.34:29630', 'table.name' = 'default.flink_input_rowkey_col3', 'shiva2.enable' = 'true', 'compression' = 'snappy', 'use.external.address' = 'true', 'sink.parallelism' = '5', 'metastore.url' = 'jdbc:hive2://172.18.120.33:10010/default' );
复制 -
Flink 执行批入库任务
insert into flink_input_rowkey4 valus ('1','1',11.11);
复制
通过 Flink SQL 查询 ArgoDB
目前只支持 Flink 1.17 版本通过 SQL 命令的方式查询 ArgoDB 表数据 |
-
首先连接 ArgoDB 的 Quark 服务,创建 Holdesk 表
create table table_output_holo_bucket ( c0 tinyint, c1 smallint, c2 int, c3 bigint, c4 float, c5 double, c6 decimal(30, 17), c7 string, c8 varchar(11), c9 boolean, c10 timestamp ) stored as holodesk;
复制 -
然后在 Flink 1.17 中创建该 Holodesk 表视图,并查询
SET sql-client.execution.result-mode=TABLEAU; SET 'parallelism.default' = '2'; -- 创建视图 create table flink_input_rowkey4 ( c0 tinyint, c1 smallint, c2 int, c3 bigint, c4 float, c5 double,c6 decimal(30, 17), c7 string, c8 varchar(11),c9 boolean, c10 timestamp ) WITH ( 'connector' = 'argodb', 'master.group' = '172.18.120.33:39630,172.18.120.32:39630,172.18.120.31:39630', 'table.name' = 'default.table_output_holo_bucket', 'shiva2.enable' = 'true', 'metastore.url' = 'jdbc:hive2://172.18.120.32:10000/default' ); -- 查询视图关联的 Holodesk 表 select c0,c1,c2, c4, c5, c6, c7,c8, c9, c10 from flink_input_rowkey4 limit 2
复制