联 系 我 们
售前咨询
售后咨询
微信关注:星环科技服务号
更多联系方式 >
6.5.3 基于 Flink 读写 ArgoDB
更新时间:11/6/2024, 11:06:52 AM

在数据分析和处理的快速演变中,企业正逐渐依赖于多样化的数据处理工具来满足其业务需求。Apache Flink作为一个广泛采用的流处理框架,已经在许多企业中建立了坚实的地位,用于构建实时的数据流分析和应用。

对于已经拥有成熟 Flink 系统的企业来说,完全迁移至 ArgoDB来 实现所有数据处理任务会带来较大的成本。为此,我们提供 flink-argodb-connector 插件,通过支持 Flink 直接访问 ArgoDB 实现流式数据直接写入 TDDMS 功能。

功能说明

通过 flink-argodb-connector 配置 ArgoDB 连接和认证信息,支持 Flink 通过 Datastream API 直接流式写入 ArgoDB 存储层。同样,也支持使用 Flink SQL 命令读写 ArgoDB 数据库。

flink insert argodb

功能边界

  • 支持 Flink 对接 ArgoDB 通过 Datastream API 实现流式写入、同时也支持通过 FlinkSQL 命令进行批量写入

  • 支持 Flink 查询 ArgoDB 中的表数据,但暂不支持流式读取。

  • 支持 Flink 动态分区插入 Holodesk 表,但暂不支持根据分区列自动创建分区的功能,需提前在 ArgoDB 中创建分区

  • 支持的数据格式:TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL(30, 17), STRING, VARCHAR(11), BOOLEAN, TIMESTAMP, DATE

环境条件

  • JDK 版本需高于 jdk-1.8.161,如果低于该版本,会导致 Flink 任务 Checkpoint 卡住

  • 目前仅 ArgoDB 5.2 及以上版本支持对接 Flink 功能

  • 支持 Flink 1.13、1.14、1.15、1.16、1.17、1.19 版本通过 Flink Connector 访问 ArgoDB,但查询功能目前仅支持 flink 1.17

  • Flink 任务必须开 checkpoint

对接方式

下面向您介绍使用 Flink 对接 ArgoDB 的具体步骤:

  1. 首先确保您已经成功安装了 Flink 和 jdk-1.8.161,可进入 Flink 官网自行下载

  2. 联系星环技术人员获取对应版本的 flink-argodb-connector 驱动包。包含一个 common 包和 Flink 对应版本的 Jar 包,如果您使用的 Flink 为 1.13 或 1.14 版本,则还需额外下载对应的 JDBC 驱动包:

    flink argodb connector jars
    图 11. ArgoDB 6.0 提供的 connector 驱动包
  3. 将 connetor 驱动包拷贝至 Flink 安装路径的 lib 目录下

    cp argodb-flink-1.15-8.37.3.jar  flink-1.15.4/lib
    cp argodb-flink-common-8.37.3.jar  flink-1.15.4/lib
    复制
  4. 配置 Flink 配置文件 flink-conf.yml

    # 修改slot数: 并行度不够任务是起不来的
    taskmanager.numberOfTaskSlots: 20
    
    
    # checkpoint配置默认关闭, 需要配置以驱动数据入Argo
    execution.checkpointing.interval: 5000
    state.checkpoints.num-retained: 20
    execution.checkpointing.mode: EXACTLY_ONCE
    execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
    state.backend: filesystem
    state.checkpoints.dir: file:///tmp/chk
    state.savepoints.dir: file:///tmp/chk
    
    # 远程 Debug 排查问题
    env.java.opts.jobmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
    env.java.opts.taskmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
    
    
    # webui火焰图
    rest.flamegraph.enabled: true
    
    # 远程访问webui
    rest.bind-address: 0.0.0.0
    复制
  5. 修改 Flink 配置文件 log4j.properties:

    inceptor.root.logger=DEBUG,RFA
    log4j.rootLogger=DEBUG,RFA
    log4j.appender.console=io.transwarp.org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=io.transwarp.org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
    log4j.appender.RFA=io.transwarp.org.apache.log4j.RollingFileAppender
    log4j.appender.RFA.File=${log.file}
    log4j.appender.RFA.MaxFileSize=10MB
    log4j.appender.RFA.MaxBackupIndex=1024
    log4j.appender.RFA.layout=io.transwarp.org.apache.log4j.PatternLayout
    log4j.appender.RFA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
    复制
  6. 修改以上配置后需重启 Flink。

  7. 您可以选择启动 Flink standalone 模式,进行 Flink 功能的简单测试:

    # 启动 Flink 集群:
    ./bin/start-cluster.sh
    # 停止 Flink 集群:
    ./bin/stop-cluster.sh
    # 进入 Flink SQL 客户端
    ./bin/sql-client.sh
    复制
  8. 启动 Flink on Yarn 集群模式,并执行一个流处理示例:

    # 需要在 TDH-Client 目录下先执行 source init.sh
    cd /root/TDH-Client/
    source ./init.sh y y
    
    # 添加 HADOOP 环境变量
    export HADOOP_CLASSPATH=`hadoop classpath`
    export HADOOP_CONF_DIR=/etc/yarn1/conf/
    
    # 提交一个 Flink 应用到集群
    ./bin/flink run-application -t yarn-application  \
    -D taskmanager.numberOfTaskSlots=2 \
    -D execution.checkpointing.interval=1000 \
    -D state.backend=filesystem \
    -D state.checkpoint-storage=filesystem \
    -D state.checkpoints.dir=hdfs://nameservice1/user/root/flink-checkpoints/1 \
    examples/streaming/StateMachineExample.jar --kafka-topic test --brokers stream32:9092
    复制
    • -D taskmanager.numberOfTaskSlots=2: 设置 Flink 每个 TaskManager 的任务槽(TaskSlot)数量。任务槽是 Flink 中的资源分配单位。

    • -D execution.checkpointing.interval=1000: 设置执行检查点的间隔时间,单位是毫秒。

    • -D state.backend=filesystem: 指定了状态后端为文件系统。状态后端负责管理应用程序的状态。

    • -D state.checkpoint-storage=filesystem: 指定了检查点的存储方式为文件系统。

    • -D state.checkpoints.dir=hdfs://nameservice1/user/root/flink-checkpoints/1: 这个参数设置了检查点存储的目录。这里指定的是 HDFS 的路径,nameservice1 是 HDFS 的名称服务,/user/root/flink-checkpoints/1 是存储检查点的具体路径。

    • examples/streaming/StateMachineExample.jar: 这是 Flink 提供的示例应用程序的 JAR 文件路径。StateMachineExample 是 Flink 的官方示例之一,用于演示状态机的概念。

    • --kafka-topic test --brokers stream32:9092: --kafka-topic 指定了 Kafka 的主题名称,这里是 test。--brokers 指定了 Kafka 的 broker 地址,这里是 stream32:9092。

参数说明

下面向您介绍 Flink 对接 ArgoDB 时的参数说明,具体参数的使用请参考读写示例章节

datastream API 参数 Flink SQL 参数 说明 取值示例

url

metastore.url

配置 Quark metastore 的连接, 用来获取 Quark 中的 table 信息

jdbc:hive2://localhost:10000/default

masterGroup

master.group

TDDMS MasterGroup 连接地址, 推荐写 hostname

tdh01:9630,tdh02:9630,tdh03:9630

tableName

table.name

在 Quark 中创建表时定义的表名,需要带上数据库名, 库名和表名用点分割

default.flink_holo

tmpDirectory

dir

Flink中写 ArgoDB 的临时文件地址,会在 flush 后删除

/tmp

useExternalAddress

use.external.address

如果 Flink 客户端节点不支持配置 host,则需要将此参数设置成 true

true

ldapUser

ldap.user

LDAP 认证时的 username

user

ldapPassword

ldap.password

LDAP 认证时的 passwrod

passwd

None

sink.parallelism

ArgoDB Sink 任务并行度

1

通过 API 写入 ArgoDB

下面向您介绍通过对接 datastream API 实现流式数据写入 ArgoDB 的代码示例:

import io.transwarp.connector.argodb.ArgoDBSinkConfig;
import io.transwarp.connector.argodb.serde.ArgoDBRecordSerializationSchema;
import io.transwarp.connector.argodb.serde.ArgoDBSimpleStringSerializer;
import io.transwarp.connector.argodb.streaming.HolodeskSinkLevelWriterFunction;
import io.transwarp.connector.argodb.table.ArgoDBSink;
import io.transwarp.connector.argodb.table.ArgodbSinkBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class ArgoDBSinkExample {

    public static void main(String[] args) {

        String metastoreUrl = System.getProperty("metastore.url");
        String tableName = System.getProperty("table.name");
        String shivaMasterGroup = System.getProperty("shiva.mastergroup");
        String kafkaTopic = System.getProperty("kafka.topic");
        String kafkaServer = System.getProperty("kafka.bootstrap.server");
        String kafkaGroupName = System.getProperty("kafka.group.name");
        String para = System.getProperty("sink.parallelism");


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 1000ms 开始一次 checkpoint
        env.enableCheckpointing(10000);

// 高级选项:

// 设置模式为精确一次 (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 确认 checkpoints 之间的时间会进行 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// Checkpoint 必须在一分钟内完成,否则就会被抛弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);

// 允许两个连续的 checkpoint 错误
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);

// 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 开启实验性的 unaligned checkpoints
        env.getCheckpointConfig().enableUnalignedCheckpoints();


        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink115/checkpoints");  // 设置检查点存储路径

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(kafkaServer)
                .setTopics(kafkaTopic)
                .setGroupId(kafkaGroupName)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(30);

        DataStream<String[]> dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(30)
                .map(
                        new RichMapFunction<String, String[]>() {
                            @Override
                            public String[] map(String s) throws Exception {
                                return StringUtils.split(s, ",", -1);
                            }
                        }
                ).setParallelism(30);


        ArgoDBSinkConfig flinkArgoDBSinkConfig = ArgoDBSinkConfig.builder()
                .masterGroup(shivaMasterGroup)
                .tableName(tableName)
                .tmpDirectory("/tmp")
                .url(metastoreUrl)
                .enableShiva2(true)
                .useExternalAddress(false)
                .build();
        ArgoDBSimpleStringSerializer stringRecordSerializationSchema = new ArgoDBSimpleStringSerializer();

        HolodeskSinkLevelWriterFunction<String[]> argoSink = new HolodeskSinkLevelWriterFunction<>(flinkArgoDBSinkConfig, stringRecordSerializationSchema);

        dataStream.addSink(argoSink).setParallelism(Integer.parseInt(para));

        try {
            env.execute("argodb-sink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制

通过 Flink SQL 流式写入 ArgoDB

  1. Flink 中创建源表,并关联 kafka topic

    CREATE TABLE KafkaTable4 (
       `user` string,
        message string,
        num decimal(22,2)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'flink_input_rowkey555',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'newgroupid510',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    复制

    若 Kafka 服务开启安全模式,请参考如下示例创建源表

    CREATE TABLE KafkaTableA (
       id int,
       name int
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'flink11',
      'properties.bootstrap.servers' = 'gxb13:9092,gxb14:9092,gxb15:9092',
      'properties.group.id' = 'newgroupid510',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.sasl.mechanism' = 'GSSAPI',
      'properties.sasl.jaas.config' = 'com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab="/etc/yarn1/conf/kafka.keytab" principal="kafka@TDH";',
      'properties.sasl.kerberos.service.name'= 'kafka'
    );
    复制
  2. Flink 中创建 Holodesk 目标表视图

    create table flink_input_rowkey4 (
        `user` string,
        message string,
        num decimal(22,2)
    ) WITH (
        'connector' = 'argodb',
        'master.group' = '172.18.120.32:29630,172.18.120.33:29630,172.18.120.34:29630',
        'table.name' = 'default.flink_input_rowkey_col3',
        'shiva2.enable' = 'true',
        'compression' = 'snappy',
        'use.external.address' = 'true',
        'metastore.url' = 'jdbc:hive2://172.18.120.33:10010/default'
    );
    复制
  3. 流入库任务

    insert into flink_input_rowkey4 select * from KafkaTable4;
    复制

通过 Flink SQL 批量写入 ArgoDB

  1. 在 Flink 中创建 Holodesk 目标表视图

    set sql-client.execution.result-mode=TABLEAU;
    
    create table flink_input_rowkey4 (
    `user` string,
    message string,
    num decimal(22,2)
    ) WITH (
    'connector' = 'argodb',
    'master.group' = '172.18.120.32:29630,172.18.120.33:29630,172.18.120.34:29630',
    'table.name' = 'default.flink_input_rowkey_col3',
    'shiva2.enable' = 'true',
    'compression' = 'snappy',
    'use.external.address' = 'true',
    'sink.parallelism' = '5',
    'metastore.url' = 'jdbc:hive2://172.18.120.33:10010/default'
    );
    复制
  2. Flink 执行批入库任务

    insert into flink_input_rowkey4 valus ('1','1',11.11);
    复制

通过 Flink SQL 查询 ArgoDB

目前只支持 Flink 1.17 版本通过 SQL 命令的方式查询 ArgoDB 表数据
  1. 首先连接 ArgoDB 的 Quark 服务,创建 Holdesk 表

    create table table_output_holo_bucket
    (
        c0  tinyint,
        c1  smallint,
        c2  int,
        c3  bigint,
        c4  float,
        c5  double,
        c6  decimal(30, 17),
        c7  string,
        c8  varchar(11),
        c9  boolean,
        c10 timestamp
    ) stored as holodesk;
    复制
  2. 然后在 Flink 1.17 中创建该 Holodesk 表视图,并查询

    SET sql-client.execution.result-mode=TABLEAU;
    SET 'parallelism.default' = '2';
    
    -- 创建视图
    create table flink_input_rowkey4
    (
        c0  tinyint, c1  smallint,  c2  int, c3  bigint, c4  float, c5  double,c6  decimal(30, 17), c7  string, c8  varchar(11),c9  boolean, c10 timestamp
    )
    WITH (
          'connector' = 'argodb',
          'master.group' = '172.18.120.33:39630,172.18.120.32:39630,172.18.120.31:39630',
          'table.name' = 'default.table_output_holo_bucket',
          'shiva2.enable' = 'true',
          'metastore.url' = 'jdbc:hive2://172.18.120.32:10000/default'
          );
    
    -- 查询视图关联的 Holodesk 表
    select c0,c1,c2, c4, c5, c6, c7,c8, c9, c10
    from flink_input_rowkey4 limit 2
    复制