Citus事务处理源码解读
Citus 事务处理
本文将解密Citus的事务处理,对内核感兴趣的朋友可以阅读,对运维的DBA同样也建议阅读,在阅读完成后会对分布式事务实现有个了解,并且对异常问题的处理也会掌握响应的处理办法
Citus extension
- citus是采用extension实现,当讲解事务的时候,有必要先了解下citus的基本实现原理。
extension实现的的深入解读将在后续的文章中发表
extension实现简单解读
extension 需要三个文件
1. 创建数据库对象的sql文件
2. lib库文件
3. 控制文件
lib库控制文件的加载有4种情况
1. 不使用任何PostgreSQL的hook
2. 服务器启动时需要加载hook的库,配置到shared_preload_libraries
3. 与服务器建立连接时加载hook的库,配置到session_preload_libraries,只允许超级用户修改
4. 与服务器建立连接时加载hook的库,配置到local_preload_libraries,只能加载libdir/plugins下的库,普通用户可以修改
当我们安装citus的时候需要配置shared_preload_libraries因为我们需要设置相应的hook
hook是指PostgreSQL用户扩展使用的钩子函数,可以改变原PostgreSQL的行为
Citus 事务实现
本文会将主要过程和行为进行解读,使读者理解其中实现原理。而其中极其细微的每一个变量的作为不会进行繁琐的讲解
Citus 实现方法
事务处理的hook需要设置Xact_callbacks和SubXact_callbacks(子事务)来实现自己的定制功能,对应的主事务的citus的hook函数是CoordinatedTransactionCallback,子事务的hook是CoordinatedSubTransactionCallback。下文中将主要以2阶段提交的实现对commit/rollback进行讲解
pg_dist_transaction的作用
pg_dist_transaction系统表用于记录当前commit的事务
此表仅有一个作用,就是事务处理异常时的再次提交或回滚
test=# \d pg_dist_transaction
Table "pg_catalog.pg_dist_transaction"
Column | Type | Collation | Nullable | Default
---------+---------+-----------+----------+---------
groupid | integer | | not null |
gid | text | | not null |
Indexes:
"pg_dist_transaction_unique_constraint" UNIQUE CONSTRAINT, btree (groupid, gid)
"pg_dist_transaction_group_index" btree (groupid)
test=# select * from pg_dist_transaction ;
groupid | gid
---------+-------------------
4 | citus_0_22827_3_5
(1 row)
- groupid
组id用于节点标识使用
为pg_dist_local_group系统表的值,每个citus的节点,不论cn和work都拥有唯一的一个数值。 - gid
用于事务的唯一标识,pgname最长为64个字符,此种命名可以满足
gid由5部分组成(citus_0_22827_3_5):
1. citus:特殊标记
2. 0:groupid
3. 22827:pg的packendid(pg_backend_pid())
4. 3:唯一的事务标识
5. 5:递增的唯一标识
Citus 事务处理
我们之前提到Xact_callbacks的回调函数CoordinatedTransactionCallback,Citus的主事务将主要由次函数进行
正向处理通常都是较容易的,按本身逻辑顺序进行就可以,而异常处理时,变要考虑更多的内容,如何高效正确的处理两阶段提交的异常是一个难点
prepare过程
PostgreSQL提交事务时的函数
static void
CommitTransaction(void)
{
CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
: XACT_EVENT_PRE_COMMIT);
CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT
: XACT_EVENT_COMMIT);
}
CommitTransaction主要由两个行为XACT_EVENT_PRE_COMMIT和XACT_EVENT_COMMIT。通过PRE_COMMIT和COMMIT的两个过程便可以在PostgreSQL的commit过程中完成两阶段提交。
- prepared过程详解
- Prepare由XACT_EVENT_PRE_COMMIT完成
if (CoordinatedTransactionUses2PC)
{
CoordinatedRemoteTransactionsPrepare();
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
/*
* Make sure we did not have any failures on connections marked as
* critical before committing.
*/
CheckRemoteTransactionsHealth();
}
首先通过CoordinatedRemoteTransactionsPrepare对数据进行prepare提交
然后通过CheckRemoteTransactionsHealth进行健康状态检查
当健康检查出现异常时便会触发ERROR引起回滚
CheckRemoteTransactionsHealth健康状态检查,主要是查看连接是否异常和prepare是否都正确提交,本文不会进行详细解读
- CoordinatedRemoteTransactionsPrepare讲解
void
CoordinatedRemoteTransactionsPrepare(void)
{
/* asynchronously send PREPARE */
dlist_foreach(iter, &InProgressTransactions)
{
//对所需要提交的每个节点发送prepare语句,当失败时,便会触发ERROR,引起回滚
// 异步连接模式,每个语句不需要等待结果
// 异步连接由PQconnectPoll的API实现
StartRemoteTransactionPrepare(connection);
connectionList = lappend(connectionList, connection);
}
// 等待所有连接执行完成
WaitForAllConnections(connectionList, raiseInterrupts);
/* Wait for result */
dlist_foreach(iter, &InProgressTransactions)
{
//资源清理
FinishRemoteTransactionPrepare(connection);
}
prepare过程经过上诉两个步骤便执行完毕。
prepare的关键点是任何一个prepare失败时,都会通过ERROR进行回滚。
commit过程
通过preprare过程章节,我们知道commit由XACT_EVENT_PRE_COMMIT完成。
- commit过程详解
void
CoordinatedRemoteTransactionsCommit(void)
{
dlist_foreach(iter, &InProgressTransactions)
{
//异步commit提交,任何错误都不会引起ERROR,
//这是与perpare一个非常大的区别
StartRemoteTransactionCommit(connection);
connectionList = lappend(connectionList, connection);
}
//等待执行完成
WaitForAllConnections(connectionList, raiseInterrupts);
/* wait for the replies to the commands to come in */
dlist_foreach(iter, &InProgressTransactions)
{
//清理资源
FinishRemoteTransactionCommit(connection);
}
commit的过程与prepare相似
commit的关键点是任何异常都不会触发回滚
rollback过程
rollback过程与commit过程类似,只不过发送的是rollback的语句,不再进行描述。
事务异常处理
parpare/commit/rollback并没有进行事务异常的处理,因此是非常高效的拥有非常好的性能
事务异常是什么
举例如下:
1. prepare过程中,有部分成功部分失败,事务最终应该是都rollback回滚
2. commit prepare过程中,有部分成功部分失败,事务最终应该都是commit提交
当事务出现异常时就会导致事务在不同节点的不一致状态
千万不可以通过pg_prepared_xact的残留信息去做任何处理,因为无法获知最终状态
事务异常何时处理
异常的事务由Citus Maintenance Daemon进程定期进行最终状态处理,默认是1分钟处理一次
- 调整清理频率
citus.recover_2pc_interval=60000 (毫秒) 注意单位是毫秒
设置该参数可以调整异常清理的频率,
事务异常如何手动处理
select recover_prepared_transactions();
异常处理过程
我们直接在代码里进行标注,这样更容易理解逻辑顺序
- 简单讲解
如果work节点的prepare信息,可以在pg_dist_transaction中找到,则再次提交它。
如果不能在pg_dist_transaction中找到,我们就回滚它。 -
详细讲解
这个过程会有些复杂,根据兴趣阅读
static int
RecoverWorkerTransactions(WorkerNode *workerNode)
{
// 获取work节点上的pg_prepare_xact的信息
pendingTransactionSet =
// 获取当前正在运行的事务信息
activeTransactionNumberSet =
//获取pg_dist_transaction的信息
scanDescriptor = systable_beginscan(pgDistTransaction)
// 再次获得work节点上的pg_prepare_xact的信息
recheckTransactionSet =
while (逐条获得pg_dist_transaction的信息)
{
//这个事务正在运行,还未结束,不做任何处理
isTransactionInProgress = activeTransactionNumberSet
if (isTransactionInProgress)
{
continue;
}
if (pendingTransactionSet中有此条记录 && recheckTransactionSet中有此条记录)
{
我们会再次提交它
如果失败就下一次在继续
}
if (pendingTransactionSet中没有此条记录)
说明work上的这个记录已经正常的commit了。
if (recheckTransactionSet中有此条记录, 而pendingTransactionSet中没有)
说明这个事务刚刚启动,并且在极其微小的时间内,还没有加入到activeTransactionNumberSet中。
或是这个事务在我们获取 activeTransactionNumberSet开始,在获取pg_dist_transaction前结束,可能正处在commit_pepare的时间事务未完全结束
这两种情况,我们不做处理,下次再次维护的时候进行处理
} while结束
此时pendingTransactionSet里的信息,需要再次提交的已经做完,剩下的便是需要回滚的信息
while (逐条获取pendingTransactionSet信息)
{
如果这个记录正在运行,我们不处理它(因为此时它不会出现在pg_dist_transaction中)
如果这个记录不是正在运行,我们将回滚这个记录
}
总结
事务处理非常复杂,也很关键,其中涉及非常多的小细节,本文揭示了Citus在事务处理过程中最关键的部分,后续还会陆续分享其他特征。
Citus的这个事务异常处理设计的非常高效,非常棒,我们可以将其设计模式和理念应用于我们的日常工作中。
[CitusDB中国]站主,PostgreSQL粉丝,现从事Citus研发工作
愿Citus在中国发展的越来越好
评论 (3)
levi| 2019年8月15日
事务异常这里没太看懂,主要是不知道pg_prepare_xact这个里存的是什么,还有为什么要第二次获取,你展示的代码里也没有提到scanDescriptor 是怎么用的
levi| 2019年8月15日
我有个大胆的猜测,第一阶段提交在pg_prepare_xact中做记录,第二阶段提交再次检查是否具有提交事务的能力,有能力就在pg_dist_transaction中做记录,正式提交事务,这样就能和你开始的简单解释能对应上了,是这样吗
张 连壮| 2019年8月15日
第一阶段提交在pg_prepare_xact中做记录,第二阶段的内容在第一阶段里做了,prepare没有失败的,pg_dist_trians就做记录了。