Flink两阶段提交

article/2025/9/14 13:03:13

文章目录

  • Flink两阶段提交
    • 1.EXACTLY_ONCE语义
    • 2.Kafka的幂等性和事务
      • 2.1 幂等性
      • 2.2 事务
    • 3.两阶段提交协议
    • 4.TwoPhaseCommitSinkFunction
    • 参考文献

Flink两阶段提交

1.EXACTLY_ONCE语义

EXACTLY_ONCE语义简称EOS,指的是每条输入消息只会影响最终结果一次,注意这里是影响一次,而非处理一次,Flink一直宣称自己支持EOS,实际上主要是对于Flink应用内部来说的,对于外部系统(端到端)则有比较强的限制

  • 外部系统写入支持幂等性
  • 外部系统支持以事务的方式写入

Flink在1.4.0版本引入了TwoPhaseCommitSinkFunction接口,并在Kafka Producer的connector中实现了它,支持了对外部Kafka Sink的EXACTLY_ONCE语义。

2.Kafka的幂等性和事务

Kafka在0.11版本之前只能保证At-Least-Once或At-Most-Once语义,从0.11版本开始,引入了幂等发送和事务,从而开始保证EXACTLY_ONCE语义,下面来看看Kafka中幂等发送和事务的原理:

2.1 幂等性

在未引入幂等性时,Kafka正常发送和重试发送消息流程图如下:
在这里插入图片描述
在这里插入图片描述
为了实现Producer的幂等语义,Kafka引入了Producer ID(即PID)和Sequence Number。每个新的Producer在初始化的时候会被分配一个唯一的PID,该PID对用户完全透明而不会暴露给用户。

Producer发送每条消息<Topic, Partition>对于Sequence Number会从0开始单调递增,broker端会为每个<PID, Topic, Partition>维护一个序号,每次commit一条消息此序号加一,对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)大1以上,则Broker会接受它,否则将其丢弃:

  • 序号比Broker维护的序号大1以上,说明存在乱序。
  • 序号比Broker维护的序号小,说明此消息以及被保存,为重复数据。

有了幂等性,Kafka正常发送和重试发送消息流程图如下:
在这里插入图片描述
在这里插入图片描述
幂等性机制仅解决了单分区上的数据重复和乱序问题,对于跨session和所有分区的重复和乱序问题不能得到解决。于是需要引入事务。

2.2 事务

事务是指所有的操作作为一个原子,要么都成功,要么都失败,而不会出现部分成功或部分失败的可能。举个例子,比如小明给小王转账1000元,那首先小明的账户会减去1000,然后小王的账户会增加1000,这两个操作就必须作为一个事务,否则就会出现只减不增或只增不减的问题,因此要么都失败,表示此次转账失败。要么都成功,表示此次转账成功。分布式下为了保证事务,一般采用两阶段提交协议。

为了解决跨session和所有分区不能EXACTLY-ONCE问题,Kafka从0.11开始引入了事务。

为了支持事务,Kafka引入了Transacation Coordinator来协调整个事务的进行,并可将事务持久化到内部topic里,类似于offset和group的保存。

用户为应用提供一个全局的Transacation ID,应用重启后Transacation ID不会改变。为了保证新的Producer启动后,旧的具有相同Transaction ID的Producer即失效,每次Producer通过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。由于旧的Producer的epoch比新Producer的epoch小,Kafka可以很容易识别出该Producer是老的Producer并拒绝其请求。有了Transaction ID后,Kafka可保证:

  • 跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的Producer停止工作。
  • 跨Session的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作。

KPI-98对Kafka事务原理进行了详细介绍,完整的流程图如下:
在这里插入图片描述

  1. Producer向任意一个brokers发送 FindCoordinatorRequest请求来获取Transaction Coordinator的地址

  2. 找到Transaction Coordinator后,具有幂等特性的Producer必须发起InitPidRequest请求以获取PID。

    2.1 当设置了TransactionalId
    如果开启了事务特性,设置了TransactionalId,则TransactionalId会和InitPidRequest请求一起传递,并且在步骤2a中将TransactionalId和对应的PID持久化到事务日志中,这使我们能够将TransactionalId的相同PID返回给producer的未来实例,从而使恢复或中止先前未完成的事务成为可能。除了返回PID外,InitPidRequest还会执行如下任务:

    • 增加该PID对应的epoch。具有相同PID但epoch小于该epoch的其它Producer(如果有)新开启的事务将被拒绝。
    • 恢复(Commit或Abort)之前的Producer未完成的事务(如果有)。
      注意:InitPidRequest的处理过程是同步阻塞的。一旦该调用正确返回,Producer即可开始新的事务。

    2.2 当没有设置TransactionalId
    如果事务特性未开启,InitPidRequest可发送至任意Broker,并且会得到一个全新的唯一的PID。该Producer将只能使用幂等特性以及单一Session内的事务特性,而不能使用跨Session的事务特性。

  3. 调用beginTransaction()方法开启一个事务,Producer本地会记录已经开启了事务,但Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。

  4. Consume-Transform-Produce
    这一阶段,包含了整个事务的数据处理过程,并且包含了多种请求。

    4.1 AddPartitionsToTxnRequest
    一个Producer可能会给多个<Topic, Partition>发送数据,给一个新的<Topic, Partition>发送数据前,它需要先向Transaction Coordinator发送AddPartitionsToTxnRequest。Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为BEGIN,如上图中步骤4.1所示。有了该信息后,我们才可以在后续步骤中为每个Topic, Partition>设置COMMIT或者ABORT标记(如上图中步骤5.2所示)。另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。

    4.2 ProduceRequest
    Producer通过一个或多个ProduceRequest发送一系列消息。除了应用数据外,该请求还包含了PID,epoch,和Sequence Number。该过程如上图中步骤4.2所示。

    4.3 AddOffsetCommitsToTxnRequest
    为了提供事务性,Producer新增了sendOffsetsToTransaction方法,该方法将多组消息的发送和消费放入同一批处理内。

    该方法先判断在当前事务中该方法是否已经被调用并传入了相同的Group ID。若是,直接跳到下一步;若不是,则向Transaction Coordinator发送AddOffsetsToTxnRequests请求,Transaction Coordinator将对应的所有<Topic, Partition>存于Transaction Log中,并将其状态记为BEGIN,如上图中步骤4.3所示。该方法会阻塞直到收到响应。TxnOffsetCommitRequest作为sendOffsetsToTransaction方法的一部分,在处理完AddOffsetsToTxnRequest后,Producer也会发送TxnOffsetCommit请求给Consumer Coordinator从而将本事务包含的与读操作相关的各<Topic, Partition>的Offset持久化到内部的__consumer_offsets中,如上图步骤4.3a

    4.4 TxnOffsetCommitRequest
    作为sendOffsetsToTransaction方法的一部分,在处理完AddOffsetsToTxnRequest后,Producer也会发送TxnOffsetCommit请求给Consumer Coordinator从而将本事务包含的与读操作相关的各<Topic, Partition>的Offset持久化到内部的__consumer_offsets中,如上图步骤4.4所示。在此过程中,Consumer Coordinator会通过PID和对应的epoch来验证是否应该允许该Producer的该请求。
    这里需要注意:

    • 写入__consumer_offsets的Offset信息在当前事务Commit前对外是不可见的。也即在当前事务被Commit前,可认为该Offset尚未Commit,也即对应的消息尚未被完成处理。
    • Consumer Coordinator并不会立即更新缓存中相应<Topic, Partition>的Offset,因为此时这些更新操作尚未被COMMIT或ABORT。
  5. 提交或回滚事务
    一旦上述数据写入操作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。

    5.1 EndTxnRequest
    commitTransaction方法使得Producer写入的数据对下游Consumer可见。abortTransaction方法通过Transaction Marker将Producer写入的数据标记为Aborted状态。下游的Consumer如果将isolation.level设置为READ_COMMITTED,则它读到被Abort的消息后直接将其丢弃而不会返回给客户程序,也即被Abort的消息对应用程序不可见。

    无论是Commit还是Abort,Producer都会发送EndTxnRequest请求给Transaction Coordinator,并通过标志位标识是应该Commit还是Abort。

    收到该请求后,Transaction Coordinator会进行如下操作

    • 将PREPARE_COMMIT或PREPARE_ABORT消息写入Transaction Log,如上图中步骤5.1所示
    • 通过WriteTxnMarker请求以Transaction Marker的形式将COMMIT或ABORT信息写入用户数据日志以及Offset Log中,如上图中步骤5.2所示
    • 最后将COMMIT或ABORT信息写入Transaction Log中,如上图中步骤5.3所示。

    上述第二步是实现将一组读操作与写操作作为一个事务处理的关键。因为Producer写入的数据Topic以及记录Comsumer Offset的Topic会被写入相同的Transactin Marker,所以这一组读操作与写操作要么全部COMMIT要么全部ABORT。

    5.2 WriteTxnMarkerRequest
    上面提到的WriteTxnMarkerRequest由Transaction Coordinator发送给当前事务涉及到的每个<Topic, Partition>的Leader。收到该请求后,对应的Leader会将对应的COMMIT(PID)或者ABORT(PID)控制信息写入日志,如上图中步骤5.2所示。

    该控制消息向Broker以及Consumer表明对应PID的消息被Commit了还是被Abort了。

    这里要注意,如果事务也涉及到__consumer_offsets,即该事务中有消费数据的操作且将该消费的Offset存于__consumer_offsets中,Transaction Coordinator也需要向该内部Topic的各Partition的Leader发送WriteTxnMarkerRequest从而写入COMMIT(PID)或COMMIT(PID)控制信息。

    5.2 写入最终的COMMIT或ABORT消息
    写完所有的Transaction Marker后,Transaction Coordinator会将最终的COMMIT或ABORT消息写入Transaction Log中以标明该事务结束,如上图中步骤5.3所示。

    此时,Transaction Log中大多数关于该事务的消息全部可以移除。当然,由于Kafka内数据是Append Only的,不可直接更新和删除,这里说的移除只是将其标记为null从而在Log Compact时不再保留。

    我们只需要保留已完成事务的PID和时间戳,因此最终可以为生产者删除TransactionalId-> PID映射。

3.两阶段提交协议

两阶段提交指的是一种协议,经常用来实现分布式事务,可以简单理解为预提交+实际提交,一般分为协调器Coordinator(以下简称C)和若干事务参与者Participant(以下简称P)两种角色。
在这里插入图片描述

  1. C先将prepare请求写入本地日志,然后发送一个prepare的请求给P
  2. P收到prepare请求后,开始执行事务,如果执行成功返回一个Yes或OK状态给C,否则返回No,并将状态存到本地日志。
  3. C收到P返回的状态,如果每个P的状态都是Yes,则开始执行事务Commit操作,发Commit请求给每个P,P收到Commit请求后各自执行Commit事务操作。如果至少一个P的状态为No,则会执行Abort操作,发Abort请求给每个P,P收到Abort请求后各自执行Abort事务操作。
    注:C或P把发送或接收到的消息先写到日志里,主要是为了故障后恢复用,类似WAL
    在这里插入图片描述
    在这里插入图片描述

4.TwoPhaseCommitSinkFunction

Flink在1.4.0版本引入了TwoPhaseCommitSinkFunction接口,封装了两阶段提交逻辑,并在Kafka Sink connector中实现了TwoPhaseCommitSinkFunction,依赖Kafka版本为0.11+,TwoPhaseCommitSinkFunction具体实现如下:
在这里插入图片描述

//TwoPhaseCommitSinkFunctionpublic void initializeState(FunctionInitializationContext context) throws Exception {// 当我们通过提交中事务恢复状态时,我们并不知道事务已经提交了,还是在checkpoint执行完成(在master端)和在此处通知checkpoint完成之间失败了。// 通常情况下是已经提交了,因为checkpoint在master执行完成和在此处通知checkpoint完成之间的窗口非常小// 如果在第一次checkpoint完成之前失败,则可能没有任何事务,或者扩容的情况下,会有一部分新的task没有事务// 如果发生缩容事件,或者由于“notifycheckpointcomplete()”方法中讨论的原因,我们可以检查多个事务。state = context.getOperatorStateStore().getListState(stateDescriptor);boolean recoveredUserContext = false;// 从上一次恢复if (context.isRestored()) {for (State<TXN, CONTEXT> operatorState : state.get()) {userContext = operatorState.getContext();List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {recoverAndCommitInternal(recoveredTransaction);}recoverAndAbort(operatorState.getPendingTransaction().handle);if (userContext.isPresent()) {finishRecoveringContext();recoveredUserContext = true;}}}if (!recoveredUserContext) {userContext = initializeUserContext();}this.pendingCommitTransactions.clear();//创建生产者事务,并返回句柄currentTransactionHolder = beginTransactionInternal();}public void snapshotState(FunctionSnapshotContext context) throws Exception {long checkpointId = context.getCheckpointId();//预提交,如果语义为EXACTLY_ONCE,执行flush操作preCommit(currentTransactionHolder.handle);//pendingCommitTransactions插入当次检查点对应的currentTransactionHolder,包含事务生产者的实例(对于EXACTLY_ONCE模式)pendingCommitTransactions.put(checkpointId, currentTransactionHolder);//这里又初始化了一次包含事务生产者的实例(对于EXACTLY_ONCE模式),并赋给currentTransactionHoldercurrentTransactionHolder = beginTransactionInternal();//清空statestate.clear();//state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}public final void notifyCheckpointComplete(long checkpointId) throws Exception {// 可能出现以下几种情况// (1) 从最近的检查点触发并完成的事务恰好只有一个。 这应该很常见,在这种情况下只需提交该事务即可。// (2) 由于上一次checkpoint被跳过导致这里有多个正在进行的事务,这是一种罕见的情况,但可能在以下情况下发生://     - 上一次checkpoint未能持久化metadata(存储系统临时中断)但可以保留一个连续的检查点(此处通知的检查点)//     - 其他task未能在上一次checkpoint持久化他们的状态,但未触发失败,因为他们可以保持其状态并将其成功保存在连续的检查点中(此处通知的检查点)//    在这两种情况下,前一个检查点都不会达到提交状态,但此检查点总是希望包含前一个检查点,并覆盖自上一个成功检查点以来的所有更改。因此,我们需要提交所有待提交的事务。// (3) 多个事务处于待提交状态,但检查点完成通知与最新的不相关。这是可能的,因为通知消息可能会延迟(在极端情况下,直到触发下一个检查点之后到达)并且可能会有并发的重叠检查点(新的检查点在上一个完全完成之前启动)。// ==> 永远不会有我们这里没有待提交事务的情况//待提交的事务版本和事务句柄Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}try {//提交事务(最终调用commitTransaction)commit(pendingTransaction.handle);} catch (Throwable t) {//}pendingTransactionIterator.remove();}}
//FlinkKafkaProducer011public void initializeState(FunctionInitializationContext context) throws Exception {//如果检查点未开启,语义设置为NONEif (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {semantic = Semantic.NONE;}nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);//初始化事务ID生成器transactionalIdsGenerator = new TransactionalIdsGenerator(getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),getRuntimeContext().getIndexOfThisSubtask(),getRuntimeContext().getNumberOfParallelSubtasks(),kafkaProducersPoolSize,SAFE_SCALE_DOWN_FACTOR);if (semantic != Semantic.EXACTLY_ONCE) {nextTransactionalIdHint = null;} else {//如果为EXACTLY_ONCE语义,初始化nextTransactionalIdHint(初始化lastParallelism和nextFreeTransactionalId为0),后面用来生成多个事务IDArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());if (transactionalIdHints.size() > 1) {} else if (transactionalIdHints.size() == 0) {nextTransactionalIdHint = new NextTransactionalIdHint(0, 0);abortTransactions(transactionalIdsGenerator.generateIdsToAbort());} else {nextTransactionalIdHint = transactionalIdHints.get(0);}}//调用父类的initializeState方法super.initializeState(context);}public void snapshotState(FunctionSnapshotContext context) throws Exception {//调用父类的snapshotState方法super.snapshotState(context);//清空nextTransactionalIdHintStatenextTransactionalIdHintState.clear();//避免每个task上做同样的操作,这里只在第一个task上完成nextFreeTransactionalId的初始化if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == Semantic.EXACTLY_ONCE) {long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;}nextTransactionalIdHintState.add(new NextTransactionalIdHint(getRuntimeContext().getNumberOfParallelSubtasks(),nextFreeTransactionalId));}}

Flink Kafka Sink执行两阶段提交的流程图大致如下:
在这里插入图片描述
假设一种场景,从Kafka Source拉取数据,经过一次窗口聚合,最后将数据发送到Kafka Sink,如下图:
在这里插入图片描述

  1. JobManager向Source发送Barrier,开始进入pre-Commit阶段,当只有内部状态时,pre-commit阶段无需执行额外的操作,仅仅是写入一些已定义的状态变量即可。当chckpoint成功时Flink负责提交这些写入,否则就终止取消掉它们。
  2. 当Source收到Barrier后,将自身的状态进行保存,后端可以根据配置进行选择,这里的状态是指消费的每个分区对应的offset。然后将Barrier发送给下一个Operator。
    在这里插入图片描述
  3. 当Window这个Operator收到Barrier之后,对自己的状态进行保存,这里的状态是指聚合的结果(sum或count的结果),然后将Barrier发送给Sink。Sink收到后也对自己的状态进行保存,之后会进行一次预提交。
    在这里插入图片描述
  4. 预提交成功后,JobManager通知每个Operator,这一轮检查点已经完成,这个时候,Kafka Sink会向Kafka进行真正的事务Commit。
    在这里插入图片描述

以上便是两阶段的完整流程,提交过程中如果失败有以下两种情况

  1. Pre-commit失败,将恢复到最近一次CheckPoint位置
  2. 一旦pre-commit完成,必须要确保commit也要成功

因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚。

参考文献

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98±+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-1.Findingatransactioncoordinator–theFindCoordinatorRequest
https://cloud.tencent.com/developer/article/1149669
https://segmentfault.com/a/1190000019329884
https://www.cnblogs.com/felixzh/p/10184762.html


http://chatgpt.dhexx.cn/article/TLbPtGay.shtml

相关文章

2PC(两阶段提交)方案

XA方案 2PC的传统方案是在数据库层面实现的&#xff0c;如Oracle、MySQL都支持2PC协议&#xff0c;为了统一标准减少行业内不必要的对接成本&#xff0c;需要制定标准化的处理模型及接口标准&#xff0c;国际开放标准组织Open Group定义了分布式事务处理模型DTP&#xff08;Dis…

简述计算机学科的三个过程,三阶段提交

三阶段提交&#xff0c;也叫三阶段提交协议&#xff0c;是在计算机网络及数据库的范畴下&#xff0c;使得一个分布式系统内的所有节点能够执行事务的提交的一种分布式算法。三阶段提交是为解决两阶段提交协议的缺点而设计的。 中文名 三阶段提交 外文名 Three-phase commit 阶 …

什么是Mysql 两阶段提交?为什么必须有“两阶段提交”呢? 两阶段提交的原理? 解决 bin log 与 redo log 的一致性问题?

文章目录 什么是两阶段提交为什么必须有“两阶段提交”呢&#xff1f; 两阶段提交的原理&#xff1f; 解决 bin log 与 redo log 的一致性问题&#xff1f;在没有两阶段提交的恢复情况下&#xff1a;有了两阶段两阶段提交后的恢复情况&#xff1a; 什么是两阶段提交 当有数据修…

分布式事务 - 两阶段提交与三阶段提交

在分布式系统中&#xff0c;著有CAP理论&#xff0c;该理论由加州大学伯克利分校的Eric Brewer教授提出&#xff0c;该理论阐述了在一个分布式系统中不可能同时满足一致性&#xff08;Consistency&#xff09;、可用性&#xff08;Availability&#xff09;&#xff0c;以及分区…

mysql的两阶段提交理解

一个事务的执行方式&#xff08;一般存在显式开启和隐式开启&#xff0c;在未申明的情况下一般是隐式事务&#xff09;&#xff1a;redo log存在两阶段提交&#xff0c;prepare和commit.当写完redo log 时,开始写 bin log ,当bin log写完,会在这次事务的最后加上commit(xid)&am…

分布式事务(两阶段提交、三阶段提交)

假如没有分布式事务 在一系列微服务系统当中&#xff0c;假如不存在分布式事务&#xff0c;会发生什么呢&#xff1f;让我们以互联网中常用的交易业务为例子&#xff1a; 上图中包含了库存和订单两个独立的微服务&#xff0c;每个微服务维护了自己的数据库。在交易系统的业务逻…

两阶段提交(2 Phase Commit) 在 PostgreSQL 和 RocksDB 中的实现

文章目录 前言用法PostgreSQLRocksDB 实现PostgreSQL 2PCRocksDB 2PCWRITE_COMMITTEDWRITE_PREPARED解决 snapshot-read 问题解决 rollback 问题 WRITE_UNPREPARED 总结 前言 本节中提到的代码实现是基于 PG&#xff1a;REL_15_STABLE 和 Rocksdb: master-fcd816d534 代码介绍的…

分布式事务(2)两阶段提交

分布式事务(1)基本概念 分布式事务(2)两阶段提交 分布式事务(3)TCC 分布式事务(4)可靠消息最终一致性 分布式事务(5)最大努力通知 分布式事务【2】2PC两阶段提交 1.什么是2PC&#xff08;两阶段提交&#xff09;2.解决方案2.1 XA方案2.2 Seata方案 3.seata实现2PC事务4.2PC解决…

MYSQL之两阶段提交和组提交(数据一致性)

我们在MySQL 的日志中详细的介绍了undo log、redo log、binlog这三个日志和所用到的一些缓存知识&#xff0c;那么下面我们分析一下更新语句执行过程&#xff0c;它们是怎么变化的呢&#xff1f;下面我们直接给答案吧。假如我们修改一行主键索引&#xff08;id&#xff09;为1的…

MySQL的两阶段提交(数据一致性)

MySQL的两阶段提交 两阶段提交过程为什么要写redo log&#xff0c;不写redo log的话&#xff0c;根本就不会出现“两阶段提交”的麻烦事啊&#xff1f;为什么要写两次redo log&#xff0c;写一次不行吗&#xff1f;在两阶段提交的情况下&#xff0c;是怎么实现崩溃恢复的呢&…

分布式事务之两阶段提交

两阶段提交协议 两阶段提交协议把分布式事务分为两个阶段&#xff0c;一个是准备阶段&#xff0c;另一个是提交阶段&#xff1b;准备阶段和提交阶段都是由事务管理器发起的&#xff1b;我们可以将事务管理器称为协调者&#xff0c;将资源管理器称为参与者。 流程 准备阶段&a…

两阶段提交和三阶段提交的理解

两阶段提交和三阶段提交的理解 本文主要通过梳理流程说明2PC的缺点&#xff0c;并总结3PC的优点。 一、2PC流程及致命问题 如图所示&#xff0c;2PC的流程简要&#xff1a; 1.协调者 参与者都是INIT状态&#xff0c;参与者等待消息。 2.协调者进入prepare状态发送vote_requ…

两阶段提交和三阶段提交的区别

1.分布式理论&#xff1a;一致性协议&#xff1a;2PC 1.1 什么是2PC2PC,即两阶段提交协议的简写&#xff0c;是整个事务过程分为两个阶段&#xff0c;准备阶段&#xff08;Prepare phase&#xff09;和提交阶段&#xff08;Commit phase&#xff09;两个阶段过程&#xff1a; …

分布式两阶段提交和三阶段提交

随着大型网站的各种高并发访问、海量数据处理等场景越来越多&#xff0c;如何实现网站的高可用、易伸缩、可扩展、安全等目标就显得越来越重要。 为了解决这样一系列问题&#xff0c;大型网站的架构也在不断发展。提高大型网站的高可用架构&#xff0c;不得不提的就是分布式。本…

MySQL (十四) 两阶段提交

一. 两阶段提交 1.1 利用 binlog 和redolog 做到两阶段提交 从上图中看出&#xff1a;最后提交事务的三个步骤&#xff1a; 写入redo log &#xff0c;处于prepare状态写binlog修改redo log 状态变为commit 由于redo log 的提交分为prepare 和commit两个阶段&#xff0c;所以我…

mysql之两阶段提交

什么是两阶段提交 当有数据修改时&#xff0c;会先将修改redo log cache和binlog cache然后在刷入到磁盘形成redo log file&#xff0c;当redo log file全都刷入到磁盘时&#xff08;prepare 状态&#xff09;和提交成功后才能将binlog cache刷入磁盘&#xff0c;当binlog全部刷…

分布式事务:两阶段提交与三阶段提交

两阶段提交与三阶段提交 分布式事务二阶段提交请求阶段提交阶段举例故障分析柜员侧出现故障或拒绝Proposal第一阶段经理侧出现故障第二阶段经理侧出现故障 二阶段提交存在的问题 三阶段提交canCommit阶段preCommit阶段doCommit阶段特点 参考 分布式事务 满足ACID&#xff08;原…

正确理解二阶段提交(Two-Phase Commit)

文章目录 明确问题草稿方案原子提交协议希望实现的2个特性正确的二段提交协议&#xff08;Two-Phase Commit&#xff09;二阶段提交协议如何满足安全性&#xff08;Safety&#xff09;二阶段提交协议如何满足存活性( Liveness) 如何应对超时 超时终止协议 如何应对宕机重启 二阶…

两阶段提交协议(two phase commit protocol,2PC)详解

一、协议概述 两阶段提交协议&#xff08;two phase commit protocol&#xff0c;2PC&#xff09;可以保证数据的强一致性&#xff0c;许多分布式关系型数据管理系统采用此协议来完成分布式事务。它是协调所有分布式原子事务参与者&#xff0c;并决定提交或取消&#xff08;回…

简单谈谈MySQL的两阶段提交

一、简单回顾三种日志 在讲解两阶段提交之前&#xff0c;需要对MySQL中的三种日志即binlog、redo log与undo log有一定的了解。 在这三种日志中&#xff0c;很多同学会把binlog与redo log混淆&#xff0c;下面使用一张表格来简单对比下两者的区别。 当然&#xff0c;如果对bi…