资讯专栏INFORMATION COLUMN

从一段代码谈起——浅谈JavaIO接口

pkwenda / 1860人阅读

摘要:缓冲输入流从被称为缓冲区的存储器区域读出数据仅当缓冲区是空时,本地输入才被调用。同样,缓冲输出流,将数据写入到缓存区,只有当缓冲区已满才调用本机输出。

:https://segmentfault.com/blog...

1.前言

前阵子休息天日常在寻找项目里不好的代码,看到了这样的一段代码:

</>复制代码

  1. private Result sshSameExec(Session session, String cmd) {
  2. if (log.isDebugEnabled()) {
  3. log.debug("shell command: {}", cmd);
  4. }
  5. UserInfo ui = getUserInfo();
  6. session.setUserInfo(ui);
  7. int exitStatus = 0;
  8. StringBuilder builder = new StringBuilder();
  9. ChannelExec channel;
  10. InputStream in;
  11. InputStream err;
  12. try {
  13. session.connect(connectTimeout);
  14. channel = (ChannelExec) session.openChannel("exec");
  15. channel.setCommand(cmd);
  16. in = channel.getInputStream();
  17. err = channel.getErrStream();
  18. channel.connect();
  19. } catch (Exception e) {
  20. throw new CloudRuntimeException(e);
  21. }
  22. try {
  23. long lastRead = Long.MAX_VALUE;
  24. byte[] tmp = new byte[1024];
  25. while (true) {
  26. while (in.available() > 0 || err.available() > 0) {
  27. int i = 0;
  28. if (in.available() > 0) {
  29. i = in.read(tmp, 0, 1024);
  30. } else if (err.available() > 0) {
  31. i = err.read(tmp, 0, 1024);
  32. }
  33. if (i < 0) {
  34. break;
  35. }
  36. lastRead = System.currentTimeMillis();
  37. builder.append(new String(tmp, 0, i));
  38. }
  39. if (channel.isClosed()) {
  40. if (in.available() > 0) {
  41. continue;
  42. }
  43. exitStatus = channel.getExitStatus();
  44. break;
  45. }
  46. if (System.currentTimeMillis() - lastRead > exeTimeout) {
  47. break;
  48. }
  49. }
  50. } catch (IOException e) {
  51. throw new CloudRuntimeException(e);
  52. } finally {
  53. channel.disconnect();
  54. session.disconnect();
  55. }
  56. if (0 != exitStatus) {
  57. return Result.createByError(ErrorData.builder()
  58. .errorCode(ResultCode.EXECUTE_SSH_FAIL.getCode())
  59. .detail(builder.toString())
  60. .title(ResultCode.EXECUTE_SSH_FAIL.toString())
  61. .build());
  62. } else {
  63. return Result.createBySuccess(builder.toString());
  64. }
  65. }

简单解释一下这段代码——即通过ssh到一台机器上,然后执行一些命令.对命令输出的东西,开了一个循环,每一次读一定的位置,然后以字节流的形式读回来.

这段代码有点丑,于是我闻到了学习的味道.

首先是对两个Stream的消费,很显然,在多核环境下,我们同时也只能够消费其中一个Stream.其次,这代码太挫了,自己定义一个tmp,然后1024、1024这样的去取出来.

在改良之前,我们先来回顾一下JavaIO的接口定义.

2.JavaIO 接口知识回顾 2.1 低级抽象接口:InputStream 和 OutputStream

这里有同学可能问了,为啥叫它低抽象接口呢?因为它离底层太近了,计算机本来就是处理二进制的,而这两个接口正是用来处理二进制数据流的.

先简单看一眼这两个接口:

InputStream

</>复制代码

  1. **
  2. * This abstract class is the superclass of all classes representing
  3. * an input stream of bytes.
  4. *
  5. *

    Applications that need to define a subclass of InputStream

  6. * must always provide a method that returns the next byte of input.
  7. *
  8. * @author Arthur van Hoff
  9. * @see java.io.BufferedInputStream
  10. * @see java.io.ByteArrayInputStream
  11. * @see java.io.DataInputStream
  12. * @see java.io.FilterInputStream
  13. * @see java.io.InputStream#read()
  14. * @see java.io.OutputStream
  15. * @see java.io.PushbackInputStream
  16. * @since JDK1.0
  17. */
  18. public abstract class InputStream implements Closeable {.....}

OutputStream

</>复制代码

  1. /**
  2. * This abstract class is the superclass of all classes representing
  3. * an output stream of bytes. An output stream accepts output bytes
  4. * and sends them to some sink.
  5. *

  6. * Applications that need to define a subclass of
  7. * OutputStream must always provide at least a method
  8. * that writes one byte of output.
  9. *
  10. * @author Arthur van Hoff
  11. * @see java.io.BufferedOutputStream
  12. * @see java.io.ByteArrayOutputStream
  13. * @see java.io.DataOutputStream
  14. * @see java.io.FilterOutputStream
  15. * @see java.io.InputStream
  16. * @see java.io.OutputStream#write(int)
  17. * @since JDK1.0
  18. */
  19. public abstract class OutputStream implements Closeable, Flushable {...}

我们可以发现,它们都实现了Closeable的接口.因此大家在使用这些原生类时,要注意在结束时调用Close方法哦.

这两个接口的常用实现类有:
FileInputStreamFileOutputStream

DataInputStreamDataOutputStream

 ObjectInputStreamObjectOutputStream

2.2 高级抽象接口——Writer和Reader

为啥说它是高级抽象接口呢?我们先来看看它们的注释:

Writer

</>复制代码

  1. /**
  2. * Abstract class for writing to character streams. The only methods that a
  3. * subclass must implement are write(char[], int, int), flush(), and close().
  4. * Most subclasses, however, will override some of the methods defined here in
  5. * order to provide higher efficiency, additional functionality, or both.
  6. *
  7. * @see Writer
  8. * @see BufferedWriter
  9. * @see CharArrayWriter
  10. * @see FilterWriter
  11. * @see OutputStreamWriter
  12. * @see FileWriter
  13. * @see PipedWriter
  14. * @see PrintWriter
  15. * @see StringWriter
  16. * @see Reader
  17. *
  18. * @author Mark Reinhold
  19. * @since JDK1.1
  20. */
  21. public abstract class Writer implements Appendable, Closeable, Flushable {

Reader

</>复制代码

  1. /**
  2. * Abstract class for reading character streams. The only methods that a
  3. * subclass must implement are read(char[], int, int) and close(). Most
  4. * subclasses, however, will override some of the methods defined here in order
  5. * to provide higher efficiency, additional functionality, or both.
  6. *
  7. *
  8. * @see BufferedReader
  9. * @see LineNumberReader
  10. * @see CharArrayReader
  11. * @see InputStreamReader
  12. * @see FileReader
  13. * @see FilterReader
  14. * @see PushbackReader
  15. * @see PipedReader
  16. * @see StringReader
  17. * @see Writer
  18. *
  19. * @author Mark Reinhold
  20. * @since JDK1.1
  21. */
  22. public abstract class Reader implements Readable, Closeable {

我们可以看到,这个抽象类是用来面向character的,也就是字符.字符的抽象等级必然比字节高,因为字符靠近上层,即人类.

2.3 优化输入和输出——Buffered

如果我们直接使用上述实现类去打开一个文件(如FileWriter FileReader FileInputStream FileOutputStream ),对其对象调用readwritereadLine等,每个请求都是由基础OS直接处理的,这会使一个程序效率低得多——因为它们都会引发磁盘访问or网络请求等.

为了减少这种开销,Java 平台实现缓冲 I/O 流。缓冲输入流从被称为缓冲区(buffer)的存储器区域读出数据;仅当缓冲区是空时,本地输入 API 才被调用。同样,缓冲输出流,将数据写入到缓存区,只有当缓冲区已满才调用本机输出 API。

用于包装非缓存流的缓冲流类有4个:BufferedInputStreamBufferedOutputStream·用于创建字节缓冲字节流, BufferedReaderBufferedWriter`用于创建字符缓冲字节流.

3. 着手优化

之前,我们提到了这段代码写得搓的地方:

首先是对两个Stream的消费,很显然,在多核环境下,我们同时也只能够消费其中一个Stream.

其次,这代码太挫了,自己定义一个tmp,然后1024、1024这样的去取出来.

故此,我们可以考虑对每个Stream都进行包装,支持用线程去消费,其次我们可以用高级抽象分接口去适配Byte,然后去装饰成Buffer.

接下来,我们来看一段ZStack里的工具类ShellUtils,为了节省篇幅,我们仅仅截出它在IDE里的
概览:

run方法的核心:

</>复制代码

  1. public ShellResult run() {
  2. StopWatch watch = new StopWatch();
  3. watch.start();
  4. try {
  5. if (withSudo) {
  6. command = String.format("sudo %s", command);
  7. }
  8. ProcessBuilder pb = new ProcessBuilder(Arrays.asList("/bin/bash", "-c", command));
  9. if (baseDir == null) {
  10. baseDir = System.getProperty("user.home");
  11. }
  12. pb.directory(new File(baseDir));
  13. process = pb.start();
  14. if (!suppressTraceLog && logger.isTraceEnabled()) {
  15. logger.debug(String.format("exec shell command[%s]", command));
  16. }
  17. Writer stdout;
  18. int stdoutLog = stdoutLogStrategy();
  19. if (stdoutLog == LOG_TO_FILE) {
  20. stdout = new BufferedWriter(new FileWriter(stdoutFile));
  21. } else if (stdoutLog == LOG_TO_SCREEN) {
  22. stdout = new BufferedWriter(new OutputStreamWriter(System.out));
  23. } else {
  24. stdout = new StringWriter();
  25. }
  26. Writer stderr;
  27. int stderrLog = stderrLogStrategy();
  28. if (stderrLog == LOG_TO_FILE) {
  29. stderr = new BufferedWriter(new FileWriter(stderrFile));
  30. } else if (stderrLog == LOG_TO_SCREEN) {
  31. stderr = new BufferedWriter(new OutputStreamWriter(System.err));
  32. } else {
  33. stderr = new StringWriter();
  34. }
  35. StreamConsumer stdoutConsumer = new StreamConsumer(process.getInputStream(), new PrintWriter(stdout, true), stdoutLog != LOG_TO_FILE);
  36. StreamConsumer stderrConsumer = new StreamConsumer(process.getErrorStream(), new PrintWriter(stderr, true), stderrLog != LOG_TO_FILE);
  37. stderrConsumer.start();
  38. stdoutConsumer.start();
  39. process.waitFor();
  40. stderrConsumer.join(TimeUnit.SECONDS.toMillis(30));
  41. stdoutConsumer.join(TimeUnit.SECONDS.toMillis(30));
  42. ShellResult ret = new ShellResult();
  43. ret.setCommand(command);
  44. ret.setRetCode(process.exitValue());
  45. if (stderrLog == LOG_TO_STRING) {
  46. ret.setStderr(stderr.toString());
  47. } else if (stderrLog == LOG_TO_FILE) {
  48. stderr.close();
  49. }
  50. if (stdoutLog == LOG_TO_STRING) {
  51. ret.setStdout(stdout.toString());
  52. } else if (stdoutLog == LOG_TO_FILE) {
  53. stdout.close();
  54. }
  55. return ret;
  56. } catch (Exception e) {
  57. StringBuilder sb = new StringBuilder();
  58. sb.append("Shell command failed:
  59. ");
  60. sb.append(command);
  61. throw new ShellException(sb.toString(), e);
  62. } finally {
  63. if (process != null) {
  64. process.destroy();
  65. }
  66. watch.stop();
  67. if (!suppressTraceLog && logger.isTraceEnabled()) {
  68. logger.trace(String.format("shell command[%s] costs %sms to finish", command, watch.getTime()));
  69. }
  70. }
  71. }
  72. }

我们可以看到StreamConsumer这个类,我们来看一下它的代码:

</>复制代码

  1. private static class StreamConsumer extends Thread {
  2. final InputStream in;
  3. final PrintWriter out;
  4. final boolean flush;
  5. StreamConsumer(InputStream in, PrintWriter out, boolean flushEveryWrite) {
  6. this.in = in;
  7. this.out = out;
  8. flush = flushEveryWrite;
  9. }
  10. @Override
  11. public void run() {
  12. BufferedReader br = null;
  13. try {
  14. br = new BufferedReader(new InputStreamReader(in));
  15. String line;
  16. while ( (line = br.readLine()) != null) {
  17. out.println(line);
  18. if (flush) {
  19. out.flush();
  20. }
  21. }
  22. } catch (Exception e) {
  23. logger.warn(e.getMessage(), e);
  24. } finally {
  25. try {
  26. if (br != null) {
  27. br.close();
  28. }
  29. } catch (IOException e) {
  30. logger.warn(e.getMessage(), e);
  31. }
  32. }
  33. }
  34. }

这段代码已经达到了我们的理想状态:线程消费,高级抽象.

3.1 使用Kotlin 3.1.1 Kotlin IO

闲话不多说,先贴代码为敬:

</>复制代码

  1. import java.io.InputStream
  2. import java.io.InputStreamReader
  3. class StreamGobbler(private val inputStream: InputStream, private var result: StringBuilder) : Runnable {
  4. override fun run() {
  5. InputStreamReader(inputStream).buffered().use {
  6. it.lines().forEach { r -> result.append(r) }
  7. }
  8. }
  9. }

还是一样熟悉的配方,我们逐行来解读:

定义一个类,并且要求构造函数必须传入InputStream和一个StringBuilder.且实现了Runnable接口,这意味着它可以被线程消费.

覆写run方法.我们可以看到InputStream被适配成了InputStreamReader,这意味着它可以输出字符流了,然后我们使用了Kotlin的接口将其装饰成了Buffer.

读每一行buffer,并appned到result这个StringBuilder里去.

读完就可以告辞了,close.(use会将其关闭)

3.1.2 Kotlin Coroutine

先看一下上面的图,我们都知道内核态线程是由OS调度的,但当一个线程拿到时间片时,却调到了阻塞IO,那么只能等在那边,浪费时间.

而协程则可以解决这个问题,当一个Jobhang住的时候,可以去做别的事情,绕开阻塞.更好的利用时间片.

最后,我们来看一下成品代码:

</>复制代码

  1. override fun sshExecWithCoroutine(session: Session, cmd: String): SimpleResult {
  2. val ui = InnerUserInfo()
  3. session.userInfo = ui
  4. val exitStatus: Int
  5. var channel = ChannelExec()
  6. val inputBuilder = StringBuilder()
  7. val errorBuilder = StringBuilder()
  8. try {
  9. session.connect(connectTimeout)
  10. channel = session.openChannel("exec") as ChannelExec
  11. channel.setCommand(cmd)
  12. channel.connect()
  13. val inputStream = StreamGobbler(channel.inputStream, inputBuilder)
  14. val errStream = StreamGobbler(channel.errStream, errorBuilder)
  15. val customJob = GlobalScope.launch {
  16. customStream(inputStream, errStream)
  17. }
  18. while (!customJob.isCompleted) {
  19. // wait job be done
  20. }
  21. exitStatus = channel.exitStatus
  22. } catch (e: IOException) {
  23. throw java.lang.RuntimeException(e)
  24. } finally {
  25. if (channel.isConnected) {
  26. channel.disconnect()
  27. }
  28. if (session.isConnected) {
  29. session.disconnect()
  30. }
  31. }
  32. return if (0 != exitStatus) {
  33. return SimpleResult.createByError(ErrorData.Builder()
  34. .errorCode(ResultCode.EXECUTE_SSH_FAIL.value)
  35. .detail(errorBuilder.toString())
  36. .title(ResultCode.EXECUTE_SSH_FAIL.toString())
  37. .build())
  38. } else {
  39. SimpleResult.createBySuccess(inputBuilder.toString())
  40. }
  41. }
  42. private suspend fun customStream(inputStream: StreamGobbler, errorStream: StreamGobbler) {
  43. val inputDeferred = GlobalScope.async {
  44. inputStream.run()
  45. }
  46. val errorDeferred = GlobalScope.async {
  47. errorStream.run()
  48. }
  49. inputDeferred.join()
  50. errorDeferred.join()
  51. }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/75095.html

相关文章

  • Java知识点总结(JavaIO-字符流)

    摘要:使用字节流写入文件,如果没有关闭字节流操作,文件依然存在内容,说明字节流是操作文件本身的。字节流比字符流更好,使用更广泛。 Java知识点总结(JavaIO-字符流) @(Java知识点总结)[Java, JavaIO] [toc] 在程序中一个字符等于两个字节,那么 Java 提供了 Reader 和 Writer 两个专门操作字符流的类。 字符输出流:Writer 类定义如下: p...

    thekingisalwaysluc 评论0 收藏0
  • Java知识点总结(JavaIO-异常)

    摘要:知识点总结异常知识点总结异常为什么需要异常机制不是所有的问题都能在编译时被发现,有些问题在程序运行时才会暴露出来异常机制是面向对象的处理程序在运行时发生的状况的手段使用异常机制不会打乱原有业务逻辑的用块把可能出异常的代码保护起来用一个 Java知识点总结(JavaIO-异常) @(Java知识点总结)[Java, Java异常] [toc] 为什么需要异常机制 不是所有的问题都能在编译...

    Near_Li 评论0 收藏0
  • Java知识点总结(JavaIO-内存操作流)

    摘要:知识点总结内存操作流知识点总结前面所讲的程序中输入输出都是从文件中来,当然也可以将输出的位置设置在内存上。将内容写入到内存中。 Java知识点总结(JavaIO-内存操作流) @(Java知识点总结)[Java, JavaIO] [toc] showImg(https://segmentfault.com/img/bV82tm?w=753&h=275); 前面所讲的程序中输入、输出都是...

    Half 评论0 收藏0
  • JavaIOJavaIO输入输出流

    摘要:下面我们使用字节输入输出流来说明这个问题输入流一般是由对象如建立的,当新建一个时,对象建立了一个包含有数据的管道其实就是我们所说的这个流并将对象存储的数据输入到管道中,因此管道里的数据流就是对象内的数据。 流的原理: showImg(/img/bVqa89); 一连串有顺序的数据系列可以看成是一个流。 输入输出流: 数据从IO输入到程序的流是输入流,数据从程序输出到IO的流是输出流。 ...

    CloudwiseAPM 评论0 收藏0
  • Java知识点总结(JavaIO- Scanner类 )

    摘要:知识点总结类知识点总结后提供的输入数据类,此类位于包中,不仅可以完成输入数据的操作,还可以方便地对输入数据进行验证。 Java知识点总结(JavaIO- Scanner类 ) @(Java知识点总结)[Java, JavaIO] showImg(https://segmentfault.com/img/bV9dAj?w=838&h=396); JDK 1.5后提供的输入数据类,此类位于...

    CarlBenjamin 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<