C++中的Reactor原理與實現
一、Reactor介紹
reactor設計模式是event-driven architecture的一種實現方式,處理多個客戶端並發的向服務端請求服務的場景。每種服務在服務端可能由多個方法組成。reactor會解耦並發請求的服務並分發給對應的事件處理器來處理。
中心思想是將所有要處理的I/o事件註冊到一個中心I/o多路復用器上,同時主線程/進程阻塞在多路復用器上;一旦有I/o事件到來或是準備就緒(文件描述符或socket可讀、寫),多路復用器返回並將事先註冊的相應l/o事件分發到對應的處理器中。
處理機制為:主程序將事件以及對應事件處理的方法在Reactor上進行註冊, 如果相應的事件發生,Reactor將會主動調用事件註冊的接口,即 回調函數.
二、代碼實現
前提準備:1單例模式:單例模式(Singleton Pattern,也稱為單件模式),使用最廣泛的設計模式之一。其意圖是保證一個類(結構體)僅有一個實例,並提供一個訪問它的全局訪問點,該實例被所有程序模塊共享。
2.回調函數:把一段可執行的代碼像參數傳遞那樣傳給其他代碼,而這段代碼會在某個時刻被調用執行,這就叫做回調。
對epoll反應堆中結構體定義
/*fd包含的屬性*/ struct nitem { // fd int fd; //要監聽的文件描述符 int status; //是否在監聽:1->在紅黑樹上(監聽),0->不在(不監聽) int events; //對應的監聽事件, EPOLLIN和EPOLLOUT(不同的事件,走不同的回調函數) void *arg; //指向自己結構體指針 #if 0 NCALLBACK callback; #else NCALLBACK *readcb; // epollin NCALLBACK *writecb; // epollout NCALLBACK *acceptcb; // epollin #endif unsigned char sbuffer[BUFFER_LENGTH]; // int slength; unsigned char rbuffer[BUFFER_LENGTH]; int rlength; }; /*分塊存儲*/ struct itemblock { struct itemblock *next; struct nitem *items; }; /*epoll反應堆中包括通信的fd以及epoll的(epfd)*/ struct reactor { int epfd; struct itemblock *head; };
單例模式,創建reactor的一個實例
/*單例模式*/ struct reactor *instance = NULL; int init_reactor(struct reactor *r) { if (r == NULL) return -1; int epfd = epoll_create(1); //int size r->epfd = epfd; // fd --> item r->head = (struct itemblock*)malloc(sizeof(struct itemblock)); if (r->head == NULL) { close(epfd); return -2; } memset(r->head, 0, sizeof(struct itemblock)); r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem)); if (r->head->items == NULL) { free(r->head); close(epfd); return -2; } memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem))); r->head->next = NULL; return 0; } struct reactor *getInstance(void) { //singleton if (instance == NULL) { instance = (struct reactor *)malloc(sizeof(struct reactor)); if (instance == NULL) return NULL; memset(instance, 0, sizeof(struct reactor)); if (0 > init_reactor(instance)) { free(instance); return NULL; } } return instance; }
事件註冊
/*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/ /*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/ /*fd找到對應事件*/ /*驅動註冊*/ int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; //1 if (event == READ_CB) { r->head->items[fd].fd = fd; r->head->items[fd].readcb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLIN; } //2 else if (event == WRITE_CB) { r->head->items[fd].fd = fd; r->head->items[fd].writecb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLOUT; } //3 else if (event == ACCEPT_CB) { r->head->items[fd].fd = fd; r->head->items[fd].acceptcb = cb; //回調函數 r->head->items[fd].arg = arg; ev.events = EPOLLIN; } ev.data.ptr = &r->head->items[fd]; /*NOSET_CB 0*/ if (r->head->items[fd].events == NOSET_CB) { if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno); return -1; } r->head->items[fd].events = event; } else if (r->head->items[fd].events != event) { if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_MOD failed\n"); return -1; } r->head->items[fd].events = event; } return 0; }
回調函數書寫
int write_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *sbuffer = R->head->items[fd].sbuffer; int length = R->head->items[fd].slength; int ret = send(fd, sbuffer, length, 0); if (ret < length) { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } else { nreactor_set_event(fd, read_callback, READ_CB, NULL); } return 0; } // 5k qps int read_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *buffer = R->head->items[fd].rbuffer; #if 0 //ET int idx = 0, ret = 0; while (idx < BUFFER_LENGTH) { ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0); if (ret == -1) { break; } else if (ret > 0) { idx += ret; } else {// == 0 break; } } if (idx == BUFFER_LENGTH && ret != -1) { nreactor_set_event(fd, read_callback, READ_CB, NULL); } else if (ret == 0) { nreactor_set_event //close(fd); } else { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #else //LT int ret = recv(fd, buffer, BUFFER_LENGTH, 0); if (ret == 0) { // fin nreactor_del_event(fd, NULL, 0, NULL); close(fd); } else if (ret > 0) { unsigned char *sbuffer = R->head->items[fd].sbuffer; memcpy(sbuffer, buffer, ret); R->head->items[fd].slength = ret; printf("readcb: %s\n", sbuffer); nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #endif } // web server // ET / LT int accept_callback(int fd, int event, void *arg) { int connfd; struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } nreactor_set_event(connfd, read_callback, READ_CB, NULL); }
監聽描述符變化
// accept --> EPOLL /*epoll_wait監聽0*/ int reactor_loop(int listenfd) { struct reactor *R = getInstance(); struct epoll_event events[POLL_SIZE] = {0}; while (1) { int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1); if (nready == -1) { continue; } int i = 0; for (i = 0;i < nready;i ++) { struct nitem *item = (struct nitem *)events[i].data.ptr; int connfd = item->fd; if (connfd == listenfd) { // item->acceptcb(listenfd, 0, NULL); } else { if (events[i].events & EPOLLIN) { // item->readcb(connfd, 0, NULL); } if (events[i].events & EPOLLOUT) { item->writecb(connfd, 0, NULL); } } } } return 0; }
完整代碼實現
#define MAXLNE 4096 #define POLL_SIZE 1024 #define BUFFER_LENGTH 1024 #define MAX_EPOLL_EVENT 1024 #define NOSET_CB 0 #define READ_CB 1 #define WRITE_CB 2 #define ACCEPT_CB 3 /*單例模式*/ typedef int NCALLBACK(int fd, int event, void *arg); /*fd包含的屬性*/ struct nitem { // fd int fd; //要監聽的文件描述符 int status; //是否在監聽:1->在紅黑樹上(監聽),0->不在(不監聽) int events; //對應的監聽事件, EPOLLIN和EPOLLOUT(不同的事件,走不同的回調函數) void *arg; //指向自己結構體指針 #if 0 NCALLBACK callback; #else NCALLBACK *readcb; // epollin NCALLBACK *writecb; // epollout NCALLBACK *acceptcb; // epollin #endif unsigned char sbuffer[BUFFER_LENGTH]; // int slength; unsigned char rbuffer[BUFFER_LENGTH]; int rlength; }; /*分塊存儲*/ struct itemblock { struct itemblock *next; struct nitem *items; }; /*epoll反應堆中包括通信的fd以及epoll的(epfd)*/ struct reactor { int epfd; struct itemblock *head; }; /*初始化結構體*/ int init_reactor(struct reactor *r); int read_callback(int fd, int event, void *arg); int write_callback(int fd, int event, void *arg); int accept_callback(int fd, int event, void *arg); /*單例模式*/ struct reactor *instance = NULL; struct reactor *getInstance(void) { //singleton if (instance == NULL) { instance = (struct reactor *)malloc(sizeof(struct reactor)); if (instance == NULL) return NULL; memset(instance, 0, sizeof(struct reactor)); if (0 > init_reactor(instance)) { free(instance); return NULL; } } return instance; } /*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/ /*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/ /*fd找到對應事件*/ /*驅動註冊*/ int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; //1 if (event == READ_CB) { r->head->items[fd].fd = fd; r->head->items[fd].readcb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLIN; } //2 else if (event == WRITE_CB) { r->head->items[fd].fd = fd; r->head->items[fd].writecb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLOUT; } //3 else if (event == ACCEPT_CB) { r->head->items[fd].fd = fd; r->head->items[fd].acceptcb = cb; //回調函數 r->head->items[fd].arg = arg; ev.events = EPOLLIN; } ev.data.ptr = &r->head->items[fd]; /*NOSET_CB 0*/ if (r->head->items[fd].events == NOSET_CB) { if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno); return -1; } r->head->items[fd].events = event; } else if (r->head->items[fd].events != event) { if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_MOD failed\n"); return -1; } r->head->items[fd].events = event; } return 0; } /*nreactor_del_event(fd, NULL, 0, NULL);*/ /*下樹*/ /*nreactor_del_event(fd, NULL, 0, NULL);*/ int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; ev.data.ptr = arg; epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev); r->head->items[fd].events = 0; return 0; } int write_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *sbuffer = R->head->items[fd].sbuffer; int length = R->head->items[fd].slength; int ret = send(fd, sbuffer, length, 0); if (ret < length) { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } else { nreactor_set_event(fd, read_callback, READ_CB, NULL); } return 0; } // 5k qps int read_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *buffer = R->head->items[fd].rbuffer; #if 0 //ET int idx = 0, ret = 0; while (idx < BUFFER_LENGTH) { ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0); if (ret == -1) { break; } else if (ret > 0) { idx += ret; } else {// == 0 break; } } if (idx == BUFFER_LENGTH && ret != -1) { nreactor_set_event(fd, read_callback, READ_CB, NULL); } else if (ret == 0) { nreactor_set_event //close(fd); } else { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #else //LT int ret = recv(fd, buffer, BUFFER_LENGTH, 0); if (ret == 0) { // fin nreactor_del_event(fd, NULL, 0, NULL); close(fd); } else if (ret > 0) { unsigned char *sbuffer = R->head->items[fd].sbuffer; memcpy(sbuffer, buffer, ret); R->head->items[fd].slength = ret; printf("readcb: %s\n", sbuffer); nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #endif } // web server // ET / LT int accept_callback(int fd, int event, void *arg) { int connfd; struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } nreactor_set_event(connfd, read_callback, READ_CB, NULL); } int init_server(int port) { int listenfd; struct sockaddr_in servaddr; char buff[MAXLNE]; if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { printf("create socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(port); if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) { printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } if (listen(listenfd, 10) == -1) { printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } return listenfd; } int init_reactor(struct reactor *r) { if (r == NULL) return -1; int epfd = epoll_create(1); //int size r->epfd = epfd; // fd --> item r->head = (struct itemblock*)malloc(sizeof(struct itemblock)); if (r->head == NULL) { close(epfd); return -2; } memset(r->head, 0, sizeof(struct itemblock)); r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem)); if (r->head->items == NULL) { free(r->head); close(epfd); return -2; } memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem))); r->head->next = NULL; return 0; } // accept --> EPOLL /*epoll_wait監聽0*/ int reactor_loop(int listenfd) { struct reactor *R = getInstance(); struct epoll_event events[POLL_SIZE] = {0}; while (1) { int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1); if (nready == -1) { continue; } int i = 0; for (i = 0;i < nready;i ++) { struct nitem *item = (struct nitem *)events[i].data.ptr; int connfd = item->fd; if (connfd == listenfd) { // item->acceptcb(listenfd, 0, NULL); } else { if (events[i].events & EPOLLIN) { // item->readcb(connfd, 0, NULL); } if (events[i].events & EPOLLOUT) { item->writecb(connfd, 0, NULL); } } } } return 0; } int main(int argc, char **argv) { int connfd, n; int listenfd = init_server(9999); nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL); //nreactor_set_event(listenfd, accept_callback, read_callback, write_callback); reactor_loop(listenfd); return 0; }
到此這篇關於Reactor原理與實現的文章就介紹到這瞭,更多相關Reactor原理內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- epoll封裝reactor原理剖析示例詳解
- C++基於reactor的服務器百萬並發實現與講解
- 解析Linux高性能網絡IO和Reactor模型
- 如何用C寫一個web服務器之I/O多路復用
- 解析Linux源碼之epoll