【libuv高效编程】libuv学习超详细教程10 —— libuv stream 流句柄解读

libuv系列文章

stream handle

stream handle可以被译为流句柄,它在 libuv 中是一个抽象的数据类型,为 libuv 提供了全双工的通信方式,可以说它只是一个父类,通过它派生出 uv_tcp_t、uv_pipe_t、uv_tty_t 这 3 个子类,在这些handle中,都使用了stream handle 的成员变量及处理方法。

数据结构

通过 uv_stream_t 可以定义一个 stream handle的实例。

typedef struct uv_stream_s uv_stream_t;
struct uv_stream_s {
  UV_HANDLE_FIELDS
  UV_STREAM_FIELDS
};

#define UV_STREAM_FIELDS                                                      \
  /* 等待写的字节数 */                                                        \
  size_t write_queue_size;                                                    \
  /* 分配内存的函数 */                                                        \
  uv_alloc_cb alloc_cb;                                                       \
  /* 读取完成时候执行的回调函数 */                                            \
  uv_read_cb read_cb;                                                         \
  /* private */                                                               \
  UV_STREAM_PRIVATE_FIELDS

其实 stream handle 是属于handle的子类,因此它的数据结构中包含了handle的成员变量,还包含它自身的一个成员变量 UV_STREAM_FIELDS ,它分为公有字段与私有字段,公有字段只有 write_queue_size、 alloc_cb 、 read_cb,私有字段就是 UV_STREAM_PRIVATE_FIELDS ,它是分为Windows平台与linux平台的,此处以linux为例。

#define UV_STREAM_PRIVATE_FIELDS                                              \
  uv_connect_t *connect_req;                                                  \
  uv_shutdown_t *shutdown_req;                                                \
  uv__io_t io_watcher;                                                        \
  void* write_queue[2];                                                       \
  void* write_completed_queue[2];                                             \
  uv_connection_cb connection_cb;                                             \
  int delayed_error;                                                          \
  int accepted_fd;                                                            \
  void* queued_fds;                                                           \
  UV_STREAM_PRIVATE_PLATFORM_FIELDS                                           \
  • connect_req:其实 uv_connect_t 是一个请求,从前面的文章我们也知道,在libuv存在 handlerequest,很明显connect_req就是一个请求,它的作用就是请求建立连接,比如类似建立tcp连接。

  • shutdown_req:uv_shutdown_t也是一个请求,它的作用与uv_connect_t刚好相反,关闭一个连接。

  • io_watcher:抽象出来的io观察者

  • write_queue:写数据队列。

  • write_completed_queue:完成的写数据队列。

  • connection_cb:有新连接时的回调函数。

  • delayed_error:延时的错误代码。

  • accepted_fd:接受连接的描述符 fd

  • queued_fds:fd 队列,可能有多个fd在排队。

  • UV_STREAM_PRIVATE_PLATFORM_FIELDS:目前为空。

其实现在不太了解无所谓,就先看下去,我在写文章的时候其实也没有完全理解透彻。

总结一下它的框架示意图,如下:

libuv007

内部API

stream handle其实并未提供用户的API接口,但提供了内部的API接口,供子类使用,比如在创建一个tcp的时候,就会通过uv__stream_init()函数去初始化一个 stream handle,又比如在读写流操作的时候肯定是通过stream handle去操作的,因此它又需要实现内部的读写操作接口,相关的函数如下:

void uv__stream_init(uv_loop_t* loop,
                     uv_stream_t* stream,
                     uv_handle_type type);

static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream);
static void uv__read(uv_stream_t* stream);
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static void uv__write_callbacks(uv_stream_t* stream);
static size_t uv__write_req_size(uv_write_t* req);

uv__stream_init()

初始化一个stream handle,设置

函数原型:

void uv__stream_init(uv_loop_t* loop,
                     uv_stream_t* stream,
                     uv_handle_type type);

参数:

  • loop:传入了事件循环的句柄。
  • stream:指定初始化的stream handle
  • type:指定stream handle的类型,注意看它的类型参数是handle类型的,而handle类型有很多,但是对与这个stream handle来说可选的值基本上只有UV_TCP、UV_TTY、UV_PIPE

源码的实现:

void uv__stream_init(uv_loop_t* loop,
                     uv_stream_t* stream,
                     uv_handle_type type) {
  int err;

  uv__handle_init(loop, (uv_handle_t*)stream, type);
  stream->read_cb = NULL;
  stream->alloc_cb = NULL;
  stream->close_cb = NULL;
  stream->connection_cb = NULL;
  stream->connect_req = NULL;
  stream->shutdown_req = NULL;
  stream->accepted_fd = -1;
  stream->queued_fds = NULL;
  stream->delayed_error = 0;
  QUEUE_INIT(&stream->write_queue);
  QUEUE_INIT(&stream->write_completed_queue);
  stream->write_queue_size = 0;

  if (loop->emfile_fd == -1) {
    err = uv__open_cloexec("/dev/null", O_RDONLY);
    if (err < 0)
        /* In the rare case that "/dev/null" isn't mounted open "/"
         * instead.
         */
        err = uv__open_cloexec("/", O_RDONLY);
    if (err >= 0)
      loop->emfile_fd = err;
  }

#if defined(__APPLE__)
  stream->select = NULL;
#endif /* defined(__APPLE_) */

  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

说说处理的逻辑:

  1. 调用uv__handle_init()函数将stream handle初始化,主要设置loop类型、以及UV_HANDLE_REF标记。

  2. 初始化stream handle中的成员变量。

  3. 初始化write_queuewrite_completed_queue队列,可能有人有疑问了,为啥要写队列还要 写完成 两个队列,因为啊libuv是为了实现异步,写操作为了实现异步非阻塞,你不能直接写,你得通过写队列去操作,它会首先将数据丢到队列中,下层 io 观察者触发可写事件时才去写入,当写完了就告诉你。

  4. 最后调用uv__io_init()函数去初始化io观察者,并设置stream的回调处理函数uv__stream_io(),这个处理回调函数后续慢慢讲解吧,先来看看stream handle 的读写操作。

uv__read()

io观察者发现stream handle有可读事件时,uv__read()函数会被调用,其实是被uv__stream_io()函数调用,因为io观察者发现了底层有数据可读。所以该函数是用于从底层读取数据,这也是stream handle的读取操作。

uv__read()函数是通过 read() 函数从底层文件描述符读取数据,读取的数据写入由 stream->alloc_cb 分配到内存块中,并在完成读取后由 stream->read_cb 回调函数传递到用户。因为数据已经由底层准备好,直接读取即可,效率非常高,是不需要等待的。而当底层没有数据的情况时,read() 系统调用也会阻塞,而是直接返回,因为文件描述符工作在非阻塞模式下,即使底层还没有数据,它也不会阻塞的,而真正阻塞的地方是在io循环中。

简单看看函数源码吧:

static void uv__read(uv_stream_t* stream) {
  uv_buf_t buf;
  ssize_t nread;
  struct msghdr msg;
  char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
  int count;
  int err;
  int is_ipc;

  stream->flags &= ~UV_HANDLE_READ_PARTIAL;

  /* Prevent loop starvation when the data comes in as fast as (or faster than)
   * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
   */
  count = 32;

  /* 看看是不是管道,IPC通讯机制 */
  is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;

  /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
   * tcp->read_cb is NULL or not?
   */
  while (stream->read_cb
      && (stream->flags & UV_HANDLE_READING)
      && (count-- > 0)) {
    assert(stream->alloc_cb != NULL);

    buf = uv_buf_init(NULL, 0);

    /* 分配内存 */
    stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);

    if (buf.base == NULL || buf.len == 0) {

      /* 如果内存分配失败或者buffer的长度是0则无法读取 */
      stream->read_cb(stream, UV_ENOBUFS, &buf);
      return;
    }

    /* 断言,保证有内存空间与stream handle的文件描述符是存在的 */
    assert(buf.base != NULL);
    assert(uv__stream_fd(stream) >= 0);

    /* 如果不是pipe */
    if (!is_ipc) {
      do {
        /* 通过read()去读取底层数据 */
        nread = read(uv__stream_fd(stream), buf.base, buf.len);
      }
      /* 直到读取完毕 */
      while (nread < 0 && errno == EINTR);
    } else {
      /* ipc 需要使用 recvmsg() 函数去读取 */
      msg.msg_flags = 0;
      msg.msg_iov = (struct iovec*) &buf;
      msg.msg_iovlen = 1;
      msg.msg_name = NULL;
      msg.msg_namelen = 0;
      /* Set up to receive a descriptor even if one isn't in the message */
      msg.msg_controllen = sizeof(cmsg_space);
      msg.msg_control = cmsg_space;

      do {
        /* 读取ipc机制的数据 */
        nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
      }
      while (nread < 0 && errno == EINTR);
    }

    /* 读取数据错误 */
    if (nread < 0) {
      /* Error */
      if (errno == EAGAIN || errno == EWOULDBLOCK) {

        /* 开始下一次的等待. */
        if (stream->flags & UV_HANDLE_READING) {

          /* 重新设置io观察者活跃 */
          uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
          uv__stream_osx_interrupt_select(stream);
        }
        stream->read_cb(stream, 0, &buf);
#if defined(__CYGWIN__) || defined(__MSYS__)
      } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
        uv__stream_eof(stream, &buf);
        return;
#endif
      } else {
        /* 错误,用户应该调用uv_close()关闭 */
        stream->read_cb(stream, UV__ERR(errno), &buf);
        if (stream->flags & UV_HANDLE_READING) {
          stream->flags &= ~UV_HANDLE_READING;
          uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
          if (!uv__io_active(&stream->io_watcher, POLLOUT))
            uv__handle_stop(stream);
          uv__stream_osx_interrupt_select(stream);
        }
      }
      return;
    } else if (nread == 0) {
      uv__stream_eof(stream, &buf);
      return;
    } else {
      /* 成功读取到数据 */
      ssize_t buflen = buf.len;

      /* ipc就这样子读取 */
      if (is_ipc) {
        err = uv__stream_recv_cmsg(stream, &msg);
        if (err != 0) {
          stream->read_cb(stream, err, &buf);
          return;
        }
      }

#if defined(__MVS__)
      if (is_ipc && msg.msg_controllen > 0) {
        uv_buf_t blankbuf;
        int nread;
        struct iovec *old;

        blankbuf.base = 0;
        blankbuf.len = 0;
        old = msg.msg_iov;
        msg.msg_iov = (struct iovec*) &blankbuf;
        nread = 0;
        do {
          nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
          err = uv__stream_recv_cmsg(stream, &msg);
          if (err != 0) {
            stream->read_cb(stream, err, &buf);
            msg.msg_iov = old;
            return;
          }
        } while (nread == 0 && msg.msg_controllen > 0);
        msg.msg_iov = old;
      }
#endif

      /* 通过回调函数告诉应用层读取完成 */
      stream->read_cb(stream, nread, &buf);

      /* 如果没有填满缓冲区,则返回没有更多数据可读取。 */
      if (nread < buflen) {
        stream->flags |= UV_HANDLE_READ_PARTIAL;
        return;
      }
    }
  }
}

uv__write()

同理地,当 io 观察者发现要写入数据的时候,它也会去将数据写入到底层,函数 uv__write() 会被调用,那什么时候才是可写呢,回顾 stream handle 的成员变量,它有两个队列,当 stream->write_queue 队列存在数据时,表示可以写入,如果队列为空则表示没有数据可以写。

libuv的异步处理都是差不多的,都是通过io观察者去发现是否有可读可写,写数据的过程大致如下:用户将数据丢到写队列中就直接返回了,io观察者发现队列有数据,stream handle 的处理 uv__stream_io()函数被调用,开始写入操作,这个写入的操作是依赖系统的函数接口的,比如write()等,等写完了就通知用户即可。

源码的实现:

static void uv__write(uv_stream_t* stream) {
  struct iovec* iov;
  QUEUE* q;
  uv_write_t* req;
  int iovmax;
  int iovcnt;
  ssize_t n;
  int err;

start:

  /* 健壮性的处理,断言,确保存在stream handle的fd、队列存在等 */
  assert(uv__stream_fd(stream) >= 0);

  if (QUEUE_EMPTY(&stream->write_queue))
    return;

  q = QUEUE_HEAD(&stream->write_queue);
  req = QUEUE_DATA(q, uv_write_t, queue);
  assert(req->handle == stream);

  /* 转换为iovec。我们必须拥有自己的uv_buf_t而不是iovec,因为Windows的WSABUF不是iovec。 */
  assert(sizeof(uv_buf_t) == sizeof(struct iovec));
  iov = (struct iovec*) &(req->bufs[req->write_index]);
  iovcnt = req->nbufs - req->write_index;

  iovmax = uv__getiovmax();

  /* 限制iov计数以避免来自writev()的EINVAL */
  if (iovcnt > iovmax)
    iovcnt = iovmax;

  if (req->send_handle) {
    int fd_to_send;
    struct msghdr msg;
    struct cmsghdr *cmsg;
    union {
      char data[64];
      struct cmsghdr alias;
    } scratch;

    if (uv__is_closing(req->send_handle)) {
      err = UV_EBADF;
      goto error;
    }

    fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);

    memset(&scratch, 0, sizeof(scratch));

    assert(fd_to_send >= 0);

    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = iovcnt;
    msg.msg_flags = 0;

    msg.msg_control = &scratch.alias;
    msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));

    cmsg = CMSG_FIRSTHDR(&msg);
    cmsg->cmsg_level = SOL_SOCKET;
    cmsg->cmsg_type = SCM_RIGHTS;
    cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));

    /* silence aliasing warning */
    {
      void* pv = CMSG_DATA(cmsg);
      int* pi = pv;
      *pi = fd_to_send;
    }

    do
      n = sendmsg(uv__stream_fd(stream), &msg, 0);
    while (n == -1 && RETRY_ON_WRITE_ERROR(errno));

    /* Ensure the handle isn't sent again in case this is a partial write. */
    if (n >= 0)
      req->send_handle = NULL;
  } else {
    do
      /* 写操作 */
      n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
    while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
  }

  if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
    err = UV__ERR(errno);
    goto error;
  }

  if (n >= 0 && uv__write_req_update(stream, req, n)) {
    uv__write_req_finish(req);
    return;  /* TODO(bnoordhuis) Start trying to write the next request. */
  }

  /* If this is a blocking stream, try again. */
  if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
    goto start;

  /* 重新启动io观察者. */
  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);

  /* Notify select() thread about state change */
  uv__stream_osx_interrupt_select(stream);

  return;

error:
  req->error = err;
  uv__write_req_finish(req);
  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  if (!uv__io_active(&stream->io_watcher, POLLIN))
    uv__handle_stop(stream);
  uv__stream_osx_interrupt_select(stream);
}

uv__stream_io()

uv__stream_io() 函数是 stream handle 的事件处理函数,它在uv__io_init()函数就被注册了,在调用 uv__stream_io() 函数时,传递了事件循环对象、io 观察者对象、事件类型等信息。

我们来看看stream handle是如何处理可读写事件的:

  1. 通过container_of()函数获取 stream handle 的实例,其实是计算出来的。

  2. 如果 stream->connect_req存在,说明 该 stream handle 需要进行连接,于是调用 uv__stream_connect() 函数请求建立连接。

  3. 满足可读取数据的条件,调用uv__read()函数进行数据读取

  4. 如果满足流结束条件 调用 uv__stream_eof() 进行相关处理。

  5. 如果满足可写条件,调用 uv__write() 函数去写入数据,当然,数据会被放在 stream->write_queue 队列中。

  6. 在写完数据后,调用 uv__write_callbacks() 函数去清除队列的数据,并通知应用层已经写完了。

static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  uv_stream_t* stream;

  /* 获取 stream handle 的实例 */
  stream = container_of(w, uv_stream_t, io_watcher);

  /* 断言,判断是否满足类型 */
  assert(stream->type == UV_TCP ||
         stream->type == UV_NAMED_PIPE ||
         stream->type == UV_TTY);
  assert(!(stream->flags & UV_HANDLE_CLOSING));

  if (stream->connect_req) {
    /* 如果需要建立连接,则请求建立连接 */
    uv__stream_connect(stream);
    return;
  }

  /* 断言 */
  assert(uv__stream_fd(stream) >= 0);

  /* 满足读数据条件,进行数据读取 */
  if (events & (POLLIN | POLLERR | POLLHUP))
    uv__read(stream);

  /* read_cb 可能会关闭 stream,此处判断一下是否需要关闭fd */
  if (uv__stream_fd(stream) == -1)
    return;  /* read_cb closed stream. */

  /* 如果满足流结束条件 调用 uv__stream_eof() 进行相关处理。 */
  if ((events & POLLHUP) &&
      (stream->flags & UV_HANDLE_READING) &&
      (stream->flags & UV_HANDLE_READ_PARTIAL) &&
      !(stream->flags & UV_HANDLE_READ_EOF)) {
    uv_buf_t buf = { NULL, 0 };
    uv__stream_eof(stream, &buf);
  }

  if (uv__stream_fd(stream) == -1)
    return;  /* read_cb closed stream. */

  /* 如果有数据要写入,则调用uv__write()去写数据,写完了调用uv__write_callbacks()函数 */
  if (events & (POLLOUT | POLLERR | POLLHUP)) {
    uv__write(stream);
    uv__write_callbacks(stream);

    /* Write queue drained. */
    if (QUEUE_EMPTY(&stream->write_queue))
      uv__drain(stream);
  }
}

uv__write_callbacks()

清理 stream->write_completed_queue 已完成写请求的队列,清理空间,并调用回调函数。

static void uv__write_callbacks(uv_stream_t* stream) {
  uv_write_t* req;
  QUEUE* q;
  QUEUE pq;

  if (QUEUE_EMPTY(&stream->write_completed_queue))
    return;

  /* 从写完成队列获取队列 */
  QUEUE_MOVE(&stream->write_completed_queue, &pq);

  /* 队列不为空 */
  while (!QUEUE_EMPTY(&pq)) {
    /* 获取队列头部节点 */
    q = QUEUE_HEAD(&pq);
    req = QUEUE_DATA(q, uv_write_t, queue);
    QUEUE_REMOVE(q);
    uv__req_unregister(stream->loop, req);

    /* 清除并释放内存 */
    if (req->bufs != NULL) {
      stream->write_queue_size -= uv__write_req_size(req);
      if (req->bufs != req->bufsml)
        uv__free(req->bufs);
      req->bufs = NULL;
    }

    /* 通知应用层 */
    if (req->cb)
      req->cb(req, req->error);
  }
}

外部API

内容较多,在下一章讲解吧。

参考

libuv官方文档

例程代码获取

libuv-learning-code


 上一篇
闲谈:我为什么热爱开源。 闲谈:我为什么热爱开源。
hello!公众号的兄弟姐妹们大家好,我们今天来聊一聊我热爱的事情——开源。 回想几年前,我刚学C语言的时候,觉得这东西很好玩,也很有兴趣,居然能让我操控电脑,虽然能做的东西不对,也就打印点字符在屏幕上,但这确实却让我喜欢上了它,也许以前没
2020-05-17
下一篇 
【libuv高效编程】libuv学习超详细教程11 —— libuv stream 流句柄解读 【libuv高效编程】libuv学习超详细教程11 —— libuv stream 流句柄解读
libuv系列文章 【libuv高效编程】libuv学习超详细教程1——libuv的编译与安装 【libuv高效编程】libuv学习超详细教程2——libuv框架初窥 【libuv高效编程】libuv学习超详细教程3——libuv事件循
2020-04-24
  目录