摘要:緩沖輸入流從被稱為緩沖區(qū)的存儲(chǔ)器區(qū)域讀出數(shù)據(jù)僅當(dāng)緩沖區(qū)是空時(shí),本地輸入才被調(diào)用。同樣,緩沖輸出流,將數(shù)據(jù)寫入到緩存區(qū),只有當(dāng)緩沖區(qū)已滿才調(diào)用本機(jī)輸出。
:https://segmentfault.com/blog...
1.前言前陣子休息天日常在尋找項(xiàng)目里不好的代碼,看到了這樣的一段代碼:
private Result sshSameExec(Session session, String cmd) { if (log.isDebugEnabled()) { log.debug("shell command: {}", cmd); } UserInfo ui = getUserInfo(); session.setUserInfo(ui); int exitStatus = 0; StringBuilder builder = new StringBuilder(); ChannelExec channel; InputStream in; InputStream err; try { session.connect(connectTimeout); channel = (ChannelExec) session.openChannel("exec"); channel.setCommand(cmd); in = channel.getInputStream(); err = channel.getErrStream(); channel.connect(); } catch (Exception e) { throw new CloudRuntimeException(e); } try { long lastRead = Long.MAX_VALUE; byte[] tmp = new byte[1024]; while (true) { while (in.available() > 0 || err.available() > 0) { int i = 0; if (in.available() > 0) { i = in.read(tmp, 0, 1024); } else if (err.available() > 0) { i = err.read(tmp, 0, 1024); } if (i < 0) { break; } lastRead = System.currentTimeMillis(); builder.append(new String(tmp, 0, i)); } if (channel.isClosed()) { if (in.available() > 0) { continue; } exitStatus = channel.getExitStatus(); break; } if (System.currentTimeMillis() - lastRead > exeTimeout) { break; } } } catch (IOException e) { throw new CloudRuntimeException(e); } finally { channel.disconnect(); session.disconnect(); } if (0 != exitStatus) { return Result.createByError(ErrorData.builder() .errorCode(ResultCode.EXECUTE_SSH_FAIL.getCode()) .detail(builder.toString()) .title(ResultCode.EXECUTE_SSH_FAIL.toString()) .build()); } else { return Result.createBySuccess(builder.toString()); } }
簡單解釋一下這段代碼——即通過ssh到一臺(tái)機(jī)器上,然后執(zhí)行一些命令.對命令輸出的東西,開了一個(gè)循環(huán),每一次讀一定的位置,然后以字節(jié)流的形式讀回來.
這段代碼有點(diǎn)丑,于是我聞到了學(xué)習(xí)的味道.
首先是對兩個(gè)Stream的消費(fèi),很顯然,在多核環(huán)境下,我們同時(shí)也只能夠消費(fèi)其中一個(gè)Stream.其次,這代碼太挫了,自己定義一個(gè)tmp,然后1024、1024這樣的去取出來.
在改良之前,我們先來回顧一下JavaIO的接口定義.
2.JavaIO 接口知識(shí)回顧 2.1 低級(jí)抽象接口:InputStream 和 OutputStream這里有同學(xué)可能問了,為啥叫它低抽象接口呢?因?yàn)樗x底層太近了,計(jì)算機(jī)本來就是處理二進(jìn)制的,而這兩個(gè)接口正是用來處理二進(jìn)制數(shù)據(jù)流的.
先簡單看一眼這兩個(gè)接口:
InputStream
** * This abstract class is the superclass of all classes representing * an input stream of bytes. * *Applications that need to define a subclass of
InputStream
* must always provide a method that returns the next byte of input. * * @author Arthur van Hoff * @see java.io.BufferedInputStream * @see java.io.ByteArrayInputStream * @see java.io.DataInputStream * @see java.io.FilterInputStream * @see java.io.InputStream#read() * @see java.io.OutputStream * @see java.io.PushbackInputStream * @since JDK1.0 */ public abstract class InputStream implements Closeable {.....}
OutputStream
/** * This abstract class is the superclass of all classes representing * an output stream of bytes. An output stream accepts output bytes * and sends them to some sink. ** Applications that need to define a subclass of *
OutputStream
must always provide at least a method * that writes one byte of output. * * @author Arthur van Hoff * @see java.io.BufferedOutputStream * @see java.io.ByteArrayOutputStream * @see java.io.DataOutputStream * @see java.io.FilterOutputStream * @see java.io.InputStream * @see java.io.OutputStream#write(int) * @since JDK1.0 */ public abstract class OutputStream implements Closeable, Flushable {...}
我們可以發(fā)現(xiàn),它們都實(shí)現(xiàn)了Closeable的接口.因此大家在使用這些原生類時(shí),要注意在結(jié)束時(shí)調(diào)用Close方法哦.
這兩個(gè)接口的常用實(shí)現(xiàn)類有:
-?FileInputStream和FileOutputStream
DataInputStream和DataOutputStream
?ObjectInputStream和ObjectOutputStream
2.2 高級(jí)抽象接口——Writer和Reader為啥說它是高級(jí)抽象接口呢?我們先來看看它們的注釋:
Writer
/** * Abstract class for writing to character streams. The only methods that a * subclass must implement are write(char[], int, int), flush(), and close(). * Most subclasses, however, will override some of the methods defined here in * order to provide higher efficiency, additional functionality, or both. * * @see Writer * @see BufferedWriter * @see CharArrayWriter * @see FilterWriter * @see OutputStreamWriter * @see FileWriter * @see PipedWriter * @see PrintWriter * @see StringWriter * @see Reader * * @author Mark Reinhold * @since JDK1.1 */ public abstract class Writer implements Appendable, Closeable, Flushable {
Reader
/** * Abstract class for reading character streams. The only methods that a * subclass must implement are read(char[], int, int) and close(). Most * subclasses, however, will override some of the methods defined here in order * to provide higher efficiency, additional functionality, or both. * * * @see BufferedReader * @see LineNumberReader * @see CharArrayReader * @see InputStreamReader * @see FileReader * @see FilterReader * @see PushbackReader * @see PipedReader * @see StringReader * @see Writer * * @author Mark Reinhold * @since JDK1.1 */ public abstract class Reader implements Readable, Closeable {
我們可以看到,這個(gè)抽象類是用來面向character的,也就是字符.字符的抽象等級(jí)必然比字節(jié)高,因?yàn)樽址拷蠈?即人類.
2.3 優(yōu)化輸入和輸出——Buffered如果我們直接使用上述實(shí)現(xiàn)類去打開一個(gè)文件(如FileWriter 、FileReader 、FileInputStream 、FileOutputStream ),對其對象調(diào)用read、write、readLine等,每個(gè)請求都是由基礎(chǔ)OS直接處理的,這會(huì)使一個(gè)程序效率低得多——因?yàn)樗鼈兌紩?huì)引發(fā)磁盤訪問or網(wǎng)絡(luò)請求等.
為了減少這種開銷,Java 平臺(tái)實(shí)現(xiàn)緩沖 I/O 流。緩沖輸入流從被稱為緩沖區(qū)(buffer)的存儲(chǔ)器區(qū)域讀出數(shù)據(jù);僅當(dāng)緩沖區(qū)是空時(shí),本地輸入 API 才被調(diào)用。同樣,緩沖輸出流,將數(shù)據(jù)寫入到緩存區(qū),只有當(dāng)緩沖區(qū)已滿才調(diào)用本機(jī)輸出 API。
用于包裝非緩存流的緩沖流類有4個(gè):BufferedInputStream和BufferedOutputStream·用于創(chuàng)建字節(jié)緩沖字節(jié)流,?BufferedReader和BufferedWriter`用于創(chuàng)建字符緩沖字節(jié)流.
3. 著手優(yōu)化之前,我們提到了這段代碼寫得搓的地方:
首先是對兩個(gè)Stream的消費(fèi),很顯然,在多核環(huán)境下,我們同時(shí)也只能夠消費(fèi)其中一個(gè)Stream.
其次,這代碼太挫了,自己定義一個(gè)tmp,然后1024、1024這樣的去取出來.
故此,我們可以考慮對每個(gè)Stream都進(jìn)行包裝,支持用線程去消費(fèi),其次我們可以用高級(jí)抽象分接口去適配Byte,然后去裝飾成Buffer.
接下來,我們來看一段ZStack里的工具類ShellUtils,為了節(jié)省篇幅,我們僅僅截出它在IDE里的
概覽:
run方法的核心:
public ShellResult run() { StopWatch watch = new StopWatch(); watch.start(); try { if (withSudo) { command = String.format("sudo %s", command); } ProcessBuilder pb = new ProcessBuilder(Arrays.asList("/bin/bash", "-c", command)); if (baseDir == null) { baseDir = System.getProperty("user.home"); } pb.directory(new File(baseDir)); process = pb.start(); if (!suppressTraceLog && logger.isTraceEnabled()) { logger.debug(String.format("exec shell command[%s]", command)); } Writer stdout; int stdoutLog = stdoutLogStrategy(); if (stdoutLog == LOG_TO_FILE) { stdout = new BufferedWriter(new FileWriter(stdoutFile)); } else if (stdoutLog == LOG_TO_SCREEN) { stdout = new BufferedWriter(new OutputStreamWriter(System.out)); } else { stdout = new StringWriter(); } Writer stderr; int stderrLog = stderrLogStrategy(); if (stderrLog == LOG_TO_FILE) { stderr = new BufferedWriter(new FileWriter(stderrFile)); } else if (stderrLog == LOG_TO_SCREEN) { stderr = new BufferedWriter(new OutputStreamWriter(System.err)); } else { stderr = new StringWriter(); } StreamConsumer stdoutConsumer = new StreamConsumer(process.getInputStream(), new PrintWriter(stdout, true), stdoutLog != LOG_TO_FILE); StreamConsumer stderrConsumer = new StreamConsumer(process.getErrorStream(), new PrintWriter(stderr, true), stderrLog != LOG_TO_FILE); stderrConsumer.start(); stdoutConsumer.start(); process.waitFor(); stderrConsumer.join(TimeUnit.SECONDS.toMillis(30)); stdoutConsumer.join(TimeUnit.SECONDS.toMillis(30)); ShellResult ret = new ShellResult(); ret.setCommand(command); ret.setRetCode(process.exitValue()); if (stderrLog == LOG_TO_STRING) { ret.setStderr(stderr.toString()); } else if (stderrLog == LOG_TO_FILE) { stderr.close(); } if (stdoutLog == LOG_TO_STRING) { ret.setStdout(stdout.toString()); } else if (stdoutLog == LOG_TO_FILE) { stdout.close(); } return ret; } catch (Exception e) { StringBuilder sb = new StringBuilder(); sb.append("Shell command failed: "); sb.append(command); throw new ShellException(sb.toString(), e); } finally { if (process != null) { process.destroy(); } watch.stop(); if (!suppressTraceLog && logger.isTraceEnabled()) { logger.trace(String.format("shell command[%s] costs %sms to finish", command, watch.getTime())); } } } }
我們可以看到StreamConsumer這個(gè)類,我們來看一下它的代碼:
private static class StreamConsumer extends Thread { final InputStream in; final PrintWriter out; final boolean flush; StreamConsumer(InputStream in, PrintWriter out, boolean flushEveryWrite) { this.in = in; this.out = out; flush = flushEveryWrite; } @Override public void run() { BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(in)); String line; while ( (line = br.readLine()) != null) { out.println(line); if (flush) { out.flush(); } } } catch (Exception e) { logger.warn(e.getMessage(), e); } finally { try { if (br != null) { br.close(); } } catch (IOException e) { logger.warn(e.getMessage(), e); } } } }
這段代碼已經(jīng)達(dá)到了我們的理想狀態(tài):線程消費(fèi),高級(jí)抽象.
3.1 使用Kotlin 3.1.1 Kotlin IO閑話不多說,先貼代碼為敬:
import java.io.InputStream import java.io.InputStreamReader class StreamGobbler(private val inputStream: InputStream, private var result: StringBuilder) : Runnable { override fun run() { InputStreamReader(inputStream).buffered().use { it.lines().forEach { r -> result.append(r) } } } }
還是一樣熟悉的配方,我們逐行來解讀:
定義一個(gè)類,并且要求構(gòu)造函數(shù)必須傳入InputStream和一個(gè)StringBuilder.且實(shí)現(xiàn)了Runnable接口,這意味著它可以被線程消費(fèi).
覆寫run方法.我們可以看到InputStream被適配成了InputStreamReader,這意味著它可以輸出字符流了,然后我們使用了Kotlin的接口將其裝飾成了Buffer.
讀每一行buffer,并appned到result這個(gè)StringBuilder里去.
讀完就可以告辭了,close.(use會(huì)將其關(guān)閉)
3.1.2 Kotlin Coroutine先看一下上面的圖,我們都知道內(nèi)核態(tài)線程是由OS調(diào)度的,但當(dāng)一個(gè)線程拿到時(shí)間片時(shí),卻調(diào)到了阻塞IO,那么只能等在那邊,浪費(fèi)時(shí)間.
而協(xié)程則可以解決這個(gè)問題,當(dāng)一個(gè)Jobhang住的時(shí)候,可以去做別的事情,繞開阻塞.更好的利用時(shí)間片.
最后,我們來看一下成品代碼:
override fun sshExecWithCoroutine(session: Session, cmd: String): SimpleResult{ val ui = InnerUserInfo() session.userInfo = ui val exitStatus: Int var channel = ChannelExec() val inputBuilder = StringBuilder() val errorBuilder = StringBuilder() try { session.connect(connectTimeout) channel = session.openChannel("exec") as ChannelExec channel.setCommand(cmd) channel.connect() val inputStream = StreamGobbler(channel.inputStream, inputBuilder) val errStream = StreamGobbler(channel.errStream, errorBuilder) val customJob = GlobalScope.launch { customStream(inputStream, errStream) } while (!customJob.isCompleted) { // wait job be done } exitStatus = channel.exitStatus } catch (e: IOException) { throw java.lang.RuntimeException(e) } finally { if (channel.isConnected) { channel.disconnect() } if (session.isConnected) { session.disconnect() } } return if (0 != exitStatus) { return SimpleResult.createByError(ErrorData.Builder() .errorCode(ResultCode.EXECUTE_SSH_FAIL.value) .detail(errorBuilder.toString()) .title(ResultCode.EXECUTE_SSH_FAIL.toString()) .build()) } else { SimpleResult.createBySuccess(inputBuilder.toString()) } } private suspend fun customStream(inputStream: StreamGobbler, errorStream: StreamGobbler) { val inputDeferred = GlobalScope.async { inputStream.run() } val errorDeferred = GlobalScope.async { errorStream.run() } inputDeferred.join() errorDeferred.join() }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/75095.html
摘要:使用字節(jié)流寫入文件,如果沒有關(guān)閉字節(jié)流操作,文件依然存在內(nèi)容,說明字節(jié)流是操作文件本身的。字節(jié)流比字符流更好,使用更廣泛。 Java知識(shí)點(diǎn)總結(jié)(JavaIO-字符流) @(Java知識(shí)點(diǎn)總結(jié))[Java, JavaIO] [toc] 在程序中一個(gè)字符等于兩個(gè)字節(jié),那么 Java 提供了 Reader 和 Writer 兩個(gè)專門操作字符流的類。 字符輸出流:Writer 類定義如下: p...
摘要:知識(shí)點(diǎn)總結(jié)異常知識(shí)點(diǎn)總結(jié)異常為什么需要異常機(jī)制不是所有的問題都能在編譯時(shí)被發(fā)現(xiàn),有些問題在程序運(yùn)行時(shí)才會(huì)暴露出來異常機(jī)制是面向?qū)ο蟮奶幚沓绦蛟谶\(yùn)行時(shí)發(fā)生的狀況的手段使用異常機(jī)制不會(huì)打亂原有業(yè)務(wù)邏輯的用塊把可能出異常的代碼保護(hù)起來用一個(gè) Java知識(shí)點(diǎn)總結(jié)(JavaIO-異常) @(Java知識(shí)點(diǎn)總結(jié))[Java, Java異常] [toc] 為什么需要異常機(jī)制 不是所有的問題都能在編譯...
摘要:知識(shí)點(diǎn)總結(jié)內(nèi)存操作流知識(shí)點(diǎn)總結(jié)前面所講的程序中輸入輸出都是從文件中來,當(dāng)然也可以將輸出的位置設(shè)置在內(nèi)存上。將內(nèi)容寫入到內(nèi)存中。 Java知識(shí)點(diǎn)總結(jié)(JavaIO-內(nèi)存操作流) @(Java知識(shí)點(diǎn)總結(jié))[Java, JavaIO] [toc] showImg(https://segmentfault.com/img/bV82tm?w=753&h=275); 前面所講的程序中輸入、輸出都是...
摘要:下面我們使用字節(jié)輸入輸出流來說明這個(gè)問題輸入流一般是由對象如建立的,當(dāng)新建一個(gè)時(shí),對象建立了一個(gè)包含有數(shù)據(jù)的管道其實(shí)就是我們所說的這個(gè)流并將對象存儲(chǔ)的數(shù)據(jù)輸入到管道中,因此管道里的數(shù)據(jù)流就是對象內(nèi)的數(shù)據(jù)。 流的原理: showImg(/img/bVqa89); 一連串有順序的數(shù)據(jù)系列可以看成是一個(gè)流。 輸入輸出流: 數(shù)據(jù)從IO輸入到程序的流是輸入流,數(shù)據(jù)從程序輸出到IO的流是輸出流。 ...
摘要:知識(shí)點(diǎn)總結(jié)類知識(shí)點(diǎn)總結(jié)后提供的輸入數(shù)據(jù)類,此類位于包中,不僅可以完成輸入數(shù)據(jù)的操作,還可以方便地對輸入數(shù)據(jù)進(jìn)行驗(yàn)證。 Java知識(shí)點(diǎn)總結(jié)(JavaIO- Scanner類 ) @(Java知識(shí)點(diǎn)總結(jié))[Java, JavaIO] showImg(https://segmentfault.com/img/bV9dAj?w=838&h=396); JDK 1.5后提供的輸入數(shù)據(jù)類,此類位于...
閱讀 1763·2021-11-24 09:39
閱讀 1691·2021-11-22 15:22
閱讀 1012·2021-09-27 13:36
閱讀 3250·2021-09-24 10:34
閱讀 3340·2021-07-26 23:38
閱讀 2637·2019-08-29 16:44
閱讀 980·2019-08-29 16:39
閱讀 1110·2019-08-29 16:20