
功能介绍
随着数据规模的迅速增长,用户对于数据安全和高可用的需求日渐增强,但由于 Hyperbase 在运维操作上的复杂性,同时 ArgoDB 6.0 新增 Holodesk 行存表支持单条数据插入/更新和高并发点查的场景。 因此,ArgoDB 6.1 将兼容 HBase 2.0(对应 Hyperbase 9.3 版本)的常用接口和方法,对现有业务代码最小化调整的情况下,将部分依赖于 Hyperbase API 的业务迁移替换至 ArgoDB 的 Holodesk 行存表进行实现,实现上层业务用户无感知的数据无缝迁移和业务无中断切换。

使用事项
-
使用 HBase API 访问 ArgoDB 表数据,需要满足以下前提条件:
-
需要添加兼容性插件 Jar 包,并将获取到的 Jar 包导入到用户 Java 项目的依赖中,包含一个 All-in-one Jar 包、 JDBC 驱动包,以及一共 Hyperbse Handle Jar 包。
图 8. JAVA 项目文件 + 依赖 Jar 包由于相关 Jar 包与版本存在依赖关系,为保障数据写入的正确性,请联系星环科技技术支持获取相关Jar 包。
-
此外,您还需要保障客户端与 ArgoDB 集群的下述端口的通信正常:
-
Quark 服务的
hive.server2.thrift.port
参数对应的端口,默认为 10000,具体可通过 Manager 平台查询。 -
TDDMS 服务的
master.rpc_service.master_service_port
参数对应的端口及后面的 3 个端口,默认为 9630~9633。 -
TDDMS 服务的
tabletserver.rpc_service.manage_service_port
参数对应的端口及后面的 3 个端口,默认为 8002~8005。
-
-
-
目前仅支持通过 HBase API 读写 Holodesk 行存表。
-
目前暂不支持通过 HBase API 进行创建表等 DDL 操作,建议使用 JDBC 接口或手动在 ArgoDB 中创建。
-
目前 ArgoDB 支持以 HBase 2.x 版本的常用 API 类和方法,暂不兼容 HBase 0.98 版本。
若您仍需使用 Hyperdrive 存储格式,以及 HBase Shell 命令行交互工具,则请额外安装 Hyperbase。 |
示例代码
接下来,我们将详细介绍一个示例代码,展示如何使用 Hyperbase API 与 ArgoDB 数据库进行交互,实现读写 Holodesk 行存表功能。
package org.apache.hadoop.hbase.example; import io.transwarp.apache.hadoop.conf.Configuration; import io.transwarp.holodesk.sink.ArgoDBConfig; import io.transwarp.sink.hadoop.hive.ql.dblink.DbLinkException; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.sql.SQLException; import java.util.Arrays; import java.util.stream.Collectors; public class HBaseTableDemo { // 声明静态配置 private static Configuration conf = null; private static Connection connection = null; //quark的连接地址和端口 public static final String INCEPTOR_URL = "jdbc:inceptor2://172.18.126.78:10000"; public static final String USER_NAME = "hive"; public static final String USER_PASSWD = "123456"; private static final String CREATE_TABLE_ALL_TYPE_SQL="set ngmr.tddms2.lockbased.lightweight.rotransaction=false;" + " create database if not exists test;\n" + " drop table if exists test.table_all_type;\n" + "\n" + " create table if not exists test.table_all_type (\n" + " id string,\n" + " b string,\n" + " c int,\n" + " d bigint,\n" + " e smallint,\n" + " f double,\n" + " g date,\n" + " h timestamp,\n" + " j boolean,\n" + " m float,\n" + " n char(10),\n" + " o varchar(20),\n" + " p time)\n" + " clustered by (id) into 3 buckets\n" + " stored as holodesk tblproperties('holodesk.rowkey'='id','holodesk.storage.format'='row');"; public static final String DROP_TEST_TABLE_ALL_TYPE_SQL= "set ngmr.tddms2.lockbased.lightweight.rotransaction=false;" + "drop table test.table_all_type;"; public static java.sql.Connection getJDBCConnection() throws DbLinkException { ArgoDBConfig argoDBConfig = ArgoDBConfig.builder() .url(INCEPTOR_URL) .user(USER_NAME) .passwd(USER_PASSWD) .build(); return argoDBConfig.getDbLink().getConnection(); } //获取Connection //一个进程通常只有一个Connection,使用Connection创建Admin实例和Table实例都是线程安全的 public static Connection getConnectionInstance(Configuration conf) throws IOException { if(connection == null) { connection = ConnectionFactory.createConnection(conf); } return connection; } // 通过SQL创建表,指定表的列名和类型 public static void createTable(String tableName) throws IOException, DbLinkException, SQLException { java.sql.Connection connection = getJDBCConnection(); connection.prepareStatement(CREATE_TABLE_ALL_TYPE_SQL).execute(); connection.close(); System.out.println(tableName+" created successfully!"); } //插入数据(rowKey rowKey;tableName 表名;family 列族;qualifier 限定名;value 值) public static void addData(String tableName, String rowKey, String familyName, String[] columnName, String[] value) throws IOException { connection = getConnectionInstance(conf); Table table = connection.getTable(TableName.valueOf(tableName));// Table负责跟记录相关的操作如增删改查等// Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey assert columnName.length == value.length; for(int i = 0; i < columnName.length; i++){ put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName[i]), Bytes.toBytes(value[i])); } table.put(put); table.close(); System.out.println("Add data successfully! rowKey:"+rowKey +", column:"+familyName+":"+ Arrays.stream(columnName).collect(Collectors.toList()) +", cell:"+Arrays.stream(value).collect(Collectors.toList())); } //遍历查询hbase表(tableName 表名) public static void getResultScan(String tableName) throws IOException { Scan scan = new Scan(); connection = getConnectionInstance(conf); Table table = connection.getTable(TableName.valueOf(tableName)); try (ResultScanner rs = table.getScanner(scan)) { for (Result r : rs) { for (Cell cell : r.listCells()) { System.out.println("row:" + Bytes.toString(CellUtil.cloneRow(cell))); System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("value:" + Bytes.toString(CellUtil.cloneValue(cell))); System.out.println("timestamp:" + cell.getTimestamp()); System.out.println("-------------------------------------------"); } } } table.close(); } //查询表中的某一列( public static void getResultByColumn(String tableName, String rowKey, String familyName, String columnName) throws IOException { connection = getConnectionInstance(conf); Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族和列修饰符对应的列 Result result = table.get(get); for (Cell cell : result.listCells()) { System.out.println("row:" + Bytes.toString(CellUtil.cloneRow(cell))); System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("value:" + Bytes.toString(CellUtil.cloneValue(cell))); System.out.println("timestamp:" + cell.getTimestamp()); System.out.println("-------------------------------------------"); } table.close(); } //更新表中的某一列(tableName 表名;rowKey rowKey;familyName 列族名;columnName 列名;value 更新后的值) public static void updateTable(String tableName, String rowKey, String familyName, String columnName, String value) throws IOException { connection = getConnectionInstance(conf); Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName), Bytes.toBytes(value)); table.put(put); table.close(); System.out.println("update table Success!"); } //删除指定单元格 public static void deleteColumn(String tableName, String rowKey, String familyName, String columnName) throws IOException { connection = getConnectionInstance(conf); Table table = connection.getTable(TableName.valueOf(tableName)); Delete deleteColumn = new Delete(Bytes.toBytes(rowKey)); deleteColumn.addColumns(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); table.delete(deleteColumn); table.close(); System.out.println("rowkey:"+rowKey+",column:"+familyName+":"+columnName+" deleted!"); } //删除指定的行 public static void deleteAllColumn(String tableName, String rowKey) throws IOException { connection = getConnectionInstance(conf); Table table = connection.getTable(TableName.valueOf(tableName)); Delete deleteAll = new Delete(Bytes.toBytes(rowKey)); table.delete(deleteAll); table.close(); System.out.println("rowkey:"+rowKey+" are all deleted!"); } //删除表(tableName 表名) public static void deleteTable(String tableName) throws IOException, DbLinkException, SQLException { java.sql.Connection connection = getJDBCConnection(); connection.prepareStatement(DROP_TEST_TABLE_ALL_TYPE_SQL).execute(); connection.close(); System.out.println(tableName + " is deleted!"); } public static void closeConnection() throws IOException { if(connection != null){ connection.close(); connection = null; } } public static void main(String[] args) throws Exception { conf = HBaseConfiguration.create(); String config_dir = ClassLoader.getSystemClassLoader().getResource(".").getPath(); conf.addResource(config_dir + "hbase-site.xml"); conf.set(HConstants.ARGODB_SINKCLIENT_INCEPTOR_SERVER_URL, INCEPTOR_URL); conf.set(HConstants.ARGODB_SINKCLIENT_INCEPTOR_USERNAME, USER_NAME); conf.set(HConstants.ARGODB_SINKCLIENT_INCEPTOR_PASSWORD, USER_PASSWD); try { String tableName = "test.table_all_type"; //family在这里是没有意义的,只是为了和Hyperbase的API保持一致,任何值都可以。 String[] family = {"f1", "f2"}; String[] rowKey = {"001", "002"}; String[] columnName = {"b", "c", "d", "e", "f", "g", "h", "j", "m", "n", "o", "p"}; String[][] value = { {"string001", "1", "1", "1", "1.1", "2024-09-24", "2024-09-01 20:00:05", "true", "1.5", "char001", "varcha001", "20:22:55"}, {"string002", "2", "2", "2", "2.2", "2024-09-14", "2024-09-01 22:02:05", "true", "2.5", "char002", "varcha002", "22:22:55"} }; // 创建表 createTable(tableName); // 为表插入数据 addData(tableName, rowKey[0], family[0], columnName, value[0]); addData(tableName, rowKey[1], family[1], columnName, value[1]); // 扫描整张表 //getResultScan(tableName); // 更新指定单元格的值 updateTable(tableName, rowKey[0], family[0], columnName[0], "update value"); // 查询刚更新的列的值 getResultByColumn(tableName, rowKey[0], family[0], columnName[0]); // 删除一列 deleteColumn(tableName, rowKey[0], family[0], columnName[1]); // 再次扫描全表 //getResultScan(tableName); // 删除整行数据 deleteAllColumn(tableName, rowKey[0]); // 再次扫描全表 //getResultScan(tableName); //关闭Connection closeConnection(); // 删除表 deleteTable(tableName); } finally { closeConnection(); } } }
复制
类与接口说明
下表中,列出了 ArgoDB 支持的 HBase API 中使用到的主要类和方法,帮助您理解每个组件的作用和它们是如何协同工作来实现数据写入。
类型 | 类名称 | 说明 |
---|---|---|
HBase 配置 |
org.apache.hadoop.hbase.HBaseConfiguration |
用于读取 Hbase 配置文件 hbase-site.xml,以及 ArgoDB 客户端配置文件 hive-default.xml 和 hive-site.xml |
HBase 连接 |
org.apache.hadoop.hbase.client.Connection |
管理与 HBase 集群的通信,负责维持与数据节点的连接,从而进行读写等操作。 |
org.apache.hadoop.hbase.client.ConnectionFactory |
用于管理 Connection 的创建,通过它可以创建与 HBase 集群的连接。是一个不可实例化的类 |
|
org.apache.hadoop.hbase.client.ConnectionImplementation |
封装 Connection 接口的主要实现。 |
|
表名称 |
org.apache.hadoop.hbase.TableName |
用于管理 HBase 中表的名称,避免了直接使用字符串操作表名,保证表名的有效性和唯一性 |
单列基础数据 |
org.apache.hadoop.hbase.Cell |
封装了 HBase 表中的一个数据单元,包含行键、列族、列限定符、时间戳和值。 |
org.apache.hadoop.hbase.KeyVaule |
表示一个键值对,它包含了上述的 Cell 的所有信息,通常用于底层的数据操作。 |
|
DML表实例 |
org.apache.hadoop.hbase.client.Table |
提供了对 HBase 表的读写操作的方法。它是一个通用的表操作接口。 |
org.apache.hadoop.hbase.client.HTable |
HTable 是 Table 接口的一个实现,提供了对 HBase 表的访问。它是线程安全的,通常用于生产环境。 |
|
DML-更新数据 |
org.apache.hadoop.hbase.client.Put |
用于将数据插入到 HBase 表中。 |
org.apache.hadoop.hbase.client.Delete |
用于从 HBase 表中删除数据。 |
|
DML-查询数据 |
org.apache.hadoop.hbase.client.Scanner |
用于在 HBase 表中执行扫描操作,可以用来检索多行数据。 |
org.apache.hadoop.hbase.client.Result |
代表从 HBase 表中检索到的数据。它包含了多个 Cell,可以用来获取扫描或获取操作的结果。 |
类型 | 方法名称 | 说明 |
---|---|---|
HBase 配置 |
HBaseConfiguration.create() |
是用于创建一个 Configuration 对象,该对象包含 HBase 所需的配置信息。 |
HBase 连接 |
ConnectionFactory.createConnection(conf) |
用于根据给定的配置信息(Configuration)创建一个与 HBase 的连接对象。通过这个连接对象,后续可以执行对 HBase 的增删改查操作。 |
表名称 |
TableName.valueOf() |
用于将一个字符串(表名)转换为 TableName 对象。 |
DML-Put |
public void put(final Put put) throws IOException |
用于向数据库中插入数据。 |
public void put(final List<Put> puts) |
用于批量插入多条数据。 |
|
checkAndPut() |
用来检查特定条件是否满足,如果满足,则执行插入操作。 |
|
DML-Get |
public Result get(final Get get) throws IOException |
用于读取数据库中的一行数据。 |
private Result get(Get get, final boolean checkExistenceOnly) |
用于检查一行数据是否存在。如果 checkExistenceOnly 值为 true,该方法仅检查数据是否存在,而不返回具体数据。 |
|
public Result[] get(List<Get> gets) throws IOException |
用于读取数据库中的多条数据。 |
|
public boolean exists(final Get get) |
检查某行数据是否存在。 |
|
public boolean[] exists(List<Get> gets) |
批量检查多行数据是否存在。 |
|
DML-Scan |
public ResultScanner getScanner(Scan scan) |
获取扫描器,用于扫描表中的多行数据,Scan 对象指定扫描的范围。 |
public ResultScanner getScanner(byte [] family) |
根据列族扫描表中的数据。 |
|
public ResultScanner getScanner(byte [] family, byte [] qualifier) |
根据列族和列限定符扫描表中的数据。 |
|
DML-Delete |
public void delete(final Delete delete) |
删除指定行的数据,Delete 对象包含要删除的行的描述。 |
public void delete(final List<Delete> deletes) |
批量删除多行数据,deletes 是包含多个 Delete 对象的列表。 |
|
checkAndDelete() |
条件性地执行 delete 操作,只有在满足某个条件时,才会删除数据。 |
|
DML-Batch |
public void batch(final List<? extends Row> actions, final Object[] results); |
批量执行多个操作(包括 put、get、delete 等),actions 包含一系列的操作,results 用于存放执行的结果。 |
public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout); |
批量执行多个操作(包括 put、get、delete 等),actions 包含一系列的操作,results 用于存放执行的结果,rpcTimeout 用于指定远程调用的超时时间。 |
|
public <R> void batchCallback(final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback); |
批量执行多个操作,并且可以通过回调函数处理每个操作的结果。 |