GO编程DTM(二)
二阶段消息例子
本文将介绍一个完整的二阶段消息例子,让读者对二阶段消息型事务有一个准确的了解业务场景
跨行转账是典型的分布式事务场景,在这里,A需要跨行转账给B,假设需求场景是:只有转出A可能失败,转入B是能够最终成功的二阶段消息
二阶段消息是dtm首创的事务模式,用于替换本地事务表和事务消息这两种现有的方案。它能够保证本地事务的提交和全局事务提交是原子的,适合解决不需要回滚的分布式事务场景。下面我们来看看二阶段消息,如何解决这个业务场景的问题。核心业务
首先我们创建账户余额表:CREATETABLEdtmbusi。useraccount(idint(11)AUTOINCREMENTPRIMARYKEY,useridint(11)notNULLUNIQUE,balancedecimal(10,2)NOTNULLDEFAULT0。00,tradingbalancedecimal(10,2)NOTNULLDEFAULT0。00,createtimedatetimeDEFAULTnow(),updatetimedatetimeDEFAULTnow());
然后编写核心业务代码,调整用户的账户余额funcSagaAdjustBalance(dbdtmcli。DB,uidint,amountint,resultstring)error{,err:dtmimp。DBExec(db,updatedtmbusi。useraccountsetbalancebalance?whereuserid?,amount,uid)returnerr}
再来编写具体的处理函数app。POST(BusiAPISagaBTransIn,dtmutil。WrapHandler2(func(cgin。Context)interface{}{barrier:MustBarrierFromGin(c)returnbarrier。Call(txGet(),func(txsql。Tx)error{returnSagaAdjustBalance(tx,TransInUID,reqFrom(c)。Amount,)})}))
这些处理函数的核心逻辑都是是调整余额。这里面的barrier。Call主要是用于处理幂等,保证重复调用不会多次调整余额,详情参见异常与子事务屏障二阶段消息事务
到此各个子事务的处理函数已经OK了,然后是开启二阶段消息事务,进行分支调用msg:dtmcli。NewMsg(DtmServer,shortuuid。New())。Add(busi。BusiSagaBTransIn,TransReq{Amount:30})err:msg。DoAndSubmitDB(busi。BusiQueryPreparedB,dbGet(),func(txsql。Tx)error{returnbusi。SagaAdjustBalance(tx,busi。TransOutUID,req。Amount)})
这段代码中,会保证DoAndSubmitDB中的业务提交和全局事务提交是原子的,保证了TransOut和TransIn的同时成功,或同时失败。其中DoAndSubmitDB中的第一个参数为回查URL,他的代码如下:app。GET(BusiAPIQueryPreparedB,dtmutil。WrapHandler2(func(cgin。Context)interface{}{bb:MustBarrierFromGin(c)returnbb。QueryPrepared(dbGet())}))
至此,一个完整的二阶段消息分布式事务编写完成。运行
如果您想要完整运行一个成功的示例,步骤如下:运行dtmgitclonehttps:github。comdtmlabsdtmcddtmgorunmain。go运行例子gitclonehttps:github。comdtmlabsdtmexamplescddtmexamplesgorunmain。gohttpmsgdoAndCommit如何保证原子性
二阶段消息如何保证本地事务和全局事务要么都成功,要么都失败呢?假定本地事务提交完成后,提交全局事务前,进程crash会如何?下面时序图很好的讲解了二阶段消息是如何处理这个问题的:
图中的回查处理逻辑,dtm已经做了自动处理,用户只需要粘贴上述的代码即可SAGA例子
本文将介绍一个完整的SAGA例子,让读者对SAGA型事务有一个准确的了解业务场景
跨行转账是典型的分布式事务场景,在这里,A需要跨行转账给B,假设需求场景是:转出A和转入B都有可能成功和失败,需要最终转入转出都成功,或者都失败SAGA
Saga是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务,由Saga事务协调器协调,如果各个本地事务成功完成那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。核心业务
对于我们要进行的银行转账的例子,我们将在正向操作中,进行转入转出,在补偿操作中,做相反的调整。
首先我们创建账户余额表:CREATETABLEdtmbusi。useraccount(idint(11)AUTOINCREMENTPRIMARYKEY,useridint(11)notNULLUNIQUE,balancedecimal(10,2)NOTNULLDEFAULT0。00,tradingbalancedecimal(10,2)NOTNULLDEFAULT0。00,createtimedatetimeDEFAULTnow(),updatetimedatetimeDEFAULTnow());
然后编写核心业务代码,调整用户的账户余额funcSagaAdjustBalance(dbdtmcli。DB,uidint,amountint,resultstring)error{,err:dtmimp。DBExec(db,updatedtmbusi。useraccountsetbalancebalance?whereuserid?,amount,uid)returnerr}
再来编写具体的正向操作补偿操作的处理函数app。POST(BusiAPISagaBTransIn,dtmutil。WrapHandler2(func(cgin。Context)interface{}{barrier:MustBarrierFromGin(c)returnbarrier。Call(txGet(),func(txsql。Tx)error{returnSagaAdjustBalance(tx,TransInUID,reqFrom(c)。Amount,)})}))app。POST(BusiAPISagaBTransInCom,dtmutil。WrapHandler2(func(cgin。Context)interface{}{barrier:MustBarrierFromGin(c)returnbarrier。Call(txGet(),func(txsql。Tx)error{returnSagaAdjustBalance(tx,TransInUID,reqFrom(c)。Amount,)})}))app。POST(BusiAPISagaBTransOut,dtmutil。WrapHandler2(func(cgin。Context)interface{}{barrier:MustBarrierFromGin(c)returnbarrier。Call(txGet(),func(txsql。Tx)error{returnSagaAdjustBalance(tx,TransOutUID,reqFrom(c)。Amount,)})}))app。POST(BusiAPISagaBTransOutCom,dtmutil。WrapHandler2(func(cgin。Context)interface{}{barrier:MustBarrierFromGin(c)returnbarrier。Call(txGet(),func(txsql。Tx)error{returnSagaAdjustBalance(tx,TransOutUID,reqFrom(c)。Amount,)})}))
这些处理函数的核心逻辑都是是调整余额,对于这里面的barrier。Call作用,后面会详细解释SAGA事务
到此各个子事务的处理函数已经OK了,然后是开启SAGA事务,进行分支调用req:gin。H{amount:30}微服务的载荷DtmServer为DTM服务的地址saga:dtmcli。NewSaga(DtmServer,shortuuid。New())。添加一个TransOut的子事务,正向操作为url:qsBusiTransOut,逆向操作为url:qsBusiTransOutComAdd(qsBusiSagaBTransOut,qsBusiSagaBTransOutCom,req)。添加一个TransIn的子事务,正向操作为url:qsBusiTransOut,逆向操作为url:qsBusiTransInComAdd(qsBusiSagaBTransIn,qsBusiSagaBTransInCom,req)提交saga事务,dtm会完成所有的子事务回滚所有的子事务err:saga。Submit()
至此,一个完整的SAGA分布式事务编写完成。运行
如果您想要完整运行一个成功的示例,步骤如下:运行dtmgitclonehttps:github。comdtmlabsdtmcddtmgorunmain。go运行例子gitclonehttps:github。comdtmlabsdtmexamplescddtmexamplesgorunmain。gohttpsagabarrier
时序图如下:
处理网络异常
假设提交给dtm的事务中,调用转入操作时,出现短暂的故障怎么办?dtm会重试未完成的操作,此时就会要求全局事务中的各个子事务是幂等的。dtm框架首创子事务屏障技术,提供BranchBarrier工具类,可以帮助用户简单的处理幂等。它提供了一个函数Call,保证这个函数内部的业务,会被最多调用一次:func(bbBranchBarrier)Call(txsql。Tx,busiCallBarrierBusiFunc)error
该BranchBarrier不仅能够自动处理幂等,还能够自动处理空补偿、悬挂的问题,详情可以参考异常与子事务屏障处理回滚
假如银行将金额准备转入用户2时,发现用户2的账户异常,返回失败,会怎么样?我们调整处理函数,让转入操作返回失败app。POST(BusiAPISagaBTransIn,dtmutil。WrapHandler2(func(cgin。Context)interface{}{returndtmcli。ErrFailure}))
我们给出事务失败交互的时序图
这里有一点,TransIn的正向操作什么都没有做,就返回了失败,此时调用TransIn的补偿操作,会不会导致反向调整出错了呢?
不用担心,前面的子事务屏障技术,能够保证TransIn的错误如果发生在提交之前,则补偿为空操作;TransIn的错误如果发生在提交之后,则补偿操作会将数据提交一次。
您可以将返回错误的TransIn改成:app。POST(BusiAPISagaBTransIn,dtmutil。WrapHandler2(func(cgin。Context)interface{}{barrier:MustBarrierFromGin(c)barrier。Call(txGet(),func(txsql。Tx)error{returnSagaAdjustBalance(tx,TransInUID,reqFrom(c)。Amount,)})returndtmcli。ErrFailure}))
最后的结果余额依旧会是对的,详情可以参考异常与子事务屏障TCC例子
本文将介绍一个完整的TCC例子,让读者对TCC型事务有一个准确的了解业务场景
跨行转账是典型的分布式事务场景,在这里,A需要跨行转账给B,假设需求场景是:转出A和转入B都有可能成功和失败,需要最终转入转出都成功,或者都失败。
同时这里还有一个要求,假如发生回滚,SAGA模式下会发生A发现自己的余额被扣减了,但是收款方B迟迟没有收到余额,那么会对A造成很大的困扰。业务上面希望不要出现这种情况TCC组成
TCC分为3个阶段Try阶段:尝试执行,完成所有业务检查(一致性),预留必须业务资源(准隔离性)Confirm阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用Try阶段预留的业务资源Cancel阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放Try阶段预留的业务资源。
如果我们要进行一个类似于银行跨行转账的业务,转出(TransOut)和转入(TransIn)分别在不同的微服务里,一个成功完成的TCC事务典型的时序图如下:
核心业务
首先我们创建账户余额表,其中tradingbalance表示被冻结的金额:createtableifnotexistsdtmbusi。useraccount(idint(11)PRIMARYKEYAUTOINCREMENT,useridint(11)UNIQUE,balanceDECIMAL(10,2)notnulldefault0,tradingbalanceDECIMAL(10,2)notnulldefault0,createtimedatetimeDEFAULTnow(),updatetimedatetimeDEFAULTnow(),key(createtime),key(updatetime));
我们先编写核心代码,冻结解冻资金操作,会检查约束balancetradingbalance0,如果约束不成立,执行失败functccAdjustTrading(dbdtmcli。DB,uidint,amountint)error{affected,err:dtmimp。DBExec(db,updatedtmbusi。useraccountsettradingbalancetradingbalance?whereuserid?andtradingbalance?balance0,amount,uid,amount)iferrnilaffected0{returnfmt。Errorf(updateerror,maybebalancenotenough)}returnerr}functccAdjustBalance(dbdtmcli。DB,uidint,amountint)error{affected,err:dtmimp。DBExec(db,updatedtmbusi。useraccountsettradingbalancetradingbalance?,balancebalance?whereuserid?,amount,amount,uid)iferrnilaffected0{returnfmt。Errorf(updateuseraccount0rows)}returnerr}
下面我们来编写具体的TryConfirmCancel的处理函数app。POST(BusiAPITccBTransOutTry,dtmutil。WrapHandler2(func(cgin。Context)interface{}{bb:MustBarrierFromGin(c)returnbb。Call(txGet(),func(txsql。Tx)error{returntccAdjustTrading(tx,TransOutUID,req。Amount)})}))app。POST(BusiAPITccBTransOutConfirm,dtmutil。WrapHandler2(func(cgin。Context)interface{}{bb:MustBarrierFromGin(c)returnbb。Call(txGet(),func(txsql。Tx)error{returntccAdjustBalance(tx,TransOutUID,reqFrom(c)。Amount)})}))app。POST(BusiAPITccBTransOutCancel,dtmutil。WrapHandler2(func(cgin。Context)interface{}{bb:MustBarrierFromGin(c)returnbb。Call(txGet(),func(txsql。Tx)error{returntccAdjustTrading(tx,TransOutUID,req。Amount)})}))app。POST(BusiAPITccBTransInTry,dtmutil。WrapHandler2(func(cgin。Context)interface{}{bb:MustBarrierFromGin(c)returnbb。Call(txGet(),func(txsql。Tx)error{returntccAdjustTrading(tx,TransInUID,req。Amount)})}))app。POST(BusiAPITccBTransOutConfirm,dtmutil。WrapHandler2(func(cgin。Context)interface{}{bb:MustBarrierFromGin(c)returnbb。Call(txGet(),func(txsql。Tx)error{returntccAdjustBalance(tx,TransInUID,reqFrom(c)。Amount)})}))app。POST(BusiAPITccBTransInCancel,dtmutil。WrapHandler2(func(cgin。Context)interface{}{bb:MustBarrierFromGin(c)returnbb。Call(txGet(),func(txsql。Tx)error{returntccAdjustTrading(tx,TransInUID,req。Amount)})}))
到此各个子事务的处理函数已经OK了,这些处理函数的核心逻辑都是冻结和调整余额,对于这里面的bb。Call作用,后面会详细解释TCC事务
然后是开启TCC事务,进行分支调用TccGlobalTransaction会开启一个全局事务,err:dtmcli。TccGlobalTransaction(DtmServer,func(tccdtmcli。Tcc)(rerrerror){CallBranch会将事务分支的ConfirmCancel注册到全局事务上,然后直接调用Tryres1,rerr:tcc。CallBranch(TransReq{Amount:30},hostapiTccBTransOutTry,hostapiTccBTransOutConfirm,hostapiTccBTransOutCanceliferr!nil{returnresp,err}returntcc。CallBranch(TransReq{Amount:30},hostapiTccBTransInTry,hostapiTccBTransInConfirm,hostapiTccBTransInCancel)})
至此,一个完整的TCC分布式事务编写完成。运行
如果您想要完整运行一个成功的示例,步骤如下:运行dtmgitclonehttps:github。comdtmlabsdtmcddtmgorunmain。go运行例子gitclonehttps:github。comdtmlabsdtmexamplescddtmexamplesgorunmain。gohttptccbarrier处理网络异常
假设提交给dtm的事务中,这些步骤中,出现短暂的故障怎么办?dtm会重试未完成的操作,此时就会要求全局事务中的各个子事务是幂等的。dtm框架首创子事务屏障技术,提供BranchBarrier工具类,可以帮助用户简单的处理幂等。它提供了一个函数Call,保证这个函数内部的业务,会被最多调用一次:func(bbBranchBarrier)Call(txsql。Tx,busiCallBarrierBusiFunc)error
该BranchBarrier不仅能够自动处理幂等,还能够自动处理空补偿、悬挂的问题,详情可以参考异常与子事务屏障TCC的回滚
假如银行将金额准备转入用户2时,发现用户2的账户异常,返回失败,会怎么样?我们修改代码,模拟这种情况:app。POST(BusiAPITccBTransInTry,dtmutil。WrapHandler2(func(cgin。Context)interface{}{returndtmcli。ErrFailure}))
这是事务失败交互的时序图
这个跟成功的TCC差别就在于,当某个子事务返回失败后,后续就回滚全局事务,调用各个子事务的Cancel操作,保证全局事务全部回滚。
这里有一点,TransInTry的正向操作什么都没有做,就返回了失败,此时调用TransInCancel补偿操作,会不会导致反向调整出错了呢?
不用担心,前面的子事务屏障技术,能够保证TransInTry的错误如果发生在提交之前,则补偿为空操作;TransInTry的错误如果发生在提交之后,则补偿操作会将数据提交一次。
您可以将TccBTransInTry改成app。POST(BusiAPITccBTransInTry,dtmutil。WrapHandler2(func(cgin。Context)interface{}{bb:MustBarrierFromGin(c)bb。Call(txGet(),func(txsql。Tx)error{returntccAdjustTrading(tx,TransInUID,req。Amount)})returndtmcli。ErrFailure}))
最后的结果余额依旧会是对的,详情可以参考异常与子事务屏障