Citus事务处理源码解读

Citus 事务处理

本文将解密Citus的事务处理,对内核感兴趣的朋友可以阅读,对运维的DBA同样也建议阅读,在阅读完成后会对分布式事务实现有个了解,并且对异常问题的处理也会掌握响应的处理办法

Citus extension

  • citus是采用extension实现,当讲解事务的时候,有必要先了解下citus的基本实现原理。
    extension实现的的深入解读将在后续的文章中发表

extension实现简单解读

extension 需要三个文件
1. 创建数据库对象的sql文件
2. lib库文件
3. 控制文件

lib库控制文件的加载有4种情况
1. 不使用任何PostgreSQL的hook
2. 服务器启动时需要加载hook的库,配置到shared_preload_libraries
3. 与服务器建立连接时加载hook的库,配置到session_preload_libraries,只允许超级用户修改
4. 与服务器建立连接时加载hook的库,配置到local_preload_libraries,只能加载libdir/plugins下的库,普通用户可以修改

当我们安装citus的时候需要配置shared_preload_libraries因为我们需要设置相应的hook
hook是指PostgreSQL用户扩展使用的钩子函数,可以改变原PostgreSQL的行为

Citus 事务实现

本文会将主要过程和行为进行解读,使读者理解其中实现原理。而其中极其细微的每一个变量的作为不会进行繁琐的讲解

Citus 实现方法

事务处理的hook需要设置Xact_callbacks和SubXact_callbacks(子事务)来实现自己的定制功能,对应的主事务的citus的hook函数是CoordinatedTransactionCallback,子事务的hook是CoordinatedSubTransactionCallback。下文中将主要以2阶段提交的实现对commit/rollback进行讲解

pg_dist_transaction的作用

pg_dist_transaction系统表用于记录当前commit的事务
此表仅有一个作用,就是事务处理异常时的再次提交或回滚

test=# \d pg_dist_transaction
       Table "pg_catalog.pg_dist_transaction"
 Column  |  Type   | Collation | Nullable | Default 
---------+---------+-----------+----------+---------
 groupid | integer |           | not null | 
 gid     | text    |           | not null | 
Indexes:
    "pg_dist_transaction_unique_constraint" UNIQUE CONSTRAINT, btree (groupid, gid)
    "pg_dist_transaction_group_index" btree (groupid)

test=# select * from pg_dist_transaction ;
 groupid |        gid        
---------+-------------------
       4 | citus_0_22827_3_5
(1 row)
  • groupid
    组id用于节点标识使用
    为pg_dist_local_group系统表的值,每个citus的节点,不论cn和work都拥有唯一的一个数值。
  • gid
    用于事务的唯一标识,pgname最长为64个字符,此种命名可以满足

gid由5部分组成(citus_0_22827_3_5):
1. citus:特殊标记
2. 0:groupid
3. 22827:pg的packendid(pg_backend_pid())
4. 3:唯一的事务标识
5. 5:递增的唯一标识

Citus 事务处理

我们之前提到Xact_callbacks的回调函数CoordinatedTransactionCallback,Citus的主事务将主要由次函数进行
正向处理通常都是较容易的,按本身逻辑顺序进行就可以,而异常处理时,变要考虑更多的内容,如何高效正确的处理两阶段提交的异常是一个难点

prepare过程

PostgreSQL提交事务时的函数

static void 
CommitTransaction(void)
{
    CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
                      : XACT_EVENT_PRE_COMMIT);

    CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT
                      : XACT_EVENT_COMMIT);
}

CommitTransaction主要由两个行为XACT_EVENT_PRE_COMMIT和XACT_EVENT_COMMIT。通过PRE_COMMIT和COMMIT的两个过程便可以在PostgreSQL的commit过程中完成两阶段提交。

  • prepared过程详解
  1. Prepare由XACT_EVENT_PRE_COMMIT完成
            if (CoordinatedTransactionUses2PC)
            {
                CoordinatedRemoteTransactionsPrepare();
                CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;

                /*
                 * Make sure we did not have any failures on connections marked as
                 * critical before committing.
                 */
                CheckRemoteTransactionsHealth();
            }

首先通过CoordinatedRemoteTransactionsPrepare对数据进行prepare提交
然后通过CheckRemoteTransactionsHealth进行健康状态检查
当健康检查出现异常时便会触发ERROR引起回滚

CheckRemoteTransactionsHealth健康状态检查,主要是查看连接是否异常和prepare是否都正确提交,本文不会进行详细解读

  1. CoordinatedRemoteTransactionsPrepare讲解
void
CoordinatedRemoteTransactionsPrepare(void)
{
    /* asynchronously send PREPARE */
    dlist_foreach(iter, &InProgressTransactions)
    {
        //对所需要提交的每个节点发送prepare语句,当失败时,便会触发ERROR,引起回滚
        // 异步连接模式,每个语句不需要等待结果
        // 异步连接由PQconnectPoll的API实现
        StartRemoteTransactionPrepare(connection);
        connectionList = lappend(connectionList, connection);
    }

    // 等待所有连接执行完成
    WaitForAllConnections(connectionList, raiseInterrupts);


    /* Wait for result */
    dlist_foreach(iter, &InProgressTransactions)
    {
    //资源清理
        FinishRemoteTransactionPrepare(connection);
    }

prepare过程经过上诉两个步骤便执行完毕。

prepare的关键点是任何一个prepare失败时,都会通过ERROR进行回滚。

commit过程

通过preprare过程章节,我们知道commit由XACT_EVENT_PRE_COMMIT完成。

  • commit过程详解
void
CoordinatedRemoteTransactionsCommit(void)
{
    dlist_foreach(iter, &InProgressTransactions)
    {
    //异步commit提交,任何错误都不会引起ERROR,
    //这是与perpare一个非常大的区别
        StartRemoteTransactionCommit(connection);
        connectionList = lappend(connectionList, connection);
    }
    //等待执行完成
    WaitForAllConnections(connectionList, raiseInterrupts);

    /* wait for the replies to the commands to come in */
    dlist_foreach(iter, &InProgressTransactions)
    {
            //清理资源
        FinishRemoteTransactionCommit(connection);
     }

commit的过程与prepare相似

commit的关键点是任何异常都不会触发回滚

rollback过程

rollback过程与commit过程类似,只不过发送的是rollback的语句,不再进行描述。

事务异常处理

parpare/commit/rollback并没有进行事务异常的处理,因此是非常高效的拥有非常好的性能

事务异常是什么

举例如下:
1. prepare过程中,有部分成功部分失败,事务最终应该是都rollback回滚
2. commit prepare过程中,有部分成功部分失败,事务最终应该都是commit提交

当事务出现异常时就会导致事务在不同节点的不一致状态
千万不可以通过pg_prepared_xact的残留信息去做任何处理,因为无法获知最终状态

事务异常何时处理

异常的事务由Citus Maintenance Daemon进程定期进行最终状态处理,默认是1分钟处理一次

  • 调整清理频率
    citus.recover_2pc_interval=60000 (毫秒) 注意单位是毫秒
    设置该参数可以调整异常清理的频率,

事务异常如何手动处理

select recover_prepared_transactions();

异常处理过程

我们直接在代码里进行标注,这样更容易理解逻辑顺序

  • 简单讲解
    如果work节点的prepare信息,可以在pg_dist_transaction中找到,则再次提交它。
    如果不能在pg_dist_transaction中找到,我们就回滚它。

  • 详细讲解
    这个过程会有些复杂,根据兴趣阅读

static int
RecoverWorkerTransactions(WorkerNode *workerNode)
{
    //  获取work节点上的pg_prepare_xact的信息
    pendingTransactionSet =

     // 获取当前正在运行的事务信息
    activeTransactionNumberSet = 

    //获取pg_dist_transaction的信息
    scanDescriptor = systable_beginscan(pgDistTransaction)

     // 再次获得work节点上的pg_prepare_xact的信息
    recheckTransactionSet = 

    while (逐条获得pg_dist_transaction的信息)
    {
          //这个事务正在运行,还未结束,不做任何处理
        isTransactionInProgress =  activeTransactionNumberSet
        if (isTransactionInProgress)
           {
              continue;
           }

        if (pendingTransactionSet中有此条记录 && recheckTransactionSet中有此条记录)
        {
        我们会再次提交它
                如果失败就下一次在继续
         }

        if (pendingTransactionSet中没有此条记录)
                说明work上的这个记录已经正常的commit了。

        if (recheckTransactionSet中有此条记录, 而pendingTransactionSet中没有)
             说明这个事务刚刚启动,并且在极其微小的时间内,还没有加入到activeTransactionNumberSet中。
             或是这个事务在我们获取 activeTransactionNumberSet开始,在获取pg_dist_transaction前结束,可能正处在commit_pepare的时间事务未完全结束
             这两种情况,我们不做处理,下次再次维护的时候进行处理

      } while结束

       此时pendingTransactionSet里的信息,需要再次提交的已经做完,剩下的便是需要回滚的信息
      while (逐条获取pendingTransactionSet信息)
        {
               如果这个记录正在运行,我们不处理它(因为此时它不会出现在pg_dist_transaction中)
               如果这个记录不是正在运行,我们将回滚这个记录
    }

总结

事务处理非常复杂,也很关键,其中涉及非常多的小细节,本文揭示了Citus在事务处理过程中最关键的部分,后续还会陆续分享其他特征。
Citus的这个事务异常处理设计的非常高效,非常棒,我们可以将其设计模式和理念应用于我们的日常工作中。

文章浏览总量 1,101 次

评论 (3)

  • levi| 2019年8月15日

    事务异常这里没太看懂,主要是不知道pg_prepare_xact这个里存的是什么,还有为什么要第二次获取,你展示的代码里也没有提到scanDescriptor 是怎么用的

  • levi| 2019年8月15日

    我有个大胆的猜测,第一阶段提交在pg_prepare_xact中做记录,第二阶段提交再次检查是否具有提交事务的能力,有能力就在pg_dist_transaction中做记录,正式提交事务,这样就能和你开始的简单解释能对应上了,是这样吗

    • 张 连壮| 2019年8月15日

      第一阶段提交在pg_prepare_xact中做记录,第二阶段的内容在第一阶段里做了,prepare没有失败的,pg_dist_trians就做记录了。

  • 要发表评论,您必须先登录