大概是最完全的Okio源码解析文章

自从Google官方将OkHttp作为底层的网络请求之后,作为OkHttp底层IO操作的Okio也是走进开发者的视野,这个甚至是取代了java的原生IO库的存在到底有什么特殊的本领呢?
这篇文章主要是对Okio的实现做一个详尽的解析,当然由于笔者分析中可能有纰漏的地方,也烦请指出,Okio的代码比较精巧,核心的代码大约5000行,对文章不尽兴的也可以直接通读源码,这样就能理解的更清晰。
全文较长,这里先放出整体的一个目录图

  • 从Sample开始
  • Sink和Source及其实现
  • Okio中的超时机制
  • Segment和SegmentPool解析
  • 不可变的ByteString
  • 最核心的Buffer解析
  • 后记

那我们先看看Okio到底有什么好用的地方。

从Sample开始

为了展现Okio强大的能力,这里先举几个例子看看Okio是怎么处理IO操作的

读写文件

Okio中特有的两个类Source,Sink代表的就是传统的输入流,和输出流

  Source source = null;
  BufferedSource bSource = null;
  File file = new File(filename);
  //读文件
  source = Okio.source(file);
  //通过source拿到 bufferedSource
  bSource = Okio.buffer(source);
  String read = bSource.readString(Charset.forName("utf-8"));

读文件的步骤就是首先拿到一个输入流,Okio中封装了许多的输入流统一使用方法重载的source方法转换成一个source,然后使用buffer方法包装成BufferedSource,这个里面提供了流的各种操作,读String,读字节数组,读字byte,short等等,甚至是16进制的数,这里直接读出文件的String内容,十分的简单。

 private static void create_writer() {
        String filename = "create.txt";
        boolean isCreate = false;
        Sink sink;
        BufferedSink bSink = null;
        try {
            //判断文件是否存在,不存在,则新建!
            File file = new File(filename);
            if (!file.exists()) {
                isCreate = file.createNewFile();
            } else {
                isCreate = true;
            }
            //写入操作
            if (isCreate) {
                sink = Okio.sink(file);
                bSink = Okio.buffer(sink);
                bSink.writeUtf8("1");
                bSink.writeUtf8("\n");
                bSink.writeUtf8("this is new file!");
                bSink.writeUtf8("\n");
                bSink.writeString("我是每二条", Charset.forName("utf-8"));
                bSink.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != bSink) {
                    bSink.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

写文件的操作使用的是Sink,同样的将一个file输出流包装成一个Sink,再通过Okio的buffer方法赋予操作流的各种方法,最后写入操作也是十分的简单。

png decode
private static final ByteString PNG_HEADER = ByteString.decodeHex("89504e470d0a1a0a");

public void decodePng(InputStream in) throws IOException {
  BufferedSource pngSource = Okio.buffer(Okio.source(in));

  ByteString header = pngSource.readByteString(PNG_HEADER.size());
  if (!header.equals(PNG_HEADER)) {
    throw new IOException("Not a PNG.");
  }
}

这个是Okio官方提供了一个Png图片的解码的例子,我们知道一般判断一个文件的格式就是依靠前面的校验码,比如class文件中前面的16进制代码就是以 cafebabe 开头,同样的常规的png,jpg,gif之类的都可以通过前面的魔数来进行判断文件类型,这里就以一个图片输入流转换成一个BufferedSource,并且通过 readByteString 方法拿到一个字节串 ByteString 这样就能验证这个文件是不是一个png的图片,同样的方法也能用在其他文件的校验上。
Okio除了这些外还有很多额外的功能,而且官方也提供了许多包括对于zip文件的处理,各种MD5,SHA-1.SHA256,Base64之类编码的处理,如果需要额外的一些操作,也可以自己实现Sink,Source对应的方法。
看完了例子,就来看看Okio真正的实现吧

Sink和Source及其实现

Okio中最重要的两个概念当属Sink,Source,先看看这两个类的继承图

《大概是最完全的Okio源码解析文章》 Sink Source UML

Sink代表的输出流,Source代表的是输入流,这两个基本都是对称的,所以就只用一个来进行分析了

public interface Sink extends Closeable,Flushable {

    @Override
    void flush() throws IOException;

    @Override
    void close() throws IOException;

    Timeout timeout();

    void write(Buffer source,long byteCount) throws IOException;
}

Sink中只包括了一些最简单的方法,以及一个timeout超时,这个后面会讲到。真正庞大的写的方法实际上都是由继承这个接口的另一个接口中的方法,从上面的UML图中可以看到整个继承链

《大概是最完全的Okio源码解析文章》 BufferedSink

里面包含大量的写的接口方法,这个BufferedSink依然只是一个接口,实现这个接口的类就是
RealBufferedSink

    public final Buffer buffer = new Buffer();
    public final Sink sink;

    public RealBufferedSink(Sink sink){
        if (sink == null)
            throw new NullPointerException("sink == null");
        this.sink = sink;
    }

    @Override
    public Buffer buffer() {
        return buffer;
    }

RealBufferedSink类中有两个主要参数,一个是新建的Buffer对象,一个是Sink的对象。
虽然这个类叫RealBufferedSink,但是实际上这个只是一个保存Buffer对象的一个代理实现,真正的实现都是在Buffer中实现的,可以看看这个类的几个例子

  @Override public BufferedSink write(byte[] source) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source);
    return emitCompleteSegments();
  }

  @Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source, offset, byteCount);
    return emitCompleteSegments();
  }

可以看到这个实现了BufferedSink接口的两个方法实际上都是调用了buffer的对应方法,对应的RealBufferedSource也是同样的调用buffer中的read方法,关于Buffer这个类会在下面详述,刚才我们看到Sink接口中有一个Timeout的类,这个就是Okio所实现的超时机制,保证了IO操作的稳定性。

Okio中的超时机制

Okio的超时机制让IO不会因为异常阻塞在某个未知的错误上,Okio的基础超时机制是采用的同步超时
以输出流为例,当我们用下面的方法包装流时

    public static Sink sink(OutputStream out){
        return sink(out,new Timeout());
    }

实际上调用了一个两个参数的sink方法,第二个参数就是同步超时

    private static Sink sink(final OutputStream out , final Timeout timeout){
        if (out == null) throw new IllegalArgumentException("out == null");
        if (timeout == null) throw new IllegalArgumentException("timeout == null");
        return new Sink() {
            ....
            @Override
            public void write(Buffer source, long byteCount) throws IOException {
                Util.checkOffsetAndCount(source.size,0,byteCount);
                while (byteCount > 0 ){
                    timeout.throwIfReached();
                    Segment head = source.head;
                    int toCopy = (int) Math.min(byteCount , head.limit - head.pos);
                    out.write(head.data,head.pos,toCopy);
                    byteCount -= toCopy;
                    source.size += toCopy;
                    head.pos += toCopy;
                    ...
                }
            }
        };
    }

可以看到write方法中实际上有一个while循环,在每个开始写的时候就调用了 timeout.throwIfReached() 方法,这个方法里面去判断的时间是否超时,这很明显是一个同步超时机制,按序执行,同样的Source也是一样的操作

    public void throwIfReached() throws InterruptedIOException {
        if (Thread.interrupted()){
            throw new InterruptedIOException("thread interrupted");
        }
        if (hasDeadline && deadlineNanoTime - System.nanoTime() < 0){
            throw new InterruptedIOException("deadline reached");
        }
    }

但是当我们看Okio对于socket的封装时

  public static Sink sink(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Sink sink = sink(socket.getOutputStream(), timeout);
    return timeout.sink(sink);
  }

这里出现了一个 AsyncTimeout 的类,这个实际上是继承于Timeout所实现的一个异步超时类,这个异步类比同步要复杂的多,它使用了一个WatchDog线程在后台进行监听超时,这里的WatchDog并不是linux中的那个,只是一个继承于Thread的一个类,里面的run方法执行的就是核心的超时判断,之所以在socket写时采取异步超时,这完全是由socket自身的性质决定的,socket经常会阻塞自己,导致下面的事情执行不了。
AsyncTimeout继承于Timeout类,可以覆写里面的timeout方法,这个方法会在watchdog的线程中调用,所以不能执行长时间的操作,否则就会引发其他的超时,下面详细分析这个类

    //不要一次写超过64k的数据否则可能会在慢连接中导致超时
    private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;

    private static AsyncTimeout head;
    private boolean inQueue;
    private AsyncTimeout next;
    private long timeoutAt;

首先就是一个最大的写的值,定义为64K,刚好和一个Buffer的大小是一样的,官方解释是因为如果连续写超过这个数的字节,那么及其容易导致超时,所以为了限制这个操作,直接给出了一次能写的最大数。
下面两个参数一个head,next很明显表明这是一个单链表,timeoutAt则是超时的时间。
使用者在操作之前首先要调用enter方法,这样相当于注册了这个超时监听,然后配对的实现exit方法,这个exit有一个返回值会表明超时是否触发,请注意这个timeout是异步的,可能会在exit后才调用

    public final void enter() {
        if (inQueue)
            throw new IllegalArgumentException("unbalanced enter/exit");
        long timeoutNanos = timeoutNanos();
        boolean hasDeadline = hasDeadline();
        if (timeoutNanos == 0 && !hasDeadline) {
            return;  // 没有超时的设置
        }
        inQueue = true;
        scheduleTimeout(this, timeoutNanos, hasDeadline);
    }

这里只是做了判断以及设置inQueue的状态,真正的是调用 scheduleTimeout 方法来加入到链表中

        ...
        long remainingNanos = node.remainingNanos(now);
        for (AsyncTimeout prev = head ; true ; prev = prev.next){
            //如果下一个为null或者剩余时间比下一个短 就插入node
            if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)){
                node.next = prev.next;
                prev.next = node;
                if (prev == node){
                    AsyncTimeout.class.notify();
                }
                break;
            }
        }

上面可以看出这个链表实际上是按照剩余的超时时间来进行排序的,快到超时的节点排在表头,依次往后递增。
我们以一个read的代码来看整个超时的绑定过程

            @Override
            public long read(Buffer sink, long byteCount) throws IOException {
                boolean throwOnTimeout = false;
                enter();
                try {
                    long result = source.read(sink,byteCount);
                    throwOnTimeout = true;
                    return result;
                }catch (IOException e){
                    throw exit(e);
                }finally {
                    exit(throwOnTimeout);
                }
            }

首先调用enter方法,然后来做读的操作,这里可以看到不仅在catch上而且在finally中也做了操作,这样异常和正常的情况都考虑到了,在exit中调用了真正的exit方法,exit中会去判断这个异步超时的对象是否在链表中

    final void exit(boolean throwOnTimeout) throws IOException {
        boolean timeOut =  exit();
        if (timeOut && throwOnTimeout)
            throw newTimeoutException(null);
    }

    public final boolean exit(){
        if (!inQueue)
            return false;
        inQueue = false;
        return cancelScheduledTimeout(this);
    }

回到前面所说的WatchDog,内部的run方法是一个while(true)的一个循环,

             while (true) {
                try {
                    AsyncTimeout timeout;
                    synchronized (AsyncTimeout.class) {
                        timeout = awaitTimeout();
                        //没有找到一个node来interrupt 继续
                        if (timeout == null){
                            continue;
                        }
                    ...
                    timeout.timedOut();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

这里锁住了内部的awaitTimeout操作,这个await正是判断是否超时的真正地方

    static AsyncTimeout awaitTimeout() throws InterruptedException {
        //拿到下一个节点
        AsyncTimeout node = head.next;
        //如果queue为空,等待直到有node进队,或者触发IDLE_TIMEOUT_MILLS
        if (node == null) {
            long startNanos = System.nanoTime();
            AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLS);
            return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head : null;
        }
        long waitNanos = node.remainingNanos(System.nanoTime());
        //这个head依然还没有超时,继续等待
        if (waitNanos > 0) {
            //这里比较奇怪,但是是wait API的需求需要两个参数
            long waitMills = waitNanos / 1000000L;
            waitNanos -= (waitMills * 1000000L);
            AsyncTimeout.class.wait(waitMills, (int) waitNanos);
            return null;
        }
        head.next = node.next;
        node.next = null;
        return node;
    }

代码中有些注释已经写得比较清晰了,主要就是通过这个 remainingNanos 来判断预定的超时时间减去当前时间是否大于0,如果比0大就说明还没超时,于是wait剩余的时间,然后表示并没有超时,如果小于0,就会把这个从链表中移除,根据前面exit方法中的判断就能触发整个超时的方法,异步超时这一部分代码比较复杂,涉及到许多wait,链表,加锁,需要详细阅读源码才能理解深刻,不清楚的可以详细看看。

Segment和SegmentPool解析

Segment字面翻译就是片段,Okio将数据也就是Buffer分割成一块块的片段,同时segment拥有前置节点和后置节点,构成一个双向循环链表,就像下面这个图的方式。

《大概是最完全的Okio源码解析文章》 Okio的Buffer和segment的关系

这样采取分片使用链表连接,片中使用数组存储,兼具读的连续性,以及写的可插入性,对比单一使用链表或者数组,是一种折中的方案,读写更快,而且有个好处根据需求改动分片的大小来权衡读写的业务操作,另外,segment也有一些内置的优化操作,综合这些Okio才能大放异彩,后面在Buffer解析会讲解什么时候形成的双向循环链表

    static final int SIZE = 8192;
    static final int SHARE_MINIMUM = 1024;
    final byte[] data;
    int pos;
    int limit;
    boolean shared;
    boolean owner;
    Segment pre;
    Segment next;

SIZE就是一个segment的最大字节数,其中还有一个SHARE_MINIMUM,这个涉及到segment优化中的另一个技巧,共享内存,然后data就是保存的字节数组,pos,limit就是开始和结束点的index,shared和owner用来设置状态判断是否可写,一个有共享内存的segment是不能写入的,pre,next就是前置后置节点。

Segment方法分析

既然是双向循环链表,其中也会有一些操作的方法,比如

    public Segment pop(){
        Segment result = next != this ? next : null;
        pre.next = next;
        next.pre = pre;
        next = null;
        pre = null;
        return result;
    }

pop方法移除了自己,首先将自己的前后两个节点连接起来,然后将自己的前后引用置空,这样就脱离了整个双向链表,然后返回next

    public Segment push(Segment segment){
        segment.pre = this;
        segment.next = next;
        next.pre = segment;
        next = segment;
        return segment;
    }

push方法就是在当前和next引用中间插入一个segment进来,并且返回插入的segment,这两个都是寻常的双向链表的操作,我们再来看看如何写入数据

    public void writeTo(Segment sink , int byteCount){
        if (!sink.owner)
            throw new IllegalArgumentException();
        if (sink.limit + byteCount > SIZE){  //limit和需要写的字节总和大于SIZE
            if (sink.shared)  //共享无法写
                throw new IllegalArgumentException();
            if (sink.limit + byteCount - sink.pos > SIZE){  //如果减去头依然比SIZE大 那么就无法写抛异常
                throw new IllegalArgumentException();
            }
            //否则我们需要先移动要写的文件地址  然后置limit pos的地址
            System.arraycopy(sink.data,sink.pos,sink.data,0,sink.limit - sink.pos);
            sink.limit = sink.limit - sink.pos;
            sink.pos = 0;
        }
        //开始尾部写入 写完置limit地址
        System.arraycopy(data,pos,sink.data,sink.limit,byteCount);
        sink.limit = sink.limit + byteCount;
        pos = pos + byteCount; //当前索引后移
    }

owner和Shared这两个状态目前看来是完全相反的,赋值都是同步赋值的,这里有点不明白存在两个参数的意义,现在的功能主要是用来判断如果是共享就无法写,以免污染数据,会抛出异常。当然,如果要写的字节大小加上原来的字节数大于单个segment的最大值也是会抛出异常,也存在一种情况就是虽然尾节点索引和写入字节大小加起来超过,但是由于前面的pos索引可能因为read方法取出数据,pos索引后移这样导致可以容纳数据,这时就先执行移动操作,使用系统的 System.arraycopy 方法来移动到pos为0的状态,更改pos和limit索引后再在尾部写入byteCount数的数据,写完之后实际上原segment读了byteCount的数据,所以pos需要后移这么多。过程十分的清晰,比较好理解。
除了写入数据之外,segment还有一个优化的技巧,因为每个segment的片段size是固定的,为了防止经过长时间的使用后,每个segment中的数据千疮百孔,可能十分短的数据却占据了一整个segment,所以有了一个压缩机制

    public void compact(){
        if (pre == this)
            throw new IllegalStateException();
        if (!pre.owner) // pre不可写 
            return;
        int byteCount = limit - pos;
        int availableByteCount = SIZE - pre.limit + (pre.shared ? 0 : pre.pos);  //前一个的剩余大小
        if (byteCount > availableByteCount)
            return;
        writeTo(pre,byteCount);   //将数据写入到前一个的片段中
        pop();  // 从双向链表中移除当前
        SegmentPool.recycle(this);   //加入到对象池中
    }

照例如果前面是共享的那么不可写,也就不能压缩了,然后判断前一个的剩余大小是否比当前的大,有足够的空间来容纳数据,调用前面的 writeTo 方法来写数据,写完后移除当前segment,然后通过 SegmentPool 来回收。
另一个技巧就是共享机制,为了减少数据复制带来的性能开销,segment存在一个共享机制

    public Segment split(int byteCount){
        if (byteCount <= 0 || byteCount > limit - pos )
            throw new IllegalArgumentException();
        Segment prefix;
        if (byteCount >= SHARE_MINIMUM){  //如果byteCount大于最小的共享要求大小
            prefix = new Segment(this); //this这个构造函数会
        }else {
            prefix = SegmentPool.take();
            System.arraycopy(data,pos,prefix,0,byteCount);
        }
        prefix.limit = prefix.pos + byteCount;
        pos = pos + byteCount;
        pre.push(prefix);
        return  prefix;
    }

这个方法实际上经过了很多次的改变,在回顾Okio的1.6的版本时,发现有一个重要的差异就是多了一个 SHARE_MINIMUM 参数,同时也多了一个注释,为了防止一个很小的片段就进行共享,我们知道共享之后为了防止数据污染就无法写了,如果存在大片的共享小片段,实际上是很浪费资源的,所以通过这个对比可以看出这个最小数的意义,而且这个方法在1.6的版本中检索实际上只有一个地方使用了这个方法,就是Buffer中的write方法,为了效率在移动大数据的时候直接移动整个segment而不是data,这样在写数据上能达到很高的效率,具体write的细节会在Buffer一章中详细描述。
再回头看刚才的 compact 中出现的 SegmentPool ,这个实际上是一个segment的对象池

    static final long MAX_SIZE = 64 * 1024;
    static Segment next;
    static long byteCount;

同样的有一个池子的上限,也就是64k,相当于8个segment,next这个节点可以看出这个 SegmentPool 是按照单链表的方式进行存储的,byteCount则是目前已有的大小。

SegmentPool方法分析

SegmentPool的方法十分的少,一个取,一个回收,十分简洁。

    /**
     * take方法用来取数据
     * 如果池子为空就创建一个空对象 owner true | share false
     * next是链表的头 就是一个简单的取表头的操作
     * @return
     */
    static Segment take(){
        synchronized (SegmentPool.class){
            if (next != null){
                Segment result = next;
                next = result.next;
                result.next = null;
                byteCount = byteCount - Segment.SIZE;
                return result;
            }
        }
        return new Segment();
    }

为了防止多线程同时操作造成数据的错乱,这里加了锁,这里的next命名虽然是next,但是实际上是整个对象池的头,但是next为空,表示池子为空,直接返回一个空对象,否则从里面拿出next,并将next的下一个节点赋为next,置一下状态,这个方法就结束了

    /**
     * 如果当前要回收的segment有前后引用或者是共享的 那么就回收失败
     * 如果加入后的大小超过了最大大小 也会失败
     * 然后将新回收的next指向表头 也就是加到的链表的头 并且将回收的segment置为next也就是head
     * @param segment
     */
    static void recycle(Segment segment){
        if (segment.next != null || segment.pre != null)
            throw new IllegalArgumentException();
        if (segment.shared)
            return;
        synchronized (SegmentPool.class){
            if (byteCount + Segment.SIZE > MAX_SIZE){
                return;
            }
            byteCount += Segment.SIZE;
            segment.next = next;
            segment.pos = segment.limit = 0;
            next = segment;
        }
    }

如果要回收的segment有前后引用或者是共享的,就不能被回收,所以要回收前先将引用置空,同样这里也加了锁,以免那个同时回收超过池子最大的大小,然后就是将回收的插到表头的操作。
所以SegmentPool无论是回收和取对象都是在表头操作。

不可变的ByteString

我们都知道String是一个不可以改变的一个对象,那可能有人问了谁说不能改变了,明明还能做分割添加的操作,那这里就不详述了,有兴趣的可以看 Java中的String为什么是不可变的? — String源码分析 这篇文章,同样的ByteString也是一个不可变的对象,当然,java语言可没有不可变标记关键字,如果想要实现一个不可变的对象,还需要一些操作。
Effective Java一书中有一条给了不可变对象需要遵循的几条原则:

  • 不要提供任何会修改对象状态的方法
  • 保证类不会被扩展
  • 使所有的域都是final的
  • 使所有的域都是private的
  • 确保对于任何可变组件的互斥访问

不可变的对象有许多的好处,首先本质是线程安全的,不要求同步处理,也就是没有锁之类的性能问题,而且可以被自由的共享内部信息,当然坏处就是需要创建大量的类的对象。

    byte[] data;
    transient String utf8;

ByteString不仅是不可变的,同时在内部有两个filed,分别是byte数据,以及String的数据,这样能够让这个类在Byte和String转换上基本没有开销,同样的也需要保存两份引用,这是明显的空间换时间的方式,为了性能Okio做了很多的事情。
但是这个String前面有 transient 关键字标记,也就是说不会进入序列化和反序列化,所有我们看到两个方法中并没有utf8这个属性。

  private void readObject(ObjectInputStream in) throws IOException {
    int dataLength = in.readInt();
    ByteString byteString = ByteString.read(in, dataLength);
    try {
      Field field = ByteString.class.getDeclaredField("data");
      field.setAccessible(true);
      field.set(this, byteString.data);
    } catch (NoSuchFieldException e) {
      throw new AssertionError();
    } catch (IllegalAccessException e) {
      throw new AssertionError();
    }
  }

  private void writeObject(ObjectOutputStream out) throws IOException {
    out.writeInt(data.length);
    out.write(data);
  }

除此之外, ByteString 内置了不少操作,方便使用

《大概是最完全的Okio源码解析文章》 方法截选

最核心的Buffer解析

前面讲到Buffer这个类实际上就是整个读和写的核心,包括 RealBufferedSourceRealBufferedSink 实际上都只是一个代理,里面的操作全部都是通过Buffer来完成的

public class Buffer implements BufferedSource, BufferedSink, Cloneable {
long size;
Segment head;

整个Buffer持有了一个Segment的引用,通过这个引用能拿到整个链表中所有的数据。
Buffer一共实现了三个接口,读,写,以及clone。
先从最简单的clone说起,clone是一种对象生成的方式,是除了常规的new·关键字以及反序列化之外的一种方式,主要分为深拷贝和浅拷贝两种,Buffer采用的是深拷贝的方式

    public Buffer clone(){
        Buffer result = new Buffer();
        if (size == 0){
            return result;
        }
        result.head = new Segment(head);
        result.head.pre = result.head.next = result.head;
        for (Segment s = head.next ; s != head ; s = s.next){
            result.head.pre.push(new Segment(s));  //这里选择的pre上push一个segment 
        }
        result.size = size;
        return result;
    }

对应实现的clone方法,如果整个Buffer的size为null,也就是没有数据,那么就返回一个新建的Buffer对象,如果不为空就是遍历所有的segment并且都创建一个对应的Segment,这样clone出来的对象就是一个全新的毫无关系的对象。
前面分析segment的时候有讲到是一个双向循环链表,但是segment自身构造的时候却没有形成闭环,其实就是在Buffer中产生的

result.head.pre = result.head.next = result.head;

clone的过程中创建了一个双向循环链表,另外一个地方就是

    Segment writableSegment(int minimumCapacity) {
        if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE)
            throw new IllegalArgumentException();
        if (head == null) {
            head = SegmentPool.take();
            return head.next = head.pre = head;
        }
        //head 不为null 的情形
        Segment tail = head.pre;
        //如果tail会导致大于Segment的上限 或是owner为false 也就是不可写
        if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
            tail = tail.push(SegmentPool.take());  //在tail的后面插入一个空的segment
        }
        return tail;
    }

除了clone接口外,同时还有两个接口BufferedSink,BufferedSource。Buffer实现了这两个接口的所有方法,所有既然读也有写的方法,举几个例子

    @Override
    public Buffer writeShort(int s) throws IOException {
        Segment tail = writableSegment(2);

        byte[] data = tail.data;
        int limit  = tail.limit;
        data[limit++] = (byte) ((s >>> 8) & 0xff);
        data[limit++] = (byte) (s & 0xFF);
        tail.limit = limit;
        size += 2;
        return this;
    }

writeShort用来给Buffer中写入一个short的数据,首先通过writableSegment拿到一个能够有2个字节空间的segment,tail中的data就是字节数组,limit则是数据的尾部索引,写数据就是在尾部继续往后写,直接设置在data通过limit自增后的index,然后重置尾部索引,并且buffer的size大小加2。

    @Override
    public short readShort() throws IOException {
        if (size < 2)
            throw new IllegalArgumentException("size < 2");
        Segment segment = head;
        int pos = segment.pos;
        int limit = segment.limit;
        //如果short被segment分隔开  通过readByte来一个个字节读
        if (limit - pos < 2) {
            int s = (readByte() & 0xFF) << 8 | (readByte() & 0xFF);
            return (short) s;
        }
        byte[] data = segment.data;  //与readByte类似 只不过一次读两个字节再组合起来
        int s = (data[pos++] & 0xFF) << 8 | (data[pos++] & 0xFF);  //pos自增2
        size -= 2;
        if (pos == limit) {
            head = segment.pop();
            SegmentPool.recycle(segment);
        } else {
            segment.pos = pos;
        }
        return (short) s;
    }

读的方法相对于写的方法就复杂一些,因为buffer是分块的,读数据的过程就有可能是跨segment的,比如前面一个字节,下一个segment一个字节,这种情况就转化为readbyte,读两个字节后合成一个short对象,对于连续的读可以直接通过pos索引自增达到目的,读完后Buffer的size减2。
并且会有当前的segment会出现读完后数据为null的情况,此时头部索引pos和尾部索引limit就重合了,通过pop方法可以把这个segment分离出来,并且将下一个segment设置为Buffer的head,然后将分离出来的segment回收到对象池中。
鉴于篇幅原因就暂时只举出Buffer中比较有代表性的读写方式,看完这两个其他都是类似的。

后记

以上就是整个Okio核心实现的分析,篇幅比较长,能够坚持看到这里的都是值得敬佩的,Okio的整个源码干货满满,而且架构清晰,如果有时间可以通读一遍,更能理解上述文章中的分析。
最后总结一下Okio这个库的精髓,第一就是快,Okio采取了空间换时间的方式比如Segment和ByteString之类的存在来让IO操作尽可能不成为整个系统的瓶颈,虽然采取这种方式但是在内存上也是极致的优化,使用的片段共享以及整体的读写共享来加快大字节数组的读写,第二就是稳定,Okio提供了超时机制,不仅在IO操作上加上超时的判定,包括close,flush之类的方法中都有超时机制,这让上层不会错过一个可能导致系统崩溃的超时异常,第三就是方便,Sink,Source两个包装了写和读,区别于传统的IO各种不同的输入输出流,这里只有一种而且支持socket,十分的方便。当然Okio还有很多其他的好处,易于扩展,代码量小易于阅读,我想这就是许多上层库选择Okio来作为IO操作的原因。

okio sample来源
https://github.com/ut2014/Okio_1.9

    原文作者:sheepm
    原文地址: https://www.jianshu.com/p/f033a64539a1
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞