联 系 我 们
售前咨询
售后咨询
微信关注:星环科技服务号
更多联系方式 >
9.8 Spark 访问 Holodesk
更新时间:6/25/2025, 10:29:20 AM

功能概述

Holodesk 是 Inceptor 主要的表数据存储格式,采用向量化存储引擎,拥有超高的性能和效率,可以高效地处理大规模数据。Spark 与 Holodesk 集成后,可以高效地进行数据读取操作,充分利用 Holodesk 能力显著提升批处理和数据分析的性能。

目前,随着大数据技术的逐渐发展成熟,企业面临愈渐复杂多样的数据处理场景,部分基于 Spark 已有数据分析生态系统。为充分利用用户已有资源,Inceptor 提供 Apache Spark 连接器(Spark Connector),帮助企业轻松集成 Inceptor,充分对接用户已有 Spark 周边生态,充分利用 Holodesk 能力显著提升批处理和数据分析的性能。

表 12. 能力边界
类别 边界说明

版本支持

  • Inceptor 9.3.3 及以上版本

  • Spark 2.4 或 3.x 版本

功能支持

  • 支持列裁剪、分区列裁剪

  • 支持过滤下推和高可用功能

表类型支持

Holodesk 列存表

字段类型支持

TINYINT、SMALLINT、INT1、BIGINT、FLOAT、DOUBLE、DECIMAL、STRING、CHAR、VARCHAR、VARCHAR2、DATE、TIMESTAMP

其中,CHAR、VARCHAR、VARCHAR2 类型会当做 STRING 类型处理,并通过 log error 打印告警。

准备工作

部署流程

  1. 为保障服务兼容性,我们需要额外部署 Quark Server 和 Metastore 服务用于 Spark 程序专用,然后开启兼容性开关。

    1. 在 Manager 平台的集群管理页面,单击添加服务

    2. 选择安装 Quark 服务并单击下一步

    3. 角色分配页面,根据下述描述安装服务。

      install metastore
      图 80. 添加 Quark Metastore
      • 在不同的管理节点上部署 一个 Quark Server、两个 Quark Metastore,,无需 Nucleon Executor。

      • 右侧的依赖关系中,依赖已有的 TDDMS 1,选择与 Quark 1 共享元数据服务。

    4. 单击下一步,单击 Quark 服务的自定义参数标签页,然后单击添加自定义参数,输入配置项为 quark.metastore.spark.read.holo.enable,值为 true,配置文件为 hive-site.xml,然后单击确定

      enable spark rw holodesk
      图 81. 允许读取 Holodesk

      确保 Quark 的 hive.metastore.port(默认9083)不与现有的 Quark 服务冲突,在基础参数标签页进行修改。

    5. 单击下一步并跟随后续流程完整部署流程,等待服务启动成功。

      为优化资源使用,可在 Manager 的集群管理页面停止新装的 Quark Server 服务,仅保留 Metastore 服务运行。

  2. 登录集群管理节点,检查容器镜像中 YARN 的 JDK 版本为 1.8,若不是则进行切换:

    1. 修改 yarn 服务的镜像,切换容器内的 JDK 版本为 1.8(默认是 jdk1.7)

      # 获取 yarn image
      docker images | grep yarn
      
      # 运行对应容器,命令中 -id 后需修改为本地实际 image
      docker run -id linu-4-29:5000/transwarp/yarn:transwarp-6.2.1-final bash
      复制
    2. 获取容器 ID 并进入容器

      docker ps -a | grep yarn | grep bash
      docker exec -it {容器id} bash
      复制
    3. 更换 JDK 版本,路径修改为本地实际路径

      mv /usr/java/jdk1.7.0_71 /usr/java/jdk1.7.0_71-bak
      mv /usr/java/jdk1.8.0_25 /usr/java/jdk1.7.0_71
      复制
    4. 验证 jdk 切换为 1.8 版本后,退出容器,更换镜像,

      docker tag linu-4-29:5000/transwarp/yarn:transwarp-6.2.1-final linu-4-29:5000/transwarp/yarn:transwarp-6.2.1-final-bak
      docker commit {容器id} linu-4-29:5000/transwarp/yarn:transwarp-6.2.1-final
      docker push linu-4-29:5000/transwarp/yarn:transwarp-6.2.1-final
      复制
  3. 在 Manager 平台下载 TDH Client 压缩包,传输至部署 Spark 业务代码的机器。

  4. 根据实际环境情况,修改 spark-env.sh 文件,示例如下:

    # 添加如下配置,并修改为本地实际环境
    SPARK_WORKER_MEMORY=8G
    SPARK_WORKER_CORES=4
    SPARK_EXECUTOR_MEMORY=4G
    SPARK_CONF_DIR=/usr/spark3.1/spark-3.1.3-bin-hadoop2.7/conf
    SPARK_MASTER_HOST=openeular2203-93
    SPARK_MASTER_WEBUI_PORT=8091
    
    HADOOP_USER_NAME=hive
    HADOOP_CONF_DIR=/etc/hdfs1/conf
    SPARK_HOME=/usr/spark3.1/spark-3.1.3-bin-hadoop2.7
    JRE_HOME=${JAVA_HOME}/jre
    复制
  5. 复制 Inceptor 集群的配置文件至 Spark 配置目录,以便 Spark 程序调用。

    # 复制 Inceptor hive-site.xml配置,用于读取 Inceptor 的配置信息
    cp /etc/hdfs1/conf/hive-site.xml $SPARK_HOME/conf/
    
    # 复制 yarn yarn-site.xml & core-site.xml 配置,用于读取 Yarn 的配置信息
    cp /etc/yarn1/conf/yarn-site.xml $SPARK_HOME/conf/
    cp /etc/yarn1/conf/core-site.xml $SPARK_HOME/conf/
    
    # 复制 HDFS hdfs-site.xm l配置,用于读取 HDFS 的配置信息
    cp /etc/hdfs1/conf/hdfs-site.xml $SPARK_HOME/conf/
    复制

    $SPARK_HOME 需替换为 Spark 安装目录,此外,您还需要将 hdfs-site.xml 中的 dfs.client.failover.proxy.provider.nameservice 的值改为 org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider,然后将 yarn-site.xmlhive-site.xmlGuardian 相关配置删除。

  6. 获取 spark3-connector 包(可联系星环科技获取),解压并上传 Spark jar 包至 HDFS,例如 /tmp/spark3-yarn/jars 目录。

  7. 修改 Spark 配置文件 spark-default-conf,配置说明如下:

    # 基础设置
    # 将 spark on yarn 依赖 jar 包设置为我们之前上传的路径
    spark.yarn.jars hdfs://nameservice1/tmp/spark3-yarn/jars/*
    
    # 指定 Quark Server 的 IP 地址或 hostname
    spark.argodb.server.addr = <hostname>
    
    # 指定 Quark Server 的服务端口
    spark.argodb.server.port = <port>
    
    # 安全认证配置
    # Inceptor 的安全配置
    spark.argodb.auth.type : none(default) [guardiantoken / ldap / none  / kerberos ]
    
    # Guardian Token 认证(推荐)
    spark.guardian.access.token  <guardianToken>
    
    # Kerberos 认证,当前需要将 keytab 文件需要上传到每个节点的/etc/yarn1/conf 文件夹下
    
    spark.argodb.krb.principal <kerberos principal name>
    spark.argodb.krb.keytab <keytab>
    spark.argodb.krb.user <kerberos user name>
    spark.argodb.krb.conf /etc/krb5.conf(default)
    
    # LDAP 认证
    spark.argodb.ldap.user <ldap user name>
    spark.argodb.ldap.password <ldap password>
    
    # 其他设置
    spark.eventLog.enabled true
    spark.eventLog.dir hdfs://nameservice1/tmp/sparklog
    spark.yarn.historyServer.address=kylin122105:18080
    spark.history.ui.port=18080
    spark.history.kerberos.enabled=true
    spark.history.kerberos.principal=yarn/kylin122105@TDH
    spark.history.kerberos.keytab=/etc/yarn1/conf/yarn.keytab
    spark.executorEnv.JAVA_HOME=/usr/java/latest
    spark.executorEnv.JAVA_HOME=/usr/java/latest
    
    # 默认配置
    # 数据库方言设置
    spark.holodesk.dialect ORACLE
    # 是否开启列裁剪功能
    spark.holodesk.column.pruner.enabled true
    # 是否开启 Filter下推
    spark.holodesk.filter.pushdown.enabled true
    # 是否开启分区裁剪功能
    spark.holodesk.partition.prune.enabled true
    # 是否支持本地读模式,默认关闭
    spark.holodesk.local.scan.enable false
    复制
    • 如需实现高可用连接,则直接通过 beeline 连接串设置即可,例如 spark.argodb.server.jdbc.url jdbc:hive2://tw-node2:10300,tw-node3:10051/default;escape=false;guardianToken=4X1U7**qoqH-TDH

    • 关于 Guardian Token 的获取方式,见 Guardian 用户手册中的获取访问服务的凭据章节。

  8. 完成认证并新建日志目录并赋权。

    hdfs dfs -mkdir /tmp/sparklog
    hdfs dfs -chmod -R 777 /tmp/sparklog
    复制
  9. 将配置好的 Spark 目录复制到 TDH Client 目录下。

额外配置(可选)

  • 开启 Spark on Yarn shuffle 服务支持。

    hdfs dfs -mkdir /user/spark/yarn/
    hdfs dfs -put $SPARK_HOME/yarn/spark-3.1.3-yarn-shuffle.jar /user/spark/yarn/
    hdfs dfs -chown yarn:yarn /user/spark/yarn
    复制
  • 通过 Manager 为 HDFS 服务添加下述自定义配置,实现 Spark 程序下的透明存储加密

    dfs.encryption.key.provider.urihadoop.security.key.provider.path 的值设置为 KMS 服务的地址,例如 kms://http@demo-spark-1;demo-spark-1:16000/kms

  • 通过 Manager 为 Yarn 服务添加下述自定义配置,开启 Spark 的 Shuffle 服务。

    参数 取值

    yarn.nodemanager.aux-services.spark_shuffle.remote-classpath

    /user/spark/yarn/spark-3.1.3-yarn-shuffle.jar

    yarn.nodemanager.aux-services.spark_shuffle.class

    org.apache.spark.network.yarn.YarnShuffleService

    yarn.nodemanager.aux-services

    mapreduce_shuffle,spark_shuffle

    spark.shuffle.service.port

    7337

参数调优(可选)

设置 spark_default.conf 配置文件,开启 Shuffle 和动态分配资源功能,优化 Spark 运行性能。

# 启⽤ External shuffle Service 服务
spark.shuffle.service.enabled true

# 设置 Shuffle Service服务端⼝,必须和 yarn-site 中的⼀致
spark.shuffle.service.port 7337

# 开启动态资源分配
spark.dynamicAllocation.enabled true

# 设置每个 Application 最⼩分配的 Executor 数
spark.dynamicAllocation.minExecutors 2

# 设置每个 Application 最⼤并发分配的 Executor数
spark.dynamicAllocation.maxExecutors 1000
复制

访问示例

当您完成以上部署操作,实现 Spark 对接 Inceptor 的配置后,您可以通过如下两种方式访问 Inceptor:

启动 spark-shell

  • 启动 Spark 单机模式可直接执行 spark-shell 脚本:

    # 进入 Spark 安装路径
    cd /home/spark-2.4.5-bin-hadoop2.7
    
    # 启动 spark-shell 单机版
    ./bin/spark-shell
    复制
  • 若您需要启动 Spark on Yarn 集群模式,则需要配置以下参数:

    # 进入 Spark 安装路径
    cd /home/spark-2.4.5-bin-hadoop2.7
    
    # 启动 spark-shell 集群版
    ./bin/spark-shell --master yarn --conf spark.ui.port=12335 --driver-memory 8g --executor-memory 32g --num-executors 4 --executor-cores 24 --principal hive/tx-node1@TDH --keytab /etc/argodbcomputing1/conf/argodb_computing.keytab
    复制
    • --conf spark.ui.port=12335:定义 spark-web ui 端口

    • --driver-memory 8g:定义 Spark driver内存

    • --executor-memory 32g:定义 Spark executor 的内存

    • --num-executors 4:定义 Spark executor的数量

    • --executor-cores 24:定义 每个 executor 分配的 cpu 核数

    • --principal hive/tx-node1@TDH:定义 集群开启 kerberos 后 principal(可以使用/etc/argodbcomputing*/conf/hive-site.xml 文件内 hive.server2.authentication.kerberos.principal参数值)

    • --keytab /etc/argodbcomputing1/conf/argodb_computing.keytab:集群开启kerberos后,argodbcomputing的keytab文件

通过 SparkSQL 读写 Inceptor

  • (1) 创建 Dataframe

val df1 = spark.read.format("io.transwarp.holodesk.sparkreader").option("dbtablename", "default.holodesk_all_type").load()
复制
  • (2) 过滤和选择

    # filter
    df1.where("tbool = true ").show()
    df1.where("tbyte = -128 ").show()
    df1.where("tsmallint = 32767 ").show()
    df1.where("tint < 0 ").show()
    df1.where("tbigint >=0 ").show()
    df1.where("tdate<CAST('2015-03-31' AS date)").show()
    df1.where("ttimestamp < cast('2014-01-01 00:00:00' as timestamp)").show()
    # select
    df1.where("tint < 0 ").select("tbool","tbigint").show()
    复制
  • (3) 创建视图,使用 spark.sql() 方式执行

    import org.apache.spark.sql.SparkSession
    val sparkSession = SparkSession.builder().appName("Spark SQL basic example").config("spark.sql.catalogImplementation", "hive").getOrCreate()
    val df1 = spark.read.format("io.transwarp.holodesk.sparkreader").option("dbtablename", "default.holodesk_all_type").load()
    df1.createOrReplaceTempView(" tv_defaul_holodesk_all_type")
    sparkSession.sql("select * from tv_defaul_holodesk_all_type").show()
    复制