资讯专栏INFORMATION COLUMN

MaxCompute Tunnel SDK数据上传利器——BufferedWriter使用指南

nanfeiyan / 1728人阅读

摘要:会尽最大可能容错,保证数据上传上去。多线程上传示例多线程上传时,每个线程只需要打开一个往里面写数据就行了。多个进程共享由于一个的上传状态是通过维护一个实现的,对于多线程程序来讲,通过锁很容易实现资源的分配。

摘要: MaxCompute 的数据上传接口(Tunnel)定义了数据 block 的概念:一个 block 对应一个 http request,多个 block 的上传可以并发而且是原子的,一次同步请求要么成功要么失败,不会污染其他的 block。这种设计对于服务端来讲十分简洁,但是也把记录状态做 fa.

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/produc...

MaxCompute 的数据上传接口(Tunnel)定义了数据 block 的概念:一个 block 对应一个 http request,多个 block 的上传可以并发而且是原子的,一次同步请求要么成功要么失败,不会污染其他的 block。这种设计对于服务端来讲十分简洁,但是也把记录状态做 failover 的工作交给了客户端。

用户在使用 Tunnel SDK 编程时,需要对 block 这一层的语义进行认知,并且驱动数据上传的整个过程[1],并且自己进行容错,毕竟『网络错误是正常而不是异常』。由于用户文档中并没有强调这一点的重要性,导致很多用户踩了坑,一种常见的出错场景是,当客户端写数据的速度过慢,两次 write 的间隔超时[2],导致整个 block 上传失败。

High Level API

MaxCompute Java SDK 在 0.21.3-public 之后新增了 BufferredWriter 这个更高层的 API,简化了数据上传的过程,并且提供了容错的功能。 BufferedWriter 对用户隐藏了 block 这个概念,从用户角度看,就是在 session 上打开一个 writer 然后往里面写记录即可:

</>复制代码

  1. RecordWriter writer = null;
  2. try {
  3. int i = 0;
  4. writer = uploadSession.openBufferedWriter();
  5. Record product = uploadSession.newRecord();
  6. for (String item : items) {
  7. product.setString("name", item);
  8. product.setBigint("id", i);
  9. writer.write(product);
  10. i += 1;
  11. }
  12. } finally {
  13. if (writer != null) {
  14. writer.close();
  15. }
  16. }
  17. uploadSession.commit();

具体实现时 BufferedWriter 先将记录缓存在客户端的缓冲区中,并在缓冲区填满之后打开一个 http 连接进行上传。BufferedWriter 会尽最大可能容错,保证数据上传上去。

由于屏蔽了底层细节,这个接口可能并不适合数据预划分、断点续传、分批次上传等需要细粒度控制的场景。

多线程上传示例

多线程上传时,每个线程只需要打开一个 writer 往里面写数据就行了。

</>复制代码

  1. class UploadThread extends Thread {
  2. private UploadSession session;
  3. private static int RECORD_COUNT = 1200;
  4. public UploadThread(UploadSession session) {
  5. this.session = session;
  6. }
  7. @Override
  8. public void run() {
  9. RecordWriter writer = up.openBufferedWriter();
  10. Record r = up.newRecord();
  11. for (int i = 0; i < RECORD_COUNT; i++) {
  12. r.setBigint(0, i);
  13. writer.write(r);
  14. }
  15. writer.close();
  16. }
  17. };
  18. public class Example {
  19. public static void main(String args[]) {
  20. // 初始化 MaxCompute 和 tunnel 的代码
  21. TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(projectName, tableName);
  22. UploadThread t1 = new UploadThread(up);
  23. UploadThread t2 = new UploadThread(up);
  24. t1.start();
  25. t2.start();
  26. t1.join();
  27. t2.join();
  28. uploadSession.commit();
  29. }

更多控制

重试策略

由于底层在上传出错时会回避一段固定的时间并进行重试,但如果你的程序不想花太多时间在重试上,或者你的程序位于一个极其恶劣的网络环境中,为此 TunnelBufferedWriter 允许用户配置重试策略。

用户可以选择三种重试回避策略:指数回避(EXPONENTIAL_BACKOFF)、线性时间回避(LINEAR_BACKOFF)、常数时间回避(CONSTANT_BACKOFF)。

例如下面这段代码可以将,write 的重试次数调整为 6,每一次重试之前先分别回避 4s、8s、16s、32s、64s 和 128s(从 4 开始的指数递增的序列)。

</>复制代码

  1. RetryStrategy retry
  2. = new RetryStrategy(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF)
  3. writer = (TunnelBufferedWriter) uploadSession.openBufferedWriter();
  4. writer.setRetryStrategy(retry);

缓冲区控制

如果你的程序对 JVM 的内存有严格的要求,可以通过下面这个接口修改缓冲区占内存的字节数(bytes):

</>复制代码

  1. writer.setBufferSize(1024*1024);

默认配置每一个 Writer 的 BufferSize 是 10 MiB。TunnelBufferedWriter 一次 flush buffer 的操作上传一个 block 的数据[3]。

多个进程共享 Session

由于一个 Session 的上传状态是通过维护一个 block list 实现的,对于多线程程序来讲,通过锁很容易实现资源的分配。但对于两个进程空间里的程序想要复用一个 Session 时,必须通过一种机制对资源进行隔离。

具体地,在 getUploadSession 的时候,必须指定这个共享这个 Session 的进程数目,以及一个用来区分进程的 global id:

</>复制代码

  1. //程序1:这个 session 将被两个 writer 共享,我是其中第 0 个
  2. TableTunnel.UploadSession up
  3. = tunnel.getUploadSession(projectName, tableName, sid, 2, 0);
  4. writer = session.openBufferedWriter();
  5. //程序1:这个 session 将被两个 writer 共享,我是其中第 1 个
  6. TableTunnel.UploadSession up
  7. = tunnel.getUploadSession(projectName, tableName, sid, 2, 1);
  8. writer = session.openBufferedWriter();

Notes

[1] 一次完整的上传流程通常包括以下步骤:

先对数据进行划分
为每个数据块指定 block id,即调用 openRecordWriter(id)
然后用一个或多个线程分别将这些 block 上传上去
并在某个 block 上传失败以后,需要对整个 block 进行重传
在所有 block 都上传以后,向服务端提供上传成功的 blockid list 进行校验,即调用 session.commit([1,2,3,...])
[2] 因为使用长连接,服务端有计时器判断是否客户端是否 alive

[3] block 在服务端有 20000 个的数量上限,如果 BufferSize 设得太小会导致 20000 个 block 很快被用光

[4] Session的有效期为24小时,超过24小时会导致数据上传失败

原文链接

阅读更多干货好文,请关注扫描以下二维码:

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/71017.html

相关文章

  • MaxCompute Studio使用心得系列6——一个工具完成整个Python UDF开发

    摘要:摘要北京云栖大会上阿里云发布了最新的功能,万众期待的功能终于支持啦,我怎么能不一试为快,今天就分享如何通过进行开发。注册函数在脚本中编辑试用好了,一个简单完整的通过开发实践分享完成。 摘要: 2017/12/20 北京云栖大会上阿里云MaxCompute发布了最新的功能Python UDF,万众期待的功能终于支持啦,我怎么能不一试为快,今天就分享如何通过Studio进行Python u...

    张迁 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<