redis事件
在介绍redis事件前,首先补充一个基础知识 事件驱动(IO多路复用技术)
事件驱动
首先介绍下事件驱动,所谓事件驱动,通俗来说就是你点什么电脑就执行什么。 事件驱动程序的基本结构组成如下
- 事件收集器
- 事件收集器专门负责收集所有事件,包括来自用户的(如鼠标、键盘事件等)、来自硬件的(如时钟事件等)和来自软件的(如操作系统、应用程序本身等)。
- 事件发送器
- 事件发送器负责将收集器收集到的事件分发到目标对象中。
- 事件处理器
- 事件处理器做具体的事件响应工作,它往往要到实现阶段才完全确定,因而需要运用虚函数机制(函数名往往取为类似于HandleMsg的一个名字)。
对于框架的使用者来说,他们唯一能够看到的是事件处理器。这也是他们所关心的内容。
IO多路复用可以参考知乎上这篇,讲解的较为详细 比较生动的是这部分
你站在讲台上等,谁解答完谁举手。这时C、D举手,表示他们解答问题完毕,你下去依次检查C、D的答案,然后继续回到讲台上等。此时E、A又举手,然后去处理E和A。。。 这种就是IO复用模型,Linux下的select、poll和epoll就是干这个的。将用户socket对应的fd注册进epoll,然后epoll帮你监听哪些socket上有消息到达,这样就避免了大量的无用操作。此时的socket应该采用非阻塞模式。 http://www.zhihu.com/question/28594409
redis事件类型
redis服务器是一个事件驱动程序。服务器需要处理以下两类事件 1、文件事件 2、时间事件
一、文件事件 文件事件指的是socket文件描述符的读写就绪情况。分为读写两种。
1
2
3
4
5
6
7
8
9
10
11
12
// 文件事件结构体
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
// 回调函数指针
aeFileProc *rfileProc;
aeFileProc *wfileProc;
// clientData 参数一般是指向 redisClient 的指针
void *clientData;
} aeFileEvent;
二、时间事件 时间事件分为一次性定时器和周期性定时器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
// 定时回调函数指针
aeTimeProc *timeProc;
// 定时事件清理函数,当删除定时事件的时候会被调用
aeEventFinalizerProc *finalizerProc;
// clientData 参数一般是指向 redisClient 的指针
void *clientData;
// 定时事件表采用链表来维护
struct aeTimeEvent *next;
} aeTimeEvent;
redis循环事件处理
此图讲述的较为清晰。 http://static.oschina.net/uploads/space/2013/0915/162951_6XvT_917596.png 部分内容引用http://my.oschina.net/u/917596/blog/161077该篇文章。
整体思路如下:
1、 初始化事件循环结构体
创建一个aeCreateEventLoop对象,对象的属性值如下面结构体所示。 整体来说,事件循环结构体维护 I/O 事件表,定时事件表和触发事件表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
// 记录最大的定时事件 id + 1
long long timeEventNextId;
// 用于系统时间的矫正
time_t lastTime; /* Used to detect system clock skew */
// I/O 事件表
aeFileEvent *events; /* Registered events */
// 被触发的事件
aeFiredEvent *fired; /* Fired events */
// 定时事件表
aeTimeEvent *timeEventHead;
// 事件循环结束标识
int stop;
// 对于不同的 I/O 多路复用技术,有不同的数据,详见各自实现
void *apidata; /* This is used for polling API specific data */
// 新的循环前需要执行的操作
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
2、注册监听套接字的事件
文件 I/O 事件注册主要操作在 aeCreateFileEvent() 中完成。aeCreateFileEvent() 会根据文件描述符(setsize)的数值大小在事件循环结构体的 I/O 事件表中取一个数据空间,利用系统提供的 I/O 多路复用技术监听感兴趣的 I/O 事件,并设置回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 在 I/O 事件表中选择一个空间
aeFileEvent *fe = &eventLoop->events[fd];
// aeApiAddEvent() 只在此函数中调用,对于不同 IO 多路复用实现,会有所不同
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// 设置回调函数
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
3、注册定时事件
定时事件,在事件循环结构体中用链表来维护。定时事件操作在 aeCreateTimeEvent()中完成,主要内容为:分配定时事件结构体,设置触发时间和回调函数,插入到定时事件表中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc)
{
/* 自增
timeEventNextId 会在处理执行定时事件时会用到,用于防止出现死循环。
如果超过了最大 id,则跳过这个定时事件,为的是避免死循环,即:
如果事件一执行的时候注册了事件二,事件一执行完毕后事件二得到执行,紧接着如果事件一有得到执行就会成为循环,因此维护了 timeEventNextId 。*/
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
// 分配空间
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
// 填充时间事件结构体
te->id = id;
// 计算超时时间
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
// proc == serverCorn
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
// 头插法,将时间事件插入到链表中
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;
return id;
}
4、事件监听(监听,回调的思想)
1. redis 提供了 TCP 和 UNIX 域套接字两种工作方式,监听这块首先是 创建绑定并监听tcp端口和unix socket
2. 接下来,ininServer为所有的监听套接字注册了读事件,响应函数为 acceptTcpHandler() 或者 acceptUnixHandler()
3. 接收套接字与客户端建立连接后,建立并保存服务端与客户端的连接信息,这些信息保存在一个 struct redisClient 结构体中
4. 为与客户端连接的套接字注册读事件,相应的回调函数为 readQueryFromClient(),readQueryFromClient() 作用是从套接字读取数据,执行相应操作并回复客户端
5、进入事件循环,触发事件
事件循环基于事件循环结构体来进行的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 进入事件循环可能会进入睡眠状态。在睡眠之前,执行预设置的函数 aeSetBeforeSleepProc()。
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// AE_ALL_EVENTS 表示处理所有的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
// 先处理定时事件,然后处理套接字事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
// tvp 会在 IO 多路复用的函数调用中用到,表示超时时间
struct timeval tv, *tvp;
// 得到最短将来会发生的定时事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
// 计算睡眠的最短时间
if (shortest) { // 存在定时事件
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
// 得到当前时间
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) { // 需要借位
// 减法中的借位,毫秒向秒借位
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else { // 不需要借位,直接减
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
// 当前系统时间已经超过定时事件设定的时间
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
// 如果没有定时事件,见机行事
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
// 调用 IO 多路复用函数阻塞监听
numevents = aeApiPoll(eventLoop, tvp);
// 处理已经触发的事件
for (j = 0; j < numevents; j++) {
// 找到 I/O 事件表中存储的数据
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 读事件
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
// 处理定时事件
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
7、如果监听套接字变为可读,会接收客户端请求,并为对应的套接字注册读事件.如果与客户端连接的套接字变为可读,执行相应的操作