之前一直对定时器这块不熟,今天来练练手。首先我们来看实现定时器的第一种方式,升序链表。
网络编程中应用层的定时器是很有必要的,这可以让服务端主动关闭时间很久的非活跃连接。另外一种解决方案是TCP的keepalive,但它只能检测真正的死连接,即客端主机断电,或者网线被拔掉这种情况。如果客端连接上,但什么都不做,keepalive是毫无办法的,它只能一定时间后不断的向客户端发送心跳包。
定时器通常至少要包含两个成员:一个超时时间(相对时间或绝对时间)和一个任务毁掉函数。有的时候还可能包含回调函数被执行时需要传入的参数,以及是否重启定时器等信息。如果使用双向链表,自然还需要指针成员。
我们将是用time函数作为定时器时间函数,它是相对时间,即从1970年某天到现在的描述,是一个数值,很容易用它和当前时间比较,后文分析。
下面实现了一个简单的升序定时器链表。升序定时器链表将其中定时器按照超时时间做升序排序。
#ifndef LIST_TIMER_H #define LIST_TIMER_H #include <time.h> #include <memory> #include <netinet/in.h> #include <assert.h> static const int BUFFER_SIZE = 64; class util_timer; //前向声明 class client_data { public: sockaddr_in addr_; //客户端地址 int sockfd_; //客户端connfd char buf_[BUFFER_SIZE]; //每个客户端的缓冲区 std::shared_ptr<util_timer> timer_; //每个客户端的定时器 }; class util_timer { public: //default constructor void (*timeout_callback_)(client_data* user_data); //超时回调函数 public: time_t expire_; //任务的超时时间,这里使用绝对时间 client_data* user_data_; //FIXME: how to replace it as a smart pointer. //回调函数处理的客户数据,由电石气的执行者传给回调函数 std::shared_ptr<util_timer> prev_; //指向前一个定时器 std::shared_ptr<util_timer> next_; //指向下一个定时器 }; //定时器链表,它是一个升序、双向链表 class sort_timer_list { public: //default constructor and destructor void add_timer(const std::shared_ptr<util_timer>& timer); void adjust_timer(const std::shared_ptr<util_timer>& timer); void del_timer(const std::shared_ptr<util_timer>& timer); void tick(); private: void add_timer(const std::shared_ptr<util_timer>& timer, const std::shared_ptr<util_timer>& lst_head); private: std::shared_ptr<util_timer> head_; std::shared_ptr<util_timer> tail_; }; //将目标定时器timer添加到链表中 void sort_timer_list::add_timer(const std::shared_ptr<util_timer>& timer) { assert(timer != NULL); if(head_ == NULL){ head_ = tail_ = timer; return; } //如果目标定时器的超时时间小于当前链表中所有定时器的超时时间,则把该定时器插入链表头部,作为链表的头节点。否则就需要调用重载函数add_timer把它插入到链表中合适的位置,以保证链表的升序特性 if(timer->expire_ < head_->expire_){ timer->next_ = head_; head_->prev_ = timer; head_ = timer; } else add_timer(timer, head_); //invoke private add_timer } //一个重载的辅助函数,它被公有地add_timer函数和adjust_timer函数调用,该函数表示将目标定时器timer添加到节点lst_head之后的部分链表中 void sort_timer_list::add_timer(const std::shared_ptr<util_timer>& timer, const std::shared_ptr<util_timer>& lst_head) { std::shared_ptr<util_timer> prev = lst_head; std::shared_ptr<util_timer> tmp = prev->next_; //遍历lst_head节点之后的部分链表,直到找到一个超时时间大于目标定时器的节点,并将目标定时器插入该节点之前 while(tmp != NULL){ if(timer->expire_ < tmp->expire_){ prev->next_ = timer; timer->next_ = tmp; tmp->prev_ = timer; timer->prev_ = prev; break; } prev = tmp; tmp = tmp->next_; } //如果遍历完lst_head节点之后的部分链表,仍未找到超时时间大于目标定时器超时时间的节点,则将目标定时器插入链表尾部,并将它设置为链表的新的尾节点 if(tmp == NULL){ prev->next_ = timer; timer->prev_ = prev; timer->next_ = NULL; tail_ = timer; } } //当某个定时任务发生变化时,调整对应的定时器在链表中的位置。这个函数只考虑被调整的定时器的超时时间延长的情况,即该定时器需要往链表尾部方向移动 void sort_timer_list::adjust_timer(const std::shared_ptr<util_timer>& timer) { assert(timer != NULL); const std::shared_ptr<util_timer>& tmp = timer->next_; //如果被调整的目标定时器处在链表尾部,或者该定时器的超时值仍然小于其下一个定时器的超时值,那就不用调整啦,皆大欢喜 if(tmp == NULL || (timer->expire_ < tmp->expire_)) return ; //如果目标定时器是链表的头结点,则将该定时器从链表中取出并重新插入链表 if(timer == head_){ head_ = head_->next_; head_->prev_ = NULL; timer->next_ = NULL; add_timer(timer, head_); //reinsert timer } else{ //in //如果目标定时器不是链表的头结点,则将该定时器从链表中取出,然后插入其原来所在位置之后的那一部分链表中 timer->prev_->next_ = timer->next_; timer->next_->prev_ = timer->prev_; add_timer(timer, timer->next_); //reinsert } } //将目标定时器timer从链表中删除 void sort_timer_list::del_timer(const std::shared_ptr<util_timer>& timer) { assert(timer != NULL); //下面这个条件成立表示链表中只有一个定时器,即目标定时器 //由于使用shared_ptr,所以无需delete,下同 if(timer == head_ && timer == tail_){ head_ = NULL; tail_ = NULL; return; } //如果链表中至少有两个定时器,且目标定时器是链表的头结点,则将链表的头结点重置为原头结点的下一个节点,然后删除目标定时器 if(timer == head_){ head_ = head_->next_; head_->prev_ = NULL; return; } //如果链表中至少有两个定时器,且目标定时器是链表的尾节点,则将链表的尾节点重置为原尾节点的前一个节点,然后删除目标定时器 if(timer == tail_){ tail_ = tail_->prev_; tail_->next_ = NULL; return; } timer->prev_->next_ = timer->next_; timer->next_->prev_ = timer->prev_; } //SIGALRM信号每次被触发就在其信号处理函数(如果使用同一事件源,则是主函数)中执行一次tick函数,以处理链表上到期的任务 //下文会给出使用SIGALRM测试的代码,当然也可以使用其他方式 void sort_timer_list::tick() { //assert(head_ != NULL); if(head_ == NULL) return ; printf("timer tick\n"); time_t cur = time(NULL); //get time std::shared_ptr<util_timer> tmp = head_; //从头结点开始一次处理每个定时器,直到遇到一个尚未到期的定时器,这就是定时器的核心逻辑!!! while(tmp != NULL){ //因为每个定时器都使用绝对时间作为超时值,所以我们可以把定时器的超时值和系统当前时间,比较以判断定时器是否到期 if(cur < tmp->expire_) //no member time out ! break; //如果到这里,说明到期。调用定时器超时回调函数,以执行定时任务 tmp->timeout_callback_(tmp->user_data_); //callback //执行完定时器中的定时任务之后,就将它从链表中删除,并重置链表头结点 head_ = tmp->next_; if(head_ != NULL) head_->prev_ = NULL; tmp = head_; //go on } } #endif核心函数tick相当于一个心搏函数,它每隔一段固定的时间就执行一次,以检测并处理到期任务。判断定时任务到期的依据是根据定时器expire_值小于当前系统时间从执行效率上来看,添加定时器的时间复杂度是O(n),删除定时器的时间复杂度是O(1)(因为是双向链表),执行定时任务的时间复杂度是O(1)(只需执行链表前部几个超时定时器即可,因为是升序,这时可知后面节点没有超时,无需再遍历)。
下面我们就利用alarm函数周期的触发SIGALRM信号。该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务——关闭非活动的链接。 另一种关闭连接的情况是,发生了socket读错误,我们也要删除定时器。
下面看测试代码:
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <signal.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #include <vector> #include <memory> #include "list_timer.h" const int FD_LIMIT = 65535; const int MAX_EVENT_NUMBER = 1024; const int TIME_SLOT = 5; static int pipefd[2]; static sort_timer_list timer_list; //使用升序链表来管理定时器 static int epollfd = 0; int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int fd) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void sig_handler(int sig) { int save_errno = errno; //考虑可重入 int msg = sig; send(pipefd[1], (char *)&msg, 1, 0); //统一信号时间和I/O事件 errno = save_errno; } void addsig(int sig) { struct sigaction sa; memset(&sa, '\0', sizeof(sa)); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask); //屏蔽其他信号 assert(sigaction(sig, &sa, NULL) != -1); } void timer_handler() { //定时处理任务,实际上就是调用tick函数 timer_list.tick(); //因为一次alarm调用只会引起一次SIGALRM信号,所以我们要重新定时,以不断触发SIGALRM信号 alarm(TIME_SLOT); } //定时器回调函数,在socket发生读错误的情况下也会执行。 //它删除非活动链接socket上的注册事件,并关闭之 void handle_callback(client_data* user_data) { epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd_, 0); assert(user_data != NULL); close(user_data->sockfd_); printf("close fd %d\n", user_data->sockfd_); } int main(int argc, char** argv) { if(argc <= 2){ printf("usage: %s ip_address port_number\n", basename(argv[0])); return -1; } const char* ip = argv[1]; int port = atoi(argv[2]); int listenfd = socket(AF_INET, SOCK_STREAM, 0); assert(listenfd != -1); struct sockaddr_in address; memset(&address, 0, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int on = 1; int ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); assert(ret != -1); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); if(ret == -1){ perror("what"); return -1; } ret = listen(listenfd, 5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; epollfd = epoll_create(5); assert(epollfd != -1); addfd(listenfd); //使用socketpair,和pipe的区别是pipe是半双工,这个是全双工 ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); assert(ret != -1); setnonblocking(pipefd[1]); addfd(pipefd[0]); //设置信号处理函数 addsig(SIGALRM); addsig(SIGTERM); std::vector<client_data> users(FD_LIMIT); //定时 alarm(TIME_SLOT); bool timeout = false; bool stop_server = false; while(!stop_server){ int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if(number < 0 && errno != EINTR){ printf("epoll failure\n"); break; } for(int i=0; i<number; ++i){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ //处理新到的客户连接 struct sockaddr_in client_address; socklen_t len = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &len); addfd(connfd); users[connfd].addr_ = client_address; users[connfd].sockfd_ = connfd; //创建定时器,设置其回调函数与超时时间,然后绑定定时器与用户数据,最后再将定时器添加到链表timer_list中 std::shared_ptr<util_timer> timer(new util_timer); timer->user_data_ = &users[connfd]; timer->timeout_callback_ = handle_callback; time_t cur = time(NULL); timer->expire_ = cur + 3 * TIME_SLOT; users[connfd].timer_ = timer; timer_list.add_timer(timer); } //处理信号 else if(sockfd == pipefd[0] && (events[i].events & EPOLLIN)){ int sig; char signals[1024]; ret = recv(pipefd[0], signals, sizeof(signals), 0); if(ret == -1){ //handle the error continue; } else if(ret == 0) continue; else{ for(int i=0; i<ret; ++i){ switch(signals[i]){ case SIGALRM: //用timeout变量标记有定时任务需要处理,但不立即处理定时任务,这是因为定时任务的优先级不是很高,我们优先处理其他更重要的任务 timeout = true; break; //这个break仅仅是break switch case SIGTERM: stop_server = true; } } } } else if(events[i].events & EPOLLIN){ //处理客户链接上接受到的数据 memset(users[sockfd].buf_, '\0', BUFFER_SIZE); ret = recv(sockfd, users[sockfd].buf_, BUFFER_SIZE-1, 0); printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf_, sockfd); std::shared_ptr<util_timer>& timer = users[sockfd].timer_; if(ret < 0){ //如果发生读错误,则关闭连接,并移出其对应的定时器 if(errno != EAGAIN){ handle_callback(&users[sockfd]); if(timer != NULL) timer_list.del_timer(timer); } } else if(ret == 0){ //如果对方已关闭连接,则我们也关闭连接,并移除对应的定时器 handle_callback(&users[sockfd]); if(timer != NULL) timer_list.del_timer(timer); } else{ //如果某个客户连接上有数据可读,我们要调整该连接对应的定时器,以延迟该连接被关闭的时间,也就是所谓的,增加寿命 if(timer != NULL){} time_t cur = time(NULL); timer->expire_ = cur + 3 * TIME_SLOT; printf("adjust timer once\n"); timer_list.adjust_timer(timer); } } else{ //do something } } //处理完上面的事情后,最后处理定时事件,因为I/O事件有更高的优先级。当然,这样做将导致定时任务不能精确的按照预期的时间执行 if(timeout){ timer_handler(); timeout = false; } } close(listenfd); close(pipefd[1]); close(pipefd[0]); close(epollfd); return 0; }上述代码已经通过测试,接下来我继续学习time wheel定时器。
