libevent中提供了一个Hello-world.c 的例子,从这个例子可以学习libevent是如何使用bufferevent的。 这个例子在Sample中 1 这个例子之前讲解过,这次主要看下bufferevent的使用。

第一步找到main函数

main函数

int main(){
   //...
listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
        LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
        (struct sockaddr*)&sin,
        sizeof(sin));

 //...

    event_base_dispatch(base);

    evconnlistener_free(listener);
    event_free(signal_event);
    event_base_free(base);

    printf("done\n");
    return 0;


}

main函数中调用evconnlistener_new_bind()创建了一个evconnlistener 类型的listener,然后拍发消息,之后释放各种资源。

第二步在evconnlistener_new_bind()中调用evconnlistener_new()完成listener属性设置。

这个函数里对evconnlistener_event中base进行回调函数的绑定和参数设置,通过event_assign将evconnlistener_event的istener设置读事件的回调函数,并且通过evconnlistener_enable让读回调函数触发,也就是触发listener_read_cb。这里evconnlister_enable调用的也是结构体注册的enable具体看代码吧,调用的是r = lev->ops->enable(lev);等同于调用event_listener_enable,该函数内部完成event_add。

struct evconnlistener_event {
    struct evconnlistener base;
    struct event listener;
};


struct evconnlistener *
evconnlistener_new(struct event_base *base,
    evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
    evutil_socket_t fd)
{
    struct evconnlistener_event *lev;//开辟evconnlistener_event大小区域
    lev = mm_calloc(1, sizeof(struct evconnlistener_event));
    if (!lev)
        return NULL;
    //lev -> base 表示  evconnlistener 
    //evconnlistener     evconnlistener_ops 基本回调参数和回调函数结构体赋值
    lev->base.ops = &evconnlistener_event_ops;
    //evconnlistener_cb 设置为listener_cb
    lev->base.cb = cb;
    //ptr表示event_base 指针
    lev->base.user_data = ptr;
    lev->base.flags = flags;
    lev->base.refcnt = 1;//  lev   is evconnlistener_event       
    //lev->listener is event
    //为lev->listener设置读回调函数和读关注事件,仅进行设置并没加入event队列
    event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
        listener_read_cb, lev);
    //实际调用了event_add将事件加入event队列
    evconnlistener_enable(&lev->base);

    return &lev->base;
}

第三步listener_read_cb内部调用accept生成新的socket处理连接,调用listener_cb

新的socket作为参数传递给evconnlistener_event中base的回调函数listener_cb

static void
listener_read_cb(evutil_socket_t fd, short what, void *p)
{
    struct evconnlistener *lev = p;
    int err;
    evconnlistener_cb cb;
    evconnlistener_errorcb errorcb;
    void *user_data;
    LOCK(lev);
    while (1) {
        //...//cb 就 是  listener_cb
        cb = lev->cb;
        user_data = lev->user_data;
        UNLOCK(lev);
        //触发了listener_cb

        //完成了eventbuffer注册写和事件函数  
        cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
            user_data);

        LOCK(lev);
        if (lev->refcnt == 1) {
            int freed = listener_decref_and_unlock(lev);
            EVUTIL_ASSERT(freed);
            return;
        }
        --lev->refcnt;
    }
  //...
}

第四步listener_cb 调用bufferevent_socket_new 生成bufferevent,

然后bufferevent_setcb设置读写水位触发的回调函数,bufferevent_enable将bufferevent的写事件加入监听,即开始检测写事件。关闭读事件,并且向outbuf中写入MSG bufferevent_socket_new内部绑定bufferevent的读写事件回调函数,读事件为bufev->ev_read,绑定了bufferevent_readcb回调函数, 写事件为bufev->ev_write, 绑定了bufferevent_writecb回调函数。这两个回调函数和bufferevent的readcb和writecb是不一样的,这两个函数在对应的读写事件激活时才触发。 而readcb和writecb是基于水位线达到阈值才会触发。做好区分。bufferevent_socket_new内部还对bufev->output添加了对调函数bufferevent_socket_outbuf_cb, bufferevent_socket_outbuf_cb内部检测是否开启写事件,以及是否可写,如果可写,同样将写事件加入监听队列,也就是调用了event_add。 bufferevent_socket_new内部解释完毕了。bufferevent_setcb设置的是读写水位达到阈值后的回调函数, bufferevent_enable内部也是调用了event_add,将读事件加入监听队列。 bufferevent_enable内部调用bufev->be_ops->enable(bufev, impl_events),等同于be_socket_enable,另外bufferevent_write函数内部调用evbuffer_add, evbuffer_add内部调用了evbuffer_invoke_callbacks,就会调用绑定在output buffer上的回调函数bufferevent_socket_outbuf_cb。

static void
listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    struct sockaddr *sa, int socklen, void *user_data)
{
    struct event_base *base = user_data;
    struct bufferevent *bev;

    bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
    if (!bev) {
        fprintf(stderr, "Error constructing bufferevent!");
        event_base_loopbreak(base);
        return;
    }
    //设置写回调和事件回调
    bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL);
    bufferevent_enable(bev, EV_WRITE);
    bufferevent_disable(bev, EV_READ);
    //将要发送的内容写入evbuffer结构
    bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
}
``` cpp

``` cpp
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
    struct bufferevent_private *bufev_p;
    struct bufferevent *bufev;

  //...//设置bufferevent中   ev_read(event类型)回调函数
    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    //设置bufferevent中   ev_write(event类型)回调函数
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    //为bufev->output(evbuffer类型)设置回调函数,插入bufferevent->output的callback队列
    //bufferevent_socket_outbuf_cb回调函数内部将ev_write事件加入事件队列
    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

    evbuffer_freeze(bufev->input, 0);
    evbuffer_freeze(bufev->output, 1);
    //...

    return bufev;
}
static int
be_socket_enable(struct bufferevent *bufev, short event)
{
    if (event & EV_READ) {
        if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
            return -1;
    }
    if (event & EV_WRITE) {
        if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
            return -1;
    }
    return 0;
}

第五步 bufferevent的output中写入MSG, 并且之前也已经将EV_WRITE事件加入监听,所以内核检测到socket可写,会通知bufferevent的ev_write,调用绑定在ev_write上的函数bufferevent_writecb。

这是bufferevent内部的写操作,我们可以详细看一下。之前也有讲过bufferevent会将接收到的数据放到inputbuffer中,将outputbuffer中的数据发送。所以之前讲过的接口bufferevent_write让我们将要发送的数据放到output中,bufferevent_read可以从input中读出bufferevent接收到的数据。

static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    int res = 0;
    short what = BEV_EVENT_WRITING;
    int connected = 0;
    ev_ssize_t atmost = -1;
    //对 bufferevent加锁,支持多线程安全模式
    _bufferevent_incref_and_lock(bufev);
    //检测是否带有超时事件
    if (event == EV_TIMEOUT) {
        /* Note that we only check for event==EV_TIMEOUT. If
         * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
         * timeout, since a read has occurred */
        what |= BEV_EVENT_TIMEOUT;
        goto error;
    }
    //判断是否是连接事件
    if (bufev_p->connecting) {
        int c = evutil_socket_finished_connecting(fd);
        /* we need to fake the error if the connection was refused
         * immediately - usually connection to localhost on BSD */
        if (bufev_p->connection_refused) {
          bufev_p->connection_refused = 0;
          c = -1;
        }

        if (c == 0)
            goto done;

        bufev_p->connecting = 0;
        //连接失败删除该事件
        if (c < 0) {
            event_del(&bufev->ev_write);
            event_del(&bufev->ev_read);
            _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
            goto done;
        } else {
            connected = 1;
            //windows情况下直接运行事件回调函数,然后go done
#ifdef WIN32
            if (BEV_IS_ASYNC(bufev)) {
                event_del(&bufev->ev_write);
                bufferevent_async_set_connected(bufev);
                _bufferevent_run_eventcb(bufev,
                        BEV_EVENT_CONNECTED);
                goto done;
            }
#endif
    //linux 下 运行事件回调函数
            _bufferevent_run_eventcb(bufev,
                    BEV_EVENT_CONNECTED);
        //检测是否可写,不可写删除该事件
            if (!(bufev->enabled & EV_WRITE) ||
                bufev_p->write_suspended) {
                event_del(&bufev->ev_write);
                goto done;
            }
        }
    }
    //计算bufferevent能写的最大数量
    atmost = _bufferevent_get_write_max(bufev_p);
    //写事件挂起了,跳过。
    if (bufev_p->write_suspended)
        goto done;

    //output非空
    if (evbuffer_get_length(bufev->output)) {
        //将output的头打开,从头部发送
        evbuffer_unfreeze(bufev->output, 1);
        //bufferevent调用写操作,将outbuffer中的内容发送出去
        res = evbuffer_write_atmost(bufev->output, fd, atmost);
        //将output的头部关闭
        evbuffer_freeze(bufev->output, 1);
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case
               XXXX Actually, a 0 on write doesn't indicate
               an EOF. An ECONNRESET might be more typical.
             */
             //写完了
            what |= BEV_EVENT_EOF;
        }
        if (res <= 0)
            goto error;
        //bufferevent减少发送的大小,留下未发送的,下次再发送,因为是PERSIST|WRITE
        //所以会在下次检测到可写时候继续写
        _bufferevent_decrement_write_buckets(bufev_p, res);
    }

    //计算是否将outbuf中的内容发送完,发完了就删除写事件
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }

    /*
     * Invoke the user callback if our buffer is drained or below the
     * low watermark.
     */
     //将buffer中的内容发完,或者低于low 水位,那么调用用户注册的写回调函数
    //之前注册在bufev->writecb中的回调函数
    if ((res || !connected) &&
        evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
        _bufferevent_run_writecb(bufev);
    }

    goto done;

 reschedule:
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }
    goto done;

 error:
    bufferevent_disable(bufev, EV_WRITE);
    _bufferevent_run_eventcb(bufev, what);

 done:
    _bufferevent_decref_and_unlock(bufev);
}

第六步:这个函数内部每次尽可能多的发送数据,

没有发送完就下次轮询继续发送,直到水位低于或等于写数据的低水位,那么就会触发bufferevent低水位写回调函数。也就是conn_writecb,

在conn_writecb内部检测output buffer中数据为空,就释放该bufferevent。

static void
conn_writecb(struct bufferevent *bev, void *user_data)
{
    struct evbuffer *output = bufferevent_get_output(bev);
    if (evbuffer_get_length(output) == 0) {
        printf("flushed answer\n");
        bufferevent_free(bev);
    }
}

这就是整体流程,bufferevent内部的流畅看懂即可,我们只需要使用libevent提供的接口即可。

results matching ""

    No results matching ""