多进程、共享内存的网络聊天室

    xiaoxiao2021-03-25  13

    好久没写网络聊天室了,去年暑假可以说写了一暑假,最近复习这些,又因为我一直偏向于多线程,就用多进程复习一下。

    下面给出昨天写的基于多进程、共享内存的网络聊天室代码。每个进程负责一个连接,多个进程之间仅共享读,不共享写,因此无需信号量来同步。分配的一段内存中,以数组的方式,分配给每个client一段buffer,每个clilent对应的buffer的索引就是connfd。当一个子进程收到客户端数据后,通过每客户端管道发送自己的pid给主进程,主进程通知除了该子进程的其他进程将该片内存写好的数据转发给其他客户端(sub_proess[pid]=connd)。

    代码如下:

    #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <stdlib.h> #include <sys/epoll.h> #include <signal.h> #include <sys/wait.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> const int USER_LIMIT = 3; const int BUFFER_SIZE = 1024; const int FD_LIMIT = 65545; const int MAX_EVENT_NUMBER = 1024; const int PROCESS_LIMIT = 65536; //封装每个客户端连接数据 struct client_data { sockaddr_in address; int connfd; pid_t pid; //负责该客户端子进程的pid int pipefd[2]; //每个子进程pipe }; static const char* shm_name = "/my_shm"; //共享内存的名字 int sig_pipefd[2]; //用来统一事件源 int epollfd; int listenfd; int shmfd; char* share_mem = NULL; //共享内存起始地址 //客户端连接数组,进程用客户连接的编号来索引这个数组,即可取得相关的客户连接数据 client_data* users = NULL; //子进程和客户连接的关系映射表,用子进程的pid来索引这个数组,即可取得该进程处理的客户连接的编号 int* sub_process = 0; int user_count = 0; //客户连接下标,这个名字有点误导,总之user_count>=USER_LIMIT即连接过多 bool stop_child = false; //停止一个子进程,这个是全部变量,每个子进程都有自己拷贝的一份 int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epfd, int fd) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void sig_handler(int sig) { int save_errno = errno; int msg = sig; send(sig_pipefd[1], (char*)&msg, 1, 0); errno = save_errno; } void addsig(int sig, void(*handler)(int), bool restart = true) { struct sigaction sa; memset(&sa, '\0', sizeof(sa)); sa.sa_handler = handler; if(restart) sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL) != -1); } void del_resource() { close(sig_pipefd[0]); close(sig_pipefd[1]); close(listenfd); close(epollfd); shm_unlink(shm_name); } //子进程的信号处理函数,停止一个子进程 void child_term_handler(int sig) { stop_child = true; } //子进程运行的函数,参数inx指出该子进程处理的客户连接的编号,users是保存所有客户连接数据的数组,参数share_mem指出共享内存的起始地址 int run_child(int idx, client_data* users, char* share_mem) { epoll_event events[MAX_EVENT_NUMBER]; //每个子进程使用I/O服用同时监听客户连接socket和与父进程通信的pipe描述符 int child_epollfd = epoll_create(5); assert(child_epollfd != -1); int connfd = users[idx].connfd; addfd(child_epollfd, connfd); int pipefd = users[idx].pipefd[1]; addfd(child_epollfd, pipefd); int ret; //子进程需要设置自己的信号处理函数,因为fork会继承父进程信号处理函数 addsig(SIGTERM, child_term_handler, false); while(!stop_child){ int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1); if(number < 0 && errno != EINTR){ printf("epoll failure\n"); break; } for(int i=0; i<number; ++i){ int sockfd = events[i].data.fd; //本子进程负责的客户链接有数据到达 if(sockfd == connfd && (events[i].events & EPOLLIN)){ //清零该客户对应的缓冲区 memset(share_mem+idx*BUFFER_SIZE, '\0', BUFFER_SIZE); //将客户数据读取到对应的读缓存中,该读缓存是共享内存的一段,它开始于idx*BUFFER_SIZE处,长度为BUFFER_SIZE字节,因此每个客户连接是共享的 ret = recv(connfd, share_mem+idx*BUFFER_SIZE, BUFFER_SIZE-1, 0); //留一个字节为'\0'间隔 if(ret < 0){ if(errno != EAGAIN) stop_child = true; } else if(ret == 0) stop_child = true; else //成功读取客户数据后就通知主进程,让主进程吩咐其他进程转发 send(pipefd, (char*)&idx, sizeof(idx), 0); } //主进程通过管道通知本进程需要转发第client个客户端的数据到本进程负责的客户 else if(sockfd == pipefd && (events[i].events & EPOLLIN)){ int client = 0; //接受主进程发来的数据,即客户的编号,用来索引buffer ret = recv(sockfd, (char *)&client, sizeof(client), 0); if(ret < 0){ if(errno != EAGAIN) stop_child = true; } else if(ret == 0) stop_child = true; else //转发给自己的客户 send(connfd, share_mem+client*BUFFER_SIZE, BUFFER_SIZE, 0); } else continue; } } close(connfd); close(pipefd); close(child_epollfd); return 0; } int main(int argc, char** argv) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); int on = 1; ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); assert(ret != -1); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); ///////////////////////////////////////////////////////////// user_count = 0; users = new client_data[USER_LIMIT]; sub_process = new int [PROCESS_LIMIT]; for(int i=0; i<PROCESS_LIMIT; ++i) sub_process[i] = -1; //////////////////////////////////////////////////////////// epoll_event events[MAX_EVENT_NUMBER]; epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd); //socketpair是全双工的,所以父子进程通信无需向pipe一样需要两个pipe[2] //fork完毕socketpair可以双向通信 ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd); assert(ret != -1); setnonblocking( sig_pipefd[1] ); addfd(epollfd, sig_pipefd[0]); // add all the interesting signals here addsig(SIGCHLD, sig_handler); addsig(SIGTERM, sig_handler); addsig(SIGINT, sig_handler); addsig(SIGPIPE, SIG_IGN); bool stop_server = false; bool terminate = false; /////////////////////////////////////////////////////////////// //创建共享内存,作为所有客户连接的读缓存 shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666); assert(shmfd != -1); //清空且resize文件大小为USER_LIMIT*BUFFER_SIZE ret = ftruncate(shmfd, USER_LIMIT*BUFFER_SIZE); assert(ret != -1); //通过上面生成的一定大小的文件来使用mmap映射共享内存 //这是共享内存的一种方式,另外一种使用SystemV的shmat share_mem = (char *)mmap(NULL, USER_LIMIT*BUFFER_SIZE, PROT_WRITE | PROT_READ, MAP_SHARED, shmfd, 0); assert(share_mem != MAP_FAILED); close(shmfd); //close shmfd is ok ////////////////////////////////////////////////////////////// while(!stop_server){ int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if(number < 0 && errno != EINTR){ printf("epoll failure\n"); break; } for(int i=0; i<number; ++i){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ struct sockaddr_in client_address; socklen_t len = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &len); if(connfd < 0){ printf("errno is: %d\n", errno); continue; } if(user_count >= USER_LIMIT){ //limit const char* info = "too many users\n"; printf("%s", info); send(connfd, info, strlen(info), 0); close(connfd); continue; } //保存第user_count个客户连接的数据 users[user_count].address = client_address; users[user_count].connfd = connfd; //在子进程和父进程间建立管道,以传递必要的数据 ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd); assert(ret != -1); pid_t pid = fork(); if(pid < 0){ close(connfd); continue; //!!!!!!! } else if(pid == 0){ //in child close(epollfd); close(listenfd); close(users[user_count].pipefd[0]); //子进程关掉一端,子进程给父进程发数据使用pipefd[1] close(sig_pipefd[0]); close(sig_pipefd[1]); run_child(user_count, users, share_mem); munmap((void*)share_mem, USER_LIMIT*BUFFER_SIZE); exit(0); } else{ close(connfd); close(users[user_count].pipefd[1]); //同理 addfd(epollfd, users[user_count].pipefd[0]); //记录新的客户连接在数组users中的索引值,建立进程pid和索引值的映射关系 users[user_count].pid = pid; sub_process[pid] = user_count; user_count++; } } //handle signal else if(sockfd == sig_pipefd[0] && (events[i].events & EPOLLIN)){ int sig; char signals[1024]; ret = recv(sig_pipefd[0], signals, sizeof(signals), 0); if(ret == -1) continue; else if(ret == 0) continue; else{ for(int i=0; i<ret; ++i){ switch(signals[i]){ case SIGCHLD: //子进程退出,表示有客户端关闭了连接 { pid_t pid; int stat; while((pid = waitpid(-1, &stat, WNOHANG)) > 0){ //用子进程的pid取得被关闭客户连接的编号 int del_user = sub_process[pid]; sub_process[pid] = -1; if(del_user < 0 || del_user > USER_LIMIT) continue; //清除数据 epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0); close(users[del_user].pipefd[0]); //用最后一个user替换该位置 users[del_user] = users[--user_count]; sub_process[users[del_user].pid] = del_user; //修正sub_process对应的值,也就是修正最后一个客户端pid对应的客户编号 } if(terminate && user_count == 0) stop_server = true; break; } case SIGTERM: case SIGINT: //结束服务器程序 { printf("kill all the child new\n"); if(user_count == 0){ stop_server = true; break; } for(int i=0; i<user_count; ++i){ int pid = users[i].pid; kill(pid, SIGTERM); //kill每个子进程 } terminate = true; break; } default: break; } } } } //某个子进程收到数据,向父进程通知 else if(events[i].events & EPOLLIN){ int child = 0; //读取管道数据,收到的数据时child变量记录了哪个客户连接有数据到达 ret =recv(sockfd, (char*)&child, sizeof(child), 0); if(ret == -1) continue; else if(ret == 0) continue; else{ //向除负责第child个客户的子进程之外的子进程发送消息,通知他们有客户数据要写 for(int j=0; j<user_count; ++j){ if(users[j].pipefd[0] != sockfd){ printf("send data to child accross pipe\n"); send(users[j].pipefd[0], (char*)&child, sizeof(child), 0); } } } } } } del_resource(); return 0; }
    转载请注明原文地址: https://ju.6miu.com/read-149770.html

    最新回复(0)