epoll水平触发模式,监控监听套接字和连接套接字。
// 编译 gcc tepoll.c -lpthread -Wall
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/errno.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <signal.h>
#define MAX_EVENTS 10
#define BUF_MALLOC_SIZE 1*1024*1024
#define WRITE_DATA_CACHE_SIZE 1024
/*
* 缓存空间控制结构
*/
typedef struct _buf_pend_t {
char* buf;
int read_offset;
int write_offset;
int end_chk_offset;
int left_write;
int buf_size;
} buf_pend_t ;
/*
* 注册给事件监听的数据,用来监听可读事件。
* 即,当可读事件触发后,将此数据通过struct epoll_event结构的data字段返回到处理函数
*/
typedef struct _my_evnt_data_t {
int conn_fd ; // 客户端来连接时,服务端维持连接打开的socket
int epoll_fd ;
buf_pend_t buf_pend ; // 数据缓存管理
int send_data_len ; // 待发送数据长度
char * send_data ; // 发送的数据
void * resource ; // 其他用于处理数据的资源,注意:临界资源处理互斥处理
} my_evnt_data_t ;
// 绑定的IP地址
const char * bind_host = "0.0.0.0" ;
// 端口
const int bind_port = 6666 ;
/*
* 创建监听端口
*/
int create_listen() {
int listen_fd = 0;
struct sockaddr_in sin;
int flags = 0;
struct linger linger_tmp;
// 1、设置绑定的 网络地址 和 端口
memset(&sin, 0x00, sizeof(sin));
sin.sin_family = AF_INET;
inet_aton(bind_host, &(sin.sin_addr));
sin.sin_port = htons(bind_port);
// 2、创建套接字
if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
printf("create listen failed !!\n");
return -1;
}
// 3、设置socket相关属性
// 3.1、非阻塞
flags = fcntl(listen_fd, F_GETFL, 0);
fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK);
// 3.2、文件描述符复用
flags = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
// 3.3、设置在close后的超时发送时间
linger_tmp.l_onoff = 1;
linger_tmp.l_linger = 0;
setsockopt(listen_fd, SOL_SOCKET, SO_LINGER, &linger_tmp, sizeof(linger_tmp));
// 4、绑定
if (bind(listen_fd, (struct sockaddr*)(&sin), sizeof(struct sockaddr)) == -1) {
printf("bind listen fd failed !!\n");
return -1;
}
// 5、启动监听
if (listen(listen_fd, 4096) == 1) {
printf("listen failed !!\n");
return -1;
}
return listen_fd ;
}
/*
* 设置套接字属性
*/
int set_client_fd_attr(int conn_fd){
int flags = 0;
// 1、设置非阻塞
flags = fcntl(conn_fd, F_GETFL, 0);
fcntl(conn_fd, F_SETFL, flags | O_NONBLOCK);
// 2、设置文件描述符复用
flags = 1;
setsockopt(conn_fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
// 3、设置在close后的超时发送时间
struct linger linger_tmp;
linger_tmp.l_onoff = 1;
linger_tmp.l_linger = 0;
setsockopt(conn_fd, SOL_SOCKET, SO_LINGER, &linger_tmp, sizeof(linger_tmp));
// 4、设置发送和接收超时时间
struct timeval tv = {30, 0};
setsockopt(conn_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(conn_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
// 5、设置保活机制,空闲发起时间、发送间隔、无响应尝试次数
int keepidle = 20, keepintval = 2, keepcnt = 5, keepalive = 1;
setsockopt(conn_fd, SOL_TCP, TCP_KEEPIDLE, &keepidle, sizeof(keepidle));
setsockopt(conn_fd, SOL_TCP, TCP_KEEPINTVL, &keepintval, sizeof(keepintval));
setsockopt(conn_fd, SOL_TCP, TCP_KEEPCNT, &keepcnt, sizeof(keepcnt));
setsockopt(conn_fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive));
return 0 ;
}
/*
* 关闭客户端套接字
* 删除空间
*/
void free_ev_data(my_evnt_data_t * ev_data) {
printf("free_ev_data : conn_fd[%d]\n", ev_data->conn_fd) ;
if ( ev_data != NULL ) {
// 1、关闭
close(ev_data->conn_fd) ;
// 2、删除接收数据缓存
if( ev_data->buf_pend.buf ){
free( ev_data->buf_pend.buf );
}
// 3、删除发送数据缓存
if( ev_data->send_data ){
free( ev_data->send_data ) ;
}
free(ev_data) ;
}
}
/*
* 设置事件发生后返回的数据结构
*/
my_evnt_data_t * set_client_deal_data(int conn_fd, int epoll_fd, void * rsc) {
// 1、申请数据处理结构
my_evnt_data_t * ev_data = (my_evnt_data_t *)calloc(1, sizeof(my_evnt_data_t)) ;
if ( ev_data == NULL) {
printf("ev_data calloc NULL\n");
return NULL;
}
// 2、申请接收数据缓存
ev_data->buf_pend.buf = (char*)calloc(1,BUF_MALLOC_SIZE);
if (ev_data->buf_pend.buf == NULL) {
printf("ev_data->buf_pend.buf calloc NULL\n");
free_ev_data(ev_data);
return NULL ;
}
ev_data->buf_pend.buf_size = BUF_MALLOC_SIZE ;
ev_data->buf_pend.left_write = BUF_MALLOC_SIZE ;
ev_data->conn_fd = conn_fd ; // 客户端来连接时,服务端维持连接打开的socket
ev_data->epoll_fd = epoll_fd ;
ev_data->resource = rsc ;
return ev_data ;
}
/*
* 添加conn_fd事件监听
*/
int add_event_to_epoll(my_evnt_data_t * ev_data, int events) {
int epoll_fd = ev_data->epoll_fd ;
int conn_fd = ev_data->conn_fd ;
// 初始化监听时间变量
struct epoll_event one_event;
memset(&one_event, 0x00, sizeof(struct epoll_event));
one_event.events = events ; // 设置监控的事件 出错、挂起、可读 默认水平触发
one_event.data.ptr = ev_data ; // 把刚申请的用于返回的数据结传给 data.ptr 指针
// 设置conn_fd事件监听,注册事件发生时返回的数据结构one_event
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &one_event) < 0) {
fprintf(stderr, "set conn_fd into epoll failed!!\n");
return -1 ;
}
return 0 ;
}
/*
* 连接处理
*/
int accept_connect(int listen_fd, int epoll_fd, void * rsc) {
struct sockaddr_in sin;
int conn_fd = 0;
int len = sizeof(struct sockaddr);
int ret = 0;
// 1、先接收连接
if ((conn_fd = accept(listen_fd, (struct sockaddr*)(&sin), (socklen_t*)&len)) < 0) {
fprintf(stderr,"accept failed !! errno:%d errstr:%s\n", errno, strerror(errno));
return -1;
}
// 2、设置套接字属性
set_client_fd_attr(conn_fd);
// 3、设置事件发生后返回的数据结构
my_evnt_data_t * ev_data = set_client_deal_data(conn_fd, epoll_fd, rsc) ;
if(ev_data == NULL) {
ret = -1 ;
goto ERR_END ;
}
// 4、加入套接字监听
ret = add_event_to_epoll(ev_data, EPOLLERR | EPOLLHUP | EPOLLIN) ;
if(ret != 0 ) {
ret = -2 ;
goto ERR_END ;
}
// printf("set data.ptr addr [%lu] socket[%d]\n", (unsigned long)ev_data, conn_fd ) ;
return ret ;
ERR_END:
if(ev_data) free_ev_data(ev_data);
return ret;
}
/*
* 调整缓冲区读、写、剩余的偏移地址
*/
int adjust_buf_pend_offset(buf_pend_t * buf_pend) {
int unprocess_bytes = 0 ;
int end_chk_bytes = 0 ;
if (buf_pend->read_offset == buf_pend->write_offset) { // 数据全部处理完
buf_pend->left_write = buf_pend->buf_size;
buf_pend->read_offset = 0;
buf_pend->write_offset = 0;
buf_pend->end_chk_offset = 0;
} else if (buf_pend->read_offset > 0) { // 有一部分数据已经处理完,需要调整
// 挪动位置
unprocess_bytes = buf_pend->write_offset - buf_pend->read_offset;
end_chk_bytes = buf_pend->end_chk_offset - buf_pend->read_offset;
memcpy(buf_pend->buf, buf_pend->buf + buf_pend->read_offset, unprocess_bytes);
// 重新调整偏移
buf_pend->write_offset = unprocess_bytes ;
buf_pend->read_offset = 0;
buf_pend->end_chk_offset = end_chk_bytes ;
// 重新计算剩余空间
buf_pend->left_write = buf_pend->buf_size - unprocess_bytes;
}
return 0 ;
}
/*
* 检测结束符
*/
const char * END_FLAG = "##&&" ;
int check_end_flag(char * buf_start) {
for(int i=0 ; i < strlen(END_FLAG) ; ++i ) {
if(buf_start[i] != END_FLAG[i]) return 0 ;
}
return 1 ;
}
/*
* 读数据的列表
*/
typedef struct _data_node_t {
char * data ;
struct _data_node_t * next ;
} data_node_t ;
/*
* 数据处理
* 此处仅打印,根据业务需求自行处理
*/
int deal_data_real(char * one_data) {
static int id = 0 ;
printf("one_data : [%.5s]==len[%lu] id [%d]\n", one_data, strlen(one_data), ++id ) ;
return 0 ;
}
/*
* 设置返回的数据
*/
int set_send_event(my_evnt_data_t * ev_data, int count) {
int ret = 0 ;
// 1、初始化可写监听返回的数据
ev_data->send_data = (char *)calloc(1, WRITE_DATA_CACHE_SIZE+strlen(END_FLAG)+1) ;
if( ev_data->send_data == NULL ) {
fprintf(stderr, "no enough memory [ev_data->data]");
return -1 ;
}
snprintf(ev_data->send_data, WRITE_DATA_CACHE_SIZE, "Just got [%d] data", count) ;
ev_data->send_data_len = strlen(ev_data->send_data) ;
snprintf(ev_data->send_data+ev_data->send_data_len, strlen(END_FLAG)+2, "%s\n", END_FLAG) ;
ev_data->send_data_len = strlen(ev_data->send_data) ;
ret = add_event_to_epoll(ev_data, EPOLLOUT ) ;
if( ret != 0 ) {
return -1 ;
}
return 0 ;
}
/*
* 处理数据
*/
int deal_data( my_evnt_data_t * ev_data, data_node_t * head, void* resp ) {
int ret = 0 ;
data_node_t * deal_node = head ;
int *count = (int*) resp;
// 逐个处理数据
while( deal_node ) {
ret = deal_data_real(deal_node->data) ;
if ( ret != 0 ) break ;
(*count) ++ ;
deal_node = deal_node->next ;
}
return ret ;
}
/*
* 识别结束符,把数据捡出来
*/
void * check_out_data_and_deal(void* arg) {
my_evnt_data_t * ev_data = (my_evnt_data_t*) arg ;
int ret = 0 ;
char * one_data = NULL ;
buf_pend_t * buf_pend = &(ev_data->buf_pend) ;
char * buf_start = &buf_pend->buf[buf_pend->end_chk_offset] ;
char * buf_begine = &buf_pend->buf[buf_pend->read_offset] ;
char * buf_end = &buf_pend->buf[buf_pend->write_offset] - strlen(END_FLAG) ;
data_node_t * data_node = NULL, * head = NULL, * tail = NULL, * tmp_node = NULL ;
while(buf_start <= buf_end ) {
// 检测结束符
if( check_end_flag(buf_start) == 1 ) {
// 读出数据,在处理线程里释放
data_node = (data_node_t*)calloc(1, sizeof(data_node_t) ) ;
if(data_node == NULL ) {
fprintf(stderr, "no enough space data_node\n") ;
goto ERR_END ;
}
// 加入到列表中
if( head == NULL ) {
head = data_node ;
tail = data_node ;
} else {
tail->next = data_node ;
tail = data_node ;
}
// 申请存储数据的空间
one_data = (char *)calloc(1, buf_start + strlen(END_FLAG) - buf_begine + 1) ;
if(one_data == NULL ) {
fprintf(stderr, "no enough space one_data\n") ;
goto ERR_END ;
}
data_node->data = one_data ;
memcpy(one_data, buf_begine, buf_start + strlen(END_FLAG) - buf_begine ) ;
// 向后跳过结束符
buf_start += strlen(END_FLAG) ;
// 重新计算 读偏移
buf_pend->read_offset = buf_start - buf_pend->buf ;
buf_begine = &buf_pend->buf[buf_pend->read_offset] ;
} else {
buf_start ++ ;
}
}
// 调整检测过的数据偏移量
buf_pend->end_chk_offset = buf_start - buf_pend->buf ;
int count = 0 ;
// 处理数据
if( head != NULL ) {
ret = deal_data(ev_data, head, &count) ;
if(ret != 0 ) {
free_ev_data(ev_data) ;
goto ERR_END ;
}
}
// 用完了,逐个删除
tmp_node = head ;
while(tmp_node) {
tmp_node = head->next ;
if(head->data) free(head->data) ;
free(head) ;
head = tmp_node ;
}
// 设置返回的数据
ret = set_send_event(ev_data, count) ;
if( ret != 0 ) {
// 设置失败,则关闭conn_fd并清理数据
free_ev_data(ev_data) ;
return NULL ;
}
return NULL ;
ERR_END :
// 清理申请的数据
tmp_node = head ;
while(tmp_node != NULL ) {
tmp_node = head->next ;
if(head->data) free(head->data) ;
free(head) ;
head = tmp_node ;
}
return NULL ;
}
/*
* 读数据处理
*/
int read_data(my_evnt_data_t * ev_data) {
// printf("ev_data addr [%lu] socket[%d]\n", (unsigned long) ev_data, ev_data->conn_fd ) ;
buf_pend_t * buf_pend = &ev_data->buf_pend ;
int bytes = 0 ;
volatile int flag = 0 ;
while ((bytes = recv(ev_data->conn_fd, (void*)(buf_pend->buf + buf_pend->write_offset), buf_pend->left_write, 0)) > 0) {
// printf("read : data bytes[%d]\n", bytes) ;
flag = 1 ;
buf_pend->write_offset += bytes;
buf_pend->left_write -= bytes;
}
if ( bytes == 0 ) {
if( flag == 0 ) { // 连接关闭的情况
return -1 ;
}
} else if ( bytes < 0 ) {
if (EINTR != errno && EWOULDBLOCK != errno && EAGAIN != errno) { // 如果是多个监听端口,errno会相互覆盖,可以直接关闭
fprintf(stderr, "connect fd [%d] error, will close later! \n", ev_data->conn_fd);
return -1;
}
}
// 调整偏移量
adjust_buf_pend_offset(buf_pend) ;
return 0 ;
}
/*
* 实际发送数据
*/
int send_data(my_evnt_data_t * ev_data) {
int ret = 0 ;
int total_num = 0 ;
int once_num = 0 ;
char * send_p = ev_data->send_data ;
while(1) {
once_num = send(ev_data->conn_fd, send_p, ev_data->send_data_len - total_num, 0) ;
if (once_num < 0) {
if (EINTR != errno && EWOULDBLOCK != errno && EAGAIN != errno) { // 如果是多个监听端口,errno会相互覆盖,可以直接关闭
fprintf(stderr, "connect fd [%d] error, will close later! \n", ev_data->conn_fd);
return -1;
}
goto END ;
} else {
total_num += once_num ;
send_p += once_num ;
}
if( total_num >= ev_data->send_data_len ) {
goto END ;
}
}
END:
free(ev_data->send_data) ;
ev_data->send_data = NULL ;
ev_data->send_data_len = 0 ;
return ret ;
}
/*
* epoll事件监听服务--接收数据
*/
int epoll_recv_server(int listen_fd, void * rsc) {
int ret = 0 ;
int epoll_fd = 0;
struct epoll_event one_event;
struct epoll_event event_list[MAX_EVENTS];
// 1、创建epoll句柄(文件描述符)
epoll_fd = epoll_create(MAX_EVENTS);
// 2、把listen_fd 加入到 epoll 的监控
// 2.1、初始化第一个事件,即监听事件被epoll监控起来。
memset(&one_event, 0x00, sizeof(struct epoll_event)) ;
one_event.events = EPOLLIN | EPOLLET ; // 设置被监听的事件是:可读事件,设置水平触发
one_event.data.fd = listen_fd; // 预先设置要返回的数据,当事件发生时,返回的 struct epoll_event 结构的.data.fd为这个值。
// 2.2、把listen_fd监听加入到事件监控,监听事件发生时返回一个数据结构 struct epoll_event
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &one_event) < 0) {
fprintf(stderr, "set listen_fd into epoll failed!!\n");
return -1;
}
struct epoll_event * event_p ;
my_evnt_data_t * ev_data ;
// 设置线程属性,可分离
pthread_t pthid ;
pthread_attr_t pth_attr ;
pthread_attr_init(&pth_attr) ;
pthread_attr_setdetachstate(&pth_attr, PTHREAD_CREATE_DETACHED) ;
// 3、开始循环监控事件
// 3.1、初始化事件列表,清空所有的数据。
memset(event_list, 0x00, sizeof(struct epoll_event) * MAX_EVENTS);
while (1) {
// 3.2、超时等待事件。
// 监控到有事件发生,则会把注册时(epoll_ctl函数的EPOLL_CTL_ADD参数添加的)的数据结构返回。
ret = epoll_wait(epoll_fd, event_list, MAX_EVENTS, 50) ;
// 4、开始对返回值进行分析
if (ret < 0) {
// 4.1、报错处理
if (EINTR == errno) {
continue;
}
printf("epoll failed!!\n");
return -1;
} else if (0 == ret) {
// 4.2、没有事件,超时了。继续下个循环等待监听
continue;
}
// 5、进入下边的代码,则说明发生了监听的事件。遍历所有的事件
for (int i = 0; i < ret; i++) {
printf("\n==============================\n") ;
event_p = &event_list[i] ;
if (event_p->data.fd == listen_fd) {
// 5.1、listen_fd的事件,说明有新的连接过来了。进入对事件的处理。
accept_connect(listen_fd, epoll_fd, rsc);
} else {
ev_data = (my_evnt_data_t *)event_p->data.ptr ;
// 非 listen_fd 的事件处理
// 5.2、包含可读事件
if ( event_p->events & EPOLLIN) {
printf("epoll get[%lu] read conn_fd[%d]\n", (unsigned long) event_p, ((my_evnt_data_t *)( event_p->data.ptr))->conn_fd) ;
// 5.2.1、先删除事件监听
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev_data->conn_fd, event_p ) < 0) {
if (EINTR == errno) {
continue;
}
printf("delete conn_fd from epoll failed!!\n");
}
// 5.2.2、获取数据,并做基本处理,不做业务处理,(只负责数据的存储移动等)
if (read_data( ev_data ) != 0) {
// 释放申请的内存,关闭conn_fd
free_ev_data(ev_data);
continue;
}
// 5.2.3、检测数据完整性并处理
if( pthread_create(&pthid, &pth_attr, check_out_data_and_deal, (void*) ev_data) != 0 ) {
printf("pthread_create failed!!") ;
// 线程创建不成功,释放申请的内存,关闭conn_fd
free_ev_data( ev_data );
continue ;
}
}
// 5.3、包含可写事件
if ( event_p->events & EPOLLOUT) {
printf("epoll get[%lu] write conn_fd[%d]\n", (unsigned long) event_p, ev_data->conn_fd) ;
// 5.3.1、去掉对conn_fd的事件监听
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev_data->conn_fd, event_p) < 0) {
if (EINTR == errno) {
continue;
}
printf("delete conn_fd from epoll failed!!\n");
}
// 5.3.2、执行数据发送
if ( send_data(ev_data) != 0) {
// 释放申请的内存,关闭conn_fd
free_ev_data( ev_data );
continue;
}
// 5.3.3、继续监听可写事件
if( add_event_to_epoll( ev_data, EPOLLERR | EPOLLHUP | EPOLLIN ) != 0 ) {
// 释放申请的内存,关闭conn_fd
free_ev_data( ev_data );
continue;
}
}
// 5.4、包含挂断 和 错误事件
if ( event_p->events & (EPOLLHUP | EPOLLERR)) {
printf("epoll get[%lu] error conn_fd[%d]\n", (unsigned long) event_p, ((my_evnt_data_t *)( event_p->data.ptr))->conn_fd) ;
// 5.4.1、从epoll监听事件集中删除对应文件描述符的监听
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev_data->conn_fd, event_p) < 0) {
if (EINTR == errno) {
continue;
}
printf("delete conn_fd from epoll failed!!\n");
}
// 5.4.2、释放申请的内存,关闭conn_fd
free_ev_data( ev_data );
continue;
}
}
}
}
pthread_attr_destroy(&pth_attr) ;
close(listen_fd);
close(epoll_fd);
}
void hand(int signo) {
printf("signo [%d]\n", signo) ;
}
int main(){
int ret = 0 ;
void * rsc = NULL ;
int listen_fd = create_listen() ;
signal(SIGPIPE, hand) ;
ret = epoll_recv_server(listen_fd, rsc) ;
exit(ret) ;
return 0 ;
}
发起测试,红色内容为输入。

服务端输出情况

发表回复