redis源码阅读3-AOF(Append Only File)

redis虽然是一个内存数据库,但是也存在着数据的持久化. 这一次就准备好好看看,一直以为神秘而又复杂的文件存储在redis中是如何高效实现的

在redis中,存在着两种与持久化相关的方案

  • RDB 存储某个时间点的完整备份
  • AOF(Append only file) 存储修改数据的命令, 可以认为是增量日志, 存储格式即redis命令协议的格式

RDB还没怎么看,所以先从AOF开始

aof配置

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
appendonly no/yes	# aof开关
appendfilename "appendonly.aof" # aof文件名
appendfsync always/everysec/no # 文件同步模式(还记得printf调用之后,需要调用flush不?这里是调用fsync,原理相同)
# always 每次主循环结束都刷新到文件
# everysec 每秒刷新一次缓存到文件
# no redis不管,刷新数据到硬盘交给系统处理,一般为缓冲区满或超过某个缓存时间
no-appendfsync-on-rewrite no/yes # 当后台任务进程在写入文件时,主线程是否开启aof功能, no:开启, yes:关闭
# 在一些linux下,当某个线程向aof文件写入时(后边会详细介绍aof功能的线程,进程关系),会阻
# 塞aof功能的写入,设置为yes临时关闭aof功能,
# 可防止主循环被阻塞,同时也可能增大数据丢失的风险
auto-aof-rewrite-percentage 100 # rewrite百分比,当前数据大小与上一次rewrite时的数据大小的比达到这里设定值时,自动执
# 行rewrite操作, 0为关闭此功能, rewrite将在后边详述
auto-aof-rewrite-min-size 64mb # aof rewrite的最小值,如果达到上边的比值,但是未达到这里设置的最小值,不会发生rewrite操作
aof-load-truncated yes/no # 当redis发现aof文件被截断(系统宕机等情况导致,但redis宕机不会出现此情况)时,所采取的措施
# yes 尽可能的加载数据,打印警告日志并继续
# no 报错并停止服务
aof-use-rdb-preamble no/yes # rewrite aof文件时,是否使用rdb作为文件的起始部分
# no 关闭此功能, 为了防止redis在未来修改AOF或RDB文件格式格式,此功能默认关闭
# yes 开启此功能,开启后,rewrite之后的文件就可能是以[RDB head][AOF tail]的格式存储
aof-rewrite-incremental-fsync yes # 当子进程在rewrite aof文件时,此值设为yes,则每新增32M就写入文件,以此来减少大延迟尖峰(big latency spikes)

运行时动态修改配置

使用config set parameter value可以在运行时,动态修改redis配置并立即生效
针对aof,以下配置可动态修改

  • appendonly
  • appendfsync
  • aof-load-truncated
  • aof-use-rdb-preamble
  • aof-rewrite-incremental-fsync
  • no-appendfsync-on-rewrite
  • auto-aof-rewrite-percentage
  • auto-aof-rewrite-min-size

AOF文件写入流程

写入缓存server.aof_buf

调用栈

1
2
3
4
5
6
7
8
9
10
11
12
13
// 命令调用
void call(client *c, int flags) {
...
c->cmd->proc(c); // 执行命令
...
if (propagate_flags != PROPAGATE_NONE) // 如果需要传播
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); // 传播命令
...
}

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags)
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) // 如果AOF开启,且需要传播到AOF, 则加入到server.aof_buf
feedAppendOnlyFile(cmd,dbid,argv,argc);

传播:将当前执行的命令发送给其他执行方,包括在slave上重放当前命令和写入aof缓存
从上面少量代码可以看出,如果当前命令是需要被传播的(有多个状态控制), 且aof开关打开,则会写入到aof缓存

写入缓存

具体的写入缓存代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];

// 如果发生了db切换,插入一条db切换语句(select dbid)
if (dictid != server.aof_selected_db) {
char seldb[64];

snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}

// 经命令转换为buf, 且对EXPRIRE/PEXPIRE/EXPIREAT/SETEX/PSETEX命令的特殊处理
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else {
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}

// 确认开关是否打开,并写入到aof_buf缓存, 并在进入下一次event loop之前,会flush到磁盘
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

// 如果rewrite aof正在进行,将当前命令也加入到rewrite buf
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

sdsfree(buf);
}

写入磁盘flushAppendOnlyFile

调用栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void aeMain(aeEventLoop *eventLoop) {	// 上一节的aeMain
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
...
}
}

void beforeSleep(struct aeEventLoop *eventLoop) {
...
flushAppendOnlyFile(0); // 写入磁盘
...
}

从上调用栈可知,每次event loop进入之前,会调用flush函数,与上面注释一致

写入磁盘逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;

if (sdslen(server.aof_buf) == 0) return;

if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;

if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
// 如果写入策略是每秒刷入磁盘一次, 且正在执行fsync
// 如果未产生推迟写入操作,则推迟此次操作
// 如果推迟时间小于2s,那也可以接受,继续推迟到下一次
// 否则,强制执行写入操作
// 注意,这里是推迟写入,即write
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */

if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */

server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */

return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */

server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */

// 一次性把文件缓存全部写入文件, 如果正在写入的是一个物理磁盘,则能保证是原子性操作, 除非是服务器断电宕机
latencyStartMonitor(latency);
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */

// 事件日志
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);

/* We performed the write so reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;

// 写入异常处理
if (nwritten != (signed)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;

/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}

/* Log the AOF write error and record the error code. */
if (nwritten == -1) { // 什么数据都没写入,记录日志和错误信息
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else { // 如果写入了部分数据
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}

// 把写入的数据回滚掉
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
// 写入的数据回滚失败,记录日志,并在下次写入剩下部分,不过,若此时宕机,则数据可能不完整,也就变成了trancated文件了
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
// 回滚成功,设置写入的数据为-1
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */

nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}

/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) { // 如果fsync策略为ALWAYS, 由于写入数据失败,那么就得退出程序了
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synced on disk. */

serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else { // 否则设置aof写入状态为失败,阻止后边的写入命令的执行
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */

server.aof_last_write_status = C_ERR;

/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */

if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else { // 写入成功,修改上次写入状态为成功, 以允许后续的写入命令的执行
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */

if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
// server.aof_current_size是指aof文件当前的大小
server.aof_current_size += nwritten;

/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */

// 内存重用
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}

/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */

// 如果配置了aof_no_fsync_on_rewrite, 即前边提到的配置no-appendfsync-on-rewrite, 则不调用flush, 防止阻塞
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;

/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) { // fsync策略为ALWAYS, 则在主循环调用aof_fsync, 此策略会阻塞主循环,降低redis性能
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */

latencyStartMonitor(latency);
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) { // 每秒fsync, 如果没有在sync, 则创建后台任务,刷入文件
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}

上面的写入和同步文件的代码中,对于错误的处理很值得学习,总结如下.

当写入数据失败,且完全无数据写入时, 记录错误信息,打印日志,且设置写入状态为失败,以此来阻止后续的写入命令的执行,直到写入数据成功
也就是说,当写入发生错误,且可以通过在后续的重试来恢复时,因为数据都在内存里,读操作是可执行的,而写操作会被拒绝

当写入数据失败,且写入了部分数据时,为了保证数据的一致性,先尝试将写入的数据回滚掉(因为写入的部分数据不一定是一条完整的命令,若此时宕机,则文件最后一部分数据损坏, 而回滚可以保证文件是记录的完整的命令,若丢失数据也不会出现某条命令只有一半的情况)
若回滚也失败了,则记录日志, 尝试在后续继续写入剩下的部分
若回滚成功,则按照未写入数据处理

当fsync策略为ALWAYS时,只要失败,则退出进程,结束程序,因为ALWAYS的含义即为每次执行必定写入AOF文件,写入失败则表示承诺未达到,那么只能选择结束运行这种赖皮的方式

另外需要注意一点,当为ALWAYS时发生写入错误时,需要先尝试将写入数据回滚掉才能退出

数据回滚其意义也是为了尽可能的保证数据的一致性

启动加载AOF文件loadAppendOnlyFile

TODO

rewrite

TODO

aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

文章目录
  1. 1. aof配置
    1. 1.1. 配置文件
    2. 1.2. 运行时动态修改配置
  2. 2. AOF文件写入流程
    1. 2.1. 写入缓存server.aof_buf
      1. 2.1.1. 调用栈
      2. 2.1.2. 写入缓存
    2. 2.2. 写入磁盘flushAppendOnlyFile
      1. 2.2.1. 调用栈
      2. 2.2.2. 写入磁盘逻辑
    3. 2.3. 启动加载AOF文件loadAppendOnlyFile
  3. 3. rewrite
,