redis事件深入理解

    2015年11月11日 doc redis 字数:15270

redis事件

在介绍redis事件前,首先补充一个基础知识 事件驱动(IO多路复用技术)

事件驱动

首先介绍下事件驱动,所谓事件驱动,通俗来说就是你点什么电脑就执行什么。 事件驱动程序的基本结构组成如下

  • 事件收集器
    • 事件收集器专门负责收集所有事件,包括来自用户的(如鼠标、键盘事件等)、来自硬件的(如时钟事件等)和来自软件的(如操作系统、应用程序本身等)。
  • 事件发送器
    • 事件发送器负责将收集器收集到的事件分发到目标对象中。
  • 事件处理器
    • 事件处理器做具体的事件响应工作,它往往要到实现阶段才完全确定,因而需要运用虚函数机制(函数名往往取为类似于HandleMsg的一个名字)。

对于框架的使用者来说,他们唯一能够看到的是事件处理器。这也是他们所关心的内容。

IO多路复用可以参考知乎上这篇,讲解的较为详细 比较生动的是这部分

你站在讲台上等,谁解答完谁举手。这时C、D举手,表示他们解答问题完毕,你下去依次检查C、D的答案,然后继续回到讲台上等。此时E、A又举手,然后去处理E和A。。。 这种就是IO复用模型,Linux下的select、poll和epoll就是干这个的。将用户socket对应的fd注册进epoll,然后epoll帮你监听哪些socket上有消息到达,这样就避免了大量的无用操作。此时的socket应该采用非阻塞模式。 http://www.zhihu.com/question/28594409

redis事件类型

redis服务器是一个事件驱动程序。服务器需要处理以下两类事件 1、文件事件 2、时间事件

一、文件事件 文件事件指的是socket文件描述符的读写就绪情况。分为读写两种。

1
2
3
4
5
6
7
8
9
10
11
12
// 文件事件结构体
/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
 
    // 回调函数指针
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
 
    // clientData 参数一般是指向 redisClient 的指针
    void *clientData;
} aeFileEvent;

二、时间事件 时间事件分为一次性定时器和周期性定时器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
 
    // 定时回调函数指针
    aeTimeProc *timeProc;
 
    // 定时事件清理函数,当删除定时事件的时候会被调用
    aeEventFinalizerProc *finalizerProc;
 
    // clientData 参数一般是指向 redisClient 的指针
    void *clientData;
 
    // 定时事件表采用链表来维护
    struct aeTimeEvent *next;
} aeTimeEvent;

redis循环事件处理

此图讲述的较为清晰。 http://static.oschina.net/uploads/space/2013/0915/162951_6XvT_917596.png enter image description here 部分内容引用http://my.oschina.net/u/917596/blog/161077该篇文章。

整体思路如下:

1、 初始化事件循环结构体

创建一个aeCreateEventLoop对象,对象的属性值如下面结构体所示。 整体来说,事件循环结构体维护 I/O 事件表,定时事件表和触发事件表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
 
    // 记录最大的定时事件 id + 1
    long long timeEventNextId;
 
    // 用于系统时间的矫正
    time_t lastTime;     /* Used to detect system clock skew */
 
    // I/O 事件表
    aeFileEvent *events; /* Registered events */
 
    // 被触发的事件
    aeFiredEvent *fired; /* Fired events */
 
    // 定时事件表
    aeTimeEvent *timeEventHead;
 
    // 事件循环结束标识
    int stop;
 
    // 对于不同的 I/O 多路复用技术,有不同的数据,详见各自实现
    void *apidata; /* This is used for polling API specific data */
 
    // 新的循环前需要执行的操作
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

2、注册监听套接字的事件

文件 I/O 事件注册主要操作在 aeCreateFileEvent() 中完成。aeCreateFileEvent() 会根据文件描述符(setsize)的数值大小在事件循环结构体的 I/O 事件表中取一个数据空间,利用系统提供的 I/O 多路复用技术监听感兴趣的 I/O 事件,并设置回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    // 在 I/O 事件表中选择一个空间
    aeFileEvent *fe = &eventLoop->events[fd];
 
    // aeApiAddEvent() 只在此函数中调用,对于不同 IO 多路复用实现,会有所不同
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
 
    fe->mask |= mask;
 
    // 设置回调函数
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

3、注册定时事件

定时事件,在事件循环结构体中用链表来维护。定时事件操作在 aeCreateTimeEvent()中完成,主要内容为:分配定时事件结构体,设置触发时间和回调函数,插入到定时事件表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc)
{
/*    自增
    timeEventNextId 会在处理执行定时事件时会用到,用于防止出现死循环。
    如果超过了最大 id,则跳过这个定时事件,为的是避免死循环,即:
    如果事件一执行的时候注册了事件二,事件一执行完毕后事件二得到执行,紧接着如果事件一有得到执行就会成为循环,因此维护了 timeEventNextId 。*/
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;
 
    // 分配空间
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
 
    // 填充时间事件结构体
    te->id = id;
 
    // 计算超时时间
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
 
    // proc == serverCorn
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
 
    // 头插法,将时间事件插入到链表中
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

4、事件监听(监听,回调的思想)

1. redis 提供了 TCP 和 UNIX 域套接字两种工作方式,监听这块首先是 创建绑定并监听tcp端口和unix socket
2. 接下来,ininServer为所有的监听套接字注册了读事件,响应函数为 acceptTcpHandler() 或者 acceptUnixHandler()
3. 接收套接字与客户端建立连接后,建立并保存服务端与客户端的连接信息,这些信息保存在一个 struct redisClient 结构体中
4. 为与客户端连接的套接字注册读事件,相应的回调函数为 readQueryFromClient(),readQueryFromClient() 作用是从套接字读取数据,执行相应操作并回复客户端

5、进入事件循环,触发事件

事件循环基于事件循环结构体来进行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
 
        // 进入事件循环可能会进入睡眠状态。在睡眠之前,执行预设置的函数 aeSetBeforeSleepProc()。
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
 
        // AE_ALL_EVENTS 表示处理所有的事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}
 
// 先处理定时事件,然后处理套接字事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
 
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
 
    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
 
        int j;
        aeTimeEvent *shortest = NULL;
        // tvp 会在 IO 多路复用的函数调用中用到,表示超时时间
        struct timeval tv, *tvp;
 
        // 得到最短将来会发生的定时事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
 
        // 计算睡眠的最短时间
        if (shortest) { // 存在定时事件
            long now_sec, now_ms;
 
            /* Calculate the time missing for the nearest
             * timer to fire. */
            // 得到当前时间
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) { // 需要借位
                // 减法中的借位,毫秒向秒借位
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else { // 不需要借位,直接减
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
 
            // 当前系统时间已经超过定时事件设定的时间
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            // 如果没有定时事件,见机行事
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
 
        // 调用 IO 多路复用函数阻塞监听
        numevents = aeApiPoll(eventLoop, tvp);
 
        // 处理已经触发的事件
        for (j = 0; j < numevents; j++) {
            // 找到 I/O 事件表中存储的数据
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
 
         /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
 
    // 处理定时事件
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
 
    return processed; /* return the number of processed file/time events */
}

7、如果监听套接字变为可读,会接收客户端请求,并为对应的套接字注册读事件.如果与客户端连接的套接字变为可读,执行相应的操作