kafka的安装和使用(详细版)

article/2025/9/23 1:51:43

原创地址: https://www.cnblogs.com/lilixin/p/5775877.html

Kafka安装与使用

下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz

安装以及启动kafka

步骤1:安装kafka

$ tar -xzf kafka_2.10-0.8.1.1.tgz
$ cd kafka_2.10-0.8.1.1.tgz 

 

步骤2:配置server.properties

 配置zookeeper(假设您已经安装了zookeeper,如果没有安装,请再网上搜索安装方法)

进入kafka安装工程根目录编辑 

vim config/server.properties  

修改属性 zookeeper.connect=ip:2181,ip2: 2181

 

步骤3:server.properties配置说明

kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect

kafka server端config/server.properties参数说明和解释如下:

(参考配置说明地址:http://blog.csdn.net/lizhitao/article/details/25667831)

 

#实际使用案例 这里211上面的kafka 配置文件
复制代码
broker.id=1
port=9092
host.name=192.168.1.211
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181
zookeeper.connection.timeout.ms=1000000
#kafka实际使用案例 210服务器kafka配置
broker.id=2
port=9092
host.name=192.168.1.210
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181
zookeeper.connection.timeout.ms=1000000
复制代码

 

步骤4: 启动kafka

(先启动zookeeper $:  bin/zkServer.sh start config/zookeeper.properties &)

cd kafka-0.8.1

$ bin/kafka-server-start.sh -daemon config/server.properties &

 (实验时,需要启动至少两个broker   bin/kafka-server-start.sh -daemon config/server-1.properties &) 

步骤5:创建topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

步骤6:验证topic是否创建成功

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

localhost为zookeeper地址 

topic描述:

bin/kafka-topics.sh --describe --zookeeper 192.168.1.8:2181 --topic test

 

复制代码
//启动报错Unrecognized VM option '+UseCompressedOops'
查看 bin/kafka-run-class.sh
找到
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; thenKAFKA_JVM_PERFORMANCE_OPTS="-server  -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
去掉-XX:+UseCompressedOops
复制代码

 

启动报错 Could not reserve enough space for object heap
原因及解决办法:
查看kafka-server-start.sh配置文件,发现有heap设置信息:KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"    更改这里的内存为256(因为测试机内存总共才1G ,所以报错)

 

 

步骤7:发送消息

发送一些消息验证,在console模式下,启动producer

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

(此处localhost改为本机ip,否则报错,I don’t  know why)

 消息案例: 

{"price":"100000","userId":14615501351480021,"payType":3,"code":"AFD3B8","payTime":{"time":1457330791333,"minutes":6,"seconds":31,"hours":14,"month":2,"year":116,"timezoneOffset":-480,"day":1,"date":7},"orderId":12222096,"goodsName":"会员"} 

 

 步骤8:启动一个consumer

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

 


 


 


 


 

删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除

bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181

    


 


 


 


 

配置kafka集群模式,需要由多个broker组成

 

和单机环境一样,只是需要修改下broker 的配置文件而已。

1、将单机版的kafka 目录复制到其他几台电脑上。

2、修改每台电脑上的kafka 目录下的server.properties 文件。

broker.id=1//这个参数在kafka 的broker 集群中必须唯一,且为正整数。

3、启动每台电脑上的kafka 即可。

 

本机配置伪分布式

 

首先为每个节点编写配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

 

在拷贝出的新文件中添加以下参数:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

 

现在启动另外两个节点:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

 

创建一个拥有3个副本的topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

 

运行“"describe topics”命令知道每个节点的信息
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.

 





搭建Kafka开发环境

1 在pom.xml中引入kafka依赖jar包
复制代码
<!-- kafka配置 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>${kafka.version}</version>
<exclusions>
<!-- 实际应用中单独引入下面的jar包,不使用kafka带的 -->
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>zkclient</artifactId>
<groupId>com.101tec</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
复制代码

 

2.属性文件 kafka.properties
复制代码
#zookeeper.connect=192.168.1.8:2181,192.168.1.13:2181,192.168.1.16:2181
#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181
zookeeper.connect=192.168.1.179:2181
metadata.broker.list=192.168.1.179:9092
#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092#zookeeper.connect.timeout=15000
#zookeeper.session.timeout.ms=15000
#zookeeper.sync.time.ms=20000
#auto.commit.interval.ms=20000
#auto.offset.reset=smallest
#serializer.class=kafka.serializer.StringEncoder
#producer.type=async
#queue.buffering.max.ms=6000group.id=llx
kafka.sellstat.topics=llx
复制代码

 

在spring配置文件中引入此properties文件
复制代码
<!-- 这个是加载给spring 用的.-->  
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:kafka.properties</value>
</list>
</property>
</bean>
<!-- 这个是用来在代码中注入用的.-->  
<bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="locations">
<list>
<value>classpath:kafka.properties</value>
</list>
</property>
</bean>
复制代码

 

3.定义收信人
复制代码
<!-- 定义收信人 receiver -->
<bean id="testReceiver" class="cn.vko.index.Receiver"><constructor-arg index="0" value="${zookeeper.connect}" /><constructor-arg index="1" value="${group.id}" /><constructor-arg index="2" value="${kafka.sellstat.topics}"/><constructor-arg index="3" ref="testConsumer" />
</bean>
复制代码

 

4. spring中定义一个消息处理器(需要实现vkoConsumer)
<!-- 定义消息处理器 -->
<bean id="testConsumer" class="cn.vko.index.TestConsumer" ></bean>

 

5.消息生产者项目引入producer

<bean id="topProducer" class="top.lilixin.TopProducer"><constructor-arg index="0" value="kafka.server1:9092,kafka.server2:9092" /></bean>

 


http://chatgpt.dhexx.cn/article/HTbuCh1p.shtml

相关文章

kafka-manager 的下载及安装

kafka-manager 的下载及安装 kafka-manager的功能 为了简化开发者和服务工程师维护Kafka集群的工作&#xff0c;yahoo构建了一个叫做Kafka管理器的基于Web工具&#xff0c;叫做 Kafka Manager。 这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀&#xff0c;或…

怎样安装Kafka?

1、Kafka是Java开发的应用程序&#xff0c;可以运行在Windows、 MacOS和 Linux等多 种操作系统上。最常见的是将Kafka安装在Linux系统上。 2、在安装Kafka之前&#xff0c;需要先安装Java环境&#xff0c;虽然运行 Zookeeper 和 Kafka 只需要 Java运行时版本&#xff0c;但也可…

Kafka锦集(一):Kafka的介绍 | 下载和安装 | kafka服务无法关闭 | bin/kafka-server-stop.sh无效 | 总结的很详细

前言 从本篇开始&#xff0c;带你一起领略Kafka的世界&#xff0c;下面重点介绍它的下载、安装&#xff0c;带你避坑。 提示&#xff1a;如果你只是想解决“bin/kafka-server-stop.sh无效”的问题&#xff0c;直接下滑到文章尾部的4.2章节进行查看&#xff01; 一、Kafka介绍…

docker 下载kafka

Kafka采用的是订阅-发布的模式&#xff0c;消费者主动的去kafka集群拉取消息&#xff0c;与producer相同的是&#xff0c;消费者在拉取消息的时候也是找leader去拉取。 kafka存在的意义&#xff1a;去耦合、异步、中间件的消息系统 首先安装zookeeper docker search zookeepe…

windows下安装kafka

一.下载 kafka官网下载地址:http://kafka.apache.org/downloads.html,下载二进制的. 二.安装 1.安装zookeeper windows环境下安装zookeeper(单机版) 安装并启动后的界面: 2.安装kafka 我下载的kafka_2.13-2.8.0.tgz,并解压到D:\Tools\kafka_2.13-2.8.0目录下 编辑文件Kaf…

kafka tool下载安装和使用

一、下载安装 下载连接&#xff1a;https://www.kafkatool.com/download.html kafka tool官网介绍 Kafka工具是用于管理和使用Apache Kafka集群的GUI应用程序。 它提供了一种直观的UI&#xff0c;可让用户快速查看Kafka集群中的对象以及集群主题中存储的消息。 它包含面向开发…

[kafka] windows下安装kafka(含安装包)

[kafka] windows下安装kafka&#xff08;含安装包&#xff09; 目录 前言 一、下载kafka安装包 1&#xff09;下载安装包 2&#xff09;解压安装包 二、运行zookeeper 1.运行zookeeper&#xff08;因为kafka必须要和zookeeper一起运行&#xff09; 三、运行kafka 四、使用fafka…

1.Kafka下载安装

原文&#xff1a;kafka下载安装 一、安装jdk 参见&#xff1a;Linux环境下安装jdk1.8&#xff08;安装包版&#xff09; 二、安装kafka kafka安装包 链接&#xff1a;https://pan.baidu.com/s/1hy8XONH75fU-Djb_GBC-GA?pwdnmrs 提取码&#xff1a;nmrs1.解压kafka &…

Kafka在Linux下载安装及部署

前期准备工作&#xff1a; kafka的安装及使用需要用到ZooKeeper&#xff0c;所以需要提前安装搭建好ZooKeeper ZooKeeper在Linux下载安装及部署&#xff1a; Zookeeper在Linux下载安装及部署_学弟不想努力了-CSDN博客_zookeeper下载安装linuxhttps://blog.csdn.net/Eternal_Bl…

Kafka的常用命令(包括:下载安装、后台启动)

一、环境准备 首先JDK要在1.8及以上&#xff1b;然后安装对应版本的zookeeper。 本文以kafka2.7.2为例&#xff0c;关于如何找到kafka对应的zookeeper版本&#xff0c;参考我的这篇文章&#xff1a;如何确定kafka与zookeeper版本的对应关系。 1、安装Zookeeper3.5.9 Zookeep…

Kafka在windows下下载、启动、测试详细教程

目录 下载地址 启动 启动zookeeper 启动kafka 队列操作 创建消息队列名 删除消息队列名 查看所有的队列 测试 生产测试 消费测试 下载地址 Apache KafkaApache Kafka: A Distributed Streaming Platform.https://kafka.apache.org/downloadswindows下kafka3.0版本的…

kafka 下载安装

文章目录 第一章 kafka概述一、定义二、消息队列1、传统消息队列2、消息队列的两种模式&#xff08;1&#xff09;点对点模式&#xff08;2&#xff09;发布/订阅模式 三、kafka基础架构 第二章 Kafka安装一、安装部署1、集群规划 二、集群部署1、下载地址&#xff08;1&#x…

kafka下载与安装教程

Kafka下载安装教程 1.定义2.特性3.使用场景4.1.下载jar包4.2.解压到指定的文件夹4.3.修改配置文件4.4.启动kafka内置的zookeeper4.5.启动kafka服务4.6.创建一个名为 test1 的tiopic的测试主体 kafka4.7.创建生产消息的生产者4.8.创建消息消费者接收消息 1.定义 Kafka传统定义&…

CSRF(跨站请求伪造)原理

什么是CSRF&#xff1f; &#xff08;Cross Site Request Forgery, 跨站域请求伪造&#xff09;是一种网络的攻击方式&#xff0c;它在 2007 年曾被列为互联网 20 大安全隐患之一,也被称为“One Click Attack”或者Session Riding&#xff0c;通常缩写为CSRF或者XSRF&#xff…

Spring Security跨站请求伪造(CSRF)

CSRF&#xff08;Cross Site Request Forgery&#xff09; 跨站点请求伪造。是攻击者欺骗用户的浏览器去访问一个自己曾经认证过的网站。由于浏览器曾经认证过&#xff0c;所以被访问的网站会认为是真正的用户操作而去运行。这利用了 web 中用户身份验证的一个漏洞&#xff1a…

解决Csrf跨站请求伪造

Csrf跨站请求伪造原理&#xff1a; 1. 用户C打开浏览器&#xff0c;访问受信任网站A&#xff0c;输入用户名和密码请求登录网站A&#xff1b;2.在用户信息通过验证后&#xff0c;网站A产生Cookie信息并返回给浏览器&#xff0c;此时用户登录网站A成功&#xff0c;可以正常发送请…

web渗透测试----14、CSRF(跨站请求伪造攻击)

文章目录 一、CSRF概述二、CSRF攻击条件1、相关操作2、基于Cookie的会话处理3、没有不可预测的请求参数 三、CSRF的防御1、验证请求的Referer值2、CSRF Token3、验证码等验证业务过程的方式 四、基于Token的CSRF1、CSRF令牌的验证取决于请求方法2、CSRF令牌的验证取决于令牌是否…

跨站请求伪造漏洞

首先说明一下什么是CSRF(Cross Site Request Forgery)&#xff1f; 跨站请求伪造是指攻击者可以在第三方站点制造HTTP请求并以用户在目标站点的登录态发送到目标站点&#xff0c;而目标站点未校验请求来源使第三方成功伪造请求。 为什么会有CSRF? JS控制浏览器发送请求的时…

DVWA---跨站请求伪造CSRF

CSRF&#xff0c;全称Cross-site request forgery&#xff0c;翻译过来就是跨站请求伪造&#xff0c;是指利用受害者尚未失效的身份认证信息&#xff08;cookie、会话等&#xff09;&#xff0c;诱骗其点击恶意链接或者访问包含攻击代码的页面&#xff0c;在受害人不知情的情况…

CSRF 跨站请求伪造攻击

1.概念 全称是CSRF 跨站请求伪造攻击&#xff0c;攻击者利用用户已登陆的账号&#xff0c;诱导用户访问已构造好的恶意链接或页面&#xff0c;在用户不之情的情况下&#xff0c;做一些违反用户本意的一些违法操作。 同源策略&#xff1a;协议相同&#xff0c;域名相同&#xf…