KafkaSpout 浅析

article/2025/9/20 21:21:07

最近在使用storm做一个实时计算的项目,Spout需要从 KAFKA 集群中读取数据,为了提高开发效率,直接使用了Storm提供的KAFKA插件。今天抽空看了一下KafkaSpout的源码,记录下心得体会。

       KafkaSpout基于kafka.javaapi.consumer.SimpleConsumer实现了consumer客户端的功能,包括 partition的分配,消费状态的维护(offset)。同时KafkaSpout使用了storm的可靠API,并实现了spout的ack 和 fail机制。KafkaSpout的基本处理流程如下:

1. 建立zookeeper客户端,在zookeeper zk_root + "/topics/" + _topic + "/partitions" 路径下获取到partition列表 
2. 针对每个partition 到路径Zk_root + "/topics/" + _topic + "/partitions"+"/" + partition_id + "/state"下面获取到leader partition 所在的broker id
3. 到/broker/ids/broker id 路径下获取broker的host 和 port 信息,并保存到Map中Partition_id –-> learder broker
4. 获取spout的任务个数和当前任务的index,然后再根据partition的个数来分配当前spout 所消费的partition列表
5. 针对所消费的每个broker建立一个SimpleConsumer对象用来从kafka上获取数据
6. 提交当前partition的消费信息到zookeeper上面保存

     

下面对几个关键点进行下分析:

一、partition 的分配策略

1. 在KafkaSpout中获取spout的task的个数,也就是consumer的个数,代码如下:

1
int  totalTasks = context.getComponentTasks(context.getThisComponentId()).size();

2. 在KafkaSpout中获取当前spout的 task index,注意,task index和task id是不同的,task id是当前spout在整个topology中的id,而task index是当前spout在组件中的id,取值范围为[0, spout_task_number-1],代码如下:

1
_coordinator =  new  ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);

3. 获取partiton与leader partition所在broker的映射关系,代码的调用顺序如下:

ZkCoordinator:

1
GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();

DynamicBrokersReader:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   /**
  * Get all partitions with their current leaders
  */
public  GlobalPartitionInformation getBrokerInfo()  throws  SocketTimeoutException {
   GlobalPartitionInformation globalPartitionInformation =  new  GlobalPartitionInformation();
     try  {
         int  numPartitionsForTopic = getNumPartitions();
         String brokerInfoPath = brokerPath();
         for  ( int  partition =  0 ; partition < numPartitionsForTopic; partition++) {
             int  leader = getLeaderFor(partition);
             String path = brokerInfoPath +  "/"  + leader;
             try  {
                 byte [] brokerData = _curator.getData().forPath(path);
                 Broker hp = getBrokerHost(brokerData);
                 globalPartitionInformation.addPartition(partition, hp);
             catch  (org.apache.zookeeper.KeeperException.NoNodeException e) {
                 LOG.error( "Node {} does not exist " , path);
             }
         }
     catch  (SocketTimeoutException e) {
             throw  e;
     catch  (Exception e) {
         throw  new  RuntimeException(e);
     }
     LOG.info( "Read partition info from zookeeper: "  + globalPartitionInformation);
     return  globalPartitionInformation;
}

4. 获取当前spout消费的partition

KafkaUtils:

复制代码
    public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");//获取所有的排序后的partition列表List<Partition> partitions = partitionInformation.getOrderedPartitions();int numPartitions = partitions.size();if (numPartitions < totalTasks) {LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");}List<Partition> taskPartitions = new ArrayList<Partition>();//此处是核心分配算法,举个例子来说明分配策略//假设spout的并发度是3,当前spout的task index 是 1,总的partition的个数为5,那么当前spout消费的partition id为1,4for (int i = taskIndex; i < numPartitions; i += totalTasks) {Partition taskPartition = partitions.get(i);taskPartitions.add(taskPartition);}logPartitionMapping(totalTasks, taskIndex, taskPartitions);return taskPartitions;}
复制代码

 

二、partition的更新策略

如果出现broker宕机,spout挂掉的情况,那么spout是要重新分配parition的,KafkaSpout并没有监听zookeeper上broker、partition和其他spout的状态,所以当有异常发生的时候KafkaSpout并不知道的,它采用了两种方法来更新partition的分配。

1. 定时更新

根据ZkHosts中的refreshFreqSecs字段来定时更新partition列表,我们可以通过修改配置来更改定时刷新的间隔。每一次调用kafkaspout的nextTuple方法时,都会首先调用ZkCoordinator的getMyManagedPartitions方法来获取当前spout消费的partition列表

复制代码
  public void nextTuple() {List<PartitionManager> managers = _coordinator.getMyManagedPartitions();//getMyManagedPartitions方法中会判断是否已经到了该刷新的时间,如果到了就重新分配partitionpublic List<PartitionManager> getMyManagedPartitions() {if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {refresh();_lastRefreshTime = System.currentTimeMillis();}return _cachedList;
}
复制代码

2.异常更新

当调用kafkaspout的nextTuple方法出现异常时,强制更新当前spout的partition消费列表

复制代码
    public void nextTuple() {List<PartitionManager> managers = _coordinator.getMyManagedPartitions();for (int i = 0; i < managers.size(); i++) {try {EmitState state = managers.get(_currPartitionIndex).next(_collector);} catch (FailedFetchException e) {_coordinator.refresh();}}
复制代码

 

三、消费状态的维护

1.首先要分析一下当spout启动的时候是怎么获取初始offset的。在每个spout获取到消费的partition列表时,会针对每个partition来创建PartitionManager对象,下面看一下PartitionManager的初始化过程:

复制代码
 public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {_partition = id;_connections = connections;_spoutConfig = spoutConfig;_topologyInstanceId = topologyInstanceId;//到连接池里注册partition和partition leader所在的broker host,如果连接池里有该broker的连接,则直接返回该连接、//如果连接池里没有,则建立broker的连接,并返回连接_consumer = connections.register(id.host, id.partition);_state = state;_stormConf = stormConf;numberAcked = numberFailed = 0;String jsonTopologyId = null;Long jsonOffset = null;//获取zookeeper上offset的提交路径String path = committedPath();try {//从提交路径上读取信息,提取记录的该partition的消费offset//如果zookeeper上没有该路径则表示当前topic没有被spout消费过Map<Object, Object> json = _state.readJSON(path);LOG.info("Read partition information from: " + path +  "  --> " + json );if (json != null) {jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");jsonOffset = (Long) json.get("offset");}} catch (Throwable e) {LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);}//从broker上获取当前partition的offset,默认为获取最新的offset,如果用户配置forceFromStart(KafkaConfig),则获取该partition最早的offset,//也就是consume from beginningLong currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);//情况1: 如果从zookeeper上没有获取topology和消费信息,则直接用从broker上获取到的offsetif (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?_committedTo = currentOffset;LOG.info("No partition information found, using configuration to determine offset");//情况2: 获取到的topology id 不一致 或者用户要求从新获取数据的时候,则从kafka上获取offset//可以和情况1 合并,在KafkaUtils.getOffset已经判断过forceFromStart,此处无需再次判断} else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");}//情况3: 使用zookeeper上保留的offset进行消费 else {_committedTo = jsonOffset;LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );}//如果上次消费的offset已经过了保质期,则直接消费新数据if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {LOG.info("Last commit offset from zookeeper: " + _committedTo);_committedTo = currentOffset;LOG.info("Commit offset " + _committedTo + " is more than " +spoutConfig.maxOffsetBehind + " behind, resetting to startOffsetTime=" + spoutConfig.startOffsetTime);}LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);_emittedToOffset = _committedTo;}
复制代码

2. 然后看一下partition消费offset是怎么保存和维护的

PartitionManager 中的 _emittedToOffset用来保存当前消费的offset,在每一次获取到消息的时候都会更新这个值

复制代码
 private void fill() {if (!had_failed || failed.contains(cur_offset)) {numMessages += 1;_pending.add(cur_offset);_waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));//更新_emittedToOffset_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);if (had_failed) {failed.remove(cur_offset);}}}_fetchAPIMessageCount.incrBy(numMessages);}}
复制代码

3.提交offset到zookeeper

offset的提交是周期性的,提交的周期可在SpoutConfig中的stateUpdateIntervalMs中来配置。每次调用kafkaspout的nextTuple方法后都会判断是否需要提交offset

    public void nextTuple() {if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {commit();}}

 如果需要提交则调用kafkaspout的commit方法,使用轮巡的方式提交每个partition的消费状况

  private void commit() {_lastUpdateMs = System.currentTimeMillis();for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {manager.commit();}
}

 具体的提交是委托PartitionManager来完成的

复制代码
 public void commit() {//获取当前要提交的offset,如果有pending的offset的话,就说明还有一些消息没有完成处理,则提交pending消息的最小的offset//如果没有pending的消息,则提交当前消费的offsetlong lastCompletedOffset = lastCompletedOffset();//用来判断是否有新的offset需要提交if (_committedTo != lastCompletedOffset) {LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder().put("topology", ImmutableMap.of("id", _topologyInstanceId,"name", _stormConf.get(Config.TOPOLOGY_NAME))).put("offset", lastCompletedOffset).put("partition", _partition.partition).put("broker", ImmutableMap.of("host", _partition.host.host,"port", _partition.host.port)).put("topic", _spoutConfig.topic).build();_state.writeJSON(committedPath(), data);_committedTo = lastCompletedOffset;LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);} else {LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);}
}
复制代码

 

四、kafkaspout ack 和 fail的处理

1. 首先还是说说kafkaspout消息的发送

当调用kafkaspout的nextTuple方法时,kafkaspout委托PartitionManager next方法来发送数据

复制代码
public void nextTuple() {List<PartitionManager> managers = _coordinator.getMyManagedPartitions();for (int i = 0; i < managers.size(); i++) {try {// in case the number of managers decreased_currPartitionIndex = _currPartitionIndex % managers.size();EmitState state = managers.get(_currPartitionIndex).next(_collector);if (state != EmitState.EMITTED_MORE_LEFT) {_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();}
}public EmitState next(SpoutOutputCollector collector) {
//判断等待队列是否为空,如果为空则调用fill方法从broker上取数据进行填充if (_waitingToEmit.isEmpty()) {fill();}while (true) {MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();if (toEmit == null) {return EmitState.NO_EMITTED;}//对kafka的消息进行解码Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);if (tups != null) {for (List<Object> tup : tups) {//如果tuple不为null,则发送该tuple,messageID为new KafkaMessageId(_partition, toEmit.offset)//这样在ack 或者 fail的时候才能根据_partition找到相应的PartitionManagercollector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));}break;} else {ack(toEmit.offset);}}if (!_waitingToEmit.isEmpty()) {return EmitState.EMITTED_MORE_LEFT;} else {return EmitState.EMITTED_END;}
}
复制代码

2. 在PartitionManager会维护一个pending 列表,用来保存已经发送但是没有被成功处理的消息,一个failed列表,用来保存已经失败的消息
3. 当一个消息成功处理时会调用spout的ack方法,kafkaspout会根据message id中包含的partition id 来委托相应的PartitionManager来处理

复制代码
    public void ack(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;PartitionManager m = _coordinator.getManager(id.partition);if (m != null) {m.ack(id.offset);}}//PartitionManager 接收到ack消息后,会判断pending的最早的一条消息是否已经过质保,如果过质保,则清除队列中所有过保的消息//如果没有过保的消息,则在pending队列中移除当前消息public void ack(Long offset) {if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {// Too many things pending!_pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear();}_pending.remove(offset);numberAcked++;}
复制代码

4. 当一条消息处理失败时,会调用spout的fail方法,同样,kafkaspout会根据message id中包含的partition id 来委托相应的PartitionManager来处理

复制代码
  public void fail(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;PartitionManager m = _coordinator.getManager(id.partition);if (m != null) {m.fail(id.offset);}}//PartitionManager接收到fail消息,会判断失败的消息是否已经过保,如果过保则忽略掉public void fail(Long offset) {if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {LOG.info("Skipping failed tuple at offset=" + offset +" because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind +" behind _emittedToOffset=" + _emittedToOffset);} //如果在保质期内,则加入failed列表,如果没有成功响应的消息,并且失败的消息个数已经超过保质期个数,则认为没有消息成功,系统有问题,丢异常else {LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);failed.add(offset);numberFailed++;if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {throw new RuntimeException("Too many tuple failures");}}}//对于failed的消息会进行重发private void fill() {//如果有失败的消息,则获取第一个的offsetfinal boolean had_failed = !failed.isEmpty();if (had_failed) {offset = failed.first();} else {offset = _emittedToOffset;}ByteBufferMessageSet msgs = null;try {msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);} catch (UpdateOffsetException e) {_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);LOG.warn("Using new offset: {}", _emittedToOffset);// fetch failed, so don't update the metricsreturn;}if (msgs != null) {int numMessages = 0;for (MessageAndOffset msg : msgs) {final Long cur_offset = msg.offset();if (cur_offset < offset) {// Skip any old offsets.continue;}//如果该消息在failed列表中,则重新发送,并将其从failed列表中删除if (!had_failed || failed.contains(cur_offset)) {numMessages += 1;_pending.add(cur_offset);_waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);if (had_failed) {failed.remove(cur_offset);}}}_fetchAPIMessageCount.incrBy(numMessages);}}
复制代码

http://chatgpt.dhexx.cn/article/asVL4iYm.shtml

相关文章

storm trident的多数据流,多spout

storm trident的多数据流&#xff0c;多spout (STORM)[storm, kafka] storm可以使用接收多个spout作为数据源&#xff0c;core storm与trident均可以&#xff0c;本文主要介绍trident的用法。 在trident中设置多个spout的基本思路是先建立多个spout&#xff0c;然后分别创建…

storm学习笔记(二)——Storm组件详解之Tuple、Spout

目录 Tuple元组 结构 生命周期 Spout数据源 结构 开发spout组件 Storm的核心概念包括&#xff1a;Stream、Spout、Bolt、Tuple、Task、Worker、Stream Grouping、Topology Stream是被处理的数据&#xff0c;Spout是数据源&#xff0c;Bolt是处理数据的容器&#xff0c;T…

java 纳秒 格式化_Java日期时间API系列35-----Jdk8中java.time包中的新的日期时间API类应用,微秒和纳秒等更精确的时间格式化和解析。...

通过Java日期时间API系列1-----Jdk7及以前的日期时间类中得知,Java8以前除了java.sql.Timestamp扩充纳秒,其他类最大只精确到毫秒;Java8 time包所有相关类都支持纳秒。下面是示意图: 图中的nano 是 一秒钟包含的纳秒值,0到999999999。毫秒,微秒和纳秒都是通过这个值计算得…

c语言计时程序 纳秒,前端Tips#4 - 用 process.hrtime 获取纳秒级的计时精度

视频讲解 文字讲解 如果去测试代码运行的时长&#xff0c;你会选择哪个时间函数&#xff1f; 一般第一时间想到的函数是 Date.now 或 Date.getTime。 1、先讲结论 之所以这么选&#xff0c;是基于 精度 和 时钟同步 两方面考虑的。 2、知识讲解 首先看一下 Date.now 的缺点 返回…

纳秒时代

1978年在英特尔公司的历史中是很不平凡的一年。这一年它满10岁了&#xff0c;员工数首次超过1万人。这一年&#xff0c;它卖掉了竞争激烈的电子表&#xff08;digital watch&#xff09;业务。最重要的是&#xff0c;在这一年6月&#xff0c;它推出了具有跨时代意义的8086芯片。…

linux内核纳秒精度时间,Linux时钟精度:毫秒?微妙?纳秒?

最近被内核时钟精度弄的很是郁闷。具体情况如下&#xff1a; 扫盲&#xff1a;1秒1000毫秒1000000微妙1000000000纳秒 首先&#xff1a;linux有一个很重要的概念——节拍&#xff0c;它的单位是(次/秒)。2.6内核这个值是1000&#xff0c;系统中用一个HZ的宏表征这个值。同时有全…

java 日期 纳秒_java8 ZonedDateTime 日期精度到纳秒

1秒 10E3毫秒 10E6 微妙 10E9 纳秒 使用java8 Instant 内部实际System.currentTimeMillis() 在模型上 可输出纳秒数据 重点是模型 时间戳转日期 public static ZonedDateTime ofInstant(Instant instant, ZoneId zone) { Objects.requireNonNull(instant, "instant&qu…

第九章:NAT(网络地址转换协议)

文章目录 一、NAT1、NAT介绍①公有网络地址②私有网络地址 2、NAT工作原理3、NAT功能 二、NAT的实现方式1、静态转换&#xff08;static Translation&#xff09;实验对比 2、动态转换2.1 ACL&#xff08;访问控制列表&#xff09;2.2 配置动态NAT实验效果 3、端口多路复用3.1 …

【NAT网络地址转换(私网公网地址、静态NAT、动态NAT、NAPT、Easy IP、NAT Server)】-20211215、20211216

目录 一、NAT产生背景 1.产生背景 2.私网地址、公网地址​ 私网IP地址&#xff0c;既可以一定上缓解ip的不足&#xff0c;在私网里&#xff0c;ip地址可以随意使用。 公网地址&#xff0c;在需要访问公网时&#xff0c;运用网络地址转换NAT技术&#xff0c;可以实现。 二…

什么是私网,公网?

我们常说的内网和外网&#xff0c;通常是相对于防火墙而言的&#xff0c;在防火墙内部叫做内网&#xff0c;反之就是外网。 在一定程度上外网等同于公网&#xff0c;内网等同于私网。公网地址 公网地址是指在因特网上直接可达的地址&#xff0c;如果你有一个公网地址&#xff0…

NAT——公私网地址转换

NAT—网络地址转换 NAT NAT又称为网络地址转换&#xff0c;用于实现私有网络和公有网络之间的转换 私有和公有网络地址 公有网络地址是指互联网上全球唯一的IP地址 私有网络地址是指内部网络或者主机的IP地址 IANA&#xff08;互联网数字分配机制&#xff09;规定将下列的IP地…

公网地址和私网地址问题

服务器映射用于将内网服务器的私网地址映射为公网地址&#xff0c;供Internet用户访问。选择“静态映射”类型可以将每一台服务器映射成一个独立的公网IP地址。“服务器负载均衡”类型可以将多台服务器映射成同一个公网地址&#xff0c;Internet用户在访问这个公网地址时&#…

私网地址与Internet地址

一、A、B、C三类地址 可用地址范围备注A类1.0.0.1-126.255.255.254B类128.1.0.1-191.255.255.254C类192.0.1.1-223.255.255.254D类224.0.0.1-239.255.255.254D类为多播地址 说明&#xff1a; 1. 每一个地址都是用网络位主机位组成的。 2. 全0的和全1的网络位和主机位都要去掉…

计算机网络 网络层 私网地址和公网地址及子网划分

公网地址 公有地址分配和管理由Inter NIC&#xff08;Internet Network Information Center 因特网信息中心&#xff09;负责。各级ISP使用的公网地址都需要向Inter NIC提出申请&#xff0c;有Inter NIC统一发放&#xff0c;这样就能确保地址块不冲突。 私网地址&#xff08;不…

为什么百度查到的ip和ipconfig查到的不一样;详解公网Ip和私网ip;详解网络分类ABC;

IP可以分为Public IP 和 Private IP,出现这种规划的原因在于IPv4所能表示的IP太少而电脑太多以至于不够用&#xff0c;然而只有Public IP才能直接连接上网络&#xff0c;所以对于那些公司&#xff0c;学校&#xff0c;政府机构等场所&#xff0c;就可以集中使用私有的IP进行管理…

挑战华为社招:掌握数据库其实很容易

前言 我的一个朋友,开发四年了,没跳过槽,四年时间也不过是从最开始的10K涨到了15K,经常和我吐槽工资低。去年8月份左右开始了他“骑驴找马”的行动,从各种地方找学习资料、刷面试题。值得庆幸的是,他出去找工作时疫情还不严重,异常顺利的面进了蚂蚁,薪资更是翻了几倍。…

javaspringboot面试,挑战华为社招

前言 redis简单来说 就是一个数据库&#xff0c;不过与传统数据库不同的是 redis 的数据是存在内存中的&#xff0c;所以存写速度非常快&#xff0c;因此 redis 被广泛应用于缓存方向。另外&#xff0c;redis 也经常用来做分布式锁。redis 提供了多种数据类型来支持不同的业务…

挑战华为社招:不止面试题,笔记源码统统都有,最强技术实现

前言 说起来开始进行面试是11月倒数第二周&#xff0c;上午9点&#xff0c;我还在去公司的公交上&#xff0c;突然收到蚂蚁的面试电话&#xff0c;其实算不上真正的面试。面试官只是和我聊了下他们在做的事情&#xff08;主要是做双十一这里大促的稳定性保障&#xff0c;偏中间…

挑战华为社招:字节跳动上千道精选面试题还不刷起来

前言 成为优秀的架构师是大部分初中级工程师的阶段性目标。优秀的架构师往往具备七种核心能力&#xff1a;编程能力、调试能力、编译部署能力、性能优化能力、业务架构能力、在线运维能力、项目管理能力和规划能力。 这几种能力之间的关系大概如下图。编程能力、调试能力和编…