1. 什么是流处理?
首先,让我们说一下什么是数据流(也称为事件流)?它是无边界数据集的抽象说法,无边界意味着无限且不断增长,因为随着时间的推移,新数据会不断地到来。
除了无边界的特性之外,事件流模型还有其它几个属性:
1.1 事件流是有序的
这在交易事件里是最容易理解的,先在账户里存钱然后消费与先消费再还钱是非常不同的,后者将产生透支费用,而前者不能透支。这是事件流和数据库表之间的不同点之一:表中的记录始终被视为无序的(SQL的“order by”子句不是关系模型的一部分)。
1.2 事件流的数据是不可变的
事件一旦发生,就永远无法修改。例如取消一个交易事务,这个记录本身是不会被删除的,相反,会向流写入一个附加事件,记录之前事务的取消。这是事件流和数据库表之间的另一个不同点:我们可以删除或更新表中的数据。
1.3 事件流是可重放的
对于大多数业务应用程序来说,能够重放几个月前(甚至几年前)发生的原始事件流是至关重要的,这是为了分析修正错误或执行审计所必须的。
2. 流处理的相关概念
流处理和任何类型的数据处理非常相似,都是接收数据,对数据执行某些操作,例如转换、聚合等,然后把结果保存在某处。但是,有一些关键概念是流处理所特有的:
2.1 时间
时间可能是流处理中最重要的概念,因为大多数流式应用程序都在一定的时间内执行操作。例如,计算连续5分钟的股票价格平均值。流处理系统通常参考以下时间概念:
2.1.1 消息创建时间
这是事件被创建的时间。例如,进行测量的时间、出售商品的时间、用户在网站浏览页面的时间等等。在0.10.0及更高版本中,Kafka会在生产者创建消息时自动添加当前的时间。
2.1.2 消息保存时间
这是事件被保存在Kafka broker的时间。在0.10.0及更高版本中,Kafka broker可以配置自动为接收到的消息添加当前的时间。
2.1.3 消息被处理时间
这是流处理应用程序接收事件的时间,这时间可以是事件发生的几毫秒、几小时或者几天之后。
2.2 状态
通常我们是把状态保存在流处理应用程序本地的变量中,例如一个用于保存移动计数的简单哈希表。然而,在流处理应用程序中,这不是一个管理状态的可靠方法,因为当流处理应用程序停止时,状态将丢失。流处理有以下2种类型的状态:
2.2.1 本地或内部状态
只能由流处理应用程序的特定实例访问的状态,该状态通常是使用应用程序内运行的内存数据库来维护和管理。使用本地状态的优点是性能非常快,缺点是受限于可用的内存大小。因此,流处理的许多设计模式都是把数据分到多个子流来处理。
2.2.2 外部状态
使用外部数据存储维护的状态,例如是类似Cassandra的NoSQL数据库。使用外部状态的优点是几乎没有大小的限制,并且可以从应用程序的多个实例或甚至从不同的应用程序访问它;缺点是由于引入了外部组件导致额外的延时和复杂性。
2.3 流-表二元性(Stream-Table Duality)
数据流包含从一开始到现在的完整历史,它表示了过去和当前。它是一系列的事件,其中每个事件都会引起一个变化。而表包含的是某一时刻的状态,是许多变化的结果。从某种意义上可以说流和表是一个硬币的两面,世界总是在变化,有时我们会对引起这些变化的事件感兴趣,而有时我们却对当前的状态感兴趣。我们将这两者之间来回转换的内在关系称为“流-表二元性”。
2.4 时间窗口
大多数流的操作都是在一定的时间内执行 - 移动的平均值、本周最畅销商品等等。例如,当计算移动平均值时,我们需要知道:
- 时间窗口的大小:我们需要计算每5分钟的平均值?还是每15分钟?还是每天?越大的时间窗口数据变化反应会越滞后,例如价格上涨需要更长时间才能注意到。
- 时间窗口移动的频率(间隔):五分钟的平均值可以每分钟,每秒,或每次有新事件时更新。当间隔等于时间窗口的大小时被称为翻滚窗口(tumbling window);当时间窗口在每条记录上移动时被称为滑动窗口(sliding window)。
- 时间窗口保持可更新的时间:例如我们五分钟移动平均值是计算00:00-00:05时间窗口的平均值。现在一小时后,我们得到了更多的事件,但它们的事件时间显示为00:02。我们是否应该更新00:00-00:05期间的平均值?还是忽略它们?理想情况下,我们需要定义一个特定的时间段,在此期间接收到的事件将被添加到各自的时间窗口中。
时间窗口可以和时钟对齐,例如每分钟移动的一个5分钟窗口,第一个时间段是00:00-00:05,第二个是00:01-00:06。或者可以简单地从应用程序启动时开始计时,例如03:17-03:22。滑动窗口是永远不会和时钟对齐,因为只要有新记录就会移动。有关这两种时间窗口之间的区别,请参见下图:
3. 流处理设计模式
3.1 单事件处理
流处理的最基本模式是单独处理每个事件,这也称为map/filter模式,因为它通常用于从流中过滤不必要的事件或转换每个事件。术语“map”是基于map/reduce模式而来,在map阶段转换事件,reduce阶段执行聚合。在此模式中,流处理应用程序从流中消费事件、修改每个事件,然后将事件写入到另一个流中。例如,一个应用程序从流中读取日志消息并将ERROR事件写入一个高优先级的流中,其余事件写入到一个低优先级的流中。另一个例子是把读取到的事件格式由JSON转换为Avro。这种模式可以通过一个简单的生产者和一个消费者处理,如下图:
3.2 利用本地状态处理
大多数流处理应用程序都用于聚合操作,特别是时间窗口的聚合。例如,查找出每天股票交易的最低和最高价格并计算其移动的平均值。此类聚合操作需要维护流的状态,例如需要保存当前时刻最低和最高的价格并和流中的每个新价格比较,然后更新它们,所有这些都可以使用本地状态来实现。我们使用Kafka的分区器来保证具有相同股票代码的事件都会被写入到相同的分区,然后,应用程序的每个消费者将从各自负责的分区读取相应的事件,这意味着应用程序的每个消费者都可以维护相应分区的股票子集的状态。如下图所示:
当应用程序使用本地状态时,流处理应用程序会变得非常复杂。以下几个问题必须要注意:
- 内存使用情况:本地状态使用的内存大小必须小于应用程序实例可用的内存大小。
- 持久化:当应用程序实例停止时,我们需要确保状态不会丢失,并且当实例再次启动或由其它实例替换时可以恢复之前的状态。Kafka Streams在这方面处理地非常好-本地状态会被保存在嵌入的RocksDB的内存中,而且还会持久化到磁盘以便在重启后能够快速恢复。但同时也会把本地状态所有的改变都发送到Kafka的一个topic中。如果流的其中一个节点故障,本地状态不会丢失,因为可以容易地从Kafka的topic中重新读取并创建它。
- 负载再均衡:分区有时会被重新分配给不同的消费者。当发生这种情况时,丢失分区的消费者必须保存最后的状态,而重新分配到的消费者必须知道恢复正确的状态。
3.3 多阶段处理/重新分区
如果你需要根据所有事件来计算结果,例如统计每天交易前十的股票,显然使用本地状态不足以实现,因为所有前十的股票可能位于不同的分区中。我们需要两个阶段来实现:首先分别对每个股票代码计算每日收益/损失,我们可以在每个实例中使用本地状态来实现。然后把结果都写到一个只有单分区的新topic里,使用1个消费者来计算前十的股票,如下图所示。但有时候也需要更多的阶段来处理结果。
3.4 连接外部系统处理:流-表连接(Stream-Table Join)
有时候流处理需要与外部系统整合,例如根据保存在数据库的一组规则来验证事务的合法性,或者使用用户画像来丰富用户的点击事件,如下图所示:
但这种模式的问题是与外部系统额外的交互增加了明显的延时,通常在5-15毫秒之间。在许多情况下,这是不可行的。流处理系统通常每秒可处理100K-500K个事件,但数据库一般每秒只能处理10K个事件。为了提高性能,我们需要在流处理应用中缓存数据库的信息。但是管理这些缓存是极具挑战性的,例如怎样防止缓存中的信息变得陈旧?我们可以使用事件流来捕捉数据库表所有的更改,然后实时更新缓存。捕捉数据库更改输出到一个事件流称为CDC(change data capture),Kafka Connect工具有多个connectors可以实现CDC把数据库表转换为事件流。这允许你保存多一份数据库表的副本,并且只要有更改事件就会收到通知,以便相应地更新副本的数据,如下图所示:
我们称这为流-表连接是因为其中的一个流表示本地缓存表的更改。
3.5 流连接(Streaming Join)
有时候你想连接两个事件流而不是一个流和一个表。当使用一个流来表示一个表,你可以忽略流中的大多数历史记录,因为你只关心当前的记录。但是当连接两个流时,你连接的可以是所有记录,例如尝试将一个流中的事件与另一个流中具有相同key并发生在同一时间窗口的事件进行匹配。这就是为什么流连接也被称为窗口连接(windowed-join)的原因。
例如,假设有一个保存用户查询行为的流和一个对查询结果点击行为的流。我们想把查询和点击结果相匹配,以便知道哪个查询结果是最多人点击的。显然,我们可以根据查询条件来匹配查询结果,但只限定在特定的时间窗口内。我们假设查询结果的点击是在用户输入查询条件后几秒,所以我们在每个流上选取一个几秒钟的移动窗口,并匹配每个窗口的结果,如下图所示:
查询和点击的流都是基于相同keys来分区的,这些keys同时也是连接keys。这样,来自user_id:42的所有点击事件都会在点击topic的分区5中,而user_id:42的所有查询事件都会在查询topic的分区5中。然后Kafka Streams确保这两个topics的分区5都会分配给同一个任务,因此,该任务可以读取user_id:42的所有相关事件。它是使用其嵌入的RocksDB缓存来实现这两个topics的窗口连接。
3.6 乱序事件(Out-of-Sequence Events)
处理在错误时间到达流的事件不仅是流处理的挑战,而且也是传统ETL系统的挑战。在IoT(物联网)场景中,无序事件经常发生并且预期会发生。例如,一个移动设备丢失WiFi信号几个小时,然后在重新连接时发送前几个小时的事件。流应用程序需要能够处理这些场景,这通常意味着应用程序必须执行以下操作:
- 识别乱序事件,这需要应用程序检查事件的时间并与当前时间比较。
- 定义一个时间段,在此期间将尝试排解乱序的事件,例如一个延时三小时的事件可以重新被处理,但超过此时间的将会被丢弃。
- 具有排解乱序事件的功能,这是流应用和批处理任务的主要区别。如果有一个批处理任务,并且在任务完成后还接收到一些事件,我们通常可以重新执行之前的任务。但一个流处理却不能这样做,因为它是一个持续的处理,在任何时刻都需要处理新旧的事件。
- 能够更新结果,例如把流处理的结果写入数据库。
包括Google的Dataflow和Kafka Streams在内的多个流处理框架内置了对事件时间概念的支持,能够处理事件时间比当前处理时间旧或新的事件。这通常通过在本地状态下维护多个可用于更新的聚合窗口来完成,而且开发人员能够配置保持这些窗口的时间长短。当然,保持聚合窗口可用于更新的时间越长,维护本地状态所需的内存就越多。
Kafka Streams的API始终把聚合结果写入到一个结果topic中。那些通常是精炼的topics,这意味着只保存每个key对应的最新值。如果由于一个乱序事件需要更新聚合结果,Kafka Streams将简单地为相应的聚合窗口写入一个新的结果(覆盖之前的)。
3.7 重新处理
这个模式一般有两种用例:
- 我们有一个优化版本的流处理应用程序。我们希望在与旧版本相同的事件流上运行新版本的应用程序,生成新的事件流结果,但不会替换旧版本的结果,而是比较它们,客户端会使用新的结果。
- 现有的流处理应用程序有bugs。我们修复了bugs之后希望重新处理事件流并重新计算结果。
第一个用例很容易实现,因为Kafka会长时间把整个事件流存储在一个可伸缩存储中。这意味着拥有分别生成两个结果流的两个版本的流处理应用程序只需要以下步骤:
- 将新版本的应用程序作为一个新的消费者组来使用。
- 配置新版本的应用程序从topic的第一个offset开始处理所有事件。
- 当新版本的应用程序处理的进度赶上时,让它继续处理并将客户端切换到新的结果流。
第二个用例的实现具有挑战性,它需要“重置”现在应用程序从头开始重新处理,重置本地状态(因此我们不会混淆两个版本的结果),以及很可能清除之前的输出流。虽然Kafka Streams有一个重置流处理应用程序状态的工具,但建议只要有足够的容量来运行两个版本的应用程序并生成两个结果流,就尝试使用第一种方法。因为第一种方法更安全,它允许在多个版本之间来回切换和比较结果,并且在清理过程中不会有丢失数据或引起错误的风险。
4. Kafka Streams例子
Kafka有两类流APIs,low-level Processor API和high-level Streams DSL。后续的例子将会使用后者,DSL允许你在流处理应用程序中定义一系列的转换。转换可以是简单的filter或者是复杂的流连接。low-level的API允许你创建自己的转换,但这通常比较少见。使用DSL API的应用程序始终以StreamBuilder来创建处理的拓扑开始-应用于流中事件转换的有向无环图(DAG,Directed Acyclic Graph)。然后从拓扑中创建KafkaStreams的执行对象,启动KafkaStreams对象将启动多个线程,每个线程会把处理的拓扑应用到流中的事件。当关闭KafkaStreams对象时,处理会结束。
4.1 Word Count
创建流处理应用程序首先要做的是配置Kafka Streams,它有很多配置属性,但这里不做详细介绍。另外,你也可以通过向Properties对象添加生产者或消费者配置来配置嵌套在Kafka Streams中的生产者和消费者:
public class WordCountExample {public static void main(String[] args) {Properties props = new Properties();// 必须要有一个唯一的IDprops.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 使用默认的序列化和反序列化类props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
创建处理的拓扑:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("TextLinesTopic");
KStream<String, Long> counts = source// 转换为多个word.flatMapValues(textLines -> Arrays.asList(textLines.toLowerCase().split("\\W+")))// 转换为word-word键值对.map((key, word) -> new KeyValue<>(word, word))// 过滤the.filter((key, word) -> !word.equals("the"))// 按相同word分组.groupByKey()// 计算每个word的数量.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")).toStream();
// 把计算结果写到另外一个topic中,这里指定序列化和反序列化类
counts.to("WordsWithCountsTopic", Produced.<String, Long>with(Serdes.String(), Serdes.Long()));
启动流应用程序:
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
4.2 股市统计
本例会读取一系列股票市场的交易事件,包括股票代码,要价和要价数量。为了简单起见,本例忽略出价,也不会在数据中包含timestamp,而会依赖于Kafka生产者生成的事件时间。然后,我们将创建以下一些统计信息的输出流,所有统计数据将每秒更新一次。
- 每五秒钟的最佳(最低)要价
- 每五秒钟的交易数量
- 每五秒钟的平均要价
配置Kafka Streams和上例类似:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stockstat-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, TradeSerde.class.getName());
不同的是这里使用了自定义的反序列化类TradeSerde,它使用Google的Gson库来生成JSON的序列化与反序列化器,封装在WrapperSerde类里:
static public final class TradeSerde extends WrapperSerde<Trade> {public TradeSerde() {super(new JsonSerializer<Trade>(), new JsonDeserializer<Trade>(Trade.class));}
}
创建TradeStats类用于统计计算:
public class TradeStats {String type;String ticker;// tracking count and sum so we can later calculate avg priceint countTrades;double sumPrice;double minPrice;double avgPrice;public TradeStats add(Trade trade) {if (trade.type == null || trade.ticker == null)throw new IllegalArgumentException("Invalid trade to aggregate: " + trade.toString());if (this.type == null)this.type = trade.type;if (this.ticker == null)this.ticker = trade.ticker;if (!this.type.equals(trade.type) || !this.ticker.equals(trade.ticker))throw new IllegalArgumentException("Aggregating stats for trade type " + this.type + " and ticker "+ this.ticker + " but recieved trade of type " + trade.type + " and ticker " + trade.ticker);if (countTrades == 0)this.minPrice = trade.price;this.countTrades = this.countTrades + 1;this.sumPrice = this.sumPrice + trade.price;this.minPrice = this.minPrice < trade.price ? this.minPrice : trade.price;return this;}public TradeStats computeAvgPrice() {this.avgPrice = this.sumPrice / this.countTrades;return this;}}
然后就可以创建处理的拓扑:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Trade> source = builder.stream("stocks");
KStream<Windowed<String>, TradeStats> stats = source// 按股票代码分组.groupByKey()// 创建5秒的时间窗口,每1秒移动一次.windowedBy(TimeWindows.of(5000).advanceBy(1000))// 对时间窗口内的Trade对象执行聚合,这里是简单的add到TradeStats对象里进行统计数量、最低价.<TradeStats>aggregate(() -> new TradeStats(), (k, v, tradestats) -> tradestats.add(v),Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates").withValueSerde(new TradeStatsSerde())).toStream()// 计算平均价格并返回包含统计信息的TradeStats对象.mapValues((trade) -> trade.computeAvgPrice());
// 写到另外一个topic中
stats.to("stockstats-output", Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)));
最后启动流应用程序:
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
完整代码可以参见:https://github.com/gwenshap/kafka-streams-stockstats
4.3 丰富网站的点击事件
本例会展示如何通过流连接来丰富网站的点击事件。我们将生成一个模拟用户点击的流、一个用于更新用户档案数据库的流和网页搜索的流。然后,我们将连接这三个流来获得每个用户行为的360度视图。例如,用户搜索了什么?点击了什么搜索结果?是否在用户档案中更新了“感兴趣的事物”?通过连接这些类型可以为分析提供丰富的数据,产品的推荐通常是基于这些信息。例如,用户搜索自行车,点击品牌“Trek”的搜索结果链接,并对旅行感兴趣,因此我们可以将Trek、头盔和适合自行车旅行相关的广告推送给该用户。
以下是创建处理的拓扑:
StreamsBuilder builder = new StreamsBuilder();
// 分别创建二个流,一个是点击流,另外一个是搜索流
KStream<Integer, PageView> views = builder.stream("clicks.pages.views",Consumed.with(Serdes.Integer(), new PageViewSerde()));
KStream<Integer, Search> searches = builder.stream("clicks.search",Consumed.with(Serdes.Integer(), new SearchSerde()));
// 创建缓存的用户档案表(这个是通过一个流来更新)
KTable<Integer, UserProfile> profiles = builder.table("clicks.user.profile",Materialized.<Integer, UserProfile, KeyValueStore<Bytes, byte[]>>as("profile-store"));// 先连接点击流和用户档案
KStream<Integer, UserActivity> viewsWithProfile = views.leftJoin(profiles, (view, profile) -> {if (profile != null)return new UserActivity(profile.getUserID(), profile.getUserName(), profile.getZipcode(),profile.getInterests(), "", view.getPage());elsereturn new UserActivity(-1, "", "", null, "", view.getPage());
});// 用上面连接的结果再连接最后一个流
KStream<Integer, UserActivity> userActivityKStream = viewsWithProfile.leftJoin(searches,(userActivity, search) -> {if (search != null)return userActivity.updateSearch(search.getSearchTerms());elsereturn userActivity.updateSearch("");// 仅仅关联点击搜索后1秒的相关点击事件}, JoinWindows.of(1000), Joined.with(Serdes.Integer(), new UserActivitySerde(), new SearchSerde()));
完整代码可以参见:https://github.com/gwenshap/kafka-clickstream-enrich
5. Kafka Streams架构概述
上面的例子讲述了如何使用Kafka Streams API实现一些众所周知的流处理设计模式。但是为了更好地理解Kafka Streams实际是怎样工作和扩展,我们需要深入了解API背后的一些设计原则。
5.1 创建拓扑
每个流应用程序都实现并执行至少一个拓扑。拓扑(在其它流处理框架中也称为有向无环图DAG,Directed Acyclic Graph)是一系列的操作和转换,每个事件从输入流动到输出。下图是上面Word Count例子的拓扑图:
即使是简单的处理流程也有拓扑图,它是由多个处理器组成,也就是拓扑图中的椭圆形节点。大多数处理器实现数据过滤、映射、聚合等操作。还有源处理器,它从一个topic读取数据并将其传递,和接收处理器,它从之前的处理器读取数据并发到另外一个topic。拓扑图始终从一个或多个源处理器开始,并以一个或多个接收处理器结束。
5.2 扩展拓扑
Kafka Streams通过在一个应用的实例中允许运行多个线程和在应用的分布实例之间支持负载均衡来进行扩展。Streams引擎通过切分任务来并行化一个拓扑的执行,任务的数量由Streams引擎决定,并取决于应用程序处理的topics的分区数。每个任务负责一部分分区,对每个读取的事件,任务将在最终将结果写入接收处理器之前按顺序执行适用于该分区的所有处理步骤。那些任务是Kafka Streams并行处理的基本单元,因为每个任务都可以独立于其它任务执行。如下图所示:
应用程序的开发人员可以选择每个应用程序实例将执行的线程数。如果有多个线程可用,每个线程会执行应用程序创建任务的子集。如果应用程序的多个实例运行在多个服务器,则每个服务器的每个线程会执行不同的任务。如果想提高处理性能,就启动更多的线程;如果服务器资源不足,就增加服务器,这就是流应用程序扩展的方式。Kafka会自动协调工作,它将为每个任务分配相应的分区,并且每个任务都是独立处理事件和维护自己的本地状态。如下图所示:
有时候处理步骤可能需要来自多个分区的结果,这会在任务之间创建依赖关系。例如,如果我们连接两个流,就像上述点击事件的例子,我们需要每个流中的数据才能得出结果。Kafka Streams通过分配所有需要的分区给同一个任务,以便该任务可以读取所有相关分区的事件并执行连接操作。这就是为什么Kafka Streams目前要求参与连接操作的所有topics具有相同数量的分区并根据连接key进行分区。
任务之间依赖关系的另一个例子是当我们的应用程序需要重新分区时。例如在上述点击事件的例子中,所有事件都是使用用户ID作为key值,但如果我们想按每个页面或者按zip code生成统计信息,我们需要按页面或者zip code重新分区然后执行聚合。如果任务1处理分区1的数据,然后到达需要重新分区的处理器(groupBy操作),则需要重新移动数据。与其它流处理器框架不同的是,Kafka Streams通过使用新的keys和分区把事件写入一个新的topic。然后另一组的任务从这个新的topic读取事件并继续处理,这个重新分区的步骤把原来的拓扑结构分为两个有各自任务的子拓扑。第二组的任务依赖于第一组,因为它是处理来自第一个子拓扑的结果。但是,第一组和第二组的任务仍然可以独立和并行地运行,因为第一组的任务以它自己的速率把数据写入topic,而第二组的任务只是从这个topic读取事件并处理,它们之间没有通讯也没有共享资源,不需要运行在相同的线程或服务器上。这是Kafka其中的一个优点,也就是减少管道不同部分的依赖关系。如下图所示:
5.3 处理故障
Kafka是高可用的,因此我们保存在Kafka的数据也是高可用的。如果应用程序故障并需要重启,可以从Kafka的流中查找其故障前提交的最后offset并继续处理。但请注意,如果是本地状态存储丢失,例如需要替换服务器,流应用程序总是可以从存储在Kafka的更改日志重新创建本地状态。
Kafka Streams还利用其消费者协调器来为任务提供高可用性。如果一个任务失败但流应用程序的线程或其它实例还处于活动状态,其中一个可用的线程会重新启动该任务。这与消费者组通过把分区分配给剩余消费者之一来处理组内其中一个消费者的故障类似。
END O(∩_∩)O