基於epoll的多線程網絡服務程序設計
基於epoll的多線程網絡服務程序設計——C語言
采用C語言設計瞭一個基於epoll的多線程網絡服務程序。每個線程都有一個epoll來捕獲處於這個線程的socket事件。當子線程數量為0,即隻有一個線程,則網絡監聽服務與socket消息處理處於同一個epoll。當子線程數量大於0時,主線程監聽socket連接,當有新的連接到來時將其加入到活躍socket數量最小的子線程的epoll中。
server.h
#ifndef EPOLL_C_SERVER_H #define EPOLL_C_SERVER_H #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <errno.h> #include <sys/epoll.h> #include <pthread.h> #include <netinet/in.h> #include <sys/types.h> #include <sys/socket.h> #define RESULT_OK 0 #define RESULT_ERROR -1 /************************************************************************** * Function : *MSG_HANDLE * Input : socket_fd --> socket文件描述符 * : arg --> void* 參數 * Output : * Return : 1 處理成功,繼續等待下次消息;0 處理完畢完畢該連接;-1 異常錯誤發生 * Description : 消息處理函數指針 ****************************************************************************/ typedef int (*MSG_HANDLE)(int socket_fd,void* arg) ; typedef struct { int epoll_fd; pthread_t thd_fd; //消息處理函數,各個線程會調用該函數進行消息處理 MSG_HANDLE data_func; //一個線程裡面的有效socket數量 unsigned int active_conection_cnt; //線程互斥鎖,用於實時更新有效socket數量 pthread_mutex_t thd_mutex; }socket_thd_struct; //表示處理socket的子線程 typedef struct { int epoll_fd; unsigned short ip_port; //消息處理函數,當隻有一個線程時,會調用該函數進行消息處理 MSG_HANDLE data_func; //子線程數量,可以為0,為0表示server與socket處理處於同一個線程 unsigned int socket_pthread_count; //子線程結構體指針 socket_thd_struct* socket_thd; }server_struct; //一個網絡服務結構體 /************************************************************************** * Function : initServerStruct * Input : param_port --> 服務端口號 * : param_thd_count --> 子線程數量,用於處理連接的client * : param_handle --> socket數據處理函數指針 * Output : * Return : 初始化好的server結構體 * Description : 初始化server結構體 ****************************************************************************/ server_struct* initServerStruct(unsigned short param_port,unsigned int param_thd_count,MSG_HANDLE param_handle); /************************************************************************** * Function : serverRun * Input : param_server --> server服務結構體參數 * Output : * Return :RESULT_OK(0):執行成功;RESULT_ERROR(-1):執行失敗 * Description : 運行網絡服務,監聽服務端口 ****************************************************************************/ int serverRun(server_struct *param_server); #endif //EPOLL_C_SERVER_H
server.c
#include "server.h" static void* socketPthreadRun(void* arg) { socket_thd_struct* pa_sock_st=(socket_thd_struct*)arg; int active_counts=0; struct epoll_event ev; struct epoll_event events[5]; int ret=0; while(1) { //等待讀寫事件的到來 active_counts=epoll_wait(pa_sock_st->epoll_fd,events,5,-1); fprintf(stdout,"active count:%d\n",active_counts); int index=0; for(index=0;index<active_counts;index++) { if(events[index].events&EPOLLERR) //發生異常錯誤 { fprintf(stderr,"error happened:errno(%d)-%s\n",errno,strerror(errno)); epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL); close(events[index].data.fd); pthread_mutex_lock(&(pa_sock_st->thd_mutex)); pa_sock_st->active_conection_cnt--; pthread_mutex_unlock(&(pa_sock_st->thd_mutex)); } else if(events[index].events&EPOLLRDHUP) //對端異常關閉連接 { fprintf(stdout,"client close this socket\n"); epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL); close(events[index].data.fd); pthread_mutex_lock(&(pa_sock_st->thd_mutex)); pa_sock_st->active_conection_cnt--; pthread_mutex_unlock(&(pa_sock_st->thd_mutex)); } else if(events[index].events&EPOLLIN) //讀事件到來,進行消息處理 { ret=pa_sock_st->data_func(events[index].data.fd,NULL); if(ret==-1) { fprintf(stderr,"client socket exception happened\n"); epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL); close(events[index].data.fd); pthread_mutex_lock(&(pa_sock_st->thd_mutex)); pa_sock_st->active_conection_cnt--; pthread_mutex_unlock(&(pa_sock_st->thd_mutex)); } if(ret==0) { fprintf(stdout,"client close this socket\n"); epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL); close(events[index].data.fd); pthread_mutex_lock(&(pa_sock_st->thd_mutex)); pa_sock_st->active_conection_cnt--; pthread_mutex_unlock(&(pa_sock_st->thd_mutex)); } else if(ret==1) { } } } } pthread_exit(NULL); } server_struct* initServerStruct(unsigned short param_port,unsigned int param_thd_count,MSG_HANDLE param_handle) { server_struct* serv_st=(server_struct*)malloc(sizeof(server_struct)); serv_st->ip_port=param_port; serv_st->data_func=param_handle; serv_st->epoll_fd=epoll_create(256); serv_st->socket_pthread_count=param_thd_count; serv_st->socket_thd=NULL; if(serv_st->socket_pthread_count>0) { fprintf(stdout,"create client socket sub thread\n"); serv_st->socket_thd=(socket_thd_struct*)malloc(sizeof(socket_thd_struct)*serv_st->socket_pthread_count); int index=0; for(index=0;index<serv_st->socket_pthread_count;index++) { serv_st->socket_thd[index].epoll_fd=epoll_create(256); serv_st->socket_thd[index].data_func=param_handle; serv_st->socket_thd[index].active_conection_cnt=0; serv_st->socket_thd[index].thd_fd=0; //創建子線程 pthread_create(&(serv_st->socket_thd[index].thd_fd),NULL,socketPthreadRun,(void*)&(serv_st->socket_thd[index])); //初始化線程互斥鎖 pthread_mutex_init(&(serv_st->socket_thd[index].thd_mutex),NULL); } } return serv_st; } int serverRun(server_struct *param_server) { int ret=RESULT_OK; int server_socket=0; struct sockaddr_in server_addr; bzero(&server_addr,sizeof(server_addr)); struct epoll_event ev; struct epoll_event events[5]; int active_count=0; int index=0; int new_socket=0; struct sockaddr_in client_info; socklen_t client_info_len=0; server_addr.sin_family=AF_INET; server_addr.sin_addr.s_addr=htons(INADDR_ANY); server_addr.sin_port=htons(param_server->ip_port); server_socket=socket(PF_INET,SOCK_STREAM,0); if(server_socket<0) { fprintf(stderr,"create socket error:errno(%d)-%s\n",errno,strerror(errno)); return RESULT_ERROR; } fprintf(stdout,"create server socket ssuccessful\n"); param_server->epoll_fd=epoll_create(256); ev.data.fd=server_socket; ev.events=EPOLLIN|EPOLLET; if(epoll_ctl(param_server->epoll_fd,EPOLL_CTL_ADD,server_socket,&ev)!=0) { fprintf(stderr,"server socket add to epoll error:errno(%d)-%s\n",errno,strerror(errno)); return RESULT_ERROR; } fprintf(stdout,"server socket add to epoll successful\n"); if(bind(server_socket,(struct sockaddr*)&server_addr,sizeof(server_addr))!=0) { fprintf(stderr,"server bind failed:errno(%d)-%s\n",errno,strerror(errno)); return RESULT_ERROR; } fprintf(stdout,"server socket bind successful\n"); if(listen(server_socket,param_server->ip_port)!=0) { fprintf(stderr,"server listen failed:errno(%d)-%s\n",errno,strerror(errno)); return RESULT_ERROR; } fprintf(stdout,"server socket listen successful\n"); while(1) { active_count=epoll_wait(param_server->epoll_fd,events,5,-1); fprintf(stdout,"active count:%d\n",active_count); for(index=0;index<active_count;index++) { if(events[index].data.fd==server_socket) //新連接過來 { fprintf(stdout,"new socket comming\n"); client_info_len=sizeof(client_info); new_socket=accept(server_socket,(struct sockaddr*)&client_info,&client_info_len); if(new_socket<0) { fprintf(stderr,"server accept failed:errno(%d)-%s\n",errno,strerror(errno)); continue; } fprintf(stdout,"new socket:%d.%d.%d.%d:%d-->connected\n",((unsigned char*)&(client_info.sin_addr))[0],((unsigned char*)&(client_info.sin_addr))[1],((unsigned char*)&(client_info.sin_addr))[2],((unsigned char*)&(client_info.sin_addr))[3],client_info.sin_port); ev.data.fd=new_socket; ev.events=EPOLLIN|EPOLLERR|EPOLLRDHUP; if(param_server->socket_pthread_count==0) { epoll_ctl(param_server->epoll_fd,EPOLL_CTL_ADD,new_socket,&ev); } else { int tmp_index=0; int mix_cnt_thread_id=0; unsigned int act_cnts=0; for(tmp_index=0;tmp_index<param_server->socket_pthread_count;tmp_index++) { pthread_mutex_lock(&(param_server->socket_thd[tmp_index].thd_mutex)); act_cnts=param_server->socket_thd[tmp_index].active_conection_cnt; pthread_mutex_unlock(&(param_server->socket_thd[tmp_index].thd_mutex)); if(mix_cnt_thread_id>act_cnts) { mix_cnt_thread_id=tmp_index; } } epoll_ctl(param_server->socket_thd[mix_cnt_thread_id].epoll_fd,EPOLL_CTL_ADD,new_socket,&ev); pthread_mutex_lock(&(param_server->socket_thd[mix_cnt_thread_id].thd_mutex)); param_server->socket_thd[mix_cnt_thread_id].active_conection_cnt++; pthread_mutex_unlock(&(param_server->socket_thd[mix_cnt_thread_id].thd_mutex)); } fprintf(stdout,"add new client socket to epoll\n"); } else if(events[index].events&EPOLLERR || events[index].events&EPOLLRDHUP) //對端關閉連接 { fprintf(stdout,"client close this socket\n"); epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL); close(events[index].data.fd); } else if(events[index].events&EPOLLIN) //讀事件到來,進行消息處理 { fprintf(stdout,"begin recv client data\n"); ret=param_server->data_func(events[index].data.fd,NULL); if(ret==-1) { fprintf(stderr,"client socket exception happened\n"); epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL); close(events[index].data.fd); } if(ret==0) { fprintf(stdout,"client close this socket\n"); epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL); close(events[index].data.fd); } else if(ret==1) { } } } } close(server_socket); return RESULT_OK; }
以上就是本文的全部內容,希望對大傢的學習有所幫助,也希望大傢多多支持WalkonNet。
推薦閱讀:
- 如何用C寫一個web服務器之I/O多路復用
- epoll多路復用的一個實例程序(C實現)
- epoll封裝reactor原理剖析示例詳解
- 使用C語言實現本地socke通訊的方法
- C++基於reactor的服務器百萬並發實現與講解