深入理解Okio之旅

JDK的io库由于历史原因设计的比较复杂,有很多装饰类,使用起来需要记忆大量的类,相信你也对此早已诟病不满。Square公司推出的Okio应运而生,它原本是作为Okhttp的io功能库而设计的,也是因为Okhttp而被大家熟知。从知道到会使用,再到理解实现原理后熟练使用,甚至在此基础上二次开发优化,这个认知的过程需要刻意练习,这篇文章就是对Okio的一个总结,Okio虽然代码量不是很多, 但是里面值得学习的地方还是很多。

Source + Sink

简介

Okio定义了自己的一套继承链,Source对应InputStream, Sink对应OutputStream,这样对比就不难理解了,看一下接口的定义

public interface Source extends Closeable {

  long read(Buffer sink, long byteCount) throws IOException;

  Timeout timeout();

  @Override void close() throws IOException;
}


public interface Sink extends Closeable, Flushable {

  void write(Buffer source, long byteCount) throws IOException;

  @Override void flush() throws IOException;

  Timeout timeout();

  @Override void close() throws IOException;
}

接口定义的方法很简洁,
read/write方法,读取和写入数据的接口方法,它们的第一个参数都是Buffer,关于这个类后面会详细介绍,这里我们暂且按照缓冲区理解它。byteCount就是读取或者写入的字节数。
timeout方法,Okio新增的新特性,超时控制
close方法,关闭输入输出流
flush方法,将Buffer缓冲区中的数据写入目标流中。

如何使用

Okio已经帮我们定义了一个门面类,名字就叫Okio,通过它可以生成各种我们需要的对象。
比如Okio.source(inputStream); 将inputStream包装成我们的Source对象,同样的
Okio.sink(outputStream);将outputStream包装成Sink对象。

所以Okio的底层操作的流对象还是Jdk里面定义的InputStream和OutputStream,作为一个轻量级的io框架它不可能跳出Jdk的框架去另外实现一套,它做的只是方便开发者的封装,但是它的封装设计足够优秀,这也是我还在这里跟你们吹牛x的原因,还是不想从大神那里学个一招半式。

看个例子

        File file = new File("/Users/aliouswang/Documents/olympic/JavaArt//text.temp");
        Sink sink = Okio.sink(file);

        Buffer buffer = new Buffer();
        buffer.writeString("Hello okio!", Charset.forName("UTF-8"));
        buffer.writeInt(998);
        buffer.writeByte(1);
        buffer.writeLong(System.currentTimeMillis());
        buffer.writeUtf8("Hello end!");

        sink.write(buffer, buffer.size());

        sink.flush();
        sink.close();

很简单的一个写文件的例子,前面说过Source和Sink的读和写的方法都需要一个Buffer对象,Buffer对象帮我们提供了类似BufferedInputStream和BufferedOutputStream的缓冲区功能(提高读写效率),同时还提供了DataInputStream和DataOutputStream中的大部分功能(比如写int,byte,long等),而且Buffer还提供了写String的方法,更是为我们经常使用的UTF-8编码格式,单独提供读写方法。

有写就有读

        Source source = Okio.source(file);
        buffer.clear();
        source.read(buffer, 1024);

        String string = buffer.readString("Hello okio!".length(), Charset.forName("UTF-8"));
        int intValue = buffer.readInt();
        byte byteValue = buffer.readByte();
        long longValue = buffer.readLong();
        String utf8 = buffer.readUtf8();

        System.out.println("str:" + string + ";\nint:" + intValue + ";\nbyte:" + byteValue + ";" +
                "\nlong:" + longValue + "\nutf8:" + utf8);

        source.close();

    // 打印结果:
    str:Hello okio!;
    int:998;
    byte:1;
    long:1555325659665
    utf8:Hello end!

但是每次都去new一个Buffer对象,是不是很麻烦,你我都能想到的,大神们肯定早就想到了,于是乎有了BufferedSink,BufferedSource。

BufferedSource + BufferedSink

BufferedSource 和 BufferedSink 也都是接口,里面定义的接口方法比较多,篇幅关系,这里只列出BufferedSink的定义,更细节的可以查看源码,源码中对很多方法的注释都举了例子来帮助我们理解,Okio的作者也是用心良苦,生怕我们广大的码农们看不懂,不会用啊!!!

public interface BufferedSink extends Sink, WritableByteChannel {
  Buffer buffer();
  BufferedSink write(ByteString byteString) throws IOException;
  BufferedSink write(byte[] source) throws IOException;
  BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
  long writeAll(Source source) throws IOException;
  BufferedSink write(Source source, long byteCount) throws IOException;
  BufferedSink writeUtf8(String string) throws IOException;
  BufferedSink writeString(String string, Charset charset) throws IOException;
  BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset)
      throws IOException;
  BufferedSink writeByte(int b) throws IOException;
  BufferedSink writeShort(int s) throws IOException;
  BufferedSink writeShortLe(int s) throws IOException;
  BufferedSink writeInt(int i) throws IOException;
  BufferedSink writeIntLe(int i) throws IOException;
  BufferedSink writeLong(long v) throws IOException;
  BufferedSink writeLongLe(long v) throws IOException;
  BufferedSink writeDecimalLong(long v) throws IOException;
  BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException;
  @Override void flush() throws IOException;
  BufferedSink emit() throws IOException;
  BufferedSink emitCompleteSegments() throws IOException;
  OutputStream outputStream();
}

可以看到BufferedSink继承于Sink,同时还继承了WritableByteChannel,这个接口是nio接口,所以Okio同样实现了nio的相关功能,这里由于水平有限,关于nio的知识这篇文章不会涉及,有兴趣的同学可以自行查阅资料哦。

BufferedSink定义了Buffer类中定义的全部方法,同时还定义了一个buffer()方法,返回一个Buffer对象,我们大概可以猜想到,这里应该是一个不太标准的代理模式,BufferedSink委托Buffer来干活。

Okio同样提供了Buffer相关的方法方便我们使用。

  public static BufferedSink buffer(Sink sink) {
    return new RealBufferedSink(sink);
  }

  public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
  }

返回的是BufferedSink 和 BufferedSource,Okio的默认实现类是RealBufferedSink和RealBufferedSource,我们可以通过BufferedSource和BufferedSink对上面读写文件的例子进行修改,

        File file = new File("/Users/aliouswang/Documents/java/JavaArt/text.temp");
        Sink sink = Okio.sink(file);
        BufferedSink bufferedSink = Okio.buffer(sink);

        bufferedSink.writeString("Hello okio!", Charset.forName("UTF-8"));
        bufferedSink.writeInt(998);
        bufferedSink.writeByte(1);
        bufferedSink.writeLong(System.currentTimeMillis());
        bufferedSink.writeUtf8("Hello end!");

        bufferedSink.close();

        Source source = Okio.source(file);
        BufferedSource bufferedSource = Okio.buffer(source);

        String string = bufferedSource.readString("Hello okio!".length(), Charset.forName("UTF-8"));
        int intValue = bufferedSource.readInt();
        byte byteValue = bufferedSource.readByte();
        long longValue = bufferedSource.readLong();
        String utf8 = bufferedSource.readUtf8();

        System.out.println("str:" + string + ";\nint:" + intValue + ";\nbyte:" + byteValue + ";" +
                "\nlong:" + longValue + "\nutf8:" + utf8);

        source.close();

可以看到,BufferedSource和BufferedSink能够满足我们对io的日常绝大部分使用场景。

Okio门面类的实现

更一般的,我们会这样去写,链式调用,代码更简洁。

BufferedSource bufferedSource = Okio.buffer(Okio.source(file));
BufferedSink bufferedSink = Okio.buffer(Okio.sink(file));

非常简洁的就能生成BufferedSource和BufferedSink,看一下Okio帮我们做了什么。

  public static Source source(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return source(new FileInputStream(file));
  }

  public static Source source(InputStream in) {
    // 生成一个默认的Timeout超时对象,默认实现是没有超时deadtime的
    return source(in, new Timeout());
  }

  private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          // 检查超时,
          timeout.throwIfReached();
          // 从Buffer获取一个可以写入的Segment,这一块只是接下来再具体分析
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          // 将最大能copy的字节写入Buffer,
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

通过Okio.source的实现可以看到,在读取的时候,会从传入的InputStream in 对象中读取字节到Buffer sink中,前面我们提到过,RealBufferedSource和RealBufferedSink内部都持有一个Buffer对象,可以猜测,它们持有的buffer对象 会在读写的时候传入。我们进入源码验证一下, 这里我们以readString 方法为例。

  @Override public String readString(long byteCount, Charset charset) throws IOException {
    require(byteCount);
    if (charset == null) throw new IllegalArgumentException("charset == null");
    return buffer.readString(byteCount, charset);
  }

  @Override public void require(long byteCount) throws IOException {
    if (!request(byteCount)) throw new EOFException();
  }

  @Override public boolean request(long byteCount) throws IOException {
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");
    while (buffer.size < byteCount) {
      if (source.read(buffer, Segment.SIZE) == -1) return false;
    }
    return true;
  }

  @Override public String readString(long byteCount, Charset charset) throws EOFException {
    checkOffsetAndCount(size, 0, byteCount);
    if (charset == null) throw new IllegalArgumentException("charset == null");
    if (byteCount > Integer.MAX_VALUE) {
      throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
    }
    if (byteCount == 0) return "";

    Segment s = head;
    if (s.pos + byteCount > s.limit) {
      // If the string spans multiple segments, delegate to readBytes().
      return new String(readByteArray(byteCount), charset);
    }

    String result = new String(s.data, s.pos, (int) byteCount, charset);
    s.pos += byteCount;
    size -= byteCount;

    if (s.pos == s.limit) {
      head = s.pop();
      SegmentPool.recycle(s);
    }

    return result;
  }

代码比较清晰,先从source中读取要求的bytecount长度的String到buffer中,然后从buffer中读取String 返回。其他的读取方法跟readString大同小异,有兴趣同学可以自行查阅源码。

说了这么久,我们的主角Buffer对象登场了。

Buffer

看一下Buffer类的申明,实现了BufferedSource, BufferedSink, Cloneable, ByteChannel 四个接口。

public final class Buffer implements BufferedSource, BufferedSink, Cloneable, ByteChannel {...}

我们知道Buffer作为缓冲区,肯定底层需要有数据结构来存储暂存的数据,JDK的BuffedInputStream和BufferedOutputStream中是使用字节数组的,而这里Okio的Buffer不是,它使用的是Segment。

public Segment head;
Segment

Segment 是一个双向循环链表,它的内部持有一个byte[] data,默认大小8192(与JDK的BufferedInputStream相同)。

public final class Segment {
  /** The size of all segments in bytes. */
  static final int SIZE = 8192;

  /** 默认共享最小字节数*/
  static final int SHARE_MINIMUM = 1024;

  final byte[] data;

  /** 标识下一个读取字节的位置 */
  int pos;

  /** 标识下一个写入字节的位置 */
  int limit;

  /** 是否与其他Segment共享byte[] */
  boolean shared;

  /** 是否拥有这个byte[], 如果拥有可以写入 */
  boolean owner;

  /** Segment后继 */
  public Segment next;

  /** Segment前驱 */
  Segment prev;
  Segment() {
    this.data = new byte[SIZE];
    this.owner = true;
    this.shared = false;
  }

  ......
}

Sement关键的成员变量都加了注释,Okio为了优化性能,避免频繁的创建和回收对象,使用了对象池模式,设计了SegmentPool类来管理Segment。

SegemntPool
final class SegmentPool {
  /** The maximum number of bytes to pool. */
  // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
  static final long MAX_SIZE = 64 * 1024; // 64 KiB.

  /** Singly-linked list of segments. */
  static @Nullable Segment next;

  /** Total bytes in this pool. */
  static long byteCount;

  private SegmentPool() {
  }

  static Segment take() {
    synchronized (SegmentPool.class) {
      if (next != null) {
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
      }
    }
    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
  }

  static void recycle(Segment segment) {
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    if (segment.shared) return; // This segment cannot be recycled.
    synchronized (SegmentPool.class) {
      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
      byteCount += Segment.SIZE;
      segment.next = next;
      segment.pos = segment.limit = 0;
      next = segment;
    }
  }
}

SegmentPool代码很简洁,它的最大容量是8个Segment,如果超过调用take方法就会直接新建一个Segment对象,另外recycle回收方法负责回收闲置的Segment,将其加入链表,供其他buffer使用。

有了Segment和SegmentPool的知识,就更容易理解Buffer类的实现了。
比如Okio.source方法新建的Source对象的read方法,获取可以写入的Segment对象,便利Segment链表获取可以写入的Segment,如果head为null则新建一个Segment。

    // 从Buffer获取一个可以写入的Segment,这一块只是接下来再具体分析
  Segment tail = sink.writableSegment(1);

  Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

    if (head == null) {
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    }

    Segment tail = head.prev;
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
    }
    return tail;
  }

Buffer的其他方法就不一一分析了,有了我们前面的知识,相信看起来不会太难。接下来我们看一下Okio的另一个类ByteString。

ByteString

我们知道String是的内部是基于char[] 数组来实现的,Okio的ByteString内部是基于byte[] 数组来实现的。跟String类似,ByteString也被设计为不可变的,这样可以保证ByteString是线程安全的。

public class ByteString implements Serializable, Comparable<ByteString> {
  final byte[] data;
  ByteString(byte[] data) {
    this.data = data; // Trusted internal constructor doesn't clone data.
  }
   ......
}

同时ByteString提供了很多方便的工具方法,比如base64,sha1加密等。

  public String base64() {
    return Base64.encode(data);
  }

  /** Returns the 128-bit MD5 hash of this byte string. */
  public ByteString md5() {
    return digest("MD5");
  }

  /** Returns the 160-bit SHA-1 hash of this byte string. */
  public ByteString sha1() {
    return digest("SHA-1");
  }

  /** Returns the 256-bit SHA-256 hash of this byte string. */
  public ByteString sha256() {
    return digest("SHA-256");
  }

  /** Returns the 512-bit SHA-512 hash of this byte string. */
  public ByteString sha512() {
    return digest("SHA-512");
  }

同时ByteString也提供了静态方法,方便与String类型互转。

  /** Returns a new byte string containing the {@code UTF-8} bytes of {@code s}. */
  public static ByteString encodeUtf8(String s) {
    if (s == null) throw new IllegalArgumentException("s == null");
    ByteString byteString = new ByteString(s.getBytes(Util.UTF_8));
    byteString.utf8 = s;
    return byteString;
  }

  /** Returns a new byte string containing the {@code charset}-encoded bytes of {@code s}. */
  public static ByteString encodeString(String s, Charset charset) {
    if (s == null) throw new IllegalArgumentException("s == null");
    if (charset == null) throw new IllegalArgumentException("charset == null");
    return new ByteString(s.getBytes(charset));
  }

  /** Constructs a new {@code String} by decoding the bytes as {@code UTF-8}. */
  public String utf8() {
    String result = utf8;
    // We don't care if we double-allocate in racy code.
    return result != null ? result : (utf8 = new String(data, Util.UTF_8));
  }

  /** Constructs a new {@code String} by decoding the bytes using {@code charset}. */
  public String string(Charset charset) {
    if (charset == null) throw new IllegalArgumentException("charset == null");
    return new String(data, charset);
  }

最后

Okio并不是设计来代替Jdk io的,但是在某些重度io的场景,如果对性能优化追求极致的话,Okio不失是一种选择,关于Okio还有很多细节的知识由于篇幅关系没有涉及,有兴趣的同学可以去看源码中找答案,全文完。

    原文作者:三好码农
    原文地址: https://www.jianshu.com/p/f359f074740d
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞