libev监听IO事件

libev监听IO事件

介绍

libev是一个高性能的事件循环库,用于管理异步事件。它通过给事件注册回调函数的方式实现并发,方便开发高效的事件驱动程序。支持 epoll、poll、select、kqueue、port等多种实现机制。

libev可以处理以下事件:

  • I/O 事件 (ev_io):监控文件描述符上的读写事件。这类事件常用于网络编程,监听socket连接的可读或可写状态。
  • 定时器事件 (ev_timer):在指定的时间间隔后触发一次。可以用来实现延时操作或周期性任务。
  • 周期性事件 (ev_periodic):在特定的时间点触发,可以用来处理更加复杂的定时任务,比如每小时或每天的某个固定时间点触发。
  • 信号事件 (ev_signal):当进程接收到特定的信号时触发。例如,可以监听SIGINT(通常是Ctrl+C产生的中断信号)或SIGTERM(请求终止进程的信号)。
  • 孩子进程事件 (ev_child):当子进程的状态发生变化时触发,比如子进程退出或停止。这对于管理子进程非常有用。
  • 闲置事件 (ev_idle):当事件循环没有其他待处理的事件时触发。这类事件适合用来执行低优先级的任务。
  • 嵌入式事件 (ev_embed):用于嵌入另一个事件循环。这种事件类型可以用来将一个事件循环嵌入到另一个事件循环中,实现更复杂的事件处理逻辑。
  • 统计事件 (ev_stat):监控文件系统中某个文件的状态变化,比如文件大小的变化。这可以用来监控文件是否存在或其大小是否有改变。

此次给出的示例给出的是 I/O 事件 的应用,以后有时间了把其他事件的应用示例也都测试给出。

示例

本次示例是一个回显服务器,客户端给服务端发送什么数据,服务端按照原样返回给客户端,示例的完整过程是:

1、打开一个socket监听,绑定在 0.0.0.0:3344 端口。

2、把监听套接字给 loop 监听,等待 connect 事件发生。对于监听套接字来说就是增加 EV_READ 事件监听,也就是使用 ev_io_init 方法给监听套接字增加 EV_READ 事件的回调函数。

3、在第二步的回调函数里,通过accept方法,获取客户端套接字,对此套接字添加 EV_READ 事件监听,这样在客户端发送数据时,就会被触发。

4、客户端套接字发生 EV_READ 事件时,调用回调函数,通过 read 获取到客户端发送过来的数据。

5、对客户端套接字添加 EV_WRITE 事件,增加回调函数,把需要发送的数据通过 ev_io->data 字段传递给回调函数。

6、添加完 EV_WRITE 事件监听后,在客户端套接字可写的时候(实际上大部分时间,客户端套接字都是可写的)就会调用回调函数,在回调函数里,把数据发送出去。

代码tevio.c

// gcc -Wall -o test tevio.c -lev

#include <ev.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

/***********************
 * 写事件的回调函数
 ***********************/
void write_cb(EV_P_ ev_io *w, int revents) {
    int fd = w->fd;   // 客户端连接fd
    char * buffer = (char *)w->data ;

    if (revents & EV_WRITE) {  // 写事件
        ssize_t nwritten = write(fd, buffer, strlen(buffer));
        if (nwritten < 0) { // 写报错
            perror("write");
            close(fd);
        } else {
            printf("send data: %s", buffer);
        }
        // 停止监听
        ev_io_stop(EV_A_ w);
        // 销毁 data 指向的空间
        free(buffer) ;
        // 销毁 ev_io 指向的空间
        free(w) ;
    }
}

/***********************
 * 读事件的回调函数
 ***********************/
void read_cb(EV_P_ ev_io *w, int revents) {
    int fd = w->fd;
    char buffer[24];
    ssize_t nread;

    if (revents & EV_READ) {
        nread = read(fd, buffer, sizeof(buffer) - 1);
        if (nread > 0) {
            buffer[nread] = '\0';
            printf("recv data: %s", buffer);

            // 创建新的 ev_io 结构 watcher 设置写事件 监听
            // 当客户端套接字可写时,就会立即 发送 数据
            // 实际上,客户端套接字经常处于可写状态
            ev_io *write_watcher = (ev_io *)malloc(sizeof(ev_io));
            // 把数据给到 ev_io 写监听事件
            write_watcher->data = strdup(buffer) ;
            // 初始化写事件监听
            ev_io_init(write_watcher, write_cb, fd, EV_WRITE);
            ev_io_start(EV_A_ write_watcher);

        } else if (nread == 0) { // 断开连接
            printf("Client disconnected\n");
            // 停止事件监听
            ev_io_stop(EV_A_ w);
            // 关闭套接字
            close(fd);
            // 释放申请的 ev_io 句柄空间
            free(w);
        } else {         // 读数据 出错
            perror("read");
            // 停止事件监听
            ev_io_stop(EV_A_ w);
            // 关闭套接字
            close(fd);
            // 释放申请的 ev_io 句柄空间
            free(w);
        }
    }
}

/***************************************
 * 新连接的回调函数
 * EV_P_ 的定义
 * #define EV_P_ struct ev_loop *loop,
 * EV_A_ 的定义
 * #define EV_A_ struct ev_loop *loop,
 ****************************************/
void accept_cb(EV_P_ ev_io *w, int revents) {

    int listener_fd = w->fd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    int client_fd;

    // 事件分析
    if (revents & EV_READ) {
        // 接收连接
        client_fd = accept(listener_fd, (struct sockaddr *)&client_addr, &client_len);
        if (client_fd < 0) {
            perror("accept");
            return;
        }
        printf("New connection from %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

        // 对连接的客户端套接字 client_fd 增加读事件监听
        ev_io *client_watcher = (ev_io *)malloc(sizeof(ev_io));
        // 初始化读事件监听
        ev_io_init(client_watcher, read_cb, client_fd, EV_READ);
        // 启动事件监听
        ev_io_start(EV_A_ client_watcher);
    }
}


/************************************
 * 开启监听
 * 返回值:
 *            -1 : 创建失败
 *          >0 : 监听套接字 fd
 *************************************/
int creat_listener() {

    // 创建监听套接字
    int listener_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listener_fd < 0) {
        perror("socket");
        return -1 ;
    }

    // 创建监听地址
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(3344);

    // 绑定监听地址和端口
    if (bind(listener_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
        perror("bind");
        close(listener_fd);
        return -1 ;
    }

    // 启动监听
    if (listen(listener_fd, SOMAXCONN) < 0) {
        perror("listen");
        close(listener_fd);
        return -1 ;
    }
    // 返回监听文件描述符
    return listener_fd ;
}


// 主程序
int main() {

     // 开启监听
    int listener_fd = creat_listener() ;
    if ( listener_fd < 0 ) return -1 ;

    // 初始化 loop, 指定使用 EPOLL 模式监听事件
    struct ev_loop *loop = ev_default_loop(EVBACKEND_EPOLL);

    // 创建 ev_io watcher 监听连接请求
    ev_io accept_watcher;

    // 初始化连接事件监听
    ev_io_init(&accept_watcher, accept_cb, listener_fd, EV_READ);
    // 启动监听连接事件
    ev_io_start(loop, &accept_watcher);

    // 运行事件循环
    printf("Server started, listening on port 3344\n");

    // 启动事件监听 loop 
    ev_run(loop, 0);

    // 清理 监听事件
    ev_io_stop(loop, &accept_watcher);
    // 关闭监听套接字
    close(listener_fd);

    return 0;
}

编译测试

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注