RocksDB写链路源码梳理
说明:
- 本流程聚焦核心的写路径,对于事务,Pipeline相关的内容先暂时略过。本分析将从一个典型的DB::Put接口切入
- 为了可读性,解析的顺序可能和实际上的代码顺序稍有不同(我按照实际上的发生时间顺序来介绍逻辑,但由于多线程的缘故,RocksDB的代码顺序并非如此),比如我会先侧重介绍GroupLeader分支,展示WriteGroup是如何构建的,LeaderWriter是如何工作的,再介绍Group构造完毕之后,FollowerWriter所在线程的运行路径(代码顺序正好相反,先写了GroupFollower的处理分支,往下才是GroupLeader的逻辑)。
1 进入WriteImpl之前
1.1 将用户给的key-value构造成一个WriteBatch对象
即使用户没有用batch,最后写入也是以WriteBatch为组织形式的
1.2 从DBImpl::Write进入DBImpl::WriteImpl()
DBImpl::Write有多个重载,但是最后都是进入DBImpl::WriteImpl
2 DBImpl::WriteImpl开头: 一系列检验和check
3 low_pri标志位检查
如果这次写入的WriteOption带了low_pri的标记,则试着让他限速(给high_pri任务让步)。当然,如果开了no_slowdown,这笔写入会直接返回Status::Imcomplete
4 分支1(RocksDB默认配置):parallel_write为true且unordered_write为false
说明:这是实际生产中最常见的配置形式。
4.1 用WriteBatch构造Writer(无锁链表的Node)
RocksDB同LevelDB一样,会将写入的请求队列在一个无锁链表内,RocksDB中,该无锁链表就挂在下图中的DBImpl::write_thread_中,该无锁链表中的单个元素就是一个Writer,具体可往下看Writer的定义。
下图是Writer的定义,其内有两个成员:link_older, link_newer,其实就是链表的前后指针,分别指向自己的prev, 和next, 直接了当地说,Writer这玩意就是无锁链表(写请求队列)的Node,并且一个Writer内部只有一个WriteBatch。另外有一个值得关注的地方就是每个Writer都有一个State成员,这个state_主要标志当前Writer处于什么状态(成为了GroupLeader/自己的写入已经被Leader代写了/写允许去并发写memtable...等等),这个状态实际上协调了了无队列的整个并发过程,是很重要的成员。
4.2 JoinBatchGroup:Writer加入无锁链表
简短说明一下,RocksDB从链表尾部(队尾, newest_writer)插入新来的Writer,从链表头部(队头, oldest_writer)来选取GroupLeader以构造Group。其基本的消费模式是:每次取队头的节点作为一个Leader,在逻辑上从队头向队尾进行打包操作,构造成一个WriteGroup,后续会在WriteGroup内做操作(比如整个WriteGroup内的写入会被凑成一笔WAL,以及WriteGroup内部可以写memtable并发等)。
4.2.1 将Writer插入无锁链表:LinkOne
这一句LinkOne就是入队操作,第一个参数是之前用WriteBatch构造好的writer,第二个参数是无锁链表尾节点指针的指针(进行插入后,链表尾指针会发生改变,所以这里传入了指针的指针)
下面进入LinkOne的逻辑:
- 用链表尾指针(newest_writer)取得链表尾节点(队列尾节点就是之前最后插入的,最新的那个Writer)
- 先看这个队尾节点是否是write_stall_dummy_节点(一个特殊的writer, 标志正在发生WriteStall,一旦发生WriteStall,就将该节点插入写队列中,作为一个标志来阻塞后面的写入)。如果队尾是确实是write_stall_dummy_节点,则直接调stall_cv的wait方法阻塞等待(当然如果开启了no_slodown,直接返回write imcomplited)。 等到stall_cv_.wait()返回之后,需要重新获取一下队尾节点指针,因为队尾在这期间可能发生过变化。
- 假设队尾不是write_stall_dummy_,接着往下就是入队操作:让当前要插入的new-writer的link_older指向原来的队尾,然后将队尾指针指向我们的new-writer。这里是用compare_exchange_weak操作的,如果队尾节点在步骤1-2之间发生过变化,就会更新队尾指针的值来指向正确的队尾节点,然后通过while再来一轮,具体可看代码,这里就不赘述了。
- 返回值:当我们的new-writer成功插入队尾后,返回值为:队尾指针旧值==nullptr, 说人话就是:在插入new-writer之前,这个无锁链表是不是空的。留意这个返回值,会在下一步被用上。
(下面是LinkOne函数的代码)
4.2.2 LinkOne返回值:决定当前线程是否阻塞
从LinkOne返回,得到的返回值是一个bool,其意义是:插入这个new-writer前,writers队列是否是空的,
1 如果是,直接设置这个new-writer的状态为GroupLeader,然后JoinBatchGroup的调用就结束了
2 如果不是,则让当前writer线程进入调用AwaitState来进行等待,这个AwaitState内是一个三级的轻量级等待策略,具体会在8.1中细说。
这个分支可以先简单理解为:当前这个新插入无锁链表的writer其所对应的线程(用户线程)会进入阻塞,直到当前这个writer等于某个期望的状态, 这个位置主要有以下几种期望状态可以解阻塞:
- 被前一个GroupLeader选为了新的GroupLeader(STATE_GROUP_LEADER)
- 已经被自己的GroupLeader完成了写入(parallel_write为false, GroupLeader会线性帮Group内的writer完成写memtable的动作)
- 自己被允许去并发写memtable(STATE_PARALLEL_MEMTABLE_WRITE)
- ... 其他不一一列举,涉及到pipeline特性
线程模型说明:这里说的new-writers线程实际上就是用户线程,RocksDB也好,LevelDB也好,实际上每个writer都有一个用户线程在支撑,读写链路上的线程可以说都是用户线程(就是上层调了DB::Put, DB::Get的线程),只有那些后台任务的线程真正算是RocksDB自己的线程。
4.3 JoinBatchGroup返回
额外说明:每个writer/batch实际上都是由一个用户线程在背后支撑的,或者说我们在读写链路上执行的所有代码,都是在用户线程上,每个writer都有一个用户线程(调用DB::Put, DB::Get的Application Thread)。
从4.2对JoinBatchGroup的解读可以看到,其内部有阻塞逻辑,所以当一个用户线程从JoinBatchGroup返回,其对应的Writer一定是处于以下几种状态之一(4.2.2中也提到了):
- 该writer是一个GroupLeader
- 该writer被通知可以去并发将自己的内容写入memtable
- 该writer所对应的写入内容已经被自己的GroupLeader代写完了,writer直接处于complete状态
4.3.1 LeaderWriter路径:当前Writer是GroupLeader
此时当前写入线程/用户线程对应的writer为一个LeaderWriter, 从JoinBatchGroup返回往下走,第一个关键的地方是:EnterAsBatchGroupLeader,长话短说,这个函数负责在逻辑上将任务队列中的任务从队头往队尾打包(从oldest_writer到newest_writer)
EnterAsBatchGroupLeader内部
- 给write_group初始化:将当前writer赋为这个group的leader, Group size为1
- 准备一个空list, 称作r_list 其作用是:待会在打包WriteGroup的过程中,可能有些writer没法一起打包(比如一个为sync_write, 一个为async_write, 二者一个要求WAL落盘了才返回,一个没有这一要求,自然没法打包一起处理),这些没法打包的writer会先放在r_list,最后会将整个r_list重新append回任务队列中。
- 进入一个while循环来迭代任务队列/无锁链表,从oldest_writer往newest_writer方向迭代
- 每一轮while内部都有一个条件很多的If, 其作用就是判断目前迭代到的writer能否加入WriteGroup,比如:当前writer和leader_writer的写入模式是否不同(sync or async),当前WriteGroup的总大小是否有超过上线等待
- 如果这些if条件都不满足,说明可以加入WriteGroup, 可以看见逻辑比较简单:让WriteGroup的Size增长,然后让WriteGroup的last_writer指针指向这个最新加入的Writer,再让writer的writer_group指针反向指向writeGroup对象,这里并没有实质性地将这些writer从任务队列中移除或者拷贝出来,在物理上他们依旧在任务队列原来的位置,这也是为什么我说WriteGroup是做了一个逻辑上的打包。
- 假如4中的If判断为True, 说明当前writer不能被打包进WriteGroup, 则执行下列逻辑,将该writer先从任务队列中移除出来,加入r_list
- 等到整个迭代结束,WriteGroup打包完毕,将r_list重新加入回任务队列,插入的位置就是当前WriteGroup的前面:
- WriteGroup结构补充:WriteGroup在物理上就是任务队列/无锁链表中的一个子区间,核心是由两枚指针,Leader指向一头(oldest_writer), last_writer指向另外一端(newest_writer), 另外还有size(这个WriterGroup数据的总字数数), running是一个计数,记录还有多少个writer未完成memtable的写入,last_sequence最开始就是获取的RocksDB的全局last_sequence, 最后也要用其做加法来更新全局last_seqence
从EnterAsBatchGroupLeader返回,WriteGroup便已经构造完毕了
接下来先遍历一遍Group内部的writer,做一些统计:Group内部有多少个batch, 一共包含多少条写入请求,总的写入量是多少,pre_release_callback回调函数有多少个
确定一下本次WriteGroup内部全部写入以后,sequence num需要增长多少,默认是每有一对kv写入就增长1
接着,当前LeaderWriter所在的线程会负责写入整个WriteGroup包含的WAL,这里长话短说:整个WriteGroup内部的所有写入都会汇聚成一笔大的WAL来进行写入。具体过程请见8.2。
写完WAL之后,写memtable之前,遍历整个write调用pre_release_callback
接下来会根据用户给出的配置项来看是否能并发写入memtable
本分支的该配置项为true(RocksDB默认也是true), 于是会执行下列分支:
主要有两个部分:
- LaunchparallelMemTableWriters:修改所有follower的状态为STATE_PARALLEL_MEMTABLE_WRITER,来告知他们可以自行去将自己的内容写入memtable(由于每个writer都有一个用户的写线程做支撑,所以这种情况下memtable的写入就是并发的)。
备注:此时所有的followerWriter都还在JoinBatGroup函数中的AwaitState上阻塞着,这里对他们的状态进行修改就会发生解阻塞,这些FollowerWriter所在的用户线程就会从JoinBatchGroup返回,进入4.3.2的逻辑。
其完整逻辑如下:
值得一提的是,当WriteGroup内部的FollowerWriter达到20个以上时,这个修改Follower状态的任务就会被分片,会有部分FollowerWriter被设置为STATE_PARALLEL_MEMTABLE_CALLER状态,这些Caller状态的follower会先解阻塞从JoinBatchGroup返回,然后再各自负责将同一个分片的followerWriter的状态改为STATE_PARALLEL_MEMTABLE_WRITER。这部分逻辑也在JoinBatchGroup返回之后,但由于它是一个中转状态,所以没有专门起个分支来讲他,其大致代码如下:
说白了就是如果WriteGroup内部的FollowerWriter数量大于20,"修改FollowerWriter状态"这个操作本身也会被切片,并发地去做。
相应的,如果Group内FollowerWriter的数量没有超过20,则直接由当前LeaderWriter所在的线程线性迭代一遍,修改所有FollowerWriter的状态为STATE_PARALLEL_MEMTABLE_WRITER。 - 当前的LeaderWriter自行写入自己的memtable内容:这一块不必细说,看8.3即可
修改完FollowerWriter的状态,解除他们的阻塞之后,继续往下: 此时当前WriteGroup所对应的WAL已经写入完毕,当前LeaderWriter的内容已经写入了memtable,其他FollowerWriter的内容也在各自并发地写入memtable。接下来就会来到下图中的逻辑:
这里主要做的是收尾工作,包含:
- 调用CompleteParallelMemTableWriter
首先看其中有阻塞逻辑的,也是作为if条件的 WriteThread::CompleteParallelMemTableWriter函数,他将当前Leaderwriter的指针作为参数接收进去,通过writer取到writer_group对象,而后主要做的事情就是先给当前WriterGroup的running计数减一(因为当前LeaderWriter已经完成了memtable的写入),然后检查其他FollowerWriter是否在自己达到这里之前就完成了memtable的写入(检查running计数在自己做减法前是否为1),如果不是,说明本WriterGroup内部还有其他的FollowerWriters还在并发写入memtable,自己并不是最后一个完成的的writer, 此时就需要调用AwaitState来阻塞等待,直到WriteGroup对象的status变成STATE_COMPLETED。相应地,如果当前LeaderWriter就是本writeGroup内部最后一个完成并发写入memtable的那个,就负责将writeGroup的Status设置为STATE_COMPLETED。
可见:在当前配置下(parallel_write=true, unordered_write=false),同一个WriteGroup内部的Writer允许并发地去写memtable,但是不允许写完了就直接返回,需要等到本WriteGroup内部所有的writer都完成了memtable的写入,才能进行返回,在这一个并发写memtable的环节上,LeaderWriter和FollowerWriter并没有实质上的区别。另外观察返回值:只有最后一个完成的writer会返回true,其他先完成并发写memtable的writer会返回false。
从CompleteParallelMemtableWriter函数返回,说明整个WriterGroup内部的writer都已经完成了WAL以及memtable的写入,此时再次回到其调用的位置,根据CompleteParallelMemtableWriter的返回值可以得知,如果当前的LeaderWriter是Group内最后一个完成memtable写入操作的Writer, 他就会执行下列逻辑(否则的话下面这段逻辑会由另外一个FollowerWriter在另外一个位置来执行,具体可以看4.3.2): - 运行post_memtable回调(如果有的话)
- 发布新的last_sequence
- 调用ExitAsBatchGroupLeader,其最核心的功能就是:从任务队列中指定出新的一个Leader(就是取队头的oldest_writer),将其状态改为GroupLeader, 然后那个新Leader会被唤醒,其所在的线程就会再次执行4.3.1的逻辑。
4.3.2 FollowerWriter路径1:当前Writer被通知可以去并发将自己的内容写入memtable(当前线程拥有的writer属于GroupFollower)
按照代码顺序,这就是第一个进行判断的分支:如果当前从JoinBatchGroup返回之后,发现当前Writer的状态已经被改为了 STATE_PARALLEL_MEMTABLE_WRITER, 说明当前writer不是一个GroupLeader,并且Leader已经写完了其WAL的内容,通知它自己去并发地写入memtable 于是当前的FollowerWriter所在的用户线程会执行WriteBatchInternal::InsertInto来写memtable, 并且实质上当前Group内部的每个Writer都在并发做这件事情(RocksDB写链路上的线程都是用户线程,每个writer实际上都是一个用户线程在驱动)
在这一分支下方,也就是当前writer写完memtable之后还有一段逻辑(备注,这段逻辑和4.3.1最后一段逻辑有一些重复,但是为了不用跳会4.3.1来看,所以这里冗余地重新阐述了4.3.1中已经阐述过的逻辑)
首先看其中有阻塞逻辑的,也是作为if条件的 WriteThread::CompleteParallelMemTableWriter函数,他将当前writer的指针作为参数接收进去,通过writer取到writer_group对象,而后主要做的事情就是先给当前WriterGroup的running计数减一(因为当前writer已经完成了WAL(由LeaderWriter代写)和memtable的写入,然后检查自己是否是最后一个完成并发写入memtable的Writer(检查running计数在自己做减法前是否为1),如果不是,说明本WriterGroup内部还有其他的writers还在并发写入memtable,自己并不是最后一个完成的的writer, 此时就需要调用AwaitState来阻塞等待,直到WriteGroup对象的status编程STATE_COMPLETED。相应地,如果当前writer就是本writeGroup内部最后一个完成并发写入memtable的那个,就负责将writeGroup的Status设置为STATE_COMPLETED。
可见:在当前配置下(parallel_write=true, unordered_write=false),同一个WriteGroup内部的Writer允许并发地去写memtable,但是不允许写完了就直接返回,需要等到本WriteGroup内部所有的writer都完成了memtable的写入,才能进行返回。另外观察返回值:只有最后一个完成的writer会返回true,其他先完成并发写memtable的writer会返回false。
从CompleteParallelMemtableWriter函数返回,说明整个WriterGroup内部的writer都已经完成了WAL以及memtable的写入,此时再次回到其调用的位置,根据CompleteParallelMemtableWriter的返回值可以得知,只有这个WriteGroup中的最后一个writer需要进入这个if分支来做收尾工作,收尾工作具体为:
- 运行post_memtable回调(如果有的话)
- 发布新的last_sequence
- 调用ExitAsBatchGroupFollower, 这个函数实际上调用了ExitAsBatchGroupLeader,这是一个负责收尾的函数, 一般是由LeaderWriter所在的用户线程来调用,但是在这个分支下,最后一个完成了并发写memtable的writer需要承担一下收尾的责任,这个最后一个writer可能恰好就是LeaderWriter也可能只是一个普通Follower,所以这里为了可读性,将ExitAsBatchGroupLeader包了一层,有了ExitAsBatchGroupFollower,其最核心的功能就是:通过修改Writer状态,从任务队列中指定出新的一个Leader(就是取队头的oldest_writer),然后那个新Leader会被唤醒,又去执行Leader的路径(构造WriteGroup,打包写WAL, 通知并发写memtable/自己线性打包写完memtable)。
做完这三步收尾的工作之后,Follower并发写memtable的分支就完毕了。
4.3.2 FollowerWriter路径2:当前Writer的内容被Leader线程代写完了,处于Complete状态(当前线程拥有的writer属于GroupFollower)
如果进入了这个分支,说明parallel_write为false,写WAL, 写memtable, 更新sequence num这几样工作都被LeaderWriter所在的线程线性完成了(几乎和levelDB的模式一样了),这些GroupFollower不需要做太多事情,只是选择好正确的Status进行返回就行了。
5 分支2:parallel_write为true且unordered_write为true
unordered_write为true的话,最大的区别就是:每个follower Writer在并行写memtable的阶段,不需要等待同一个Group内的其他Writer也写入完毕,就可以直接return。
其流程依旧是:由LeaderWrite构造WriteGroup-->Leader Writer合并WriteGroup内部的Batch的rep_,整笔打包写一条WAL-->修改Follower的状态,让其允许解除阻塞去并发写入memtable-->(区别就在这里)每个Follower写入完了memtable就直接返回。
其分支如下:
其中WriteImplWALonly就是由LeaderWriter打包所有writer的Batch,写成一笔WAL,具体可以看8.2的写WAL流程
而UnoderedWriteMemtable如下,写完memtable后没有别的阻塞行为,直接返回:
据RocksDB官方实测,打开unordered_write之后,写QPS能提高30%,但默认情况下是不开的,而且我看pika以及一些同类产品都不开,应该是有别的顾虑,目前考虑是last_sequence更新的时机可能和常规路径不一样,具体地下次再细看一遍。
6 分支3:parallel_write为false
parallel_write为false,即写入memtable不允许并发,所以整个Group内部的writer都会被leader writer所在的用户线程线性地代为写入memtable,这一路径下,写操作几乎就和leveldb一模一样了。
其实现如下:可以看见,直接就是一个for循环,由当前线程(Leader Writer所在的线程)遍历本Write Group内部的所有Writer然后完成memtable的写入
7 调用链细节
7.1 Writers队列的三级等待与并发协调
在前文的路径梳理中提到过JoinBatchGroup,当writer加入写队列时,如果自身的状态位没有改为指定状态的话会陷入阻塞,而这个阻塞是调用了StateAwait来做的:
7.1.2 等待级别一: 200次pause指令进行busy-wait:
该函数上来先做200次的pause指令其实pause指令本身不会引起暂停或者阻塞,他的作用主要是告知核心当前正在busy wait, 可以降低功耗,或者在缓存一致性上可以放松.
7.1.2 等待级别二:yield让出线程等待
如果前面使用pause的busy-wait没有等到当前writer进入目标状态,则使用std::this_thread::yield()来让出当前核心来做一个稍微重一些的等待, 但是最多让出3次cpu(kMaxSlowYieldsWhileSpinning=3), 如果让出3次cpu还没有等到目标状态,则往下走
7.1.3 等待级别三:最重的condition_variable::wait()
如果前面3次yield也没等来目标状态,说明当前这是一个长等待,那就不折腾了,调用BlockingAwaitState, 里面实际上就是cv.wait()
7.2 WAL实际的物化格式
7.2.1 从常规写链路上,由Leader Writer线程执行的WriteWAL进入
7.2.2 准备一个merge_batch, 然后调用MergeBatch方法,将merge_batch指针,以及write_group都传进去,顾名思义,合并这些batch
7.2.3 在MergeBatch中会遍历当前write_group内的所有writer(每个writer内有一个WriteBatch),然后执行WriteBatchInternal::Append方法, src就是每个writer的batch,dst就是merged_batch
7.2.4 进入该WriteBatchInternal::Append方法就能看到,实际上是将src->rep_的内容追加写入到dst->rep_中
7.2.5 那么每个batch都有的rep_内容究竟是什么? 在write_batch.cc顶端给了答案:
首先是整个batch起始的sequence,定长64位,而后是本batch内部有多少记录数,定长32位。再往后就是一个record数组,或者说白点就是多条连续的record,而record的类型有很多种,其中最常见的是我高亮的行:
一个kTypeColumnFalimyValue(cf ID)类型为int32-->key(类型是varstring, 即开头放着长度,后面跟着内容) -->value(类型也是varstring)
7.2.6
这一点在插入memtable的流程中也可以得到印证:在插入memtable时,会从每条Record内解析其kTypeColumFamlyValue的值作为取column_family的索引/ID
7.2.7 至此我们知道了merged_batch实际上就是将多个batch的rep_合并,而每个rep_内部可能包含来自多个cf的数据,所以WAL自然也是多个cf共用
7.2.8 回到WriteToWAL往下,他会使用merged_batch->Content接口获取log_entry, 而这个Contents接口十分简单,将rep_包装成Slice就返回了
接着往下几层的调用栈就可以看见,merged_batch->rep_确实是真正被写入WAL的内容
所以总的来说,每一笔WAL都是一整个WriteGroup内所有WriteBatch的rep_连接起来的结果,WriteBatch::rep_的格式可以在write_batch.cc看到,其内包含了来自不同cf的数据。