1 基本数据结构
struct ngx_event_s {
void *data;
unsigned
write:
1;
unsigned accept:
1;
/* used
to detect
the stale events
in kqueue
and epoll */
unsigned instance:
1;
/*
*
the event was passed
or would be passed
to a kernel;
*
in aio mode - operation was posted.
*/
unsigned active:
1;
unsigned disabled:
1;
/*
the ready event;
in aio mode
0 means
that no operation can be posted */
unsigned ready:
1;
unsigned oneshot:
1;
/* aio operation
is complete */
unsigned complete:
1;
unsigned eof:
1;
unsigned
error:
1;
unsigned timedout:
1;
unsigned timer_set:
1;
unsigned delayed:
1;
unsigned deferred_accept:
1;
/*
the pending eof reported
by kqueue, epoll
or in aio chain operation */
unsigned pending_eof:
1;
unsigned posted:
1;
unsigned closed:
1;
/*
to test
on worker
exit */
unsigned channel:
1;
unsigned resolver:
1;
unsigned cancelable:
1;
unsigned kq_vnode:
1;
/*
the pending errno reported
by kqueue */
int kq_errno;
/*
* kqueue only:
* accept:
number of sockets
that wait
to be accepted
*
read: bytes
to read when event
is ready
*
or lowat when event
is set with NGX_LOWAT_EVENT flag
*
write: available
space in buffer when event
is ready
*
or lowat when event
is set with NGX_LOWAT_EVENT flag
*
* epoll
with EPOLLRDHUP:
* accept:
1 if accept many,
0 otherwise
*
read:
1 if there can be data
to read,
0 otherwise
*
* iocp: TODO
*
* otherwise:
* accept:
1 if accept many,
0 otherwise
*/
int available;
unsigned available:
1;
ngx_event_handler_pt handler;
ngx_event_ovlp_t ovlp;
ngx_uint_t index;
ngx_log_t *
log;
ngx_rbtree_node_t timer;
/*
the posted queue */
ngx_queue_t queue;
/*
the threads support */
/*
*
the event thread context, we store
it here
*
if $(CC)
does not understand __thread declaration
*
and pthread_getspecific()
is too costly
*/
void *thr_ctx;
/* event should
not cross cache line
in SMP */
uint32_t padding[NGX_EVENT_T_PADDING];
};
struct ngx_listening_s {
ngx_socket_t fd;
struct sockaddr *sockaddr;
socklen_t socklen;
size_t addr_text_max_len;
ngx_str_t addr_text;
int type;
int backlog;
int rcvbuf;
int sndbuf;
#if (NGX_HAVE_KEEPALIVE_TUNABLE)
int keepidle;
int keepintvl;
int keepcnt;
#endif
ngx_connection_handler_pt handler;
void *servers;
ngx_log_t
log;
ngx_log_t *logp;
size_t pool_size;
size_t post_accept_buffer_size;
ngx_msec_t post_accept_timeout;
ngx_listening_t *previous;
ngx_connection_t *connection;
ngx_uint_t worker;
unsigned open:
1;
unsigned remain:
1;
unsigned ignore:
1;
unsigned bound:
1;
unsigned inherited:
1;
unsigned nonblocking_accept:
1;
unsigned listen:
1;
unsigned nonblocking:
1;
unsigned shared:
1;
unsigned addr_ntop:
1;
unsigned wildcard:
1;
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
unsigned ipv6only:
1;
#endif
#if (NGX_HAVE_REUSEPORT)
unsigned reuseport:
1;
unsigned add_reuseport:
1;
#endif
unsigned keepalive:
2;
#if (NGX_HAVE_DEFERRED_ACCEPT)
unsigned deferred_accept:
1;
unsigned delete_deferred:
1;
unsigned add_deferred:
1;
#ifdef SO_ACCEPTFILTER
char *accept_filter;
#endif
#endif
#if (NGX_HAVE_SETFIB)
int setfib;
#endif
#if (NGX_HAVE_TCP_FASTOPEN)
int fastopen;
#endif
};
struct ngx_connection_s {
void *data;
ngx_event_t *read;
ngx_event_t *write;
ngx_socket_t fd;
ngx_recv_pt recv;
ngx_send_pt send;
ngx_recv_chain_pt recv_chain;
ngx_send_chain_pt send_chain;
ngx_listening_t *listening;
off_t sent;
ngx_log_t *
log;
ngx_pool_t *pool;
int type;
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_str_t addr_text;
ngx_str_t proxy_protocol_addr;
in_port_t proxy_protocol_port;
#if (NGX_SSL)
ngx_ssl_connection_t *ssl;
#endif
struct sockaddr *local_sockaddr;
socklen_t local_socklen;
ngx_buf_t *buffer;
ngx_queue_t
queue;
ngx_atomic_uint_t number;
ngx_uint_t requests;
unsigned buffered:
8;
unsigned log_error:
3;
unsigned timedout:
1;
unsigned error:
1;
unsigned destroyed:
1;
unsigned idle:
1;
unsigned reusable:
1;
unsigned close:
1;
unsigned shared:
1;
unsigned sendfile:
1;
unsigned sndlowat:
1;
unsigned tcp_nodelay:
2;
unsigned tcp_nopush:
2;
unsigned need_last_buf:
1;
#if (NGX_HAVE_AIO_SENDFILE)
unsigned busy_count:
2;
#endif
#if (NGX_THREADS)
ngx_thread_task_t *sendfile_task;
#endif
};
2 ngx事件的执行过程
2.1 nginx初始化套接口过程
在ngx_init_cycle函数中调用函数ngx_open_listening_sockets,完成创建套接口(socket)、绑定套接口(bind)、监听套接口(listen),这些步骤结束后,在cycle的listening.elts所指向的ngx_listening_t结构体指针数组中存储了套接字的描述符信息,有几个监听服务就会在listening.elts中存储几个ngx_listening_t指针类型的元素。
2.2 ngx_single_process_cycle解析
接着进入ngx_single_process_cycle函数,该函数处理事件的主要函数。在介绍该函数之前先看下ngx_event_core_module模块的定义
ngx_module_t ngx_event_core_module = {
NGX_MODULE_V1,
&ngx_event_core_module_ctx,
ngx_event_core_commands,
NGX_EVENT_MODULE,
NULL,
ngx_event_module_init,
ngx_event_process_init,
NULL,
NULL,
NULL,
NULL,
NGX_MODULE_V1_PADDING
};
static ngx_command_t ngx_epoll_commands[] = {
{ ngx_string(
"epoll_events"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
0,
offsetof(ngx_epoll_conf_t, events),
NULL },
{ ngx_string(
"worker_aio_requests"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
0,
offsetof(ngx_epoll_conf_t, aio_requests),
NULL },
ngx_null_command
};
ngx_event_module_t ngx_epoll_module_ctx = {
&epoll_name,
ngx_epoll_create_conf,
ngx_epoll_init_conf,
{
ngx_epoll_add_event,
ngx_epoll_del_event,
ngx_epoll_add_event,
ngx_epoll_del_event,
ngx_epoll_add_connection,
ngx_epoll_del_connection,
ngx_epoll_notify,
NULL,
ngx_epoll_process_events,
ngx_epoll_init,
ngx_epoll_done,
}
};
ngx_module_t ngx_epoll_module = {
NGX_MODULE_V1,
&ngx_epoll_module_ctx,
ngx_epoll_commands,
NGX_EVENT_MODULE,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NGX_MODULE_V1_PADDING
};
在ngx_event_core_module模块定义中有两个回调函数,这两个回调函数就负责初始化套接口
ngx_event_module_init, /* init module */:在ngx_init_cycle的ngx_init_modules(cycle)函数中调用
ngx_event_process_init, /* init process */:如果是单进程模式,则在ngx_single_process_cycle(ngx_cycle_t *cycle)函数中调用,如果是多进程模式则在ngx_worker_process_init 中调用。在 ngx_event_process_init中有个重要的功能就是将accept注册到epoll中。
在ngx_single_process_cycle函数的开始位置有一段如下代码,
for (i =
0; cycle->modules[i]; i++) {
if (cycle->modules[i]->init_process) {
if (cycle->modules[i]->init_process(cycle) ==
NGX_ERROR) {
/* fatal */
exit(
2);
}
}
}
当其执行到ngx_event_core_module模块时就会调用ngx_event_process_init函数,在该函数的732行开始,就会把之前已经绑定的监听套接口添加到epoll中。并将读事件ngx_event_t类型的结构体中的handler回调函数设置为函数ngx_event_accept,该函数将会在epoll中触发监听事件可读时调用,接受请求,并将请求的事件添加到epoll中。
2.3 ngx_process_events_and_timers函数解析
在函数中首先调用事件处理函数ngx_epoll_process_events 在该函数中首先调用epoll_wait获得就绪的事件,此时肯定能获得监听事件(如果有请求的话)在该函数的907行rev->handler(rev);,实际上调用的就是ngx_event_accept函数。 详细代码如下
for (i =
0; i < events; i++) {
c = event_list[i].data.ptr;
instance = (uintptr_t) c &
1;
c = (ngx_connection_t
*) ((uintptr_t) c & (uintptr_t) ~
1);
rev = c->
read;
if (c->fd == -
1 || rev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->
log,
0,
"epoll: stale event %p", c);
continue;
}
revents = event_list[i].events;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->
log,
0,
"epoll: fd:%d ev:XD d:%p",
c->fd, revents, event_list[i].data.ptr);
if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->
log,
0,
"epoll_wait() error on fd:%d ev:XD",
c->fd, revents);
}
if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
ngx_log_error(NGX_LOG_ALERT, cycle->
log,
0,
"strange epoll_wait() events fd:%d ev:XD",
c->fd, revents);
}
if ((revents & (EPOLLERR|EPOLLHUP))
&& (revents & (EPOLLIN|EPOLLOUT)) ==
0)
{
/*
* if the error events were returned without EPOLLIN or EPOLLOUT,
* then add these flags to handle the events at least in one
* active handler
*/
revents |= EPOLLIN|EPOLLOUT;
}
if ((revents & EPOLLIN) && rev->active) {
if (revents & EPOLLRDHUP) {
rev->pending_eof =
1;
}
rev->available =
1;
rev->ready =
1;
if (flags & NGX_POST_EVENTS) {
//如果不是监听事件,而是已经
accept或者是已经存在的可读事件则将其加入相应的队列中
queue = rev->
accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
}
else {
//如果是监听事件则调用ngx_event_accept
rev->handler(rev);
}
}
wev = c->
write;
if ((revents & EPOLLOUT) && wev->active) {
if (c->fd == -
1 || wev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->
log,
0,
"epoll: stale event %p", c);
continue;
}
wev->ready =
1;
wev->complete =
1;
if (flags & NGX_POST_EVENTS) {
ngx_post_event(wev, &ngx_posted_events);
}
else {
wev->handler(wev);
}
}
}
处理队列中的事件 在ngx_epoll_process_events中将accept事件和普通的读写事件分别加入了队列ngx_posted_accept_events,ngx_posted_events中。然后在ngx_process_events_and_timers中分别调用如下代码,处理队列中的事件
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
if (delta) {
ngx_event_expire_timers();
}
ngx_event_process_posted(cycle, &ngx_posted_events);
说明: 在ngx_epoll_process_events中调用epoll_wait后既可以得到刚建立连接的事件,又可以得到之前已经建立连接的正常读写事件,会把他们放到ngx_posted_accept_events,ngx_posted_events队列中,然后会延迟执行这些队列事件中的回调函数,优先执行刚建立连接的事件,然后执行正常执行的读写事件,这就解决了惊群和负载均衡两个问题。 如果在处理一个事件过程中产生了另一个事件,要求这个新产生的事件稍后执行,就可以把这个新事件加入到post队列中。