Kafka应用场景

article/2025/10/7 21:41:40

在学习一门新技术之前,我们需要先去了解一下这门技术的具体应用场景,使用它能够做什么,能够达到什么目的,学习kafka的初衷是用作消息队列;但是还可以使用Kafka Stream进行一些实时的流计算,多用于大数据处理;也可以做日志收集汇总、网站活动跟踪等任务。

消息队列

kafka可以很好的替代一些传统的消息系统,kafka具有更好的吞吐量,内置的分区使kafka具有更好的容错和伸缩性,这些特性使它可以替代传统的消息系统,成为大型消息处理应用的首选方案。

场景:异步、解耦、削峰填谷

  1. 生成订单:给不同的产品业务线分配同一个topic的不同partition,用户下单后根据订单类型发送到对应的partition
  2. 消息通知:用户登录后计算积分
  • 消息生产者

    public static void main(String[] args) throws Exception {Properties prop = new Properties();prop.put("bootstrap.servers", "127.0.0.1:9092");prop.put("acks", "all");prop.put("retries", "0");// 缓冲区大小prop.put("batch.size", "10");prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);for (int i = 0; i < 101; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("my_topics", "value_" + i);// 阻塞到消息发送完成producer.send(record).get();}// 刷新缓冲区,发送到分区,并清空缓冲区// producer.flush();// 关闭生产者,会阻塞到缓冲区内的数据发送完producer.close();// producer.close(Duration.ofMillis(1000));
    }
    

    生产者发送消息是先将消息放到缓冲区,当缓冲区存满之后会自动flush,或者手动调用flush()方法

  • 消息消费者

    public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "127.0.0.1:9092");properties.put("group.id", "cc_consumer");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 指定topicconsumer.subscribe(Arrays.asList("my_topics"));// 指定topic的partition// TopicPartition partition0 = new TopicPartition("my_topics", 10);// consumer.assign(Arrays.asList(partition0));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.toString());}}} finally {consumer.close(Duration.ofMillis(2000));}
    }
    

流计算

[todo]

日志收集

应用程序的日志可以通过log4j收集日志信息,并将日志直接打到kafka中:客户端—>应用—>kafka

SpringBoot中默认使用的是logback,所以要在引入SpringBoot的jar包时排除掉logback的jar包

日志消息发送有同步和异步两种方式,由KafkaAppender中的syncSend属性决定,默认为true(同步)

> <Kafka name="KAFKA-LOGGER" topic="cc_log_test" syncSend="false">
>
  • pom.xml
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions>
</dependency>
<!-- springboot 1.3.x之前版本是log4j,之后版本都是log4j2 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
  • log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="off"><Properties></Properties><Appenders><Console name="STDOUT" target="SYSTEM_OUT"><PatternLayout pattern="%d %p %c{1.} %t %m%n"/></Console><!--kafka topic--><Kafka name="KAFKA-LOGGER" topic="my_topics"><!--JsonLayout:日志格式为json,方便在ES中处理--><JsonLayout/><!--kafka server的ip:port--><Property name="bootstrap.servers">127.0.0.1:9092</Property><Property name="retries">3</Property><Property name="linger.ms">1000</Property><Property name="buffer.memory">10485760</Property></Kafka><Async name="ASYNC-KAFKA-LOGGER"><AppenderRef ref="KAFKA-LOGGER"/><LinkedTransferQueue/></Async></Appenders><Loggers><!--日志级别大于info都会被记录到Kafka--><Logger name="cc.kevinlu.springbootkafka.controller.MessageController" level="info"additivity="false"><AppenderRef ref="KAFKA-LOGGER"/></Logger><!-- Root表示所有Logger用Root中的Appender打印日志  --><Root level="info"><AppenderRef ref="STDOUT"/></Root></Loggers>
</Configuration>
  • code
@GetMapping("/log")
public String sendLog() {for (int i = 0; i < 10; i++) {log.info("kafka log i = " + i);}return "success";
}
  • consumer视图

image-20200419032218971

网站活动跟踪

  1. 前端Nodejs控制

    Node接入kafka需要使用kafka-node库,下面是网上的例子

    var kafka = require('kafka-node'),Producer = kafka.Producer,client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'});
    /*** 定义生产类* partitionerType 定义* 0:默认模式 只产生数据在第一个分区* 1:随机分配,在分区个数内,随机产生消息到各分区* 2:循环分配,在分区个数内,按顺序循环产生消息到各分区
    */   
    var producerOption = {requireAcks: 1,ackTimeoutMs: 100,partitionerType: 0 //默认为第一个分区
    };
    var producer = new Producer(client,producerOption);
    /*** TOPIC的创建需要在命令行进行创建,以便指定分区个数以及备份个数* PS:kafka-node的创建topic不行,不能创建分区* 产生消息,如果不指定partition* 则根据 partitionerType 的值来指定发送数据到哪个分区* 我们创建的topic-test-one只有一个分区,所以只能产生数据到第1个分区(下标0),否则不会生产数据*/
    function getPayloads(){return [{topic:"topic-test-one",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}];
    }producer.on("ready",function(){setInterval(function(){producer.send(getPayloads(),function(err,data){if(!err){console.log("send message complete!data:"+JSON.stringify(data),new Date());}});},1000);
    });producer.on('error', function (err) {console.log("send message error!\r\n"+err);})
    
  2. 后端日志控制

    后端也可以使用log4j的日志系统来完成,拦截所有需要监控的api请求,使用log4j输出日志到kafka队列中,和上述日志收集方法相同。若同一个应用中需要通过日志输出到kafka的多个topic中,可以使用log4j的Marker标记来区分,配置如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="off"><Properties></Properties><Appenders><Console name="STDOUT" target="SYSTEM_OUT"><PatternLayout pattern="%d %p %c{1.} %t %m%n"/></Console><!-- 日志收集 --><Kafka name="KAFKA-LOGGER" topic="cc_log_test" syncSend="false"><JsonLayout/><Property name="bootstrap.servers">127.0.0.1:9092</Property><Property name="retries">3</Property><Property name="linger.ms">1000</Property><Property name="buffer.memory">10485760</Property><Filters><!-- 通过Marker过滤消息 --><MarkerFilter marker="Kafka" onMatch="ACCEPT" onMismatch="DENY"/></Filters></Kafka><!-- 轨迹跟踪 --><Kafka name="KAFKA-TRACK-LOGGER" topic="cc_test1" syncSend="false"><JsonLayout/><Property name="bootstrap.servers">127.0.0.1:9092</Property><Property name="retries">3</Property><Property name="linger.ms">1000</Property><Property name="buffer.memory">10485760</Property><Filters><!-- 通过Marker过滤消息 --><MarkerFilter marker="Track" onMatch="ACCEPT" onMismatch="DENY"/></Filters></Kafka><Async name="ASYNC-KAFKA-LOGGER"><AppenderRef ref="KAFKA-LOGGER"/><AppenderRef ref="KAFKA-TRACK-LOGGER"/><LinkedTransferQueue/></Async></Appenders><Loggers><Logger name="cc.kevinlu.springbootkafka.controller" level="info"additivity="false"><AppenderRef ref="KAFKA-LOGGER"/><AppenderRef ref="KAFKA-TRACK-LOGGER"/></Logger><Root level="info"><AppenderRef ref="STDOUT"/></Root></Loggers>
    </Configuration>
    
    private final static Marker KAFKA_MARKER       = MarkerManager.getMarker("Kafka");
    private final static Marker KAFKA_TRACK_MARKER = MarkerManager.getMarker("Track");@GetMapping("/log")
    public String sendLog() {// 轨迹跟踪log.info(KAFKA_TRACK_MARKER, "send async message!");for (int i = 0; i < 10; i++) {// 日志收集log.info(KAFKA_MARKER, "kafka log i = {}", i);}return "success";
    }
    
  3. 前端+后端组合

    后端提供API供前端传递轨迹,后端接收到请求之后将消息同步到kafka中。


http://chatgpt.dhexx.cn/article/812lKers.shtml

相关文章

Kafka基本概念与应用场景

一、Kafka的定义 Apache Kafka是一种分布式的、基于发布/订阅的消息系统&#xff0c;由Scala语言编写而成。它具备快速、可扩展、可持久化的特点。Kafka最初由LinkedIn开发&#xff0c;并于2011年初开源&#xff0c; 2012年10月从Apache孵化器毕业&#xff0c;成为Apache基金会…

kafka使用场景与设计原理

目录 1 kafka的介绍 2 架构 2.1 工作流程 2.2 副本原理 2.3 分区和主题的关系 2.4 生产者 2.4.1 为什么分区-可以水平扩展 2.4.2 分区策略 2.5 消费者 2.5.1 消费方式 2.5.2 分区分配策略 2.6 数据可靠性保证 2.6.1 副本数据同步策略 2.6.2 ACK 应答机制 2.6.3 …

kafka学习(六):kafka应用场景

消息队列中间件是分布式系统中重要的组件&#xff0c;主要解决应用解耦&#xff0c;异步消息&#xff0c;流量削锋等问题&#xff0c;实现高性能&#xff0c;高可用&#xff0c;可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ&#xff0c;RabbitMQ&#xff0c;Zero…

kafka使用场景

kafka基本介绍 kafka是使用scala语言和java语言编写的一套高可用的消息队列&#xff0c;广泛应用在后端开发里&#xff0c;是后端开发里的一个重要中间件。 kafka的使用场景 1、异步处理 下图为一个订单状态在后端各个模块之间的处理流程&#xff0c;后一个流程必须要等到前…

kafka的应用场景

关于消息队列的使用 一、消息队列概述消息队列中间件是分布式系统中重要的组件&#xff0c;主要解决应用解耦&#xff0c;异步消息&#xff0c;流量削锋等问题&#xff0c;实现高性能&#xff0c;高可用&#xff0c;可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ…

解开Kafka神秘的面纱(一):kafka架构与应用场景

文章目录 一、前言二、Kafka简介2.1 Kafka简介2.2 基于分布式的Kafka 三、Kafka架构3.1 消息生产与消费3.1.1 消息生产与消费模型3.1.2 Kafka消费单元是消费者组3.1.3 Kafka只消费Partition主分区的消息3.1.4 消费者组中的每个消费者的offset3.1.5 小结 3.2 Partition备份与选主…

Metricbeat使用与入门-1 收集系统指标数据到ES中

Metricbeat由模块和指标集组成。Metricbeat 模块定义了从特定服务&#xff08;例如Redis&#xff0c;MySQL等&#xff09;收集数据的基本逻辑。 系统环境&#xff1a;CentOS 7.4 ES版本&#xff1a;7.6.1 Metricbeat版本&#xff1a;7.6.1 1 安装 Metricbeat版本&#xff1a;7…

Beats:Beats 入门教程 (二)

这篇文章是 “Beats 入门教程 &#xff08;一&#xff09;”的续篇。在上一篇文章&#xff0c;我们主要讲述了 Beats 的一些理论方面的知识。在这篇文章中&#xff0c;我们将具体展示如何使用 Filebeat 及 Metriceat 把数据导入到我们的 Elasticsearch 并对他们进行分析。 安装…

MetricBeat + Elasticsearch + Kibana 实现监控指标可视化

1、Elasticsearch 监控指标可视化概述 之前的推文 Elasticsearch 磁盘使用率超过警戒水位线&#xff0c;怎么办&#xff1f;有读者留言&#xff1a;“配合监控系统”。 是的&#xff0c;监控系统就像我们的车载监控&#xff0c;平时可能用不到&#xff0c;一用到的时候就是“大…

关于 Kubernetes中集群统一日志管理方案(Elasticsearch+Filebeat+Kibana+Metricbeat)搭建的一些笔记

写在前面 学习K8s&#xff0c;所以整理分享给小伙伴这里要说明的是&#xff1a;这一套方案太吃硬件了&#xff0c;需要高配的本才能跑起来我的16G运存,集群用三个虚机部署的&#xff0c;工作节点都是3核5G的配置折腾了两天没有跑起来&#xff0c;后来放弃了&#xff0c;查了下&…

metricbeat实现容器监控

Metricbeat是elastic下的项目&#xff0c;在5.1及之后的版本中支持对Docker的监控&#xff0c;需与EK配合使用能在界面上显示&#xff0c;也可直接将数据导入kafka中。 -1.安装 使用版本: elasticsearch-5.2.0-1.noarch&#xff08;用于输出显示&#xff09; kibana-5.2.0-…

Centos 7.9 安装 ELK8.1.0+MetricBeat

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 环境 一、前期准备&#xff1a; 1.下载ELKMetircBeat rpm包 2.CentOS 设置 二、安装Elasticsearch 1.安装rpm 2.配置Elasticsearch 修改配置档 开防火墙 设…

Storm Metric

storm从0.9.0开始&#xff0c;增加了指标统计框架&#xff0c;用来收集应用程序的特定指标&#xff0c;并将其输出到外部系统。 本文中采用的监听类是LoggingMetricsConsumer&#xff0c;统计指标值将输出到metric.log日志文件中。 当然也可以自定义监听类&#xff0c;只需要实…

Beats:如何启动 Metricbeat 中的 MySQL 模块 - query Metricset

在我做之前的教程 “Observability&#xff1a;Elastic Metrics 应用介绍”&#xff0c;我发现当我尝试启动 MySQL 模块中的 query metricset 会出现错误。之后我发现官方文档也缺少相应的资料。在今天的文章中&#xff0c;我将介绍如上启动这个 metricset。在使用这个 metrics…

Metricbeat源码分析

0X00 版本信息 Golang&#xff1a;1.16.8 Metricbeat&#xff1a;7.14 0X01 Metricbeat介绍 Metricbeat quick start: installation and configuration | Metricbeat Reference [7.14] | Elastichttps://www.elastic.co/guide/en/beats/metricbeat/7.14/metricbeat-install…

Elk-Metricbeat配置Tomcat的日志分析 (Metricbeat-part3)

1, 安装软件 Metricbeat安装 请参考之前的文档链接&#xff1a; Metricbeat 8.4.0 linux 安装(Metricbeat-part1)_yangkei的博客-CSDN博客Metricbeat 能够以一种轻量型的方式&#xff0c;输送各种系统和服务统计数据&#xff0c;从 CPU 到内存&#xff0c;从 Redis 到 Nginx…

Metricbeat config file metricbeat.yml must be owned by the user identifier (uid=0) or root

Linux 上修改呢metricbeat.yml的权限&#xff0c;启动的时候报错。查了下解决方案 记录下 https://www.elastic.co/guide/en/beats/libbeat/5.3/config-file-permissions.html#config-file-permissions 简而言之就是所有者必须是root&#xff0c;然后权限必须是0644 sudo c…

Elk-Metricbeat配置Nginx的日志分析 (Metricbeat-part2)

1 情况说明&#xff1a; Metricbeat的基本安装部分可以参考&#xff1a; Metricbeat 8.4.0 linux 安装(Metricbeat-part1)_yangkei的博客-CSDN博客 下面来聊聊如何通过elkmetricbeat来监控Nginx日志。 借用网上以为大师的图就是这样子 Metricbeat 采集 Nginx 指标_叶康铭的…

metricbeat收集elasticsearch、kibana监控数据

一、kibana 1、下载metricbeat并部署到kibana所在的服务器 2、禁用 Kibana 监控指标的默认集合&#xff0c;在kibana.yml文件中增加如下配置&#xff1a; monitoring.kibana.collection.enabled: false3、从控制台或命令行&#xff0c;在生产集群上设置xpack.monitoring.col…

ELK日志采集平台(四)---轻量级采集工具metricbeat

目录 一、安装metricbeat 二、与kibana数据可视化联用 logstash是负责采集数据的&#xff0c;是入口&#xff0c;流向为logstash-> ES->kibana&#xff0c;但是它的资源消耗很大&#xff0c;有时候没那么多内存给他占用&#xff0c;同时有些定制的采集指标logstash无法…