epoll多路复用监控套接字事件

epoll多路复用监控套接字事件

epoll水平触发模式,监控监听套接字和连接套接字。

// 编译 gcc tepoll.c -lpthread -Wall


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/errno.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <signal.h>

#define MAX_EVENTS 10
#define BUF_MALLOC_SIZE 1*1024*1024
#define WRITE_DATA_CACHE_SIZE 1024


/*
 * 缓存空间控制结构
 */
typedef struct _buf_pend_t {                                                                                                                                                                           
    char* buf;
    int read_offset;
    int write_offset;
    int end_chk_offset;
    int left_write;
    int buf_size;
} buf_pend_t ;


/*
 * 注册给事件监听的数据,用来监听可读事件。
 * 即,当可读事件触发后,将此数据通过struct epoll_event结构的data字段返回到处理函数
 */
typedef struct _my_evnt_data_t {
    int conn_fd ;                 // 客户端来连接时,服务端维持连接打开的socket
    int epoll_fd ;
    buf_pend_t buf_pend ;    // 数据缓存管理
    int send_data_len ;        // 待发送数据长度
    char * send_data ;     // 发送的数据
    void * resource ;        // 其他用于处理数据的资源,注意:临界资源处理互斥处理
} my_evnt_data_t ;


// 绑定的IP地址
const char * bind_host = "0.0.0.0" ;
// 端口
const int bind_port = 6666 ;


/*
 * 创建监听端口
 */
int create_listen() {
    int listen_fd = 0;
    struct sockaddr_in sin;
    int flags = 0;
    struct linger linger_tmp;

    // 1、设置绑定的 网络地址 和 端口
    memset(&sin, 0x00, sizeof(sin));
    sin.sin_family = AF_INET;
    inet_aton(bind_host, &(sin.sin_addr));
    sin.sin_port = htons(bind_port);

    // 2、创建套接字
    if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        printf("create listen failed !!\n");
        return -1;
    }

    // 3、设置socket相关属性
    // 3.1、非阻塞
    flags = fcntl(listen_fd, F_GETFL, 0);
    fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK);

    // 3.2、文件描述符复用
    flags = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));

    // 3.3、设置在close后的超时发送时间
    linger_tmp.l_onoff = 1;
    linger_tmp.l_linger = 0;
    setsockopt(listen_fd, SOL_SOCKET, SO_LINGER, &linger_tmp, sizeof(linger_tmp));

    // 4、绑定
    if (bind(listen_fd, (struct sockaddr*)(&sin), sizeof(struct sockaddr)) == -1) {
        printf("bind listen fd failed !!\n");
        return -1;
    }

    // 5、启动监听
    if (listen(listen_fd, 4096) == 1) {
        printf("listen failed !!\n");
        return -1;
    }

    return listen_fd ;
}


/*
 * 设置套接字属性
 */
int set_client_fd_attr(int conn_fd){
    int flags = 0;
    // 1、设置非阻塞
    flags = fcntl(conn_fd, F_GETFL, 0);
    fcntl(conn_fd, F_SETFL, flags | O_NONBLOCK);

    // 2、设置文件描述符复用
    flags = 1;
    setsockopt(conn_fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));

    // 3、设置在close后的超时发送时间
    struct linger linger_tmp;
    linger_tmp.l_onoff = 1;
    linger_tmp.l_linger = 0;
    setsockopt(conn_fd, SOL_SOCKET, SO_LINGER, &linger_tmp, sizeof(linger_tmp));
    
    // 4、设置发送和接收超时时间
    struct timeval tv = {30, 0};
    setsockopt(conn_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
    setsockopt(conn_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));

    // 5、设置保活机制,空闲发起时间、发送间隔、无响应尝试次数
    int keepidle = 20, keepintval = 2, keepcnt = 5, keepalive = 1;
    setsockopt(conn_fd, SOL_TCP, TCP_KEEPIDLE, &keepidle, sizeof(keepidle));
    setsockopt(conn_fd, SOL_TCP, TCP_KEEPINTVL, &keepintval, sizeof(keepintval));
    setsockopt(conn_fd, SOL_TCP, TCP_KEEPCNT, &keepcnt, sizeof(keepcnt));
    setsockopt(conn_fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive));

    return 0 ;
}


/*
 * 关闭客户端套接字
 * 删除空间
 */
void free_ev_data(my_evnt_data_t * ev_data) {
    printf("free_ev_data : conn_fd[%d]\n", ev_data->conn_fd) ;
    if ( ev_data != NULL ) {
        // 1、关闭
        close(ev_data->conn_fd) ;
        // 2、删除接收数据缓存
        if( ev_data->buf_pend.buf ){
            free( ev_data->buf_pend.buf );
        }
        // 3、删除发送数据缓存
        if( ev_data->send_data ){
            free( ev_data->send_data ) ;
        }
        free(ev_data) ;
    }
}


/*
 * 设置事件发生后返回的数据结构
 */
my_evnt_data_t * set_client_deal_data(int conn_fd, int epoll_fd, void * rsc) {
    // 1、申请数据处理结构
    my_evnt_data_t * ev_data = (my_evnt_data_t *)calloc(1, sizeof(my_evnt_data_t)) ;
    if ( ev_data == NULL) {
        printf("ev_data calloc NULL\n");
        return NULL;
    }

    // 2、申请接收数据缓存
    ev_data->buf_pend.buf = (char*)calloc(1,BUF_MALLOC_SIZE);
    if (ev_data->buf_pend.buf == NULL) {
        printf("ev_data->buf_pend.buf calloc NULL\n");
        free_ev_data(ev_data);
        return NULL ;
    }

    ev_data->buf_pend.buf_size = BUF_MALLOC_SIZE ;
    ev_data->buf_pend.left_write = BUF_MALLOC_SIZE ;
    ev_data->conn_fd = conn_fd ;                 // 客户端来连接时,服务端维持连接打开的socket
    ev_data->epoll_fd = epoll_fd ;
    ev_data->resource = rsc ;

    return ev_data ;
}


/*
 * 添加conn_fd事件监听
 */
int add_event_to_epoll(my_evnt_data_t * ev_data, int events) {

    int epoll_fd = ev_data->epoll_fd ;
    int conn_fd = ev_data->conn_fd ; 
    // 初始化监听时间变量
    struct epoll_event one_event;
    memset(&one_event, 0x00, sizeof(struct epoll_event));
    one_event.events = events ;  // 设置监控的事件 出错、挂起、可读 默认水平触发
    one_event.data.ptr = ev_data  ;                     // 把刚申请的用于返回的数据结传给 data.ptr 指针

    // 设置conn_fd事件监听,注册事件发生时返回的数据结构one_event
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &one_event) < 0) {
        fprintf(stderr, "set conn_fd into epoll failed!!\n");
        return -1 ;
    }

    return 0 ;
}


/*
 * 连接处理
 */
int accept_connect(int listen_fd, int epoll_fd, void * rsc) {
    struct sockaddr_in sin;
    int conn_fd = 0;
    int len = sizeof(struct sockaddr);

    int ret = 0;

    // 1、先接收连接
    if ((conn_fd = accept(listen_fd, (struct sockaddr*)(&sin), (socklen_t*)&len)) < 0) {
        fprintf(stderr,"accept failed !! errno:%d errstr:%s\n", errno, strerror(errno));
        return -1;
    }

    // 2、设置套接字属性
    set_client_fd_attr(conn_fd);

    // 3、设置事件发生后返回的数据结构
    my_evnt_data_t * ev_data = set_client_deal_data(conn_fd, epoll_fd, rsc) ;
    if(ev_data == NULL) {
        ret = -1 ;
        goto ERR_END ;
    }

    // 4、加入套接字监听
    ret = add_event_to_epoll(ev_data, EPOLLERR | EPOLLHUP | EPOLLIN) ;
    if(ret != 0 ) {
        ret = -2 ;
        goto ERR_END ;
    }

    // printf("set data.ptr addr [%lu] socket[%d]\n", (unsigned long)ev_data, conn_fd ) ;

    return ret ;

ERR_END:
    if(ev_data) free_ev_data(ev_data);
    return ret;
}


/*
 * 调整缓冲区读、写、剩余的偏移地址
 */
int adjust_buf_pend_offset(buf_pend_t * buf_pend) {
    int unprocess_bytes = 0 ;
    int end_chk_bytes = 0 ;
    if (buf_pend->read_offset == buf_pend->write_offset) {  // 数据全部处理完
        buf_pend->left_write = buf_pend->buf_size;

        buf_pend->read_offset = 0;
        buf_pend->write_offset = 0;
        buf_pend->end_chk_offset = 0;
    } else if (buf_pend->read_offset > 0) {  // 有一部分数据已经处理完,需要调整

        // 挪动位置
        unprocess_bytes = buf_pend->write_offset - buf_pend->read_offset;
        end_chk_bytes = buf_pend->end_chk_offset - buf_pend->read_offset;
        memcpy(buf_pend->buf, buf_pend->buf + buf_pend->read_offset, unprocess_bytes);

        // 重新调整偏移
        buf_pend->write_offset = unprocess_bytes ;
        buf_pend->read_offset = 0;
        buf_pend->end_chk_offset = end_chk_bytes ;

        // 重新计算剩余空间
        buf_pend->left_write = buf_pend->buf_size - unprocess_bytes;
    }
    return 0 ;
}


/*
 * 检测结束符
 */
const char * END_FLAG = "##&&" ;
int check_end_flag(char * buf_start) {

    for(int i=0 ; i < strlen(END_FLAG) ; ++i ) {
        if(buf_start[i] != END_FLAG[i]) return 0 ; 
    }
    return 1 ;
}


/*
 * 读数据的列表
 */
typedef struct _data_node_t {
    char * data ;
    struct _data_node_t * next ;
} data_node_t ;


/*
 * 数据处理
 * 此处仅打印,根据业务需求自行处理
 */
int deal_data_real(char * one_data) {
    static int id = 0 ;
    printf("one_data : [%.5s]==len[%lu] id [%d]\n", one_data, strlen(one_data), ++id ) ;
    return 0 ;
}


/*
 * 设置返回的数据
 */
int set_send_event(my_evnt_data_t * ev_data, int count) {

    int ret = 0 ;
    // 1、初始化可写监听返回的数据
    ev_data->send_data = (char *)calloc(1, WRITE_DATA_CACHE_SIZE+strlen(END_FLAG)+1) ;
    if( ev_data->send_data == NULL ) {
        fprintf(stderr, "no enough memory [ev_data->data]");
        return -1 ;
    }

    snprintf(ev_data->send_data, WRITE_DATA_CACHE_SIZE, "Just got [%d] data", count) ;
    ev_data->send_data_len = strlen(ev_data->send_data) ;
    snprintf(ev_data->send_data+ev_data->send_data_len, strlen(END_FLAG)+2, "%s\n", END_FLAG) ;
    ev_data->send_data_len = strlen(ev_data->send_data) ;

    ret = add_event_to_epoll(ev_data, EPOLLOUT ) ;
    if( ret != 0 ) {
        return -1 ;    
    }
    return 0 ;
}


/*
 * 处理数据
 */
int deal_data( my_evnt_data_t * ev_data, data_node_t * head, void* resp ) {
    int ret = 0 ;
    data_node_t * deal_node = head ;
    int *count = (int*) resp;
    
    // 逐个处理数据
    while( deal_node ) {
        ret = deal_data_real(deal_node->data) ;
        if ( ret != 0 ) break ;
        (*count) ++ ;
        deal_node = deal_node->next ;
    }

    return ret ;
}


/*
 * 识别结束符,把数据捡出来
 */
void * check_out_data_and_deal(void* arg) {
    my_evnt_data_t * ev_data = (my_evnt_data_t*) arg ;
    int ret = 0 ;
    char * one_data = NULL ;

    buf_pend_t * buf_pend = &(ev_data->buf_pend) ;
    char * buf_start = &buf_pend->buf[buf_pend->end_chk_offset] ;
    char * buf_begine =  &buf_pend->buf[buf_pend->read_offset] ;
    char * buf_end = &buf_pend->buf[buf_pend->write_offset] - strlen(END_FLAG) ;

    data_node_t * data_node = NULL, * head = NULL, * tail = NULL, * tmp_node = NULL ;
    while(buf_start <= buf_end ) {
        // 检测结束符
        if( check_end_flag(buf_start) == 1 ) {
            // 读出数据,在处理线程里释放
            data_node = (data_node_t*)calloc(1, sizeof(data_node_t) ) ;
            if(data_node == NULL ) {
                   fprintf(stderr, "no enough space data_node\n") ;
                goto ERR_END ;
            }

            // 加入到列表中
            if( head == NULL ) {
                head = data_node ;
                tail = data_node ;
            } else {
                tail->next = data_node ;
                tail = data_node ;
            }
            // 申请存储数据的空间
            one_data = (char *)calloc(1, buf_start + strlen(END_FLAG) - buf_begine + 1) ;
            if(one_data == NULL ) {
                fprintf(stderr, "no enough space one_data\n") ;
                goto ERR_END ;
            }
            data_node->data = one_data ;
            memcpy(one_data, buf_begine, buf_start + strlen(END_FLAG) - buf_begine ) ;

            // 向后跳过结束符
            buf_start += strlen(END_FLAG) ;
            // 重新计算 读偏移
            buf_pend->read_offset = buf_start - buf_pend->buf ;
            buf_begine =  &buf_pend->buf[buf_pend->read_offset] ;
        } else {
            buf_start ++ ;
        }
    }
    // 调整检测过的数据偏移量
    buf_pend->end_chk_offset = buf_start - buf_pend->buf ;

    int count = 0 ;
    // 处理数据
    if( head != NULL ) {
        ret = deal_data(ev_data, head, &count) ;
        if(ret != 0 ) {
            free_ev_data(ev_data) ;
            goto ERR_END ;
        }
    }

    // 用完了,逐个删除
    tmp_node = head ; 
    while(tmp_node) {
         tmp_node = head->next ;
         if(head->data) free(head->data) ;
         free(head) ;
        head = tmp_node ;
    }

     // 设置返回的数据
    ret = set_send_event(ev_data, count) ;
    if( ret != 0 ) {
        // 设置失败,则关闭conn_fd并清理数据
        free_ev_data(ev_data) ;
        return NULL ;
    }

    return NULL ;

ERR_END :
    // 清理申请的数据
    tmp_node = head ; 
    while(tmp_node != NULL ) {
         tmp_node = head->next ;
         if(head->data) free(head->data) ;
         free(head) ;
        head = tmp_node ;
    }
    return NULL ; 
}


/*
 * 读数据处理
 */
int read_data(my_evnt_data_t * ev_data) {
    // printf("ev_data addr [%lu] socket[%d]\n", (unsigned long) ev_data, ev_data->conn_fd ) ;
    buf_pend_t * buf_pend = &ev_data->buf_pend ;
    int bytes = 0 ;
    volatile int flag = 0 ; 
    while ((bytes = recv(ev_data->conn_fd, (void*)(buf_pend->buf + buf_pend->write_offset), buf_pend->left_write, 0)) > 0) {
        // printf("read : data bytes[%d]\n", bytes) ;
        flag = 1 ; 
        buf_pend->write_offset += bytes;
        buf_pend->left_write -= bytes;
    }
    if ( bytes == 0 ) {
        if( flag == 0 ) {        // 连接关闭的情况
            return -1 ;
        }
    } else if ( bytes < 0 ) {
        if (EINTR != errno && EWOULDBLOCK != errno && EAGAIN != errno) {  // 如果是多个监听端口,errno会相互覆盖,可以直接关闭
            fprintf(stderr, "connect fd [%d] error, will close later! \n", ev_data->conn_fd);
            return -1;
        }
    }
    // 调整偏移量
    adjust_buf_pend_offset(buf_pend) ;
    return 0 ;
}


/*
 * 实际发送数据
 */
int send_data(my_evnt_data_t * ev_data) {
    int ret = 0 ;
    int total_num = 0 ;
    int once_num = 0 ;
    char * send_p = ev_data->send_data ;
    while(1) {
        once_num = send(ev_data->conn_fd, send_p, ev_data->send_data_len - total_num, 0) ; 
        if (once_num < 0) {
            if (EINTR != errno && EWOULDBLOCK != errno && EAGAIN != errno) {  // 如果是多个监听端口,errno会相互覆盖,可以直接关闭
                fprintf(stderr, "connect fd [%d] error, will close later! \n", ev_data->conn_fd);
                return -1;
            }
            goto END ;
        } else {
            total_num += once_num ;
            send_p += once_num ;
        }

        if( total_num >= ev_data->send_data_len ) {
            goto END ;
        }
    }

END:
    free(ev_data->send_data) ;
    ev_data->send_data = NULL ;
    ev_data->send_data_len = 0 ;
    return ret ;
}


/*
 * epoll事件监听服务--接收数据
 */
int epoll_recv_server(int listen_fd, void * rsc) {

    int ret = 0 ;
    int epoll_fd = 0;
    struct epoll_event one_event;
    struct epoll_event event_list[MAX_EVENTS];

    // 1、创建epoll句柄(文件描述符)
    epoll_fd = epoll_create(MAX_EVENTS);

    // 2、把listen_fd 加入到 epoll 的监控
    // 2.1、初始化第一个事件,即监听事件被epoll监控起来。
    memset(&one_event, 0x00, sizeof(struct epoll_event)) ;
    one_event.events = EPOLLIN | EPOLLET ;        // 设置被监听的事件是:可读事件,设置水平触发
    one_event.data.fd = listen_fd;     // 预先设置要返回的数据,当事件发生时,返回的 struct epoll_event 结构的.data.fd为这个值。
    
    // 2.2、把listen_fd监听加入到事件监控,监听事件发生时返回一个数据结构 struct epoll_event
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &one_event) < 0) {
        fprintf(stderr, "set listen_fd into epoll failed!!\n");
        return -1;
    }

    struct epoll_event * event_p ;
    my_evnt_data_t * ev_data ;

    // 设置线程属性,可分离
    pthread_t pthid ;
    pthread_attr_t pth_attr ;
    pthread_attr_init(&pth_attr) ;
    pthread_attr_setdetachstate(&pth_attr, PTHREAD_CREATE_DETACHED) ;
    // 3、开始循环监控事件
    // 3.1、初始化事件列表,清空所有的数据。
    memset(event_list, 0x00, sizeof(struct epoll_event) * MAX_EVENTS);
    while (1) {
        // 3.2、超时等待事件。
        // 监控到有事件发生,则会把注册时(epoll_ctl函数的EPOLL_CTL_ADD参数添加的)的数据结构返回。
        ret = epoll_wait(epoll_fd, event_list, MAX_EVENTS, 50) ;
        // 4、开始对返回值进行分析
        if (ret < 0) {
            // 4.1、报错处理
            if (EINTR == errno) {
                continue;
            }
            printf("epoll failed!!\n");
            return -1;
        } else if (0 == ret) {
            // 4.2、没有事件,超时了。继续下个循环等待监听
            continue;
        }
        // 5、进入下边的代码,则说明发生了监听的事件。遍历所有的事件
        for (int i = 0; i < ret; i++) {
            printf("\n==============================\n") ;
            event_p = &event_list[i] ;
            if (event_p->data.fd == listen_fd) {
                // 5.1、listen_fd的事件,说明有新的连接过来了。进入对事件的处理。
                accept_connect(listen_fd, epoll_fd, rsc);
            } else {
                ev_data = (my_evnt_data_t *)event_p->data.ptr ;
                // 非 listen_fd 的事件处理
                // 5.2、包含可读事件
                if ( event_p->events & EPOLLIN) {
                    printf("epoll get[%lu] read conn_fd[%d]\n", (unsigned long) event_p, ((my_evnt_data_t *)( event_p->data.ptr))->conn_fd) ;
                    // 5.2.1、先删除事件监听
                    if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev_data->conn_fd, event_p ) < 0) {
                        if (EINTR == errno) {
                            continue;
                        }
                        printf("delete conn_fd from epoll failed!!\n");
                    }
                    // 5.2.2、获取数据,并做基本处理,不做业务处理,(只负责数据的存储移动等)
                    if (read_data( ev_data ) != 0) {
                        // 释放申请的内存,关闭conn_fd
                        free_ev_data(ev_data);
                        continue;
                    }
                    // 5.2.3、检测数据完整性并处理
                    if( pthread_create(&pthid, &pth_attr, check_out_data_and_deal, (void*) ev_data) != 0 ) {
                        printf("pthread_create failed!!") ;
                        // 线程创建不成功,释放申请的内存,关闭conn_fd
                        free_ev_data( ev_data );
                        continue ;
                    }
                }
                // 5.3、包含可写事件
                if ( event_p->events & EPOLLOUT) {
                    printf("epoll get[%lu] write conn_fd[%d]\n", (unsigned long) event_p, ev_data->conn_fd) ;
                    // 5.3.1、去掉对conn_fd的事件监听
                    if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev_data->conn_fd, event_p) < 0) {
                        if (EINTR == errno) {
                            continue;
                        }
                        printf("delete conn_fd from epoll failed!!\n");
                    }
                    // 5.3.2、执行数据发送
                    if ( send_data(ev_data) != 0) {
                        // 释放申请的内存,关闭conn_fd
                        free_ev_data( ev_data );
                        continue;
                    }
                    // 5.3.3、继续监听可写事件
                    if( add_event_to_epoll( ev_data, EPOLLERR | EPOLLHUP | EPOLLIN ) != 0 ) {
                        // 释放申请的内存,关闭conn_fd
                        free_ev_data( ev_data );
                        continue;
                    }
                }
                // 5.4、包含挂断 和 错误事件
                if ( event_p->events & (EPOLLHUP | EPOLLERR)) {
                    printf("epoll get[%lu] error conn_fd[%d]\n", (unsigned long) event_p, ((my_evnt_data_t *)( event_p->data.ptr))->conn_fd) ;
                    // 5.4.1、从epoll监听事件集中删除对应文件描述符的监听
                    if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev_data->conn_fd, event_p) < 0) {
                        if (EINTR == errno) {
                            continue;
                        }
                        printf("delete conn_fd from epoll failed!!\n");
                    }
                    // 5.4.2、释放申请的内存,关闭conn_fd
                    free_ev_data( ev_data );
                    continue;
                }
            }
        }
    }
    pthread_attr_destroy(&pth_attr) ;
    close(listen_fd);
    close(epoll_fd);
}



void hand(int signo) {
    printf("signo [%d]\n", signo) ;
}

int main(){

    int ret = 0 ;
    void * rsc = NULL ; 
    int listen_fd = create_listen() ;
    
    signal(SIGPIPE, hand) ;
    ret = epoll_recv_server(listen_fd, rsc) ;
    exit(ret) ;

    return 0 ;
}

发起测试,红色内容为输入。

服务端输出情况

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注