联 系 我 们
售前咨询
售后咨询
微信关注:星环科技服务号
更多联系方式 >
6.5.1 基于 HBase API 读写 Holodesk
更新时间:11/29/2024, 2:47:41 AM
功能介绍

随着数据规模的迅速增长,用户对于数据安全和高可用的需求日渐增强,但由于 Hyperbase 在运维操作上的复杂性,同时 ArgoDB 6.0 新增 Holodesk 行存表支持单条数据插入/更新和高并发点查的场景。 因此,ArgoDB 6.1 将兼容 HBase 2.0(对应 Hyperbase 9.3 版本)的常用接口和方法,对现有业务代码最小化调整的情况下,将部分依赖于 Hyperbase API 的业务迁移替换至 ArgoDB 的 Holodesk 行存表进行实现,实现上层业务用户无感知的数据无缝迁移和业务无中断切换。

hyperbase api argodb
图 7. ArgoDB 兼容 HBase API 实现业务迁移
使用事项
  • 使用 HBase API 访问 ArgoDB 表数据,需要满足以下前提条件:

    • 需要添加兼容性插件 Jar 包,并将获取到的 Jar 包导入到用户 Java 项目的依赖中,包含一个 All-in-one Jar 包、 JDBC 驱动包,以及一共 Hyperbse Handle Jar 包。

      hbase api dependence java packages
      图 8. JAVA 项目文件 + 依赖 Jar 包

      由于相关 Jar 包与版本存在依赖关系,为保障数据写入的正确性,请联系星环科技技术支持获取相关Jar 包。

    • 此外,您还需要保障客户端与 ArgoDB 集群的下述端口的通信正常:

      • Quark 服务的 hive.server2.thrift.port 参数对应的端口,默认为 10000,具体可通过 Manager 平台查询。

      • TDDMS 服务的 master.rpc_service.master_service_port 参数对应的端口及后面的 3 个端口,默认为 9630~9633。

      • TDDMS 服务的 tabletserver.rpc_service.manage_service_port 参数对应的端口及后面的 3 个端口,默认为 8002~8005。

  • 目前仅支持通过 HBase API 读写 Holodesk 行存表

  • 目前暂不支持通过 HBase API 进行创建表等 DDL 操作,建议使用 JDBC 接口或手动在 ArgoDB 中创建。

  • 目前 ArgoDB 支持以 HBase 2.x 版本的常用 API 类和方法,暂不兼容 HBase 0.98 版本。

若您仍需使用 Hyperdrive 存储格式,以及 HBase Shell 命令行交互工具,则请额外安装 Hyperbase。
示例代码

接下来,我们将详细介绍一个示例代码,展示如何使用 Hyperbase API 与 ArgoDB 数据库进行交互,实现读写 Holodesk 行存表功能。

package org.apache.hadoop.hbase.example;

import io.transwarp.apache.hadoop.conf.Configuration;
import io.transwarp.holodesk.sink.ArgoDBConfig;
import io.transwarp.sink.hadoop.hive.ql.dblink.DbLinkException;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.stream.Collectors;

public class HBaseTableDemo {
  // 声明静态配置
  private static Configuration conf = null;
  private static Connection connection = null;

  //quark的连接地址和端口
  public static final String INCEPTOR_URL = "jdbc:inceptor2://172.18.126.78:10000";
  public static final String USER_NAME = "hive";
  public static final String USER_PASSWD = "123456";
  private static final String CREATE_TABLE_ALL_TYPE_SQL="set ngmr.tddms2.lockbased.lightweight.rotransaction=false;" +
      " create database if not exists test;\n" +
      "    drop table if exists test.table_all_type;\n" +
      "\n" +
      "    create table if not exists test.table_all_type (\n" +
      "     id string,\n" +
      "     b string,\n" +
      "     c int,\n" +
      "     d bigint,\n" +
      "     e smallint,\n" +
      "     f double,\n" +
      "     g date,\n" +
      "     h timestamp,\n" +
      "     j boolean,\n" +
      "     m float,\n" +
      "     n char(10),\n" +
      "     o varchar(20),\n" +
      "     p time)\n" +
      "    clustered by (id) into 3 buckets\n" +
      "    stored as holodesk tblproperties('holodesk.rowkey'='id','holodesk.storage.format'='row');";

  public static final String DROP_TEST_TABLE_ALL_TYPE_SQL= "set ngmr.tddms2.lockbased.lightweight.rotransaction=false;" +
      "drop table test.table_all_type;";
  public static java.sql.Connection getJDBCConnection() throws DbLinkException {
    ArgoDBConfig argoDBConfig = ArgoDBConfig.builder()
        .url(INCEPTOR_URL)
        .user(USER_NAME)
        .passwd(USER_PASSWD)
        .build();
    return argoDBConfig.getDbLink().getConnection();
  }

  //获取Connection
  //一个进程通常只有一个Connection,使用Connection创建Admin实例和Table实例都是线程安全的
  public static Connection getConnectionInstance(Configuration conf) throws IOException {
    if(connection == null) {
      connection = ConnectionFactory.createConnection(conf);
    }
    return connection;
  }

  // 通过SQL创建表,指定表的列名和类型
  public static void createTable(String tableName)
      throws IOException, DbLinkException, SQLException {
    java.sql.Connection connection = getJDBCConnection();
    connection.prepareStatement(CREATE_TABLE_ALL_TYPE_SQL).execute();
    connection.close();
    System.out.println(tableName+" created successfully!");
  }

  //插入数据(rowKey rowKey;tableName 表名;family 列族;qualifier 限定名;value 值)
  public static void addData(String tableName, String rowKey, String familyName, String[] columnName, String[] value)
      throws IOException {
    connection = getConnectionInstance(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));// Table负责跟记录相关的操作如增删改查等//
    Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey
    assert columnName.length == value.length;
    for(int i = 0; i < columnName.length; i++){
      put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName[i]), Bytes.toBytes(value[i]));
    }
    table.put(put);
    table.close();
    System.out.println("Add data successfully! rowKey:"+rowKey
        +", column:"+familyName+":"+ Arrays.stream(columnName).collect(Collectors.toList())
        +", cell:"+Arrays.stream(value).collect(Collectors.toList()));
  }


  //遍历查询hbase表(tableName 表名)
  public static void getResultScan(String tableName) throws IOException {
    Scan scan = new Scan();
    connection = getConnectionInstance(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    try (ResultScanner rs = table.getScanner(scan)) {
      for (Result r : rs) {
        for (Cell cell : r.listCells()) {
          System.out.println("row:" + Bytes.toString(CellUtil.cloneRow(cell)));
          System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)));
          System.out.println("qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
          System.out.println("value:" + Bytes.toString(CellUtil.cloneValue(cell)));
          System.out.println("timestamp:" + cell.getTimestamp());
          System.out.println("-------------------------------------------");
        }
      }
    }
    table.close();
  }


  //查询表中的某一列(
  public static void getResultByColumn(String tableName, String rowKey, String familyName, String columnName)
      throws IOException {
    connection = getConnectionInstance(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    Get get = new Get(Bytes.toBytes(rowKey));
    get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族和列修饰符对应的列
    Result result = table.get(get);
    for (Cell cell : result.listCells()) {
      System.out.println("row:" + Bytes.toString(CellUtil.cloneRow(cell)));
      System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)));
      System.out.println("qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
      System.out.println("value:" + Bytes.toString(CellUtil.cloneValue(cell)));
      System.out.println("timestamp:" + cell.getTimestamp());
      System.out.println("-------------------------------------------");
    }
    table.close();
  }

  //更新表中的某一列(tableName 表名;rowKey rowKey;familyName 列族名;columnName 列名;value 更新后的值)
  public static void updateTable(String tableName, String rowKey,
                                 String familyName, String columnName, String value)
      throws IOException {
    connection = getConnectionInstance(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    Put put = new Put(Bytes.toBytes(rowKey));
    put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
        Bytes.toBytes(value));
    table.put(put);
    table.close();
    System.out.println("update table Success!");
  }

  //删除指定单元格
  public static void deleteColumn(String tableName, String rowKey,
                                  String familyName, String columnName) throws IOException {
    connection = getConnectionInstance(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    Delete deleteColumn = new Delete(Bytes.toBytes(rowKey));
    deleteColumn.addColumns(Bytes.toBytes(familyName),
        Bytes.toBytes(columnName));
    table.delete(deleteColumn);
    table.close();
    System.out.println("rowkey:"+rowKey+",column:"+familyName+":"+columnName+" deleted!");
  }

  //删除指定的行
  public static void deleteAllColumn(String tableName, String rowKey)
      throws IOException {
    connection = getConnectionInstance(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    Delete deleteAll = new Delete(Bytes.toBytes(rowKey));
    table.delete(deleteAll);
    table.close();
    System.out.println("rowkey:"+rowKey+" are all deleted!");
  }

  //删除表(tableName 表名)
  public static void deleteTable(String tableName) throws IOException, DbLinkException, SQLException {
    java.sql.Connection connection = getJDBCConnection();
    connection.prepareStatement(DROP_TEST_TABLE_ALL_TYPE_SQL).execute();
    connection.close();
    System.out.println(tableName + " is deleted!");
  }

  public static void closeConnection() throws IOException {
    if(connection != null){
      connection.close();
      connection = null;
    }
  }

  public static void main(String[] args) throws Exception {
    conf = HBaseConfiguration.create();
    String config_dir = ClassLoader.getSystemClassLoader().getResource(".").getPath();
    conf.addResource(config_dir + "hbase-site.xml");
    conf.set(HConstants.ARGODB_SINKCLIENT_INCEPTOR_SERVER_URL, INCEPTOR_URL);
    conf.set(HConstants.ARGODB_SINKCLIENT_INCEPTOR_USERNAME, USER_NAME);
    conf.set(HConstants.ARGODB_SINKCLIENT_INCEPTOR_PASSWORD, USER_PASSWD);

    try {
      String tableName = "test.table_all_type";
      //family在这里是没有意义的,只是为了和Hyperbase的API保持一致,任何值都可以。
      String[] family = {"f1", "f2"};
      String[] rowKey = {"001", "002"};
      String[] columnName = {"b", "c", "d", "e", "f", "g", "h", "j", "m", "n", "o", "p"};
      String[][] value = {
        {"string001", "1", "1", "1", "1.1", "2024-09-24", "2024-09-01 20:00:05", "true", "1.5", "char001", "varcha001", "20:22:55"},
        {"string002", "2", "2", "2", "2.2", "2024-09-14", "2024-09-01 22:02:05", "true", "2.5", "char002", "varcha002", "22:22:55"}
      };

      // 创建表
      createTable(tableName);

      // 为表插入数据
      addData(tableName, rowKey[0], family[0], columnName, value[0]);
      addData(tableName, rowKey[1], family[1], columnName, value[1]);

      // 扫描整张表
      //getResultScan(tableName);
      // 更新指定单元格的值
      updateTable(tableName, rowKey[0], family[0], columnName[0], "update value");
      // 查询刚更新的列的值
      getResultByColumn(tableName, rowKey[0], family[0], columnName[0]);
      // 删除一列
      deleteColumn(tableName, rowKey[0], family[0], columnName[1]);
      // 再次扫描全表
      //getResultScan(tableName);
      // 删除整行数据
      deleteAllColumn(tableName, rowKey[0]);
      // 再次扫描全表
      //getResultScan(tableName);

      //关闭Connection
      closeConnection();
      // 删除表
      deleteTable(tableName);
    } finally {
      closeConnection();
    }
  }
}
复制
类与接口说明

下表中,列出了 ArgoDB 支持的 HBase API 中使用到的主要类和方法,帮助您理解每个组件的作用和它们是如何协同工作来实现数据写入。

类型 类名称 说明

HBase 配置

org.apache.hadoop.hbase.HBaseConfiguration

用于读取 Hbase 配置文件 hbase-site.xml,以及 ArgoDB 客户端配置文件 hive-default.xml 和 hive-site.xml

HBase 连接

org.apache.hadoop.hbase.client.Connection

管理与 HBase 集群的通信,负责维持与数据节点的连接,从而进行读写等操作。

org.apache.hadoop.hbase.client.ConnectionFactory

用于管理 Connection 的创建,通过它可以创建与 HBase 集群的连接。是一个不可实例化的类

org.apache.hadoop.hbase.client.ConnectionImplementation

封装 Connection 接口的主要实现。

表名称

org.apache.hadoop.hbase.TableName

用于管理 HBase 中表的名称,避免了直接使用字符串操作表名,保证表名的有效性和唯一性

单列基础数据

org.apache.hadoop.hbase.Cell

封装了 HBase 表中的一个数据单元,包含行键、列族、列限定符、时间戳和值。

org.apache.hadoop.hbase.KeyVaule

表示一个键值对,它包含了上述的 Cell 的所有信息,通常用于底层的数据操作。

DML表实例

org.apache.hadoop.hbase.client.Table

提供了对 HBase 表的读写操作的方法。它是一个通用的表操作接口。

org.apache.hadoop.hbase.client.HTable

HTable 是 Table 接口的一个实现,提供了对 HBase 表的访问。它是线程安全的,通常用于生产环境。

DML-更新数据

org.apache.hadoop.hbase.client.Put

用于将数据插入到 HBase 表中。

org.apache.hadoop.hbase.client.Delete

用于从 HBase 表中删除数据。

DML-查询数据

org.apache.hadoop.hbase.client.Scanner

用于在 HBase 表中执行扫描操作,可以用来检索多行数据。

org.apache.hadoop.hbase.client.Result

代表从 HBase 表中检索到的数据。它包含了多个 Cell,可以用来获取扫描或获取操作的结果。

类型 方法名称 说明

HBase 配置

HBaseConfiguration.create()

是用于创建一个 Configuration 对象,该对象包含 HBase 所需的配置信息。

HBase 连接

ConnectionFactory.createConnection(conf)

用于根据给定的配置信息(Configuration)创建一个与 HBase 的连接对象。通过这个连接对象,后续可以执行对 HBase 的增删改查操作。

表名称

TableName.valueOf()

用于将一个字符串(表名)转换为 TableName 对象。

DML-Put

public void put(final Put put) throws IOException

用于向数据库中插入数据。

public void put(final List<Put> puts)

用于批量插入多条数据。

checkAndPut()

用来检查特定条件是否满足,如果满足,则执行插入操作。

DML-Get

public Result get(final Get get) throws IOException

用于读取数据库中的一行数据。

private Result get(Get get, final boolean checkExistenceOnly)

用于检查一行数据是否存在。如果 checkExistenceOnly 值为 true,该方法仅检查数据是否存在,而不返回具体数据。

public Result[] get(List<Get> gets) throws IOException

用于读取数据库中的多条数据。

public boolean exists(final Get get)

检查某行数据是否存在。

public boolean[] exists(List<Get> gets)

批量检查多行数据是否存在。

DML-Scan

public ResultScanner getScanner(Scan scan)

获取扫描器,用于扫描表中的多行数据,Scan 对象指定扫描的范围。

public ResultScanner getScanner(byte [] family)

根据列族扫描表中的数据。

public ResultScanner getScanner(byte [] family, byte [] qualifier)

根据列族和列限定符扫描表中的数据。

DML-Delete

public void delete(final Delete delete)

删除指定行的数据,Delete 对象包含要删除的行的描述。

public void delete(final List<Delete> deletes)

批量删除多行数据,deletes 是包含多个 Delete 对象的列表。

checkAndDelete()

条件性地执行 delete 操作,只有在满足某个条件时,才会删除数据。

DML-Batch

public void batch(final List<? extends Row> actions, final Object[] results);

批量执行多个操作(包括 put、get、delete 等),actions 包含一系列的操作,results 用于存放执行的结果。

public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout);

批量执行多个操作(包括 put、get、delete 等),actions 包含一系列的操作,results 用于存放执行的结果,rpcTimeout 用于指定远程调用的超时时间。

public <R> void batchCallback(final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback);

批量执行多个操作,并且可以通过回调函数处理每个操作的结果。