【libuv高效编程】libuv学习超详细教程9——libuv async异步句柄解读

libuv系列文章

async handle

async handle可译为异步句柄,它主要是用于提供异步唤醒的功能,比如在用户线程中唤醒主事件循环线程,并且触发对应的回调函数。

从事件循环线程的处理过程可知,它在io循环时会进入阻塞状态,而阻塞的具体时间则通过计算得到,那么在某些情况下,我们想要唤醒事件循环线程,就可以通过ansyc去操作,比如当线程池的线程处理完事件后,执行的结果是需要交给事件循环线程的,这时就需要用到唤醒事件循环线程,当然方法也是很简单,调用一下uv_async_send()函数通知事件循环线程即可。libuv线程池中的线程就是利用这个机制和主事件循环线程通讯。

数据结构

通过 uv_async_t 可以定义一个async handle的实例。

typedef struct uv_async_s uv_async_t;

uv_async_t是属于handle的子类,并且还包含了一个UV_ASYNC_PRIVATE_FIELDS数据类型,里面包括它的回调函数async_cb、队列、以及一个pending成员变量。

结构比较简单,async_cb 保存回调函数指针,queue 作为队列节点插入 loop->async_handles,pending 字段的作用主要是用于保护操作,在唤醒的时候使用,初始化为 0, 在调用唤醒函数的时候会被设置为 1,为什么要这样子做呢,因为async handle是异步句柄,这可能不止一个线程会尝试唤醒事件循环,这一个async handle不能同时被多个线程操作,因此需要进行原子保护,当它为1的时候表示有其他线程在操作这个async handle。

struct uv_async_s {
  UV_HANDLE_FIELDS
  UV_ASYNC_PRIVATE_FIELDS
};


#define UV_ASYNC_PRIVATE_FIELDS                                               \
  uv_async_cb async_cb;                                                       \
  void* queue[2];                                                             \
  int pending;                                                                \

uv_async_cb回调函数:

typedef void (*uv_async_cb)(uv_async_t* handle);

API

async handle相关的API非常简单,就一个初始化uv_async_init(),还有一个异步通知的函数uv_async_send()

uv_async_init()

函数原型:

UV_EXTERN int uv_async_init(uv_loop_t*,
                            uv_async_t* async,
                            uv_async_cb async_cb);

参数:

  • uv_loop_t:传入了事件循环的句柄。
  • async:指定初始化的async handle。
  • async_cb:指定async handle的回调函数。

uv_async_init()初始化函数不同于其他handle的初始化函数,因为它会立即将async handle设置为活跃状态,所以 async handle 没有 start 相关的函数。

其实深入看uv__async_start()源码你就会发现,它实际上也是通过pipe管道进行唤醒的,因为主线程的io循环其实是在观察是否有可读写的io,libuv将管道等都抽象为io观察者了,在io循环中观察管道的读取端,当有数据到来则唤醒,越是深入了解libuv,你就会发现其实就是各个线程、进程间的通信,一种异步的通信,只不过libuv处理的很好,抽象了很多数据结构,并且衍生出了很多子类。

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
  int err;

  /* 其实这个函数的内部主要做了以下操作:创建一个管道,并且将管道注册到loop->async_io_watcher,并且start */
  err = uv__async_start(loop);
  if (err)
    return err;

  /* 设置UV_HANDLE_REF标记,并且将async handle 插入loop->handle_queue */
  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);

  /* 注册回调函数、将 pending 字段初始化为0 */
  handle->async_cb = async_cb;
  handle->pending = 0;

  /* queue 作为队列节点插入 loop->async_handles */
  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  uv__handle_start(handle);

  return 0;
}

static int uv__async_start(uv_loop_t* loop) {
  int pipefd[2];
  int err;

  if (loop->async_io_watcher.fd != -1)
    return 0;

/* 创建管道 */
#ifdef __linux__
  err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
  if (err < 0)
    return UV__ERR(errno);

  pipefd[0] = err;
  pipefd[1] = -1;
#else
  err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
  if (err < 0)
    return err;
#endif

  /* 初始化并启动了 loop->async_io_watcher */
  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
  loop->async_wfd = pipefd[1];

  return 0;
}

uv_async_send()

函数原型:

int uv_async_send(uv_async_t* handle)

参数:

  • handle:要唤醒指定的async handle。

uv_async_send()函数发送消息唤醒事件循环线程并触发回调函数调用,其实我们不难想象出来,它的唤醒就是将消息写入管道中,让io观察者发现管道有数据从而唤醒事件循环线程,并随之处理这个async handle。

其实真正的唤醒操作是uv__async_send()函数,它往管道中写入消息就行了。

int uv_async_send(uv_async_t* handle) {

  if (ACCESS_ONCE(int, handle->pending) != 0)
    return 0;

  /* 将handle->pending设置为1,表示此时有线程在操作这个async handle */
  if (cmpxchgi(&handle->pending, 0, 1) != 0)
    return 0;

  /* 唤醒事件循环线程 */
  uv__async_send(handle->loop);

  /* 操作完成 */
  if (cmpxchgi(&handle->pending, 1, 2) != 1)
    abort();

  return 0;
}

static void uv__async_send(uv_loop_t* loop) {
  const void* buf;
  ssize_t len;
  int fd;
  int r;

  buf = "";
  len = 1;
  fd = loop->async_wfd;

#if defined(__linux__)
  if (fd == -1) {
    static const uint64_t val = 1;
    buf = &val;
    len = sizeof(val);
    fd = loop->async_io_watcher.fd;  /* eventfd */
  }
#endif

  do
    r = write(fd, buf, len);
  while (r == -1 && errno == EINTR);

  if (r == len)
    return;

  if (r == -1)
    if (errno == EAGAIN || errno == EWOULDBLOCK)
      return;

  abort();
}

async的处理过程

前面所介绍的都是初始化与通知的方式,那么在事件循环中怎么去处理async handle呢?

我们注意到uv_async_init()函数可能多次被调用,初始化多个async handle,但是 loop->async_io_watcher 只有一个,那么问题来了,那么多个async handle都共用一个io观察者(假设loop是一个),那么在 loop->async_io_watcher 上有 I/O 事件时,并不知道是哪个 async handle 发送的,因此我们要知道async handle是如何处理这些的。

我们也知道从uv__io_init()函数中已经注册了一个uv__async_io()函数用于处理loop->async_io_watcher 的 I/O 事件,那么我们就看uv__async_io()函数的处理过程即可:

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  char buf[1024];
  ssize_t r;
  QUEUE queue;
  QUEUE* q;
  uv_async_t* h;

  assert(w == &loop->async_io_watcher);

  for (;;) {

    /* 不断的读取 w->fd 上的数据到 buf 中直到为空,buf 中的数据无实际用途 */
    r = read(w->fd, buf, sizeof(buf));

    if (r == sizeof(buf))
      continue;

    /* 读取到数据跳出循环 */
    if (r != -1)
      break;

    if (errno == EAGAIN || errno == EWOULDBLOCK)
      break;

    if (errno == EINTR)
      continue;

    abort();
  }

  QUEUE_MOVE(&loop->async_handles, &queue);

  /* 遍历队列 */
  while (!QUEUE_EMPTY(&queue)) {
    q = QUEUE_HEAD(&queue);

    /* 获取async handle */
    h = QUEUE_DATA(q, uv_async_t, queue);

    QUEUE_REMOVE(q);

    /* 重新插入队列 */
    QUEUE_INSERT_TAIL(&loop->async_handles, q);

    if (0 == uv__async_spin(h))
      continue;  /* Not pending. */

    if (h->async_cb == NULL)
      continue;

    /* 调用async 的回调函数 */
    h->async_cb(h);
  }
}

example

接着来一个例子吧,都是非常简单的,创建1个线程,在事件循环中等待async handle。

#include <stdio.h>
#include <unistd.h>

#include <uv.h>

void wake_entry(void *arg) 
{
    sleep(5);

    printf("wake_entry running, wake async!\n");

    uv_async_send((uv_async_t*)arg);

    uv_stop(uv_default_loop());
}

void my_async_cb(uv_async_t* handle)
{
    printf("my async running!\n");
}

int main() 
{
    uv_thread_t wake;
    uv_async_t async;

    uv_async_init(uv_default_loop(), &async, my_async_cb);

    uv_thread_create(&wake, wake_entry, &async);

    uv_run(uv_default_loop(), UV_RUN_DEFAULT);

    uv_thread_join(&wake);

    return 0;
}

参考

libuv官方文档

例程代码获取

libuv-learning-code


 上一篇
【libuv高效编程】libuv学习超详细教程11 —— libuv stream 流句柄解读 【libuv高效编程】libuv学习超详细教程11 —— libuv stream 流句柄解读
libuv系列文章 【libuv高效编程】libuv学习超详细教程1——libuv的编译与安装 【libuv高效编程】libuv学习超详细教程2——libuv框架初窥 【libuv高效编程】libuv学习超详细教程3——libuv事件循
2020-04-24
下一篇 
【libuv高效编程】libuv学习超详细教程8—— signal 信号句柄解读 【libuv高效编程】libuv学习超详细教程8—— signal 信号句柄解读
libuv系列文章 【libuv高效编程】libuv学习超详细教程1——libuv的编译与安装 【libuv高效编程】libuv学习超详细教程2——libuv框架初窥 【libuv高效编程】libuv学习超详细教程3——libuv事件循
2020-04-22
  目录