摘要:服务器通过套接字与客户端进行连接,服务器与客户端的通信会产生相应的文件事件。两类事件可以根据时间事件处理器中时间事件处理函数绑定的具体函数返回结果来区分。如果返回值为,则表示这是个一次性的时间事件,否则表明是个周期任务。
每个 CS 模式程序,尤其是高并发的网络服务端程序都有自己的网络异步事件处理库,Redis不例外。Redis 基于 Reactor 模型 封装了自己的事件驱动模型库。你可能会跟我有一样的疑问,为什么作者不使用已有的成熟的相关库,比如 Libevent 或 Libev?作者是这样跟别人讨论的,感兴趣的可以了解下。
下面从源码入手介绍下 Redis 中封装的 ae 库。
Redis 的 I/O 多路复用函数库对常见的 select/epoll/evport/kqueue 等进行了封装,提高了易用性。每一个 I/O 多路复用函数库在 Redis 源码中都多带带成一个个文件,因此你可以找到 ae_epoll.c 等文件,它们对外提供统一的 API,这样做有一个好处是,底层库可以互换。
Redis 在底层用哪个 I/O 多路复用函数库是在编译时决定的,源码中定义了如下的规则,
#ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
该规则保证在编译时会自动选择系统中性能最高的 I/O 多路复用函数库作为其 I/O 多路复用程序的底层实现。绝大多数的 Redis 服务都跑在 linux 服务器上,因此用的是 epoll。
大家都知道,epoll 常用函数主要有以下 3 个:
#includeint epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
与之相对,在 ae_epoll.c 文件中对这三个函数进行了相应的封装(主要是配合 aeEventLoop)
// 获得 epfd 以及为 aeEventLoop 结构体创建 aeApiState static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { // 出错释放 zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; return 0; }
// 为 fpfd 添加监控 fd 感兴趣的事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }
// epoll_wait static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; // 这里的 aeApiState 为 aeApiCreate 时创建的 int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; // 遍历产生的事件,加入 eventLoop->fired 数组 for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; // 根据事件不同设置不同的 mask if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; }
看起来很简单,感兴趣的小伙伴可以看看 select 等其他底层库的封装。
Redis 中的事件Redis 是事件驱动的程序,服务器需要处理文件事件(file event)和时间事件(time event),其中各结构体关系如下:
可以看到事件处理的核心是 aeEventLoop,它的作用是负责保存待处理文件事件和时间事件的结构体。
对于事件的处理,可以用下的图概况说明,
那么,下面先介绍 Redis 中的两种事件。
文件事件是 Redis 服务器对套接字操作的抽象。
服务器通过套接字与客户端进行连接,服务器与客户端的通信会产生相应的文件事件。
Redis 中使用 I/O 多路复用程序同时监听多个 socket,然后 socket 中做的不同操作关联不同的事件处理器,即采用不用的处理逻辑。
当客户端 connect server 时,即有新的连接到来,此时在服务器 listen socket fd 上产生 ae.h/AE_READABLE 事件,该事件由 acceptTcpHandler 函数进行处理。
当客户端有数据写到 socket 时,client fd 上产生ae.h/AE_READABLE 事件,该事件由 readQueryFromClient 函数进行处理。
当 sever 给 client 回复时,client fd 产生 ae.h/AE_WRITABLE 事件,该事件由 sendReplyToClient 函数进行处理。
而对于所有的文件事件,在事件处理器 aeProcessEvents 中都有如下处理逻辑:
// 阻塞等待,返回就绪文件事件的个数 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; if (fe->mask & mask & AE_READABLE) { // read 处理 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { // write 处理 if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; // 处理的事件数 + 1 }事件处理与调度
包含对文件事件和时间事件的处理,其实都收敛到 aeProcessEvents 这个函数。
在网上找了一个图可以很好的说明这个过程
在时间处理器中有这样一个逻辑:
// 找出最近要发生的那个时间事件 shortest = aeSearchNearestTimer(eventLoop); ... ... // 阻塞等待,返回就绪文件事件的个数 numevents = aeApiPoll(eventLoop, tvp);
将最近要发生的时间事件的时间作为 aeApiPoll 函数的最大阻塞时间,这既可以避免服务器对时间事件进行频繁的轮询(忙等待),也可以确保该函数不会阻塞过长时间。
注意:
事件可能是并发产生的,但是到了时间处理器这里都变成串行处理了;
时间事件并不一定是按照预设的时间点发生,会有偏差。
时间事件时服务器对一些定时或者周期任务的抽象。
分为两类,定时事件和周期事件。两类事件可以根据时间事件处理器 processTimeEvents 中时间事件处理函数 timeProc 绑定的具体函数返回结果来区分。如果返回值为 AE_NOMORE,则表示这是个一次性的时间事件,否则表明是个周期任务。
对此,在事件处理器 aeProcessEvents 如下处理逻辑:
if (flags & AE_TIME_EVENTS) // 处理时间事件 processed += processTimeEvents(eventLoop);
进到 processTimeEvents里去看,会遍历每一个时间事件,然后执行以下逻辑:
aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) // 时间事件过时,需要执行 { int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); processed++; if (retval != AE_NOMORE) { // 周期任务 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { // 一次性时间事件,标记成 AE_DELETED_EVENT_ID,从 epfd 的监控中删掉 te->id = AE_DELETED_EVENT_ID; } }
从时间事件的结构体也可以看出,Redis 中将时间事件串成一个无序链表,说无序是因为该链表对各个时间事件的发生顺序是乱的。
在目前版本中,正常模式下的 Redis 服务器只使用了 serverCron 一个时间事件,而在 benchmark 模式下,服务器也值使用了两个时间事件。在这种情况下,服务器几乎是将无序链表退化成一个指针来使用,因此不影响时间事件执行的性能。Redis 服务流程
将 Redis 源码主流程简化,其实就是下面这样,
int main(int argc, char **argv) { ... initServer(); ... aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); // 陷入循环,等待外部事件发生 aeDeleteEventLoop(server.el); return 0; }
而在 aeMain 函数中则是一个大循环,该循环一直到 eventLoop->stop 被标记成非 0 才会停止,如下,
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); // 事件(文件|时间)处理函数 } }
在初始化 initServer 函数中会创建一个 aeEventLoop 结构体,如下,
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
maxclients 为配置文件中配置的可接受最大连接数。
参考【1】《Redis 设计与实现》
【2】https://draveness.me/redis-ev...
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/36789.html
摘要:考虑到的定时并不是那么准确无论是还是,所以本来打算自己维护这个定时器队列。之前我用对象存了一本字典,约十二万多的词条,原文件大概也就五六兆,用的原生对象一存居然有五六百兆的内存占用所以打算这个定时器队列用来写。,表示列表相关指令事件支持。 原文链接:http://xcoder.in/2015/06/05/scheduled-task-using-redis/ 好久没写博文了,...
摘要:事件分派器会根据每个当前产生的事件,来选择对应的事件处理器来处理。核心是基于非阻塞的多路复用机制单线程避免了多线程上下文切换的开销。 1.memcached和redis有什么区别? (1)Redis支持服务器端的数据操作 redis和memcached相比,redis拥有更多的 数据结构并且支持更丰富的数据操作 ,通常在memcached里面,你需要将数据拿到客户端来进行类型的修改然后在se...
摘要:这个文件事件处理器是单线程的,所以叫做单线程模型,采用多路复用机制同时监听多个,根据上的事件来选择对应的事件处理器处理这个事件。 为什么使用缓存 优点: 高性能 高并发 MySQL天然对高并发不好,MySQL单机支撑2000qps也开始容易报警,可以使用缓存,让数据查询从缓存中拿出数据 缺点: 缓存的数据和数据库的数据不一致 缓存雪崩 缓存穿透 缓存并发竞争 redis 和m...
摘要:这个文件事件处理器是单线程的,所以叫做单线程模型,采用多路复用机制同时监听多个,根据上的事件来选择对应的事件处理器处理这个事件。 为什么使用缓存 优点: 高性能 高并发 MySQL天然对高并发不好,MySQL单机支撑2000qps也开始容易报警,可以使用缓存,让数据查询从缓存中拿出数据 缺点: 缓存的数据和数据库的数据不一致 缓存雪崩 缓存穿透 缓存并发竞争 redis 和m...
摘要:这个文件事件处理器是单线程的,所以叫做单线程模型,采用多路复用机制同时监听多个,根据上的事件来选择对应的事件处理器处理这个事件。 为什么使用缓存 优点: 高性能 高并发 MySQL天然对高并发不好,MySQL单机支撑2000qps也开始容易报警,可以使用缓存,让数据查询从缓存中拿出数据 缺点: 缓存的数据和数据库的数据不一致 缓存雪崩 缓存穿透 缓存并发竞争 redis 和m...
阅读 3080·2023-04-25 22:43
阅读 2369·2021-09-23 11:20
阅读 3384·2021-09-06 15:15
阅读 1140·2019-08-30 15:54
阅读 3339·2019-08-30 14:20
阅读 2726·2019-08-29 17:16
阅读 2943·2019-08-29 15:28
阅读 3285·2019-08-29 11:08