本文共 6929 字,大约阅读时间需要 23 分钟。
本篇文章主要会介绍下表格存储的Java SDK提供的异步接口,如何使用以及应用场景。
为什么需要异步?
异步提供了一个non-blocking, event-driven的编程模型,能够将系统不同层级的模块进行层次化的解耦,能够利用多核并行执行任务,提高性能。
现如今,一个大型的系统,系统级调优的最关键一步,就是异步化。异步化最常改造的是远程RPC或者数据库访问部分,表格存储作为一个底层数据库产品,需要提供异步接口来适应这个潮流。
在表格存储内部,我们也有一些使用异步来优化系统的例子,就拿Java SDK来说,可以看下以下两篇文章:
1.
2.
如何使用?
异步接口的使用和同步接口没有太大区别,使用同样的请求参数,唯一的不同在于返回结果的处理上。同步接口会同步的返回调用结果,而异步接口会返回Future类型的结果,或者直接通过Callback来通知结果。
Future的使用
private static void listTableWithFuture(OTSClientAsync client) { // 通过Future同步的等待结果返回。 try { OTSFuturefuture = client.listTable(); ListTableResult result = future.get(); // 同步的等待 System.out.println("\nList table by listTableWithFuture:"); for (String tableName : result.getTableNames()) { System.out.println(tableName); } } catch (OTSException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); } // 通过Future,间歇性的等待结果返回。 try { OTSFuture future = client.listTable(); while (!future.isDone()) { System.out.println("Waiting for result of list table."); Thread.sleep(10); // 每隔10ms检查结果是否返回 } ListTableResult result = future.get(); System.out.println("\nList table by listTableWithFuture:"); for (String tableName : result.getTableNames()) { System.out.println(tableName); } } catch (OTSException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }
Callback的使用
private static void listTableWithCallback(OTSClientAsync asyncClient) { final AtomicBoolean isDone = new AtomicBoolean(false); OTSCallbackcallback = new OTSCallback () { @Override public void onCompleted(OTSContext otsContext) { isDone.set(true); System.out.println("\nList table by listTableWithCallback:"); for (String tableName : otsContext.getOTSResult().getTableNames()) { System.out.println(tableName); } } @Override public void onFailed(OTSContext otsContext, OTSException ex) { isDone.set(true); ex.printStackTrace(); } @Override public void onFailed(OTSContext otsContext, ClientException ex) { isDone.set(true); ex.printStackTrace(); } }; asyncClient.listTable(callback); // 将callback扔给SDK,SDK在完成请求接到响应后,会自动调用callback // 等待callback被调用,一般的业务处理逻辑下,不需要这一步等待。 while (!isDone.get()) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }
案例一:如何突破BatchWriteRow的行数限制,一次性导入N行数据
private static void batchWriteRow(OTSClientAsync asyncClient, String tableName) { // BatchWriteRow的行数限制是100行,使用异步接口,实现一次批量导入1000行。 List> futures = new ArrayList >(); int count = 10; // 一次性发出10个请求,每个请求写100行数据 for (int i = 0; i < count; i++) { BatchWriteRowRequest request = new BatchWriteRowRequest(); for (int j = 0; j < 100; j++) { RowPutChange rowChange = new RowPutChange(tableName); RowPrimaryKey primaryKey = new RowPrimaryKey(); primaryKey.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyValue.fromLong(i * 100 + j)); primaryKey.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyValue.fromLong(j)); rowChange.setPrimaryKey(primaryKey); rowChange.addAttributeColumn(COLUMN_NAME_NAME, ColumnValue.fromString("name" + j)); rowChange.addAttributeColumn(COLUMN_AGE_NAME, ColumnValue.fromLong(j)); request.addRowChange(rowChange); } OTSFuture result = asyncClient.batchWriteRow(request); futures.add(result); } // 等待结果返回 List results = new ArrayList (); for (OTSFuture future : futures) { try { BatchWriteRowResult result = future.get(); // 同步等待结果返回 results.add(result); } catch (OTSException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); } } // 统计返回结果 int totalSucceedRows = 0; int totalFailedRows = 0; for (BatchWriteRowResult result : results) { totalSucceedRows += result.getSucceedRowsOfPut().size(); totalFailedRows += result.getFailedRowsOfPut().size(); } System.out.println("Total succeed rows: " + totalSucceedRows); System.out.println("Total failed rows: " + totalFailedRows); }
案例二:如何实现batch getRange
private static void batchGetRange(OTSClientAsync asyncClient, String tableName) { // 一次性查询多个范围的数据,设置10个任务,每个任务查询100条数据。 // 每个范围查询的时候设置limit为10,100条数据需要10次请求才能全部查完。 int count = 10; OTSFuture[] futures = new OTSFuture[count]; for (int i = 0; i < count; i++) { futures[i] = sendGetRangeRequest(asyncClient, tableName, i * 100, i * 100 + 100); } // 检查是否所有范围查询均已做完,若未做完,则继续发送查询请求 List allRows = new ArrayList
(); while (true) { boolean completed = true; for (int i = 0; i < futures.length; i++) { OTSFuture
future = futures[i]; if (future == null) { continue; } if (future.isDone()) { GetRangeResult result = future.get(); allRows.addAll(result.getRows()); if (result.getNextStartPrimaryKey() != null) { // 该范围还未查询完毕,需要从nextStart开始继续往下读。 long nextStart = result.getNextStartPrimaryKey().getPrimaryKey().get(COLUMN_GID_NAME).asLong(); long rangeEnd = i * 100 + 100; futures[i] = sendGetRangeRequest(asyncClient, tableName, nextStart, rangeEnd); completed = false; } else { futures[i] = null; // 若某个范围查询完毕,则将对应future设置为null } } else { completed = false; } } if (completed) { break; } else { try { Thread.sleep(10); // 避免busy wait,每次循环完毕后等待一小段时间 } catch (InterruptedException e) { e.printStackTrace(); } } } // 所有数据全部读出 System.out.println("Total rows scanned: " + allRows.size()); }
示例代码可从下载。
转载地址:http://uhabl.baihongyu.com/