redis源码阅读2-线程模型和event-loop

这一篇将从lldb入手,分析redis的线程模型以及上一篇中提到的ae事件循环, 最好参照源码看本文以加深理解

用lldb查看线程

先看一下redis在启动之后,什么也不做的情况下,线程情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
(lldb) thread list
Process 11155 stopped
* thread #1: tid = 0x15e78, 0x00007fffa43cae2e libsystem_kernel.dylib`kevent + 10, queue = 'com.apple.main-thread', stop reason = one-shot breakpoint 2
thread #2: tid = 0x15f33, 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
thread #3: tid = 0x15f34, 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
thread #4: tid = 0x15f35, 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
(lldb) bt
* thread #1: tid = 0x15e78, 0x00007fffa43cae2e libsystem_kernel.dylib`kevent + 10, queue = 'com.apple.main-thread', stop reason = one-shot breakpoint 2
* frame #0: 0x00007fffa43cae2e libsystem_kernel.dylib`kevent + 10
frame #1: 0x0000000100004be2 redis-server`aeProcessEvents [inlined] aeApiPoll(eventLoop=0x0000000100328990) + 59 at ae_kqueue.c:115 [opt]
frame #2: 0x0000000100004ba7 redis-server`aeProcessEvents(eventLoop=<unavailable>, flags=<unavailable>) + 311 at ae.c:400 [opt]
frame #3: 0x0000000100004fdb redis-server`aeMain(eventLoop=<unavailable>) + 43 at ae.c:455 [opt]
frame #4: 0x000000010000fb0a redis-server`main(argc=<unavailable>, argv=<unavailable>) + 1642 at server.c:3791 [opt]
frame #5: 0x00007fffa429b255 libdyld.dylib`start + 1
frame #6: 0x00007fffa429b255 libdyld.dylib`start + 1
(lldb) thread select 2
* thread #2: tid = 0x15f33, 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
frame #0: 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
libsystem_kernel.dylib`__psynch_cvwait:
-> 0x7fffa43c9c8a <+10>: jae 0x7fffa43c9c94 ; <+20>
0x7fffa43c9c8c <+12>: movq %rax, %rdi
0x7fffa43c9c8f <+15>: jmp 0x7fffa43c2d6f ; cerror_nocancel
0x7fffa43c9c94 <+20>: retq
(lldb) bt
* thread #2: tid = 0x15f33, 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
* frame #0: 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
frame #1: 0x00007fffa44b297a libsystem_pthread.dylib`_pthread_cond_wait + 712
frame #2: 0x000000010006890c redis-server`bioProcessBackgroundJobs(arg=<unavailable>) + 348 at bio.c:176 [opt]
frame #3: 0x00007fffa44b1abb libsystem_pthread.dylib`_pthread_body + 180
frame #4: 0x00007fffa44b1a07 libsystem_pthread.dylib`_pthread_start + 286
frame #5: 0x00007fffa44b1231 libsystem_pthread.dylib`thread_start + 13
(lldb) thread select 3
* thread #3: tid = 0x15f34, 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
frame #0: 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
libsystem_kernel.dylib`__psynch_cvwait:
-> 0x7fffa43c9c8a <+10>: jae 0x7fffa43c9c94 ; <+20>
0x7fffa43c9c8c <+12>: movq %rax, %rdi
0x7fffa43c9c8f <+15>: jmp 0x7fffa43c2d6f ; cerror_nocancel
0x7fffa43c9c94 <+20>: retq
(lldb) bt
* thread #3: tid = 0x15f34, 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
* frame #0: 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
frame #1: 0x00007fffa44b297a libsystem_pthread.dylib`_pthread_cond_wait + 712
frame #2: 0x000000010006890c redis-server`bioProcessBackgroundJobs(arg=<unavailable>) + 348 at bio.c:176 [opt]
frame #3: 0x00007fffa44b1abb libsystem_pthread.dylib`_pthread_body + 180
frame #4: 0x00007fffa44b1a07 libsystem_pthread.dylib`_pthread_start + 286
frame #5: 0x00007fffa44b1231 libsystem_pthread.dylib`thread_start + 13
(lldb) thread select 4
* thread #4: tid = 0x15f35, 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
frame #0: 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
libsystem_kernel.dylib`__psynch_cvwait:
-> 0x7fffa43c9c8a <+10>: jae 0x7fffa43c9c94 ; <+20>
0x7fffa43c9c8c <+12>: movq %rax, %rdi
0x7fffa43c9c8f <+15>: jmp 0x7fffa43c2d6f ; cerror_nocancel
0x7fffa43c9c94 <+20>: retq
(lldb) bt
* thread #4: tid = 0x15f35, 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
* frame #0: 0x00007fffa43c9c8a libsystem_kernel.dylib`__psynch_cvwait + 10
frame #1: 0x00007fffa44b297a libsystem_pthread.dylib`_pthread_cond_wait + 712
frame #2: 0x000000010006890c redis-server`bioProcessBackgroundJobs(arg=<unavailable>) + 348 at bio.c:176 [opt]
frame #3: 0x00007fffa44b1abb libsystem_pthread.dylib`_pthread_body + 180
frame #4: 0x00007fffa44b1a07 libsystem_pthread.dylib`_pthread_start + 286
frame #5: 0x00007fffa44b1231 libsystem_pthread.dylib`thread_start + 13
(lldb)

注意到thread list为列出所有的线程数。这里一共有四个线程。分为两组

  • thread 1: 通过bt可以看到thread 1就是aeMain执行的线程,也是我们的主线程
  • thread 2-4: 这三个线程可以看到是执行的bioProcessBackgroundJobs函数,通过名字可知,为执行后台任务
    以上,就目前的了解,就是redis线程的全部

后台任务

即thread 2-4线程所做的事情, 这一块相对比较简单,可以先粗略的看一下,后边有机会再进一步详细的研究

首先,在void initServer(void) 最后,调用void bioInit(void) 方法,创建的三个(BIO_NUM_OPS)后台线程, 其回调函数就是上面的bioProcessBackgroundJobs

三个后台任务对应的工作,简单的可以通过其类型定义而知

1
2
3
4
5
/* Background job opcodes */
#define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
#define BIO_NUM_OPS 3

而对于三种类型的job处理如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
/* What we free changes depending on what arguments are set:
* arg1 -> free the object at pointer.
* arg2 & arg3 -> free two dictionaries (a Redis DB).
* only arg3 -> free the skiplist. */

if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}

从而得知, 后台任务做以下三件事情:

  • BIO_CLOSE_FILE 关闭文件
  • BIO_AOP_FSYNC 同步AOF相关的东西,由于AOF还没看,所以先做个TODO
  • BIO_LAZY_FREE 做内存异步回收, 包括一个redis对象,或者两个dict,再或者一个skiplist

至于以上三个操作为什么要做成异步,暂时还不得而知,作为TODO

另外这里还可以学习的一点就是关于pthread_cond, pthread_lock的使用,有兴趣的可自行研究

aeMain

整个redis, aeMain线程可以说是处理了几乎所有的事情,这个自然是一个重点研究对象。
还没搞明白为什么名字要叫ae,感觉event-loop更易于理解一些,所以后文除代码意外,都使用el代指ae

el做了什么

上一篇中,已经看到了aeMain函数的样子, 很简单,从而也可以得到一个不严谨的对于event-loop的定义:
当程序没有停止的时候,就一直执行事件

这里loop也就是上面所说的一直执行了,那么事件指什么呢?还是从源码来看。

1
aeProcessEvents(eventLoop, AE_ALL_EVENTS);

这里,传入了一个AE_ALL_EVENTS, 那么根据这个AE_ALL_EVENTS宏定义,可以找到

1
2
3
4
#define AE_FILE_EVENTS 1
#define AE_TIME_EVENTS 2
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
#define AE_DONT_WAIT 4

可以推断,这里执行的事件,包括了文件事件和定时器事件,在阅读aeProcessEvents之后,发现存在如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
   numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;

/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */

if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}

仔细阅读会发现,这里的代码结构和epoll的调用非常相似,其实,aeApiPoll就是对不同的不同的多路复用io(select, epoll, ev_port, kqueue) 的封装。而其中的fe->rfileProc(eventLoop,fd,fe->clientData,mask);fe->wfileProc(eventLoop,fd,fe->clientData,mask);也正是对文件的处理, 所以文件(socket也是文件而且这里主要是socket读写)事件的处理已经被我们找到。但是,定时器事件在哪呢?

其实,这里利用了aeApiPoll的第二个参数,可以看到tvp的类型为struct timeval,我们知道在epoll_wait的时候,可以传入一个等待时间, 这里aeApiPoll同理,传入一个等待时间,只不过这个等待时间是通过一些状态计算出来的, 若在可以等待的情况下,等待时间到了,自然就返回了,也就是到了定时器执行的时机。那么这个等待时间计算规则如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;

aeGetTime(&now_sec, &now_ms);
tvp = &tv;

/* How many milliseconds we need to wait for the next
* time event to fire? */

long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;

if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */

if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}

即:

1
2
3
4
5
6
若可以等待,则找出最近的将要执行的定时器
若存在这样一个定时器
计算出从现在到将要执行的时间所需等待的毫秒数
若不存在满足条件的定时器
则一直等待
若不能等待,将等待时间设为0ms

另外:在文件事件执行完成之后,才执行定时器事件

1
2
3
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

aeFileEvent及文件事件的注册

文件事件的结构如下:

1
2
3
4
5
6
7
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

  • mask 表示文件为可读还是可写

其中,包含两个回调函数,分别为读写(与上面的文件事件的处理对应), 原型如下

1
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

其注册函数如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)

{

if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];

if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}

可以看到aeEventLoop中,存在一个events数组,以文件描述符fd为下标,保存的是aeFileEvent数据。这样,当某个文件有读写事件时,根据fd即可找到其对应的事件的回调函数

在redis初始化时,在void initServer(void)函数中注册了如下三个事件

  • aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) 服务主定时任务,1ms执行一次,回调函数为serverCron
  • aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) tcp socket读事件
  • aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) unix socket 读事件

后两个socket相关的事件是否注册,是由配置决定的(server.ipfd_count server.sofd)

由于服务启动时,不会有任何的写事件发生,即没有客户端连接,所以不会发送任何数据,所以只注册AE_READABLE, 这个事件对应的mask会在el中动态的修改

其他地方也存在事件的注册,例如cluster集群互相建立连接时,客户端连接redis服务时。这些在后边涉及到时再进一步说明

下一篇,待定

文章目录
  1. 1. 用lldb查看线程
  2. 2. 后台任务
  3. 3. aeMain
    1. 3.1. el做了什么
    2. 3.2. aeFileEvent及文件事件的注册
,