简介
ELK由Elasticsearch、Logstash、Kibana三个优秀的开源项目组成,取首字母简写命名为ELK,主要提供对数据的加载、处理、查询等功能。其中:
Elasticsearch:对数据进行搜集、分析、存储。
Logstash:对数据的搜集、分析、过滤。
Kibana:为 Logstash 和 ElasticSearch 提供的数据分析友好的 Web 界面。
一、安装
1、Elasticsearch安装
1.1、到官网下载Elasticsearch安装。下载地址:Download Elasticsearch | Elastic
1.2、上传安装包到服务器。
1.3、解压安装包,如(注意版本区别): tar -zxvf elasticsearch-6.8.6.tar.gz
1.4、添加启动用户
注意:Elasticsearch不能使用root用户进行启动。
groupadd elastic
useradd -g elastic elastic
1.5、为启动用户添加权限
chmod 755 elasticsearch-6.8.6chown -R elastic:elastic elasticsearch-6.8.6usermod -g root elastic编辑 /etc/sudoers文件 ,如:vim /etc/sudoers添加: elastic ALL=(ALL) ALL去掉:%wheel ALL=(ALL) ALL
1.6、设置elasticsearch参数
cd elasticsearch-6.8.6/
a)、设置内存:vim config/jvm.options
#根据自己机器的配置设置内存大小
-Xms2g
-Xmx2g
b)、设置内核参数:sysctl -w vm.max_map_count=262144
vim /etc/sysctl.conf
增加以下内容:
fs.file-max = 65536
vm.max_map_count = 262144
执行更改:sysctl -pc)、修改ES配置:vim config/elasticsearch.yml修改内容:
network.host: IP地址
http.port: 端口
cluster.name: 集群名称
设置节点的名称,节点名称
1.7、启动elasticsearch
#切换 elastic 用户
su elastic
#启动elastic,-d表示后台启动
./bin/elasticsearch -d
1.8、校验elasticsearch是否启动
ps -ef |grep elastic ,如出现以下进程则表示成功

2、安装Kibana
2.1、下载Kibana,地址:Download Kibana Free | Get Started Now | Elastic
2.2、上传服务器,并解压
tar -zxvf kibana-6.8.6-linux-x86_64.tar.gz
2.3、服务参数设置
vim config/kibana.yml
server.port: 端口
server.host: IP
elasticsearch.url: 连接elasticsearch服务地址
2.4、启动Kibana
bin/kibana
如下图所示

二、使用
1、使用kibana 对elasticsearch进行增删改查操作
1.1、打开Kibana,进入Dev Tools 界面

1.2、创建索引
PUT /test_index
{"settings": {"number_of_shards": 3, "number_of_replicas": 1 },"mappings": { "_doc": { "properties": {"id": {"type": "long"},"key": {"type": "text"},"value": {"type": "text" }}}}
}
1.3、新增数据
POST /test_index/_doc/1/_create
{"id": 1,"key": "名称","value": "不甘于平凡的溃败"
}
1.4、查询数据
GET /test_index/_search

1.5、修改数据
PUT /test_index/_doc/1
{"id": 1,"key": "名称2","value": "不甘于平凡的溃败2"}
1.6、删除数据
DELETE /test_index/_doc/1
2、使用Logstash 自动同步Mysql数据到 elasticsearch中
2.1、下载Logstash,地址:Download Logstash Free | Get Started Now | Elastic
2.2、上传服务包,并解压
unzip logstash-6.8.0.zip
2.3、创建mysql表
CREATE TABLE `t_test` (`id` int(11) NOT NULL AUTO_INCREMENT,`key` varchar(255) DEFAULT NULL,`value` varchar(255) DEFAULT NULL
);INSERT INTO `bsf_dev`.`t_test`(`id`, `key`, `value`) VALUES (1, '性别', '男');
INSERT INTO `bsf_dev`.`t_test`(`id`, `key`, `value`) VALUES (2, '年龄', '18');
INSERT INTO `bsf_dev`.`t_test`(`id`, `key`, `value`) VALUES (3, '职业', '程序员');
如:

2.4、修改group_concat函数长度
#SET GLOBAL group_concat_max_len = 4294967295;
#SET SESSION group_concat_max_len = 4294967295;
2.5、准备数据库驱动包
将mysql-connector-java-8.0.11.jar 放到bin 目录下
2.6、准备文件
#创建last_run_metadata_path 文件
创建 statement 文件夹,文件夹里面新建 doc_statement 文件#准备 sql 文件
创建sql文件夹,新建doc.sql 文件,内容如下:
Select * from t_test#准备任务文件
在config 下新建 mysql2es.yml 文件,内容如下:input {stdin {}jdbc {# 名称,可以用于输出类型选择type => "doc_jdbc"# jdbc sql server 驱动jdbc_driver_library => "mysql-connector-java-8.0.11.jar"# 驱动名jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# jdbc 连接字符串jdbc_connection_string => "jdbc:mysql://?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8"# 数据库用户名jdbc_user => ""# 数据库密码jdbc_password => ""# 调度时间,分 时 天 月 年,默认为1分钟执行一次schedule => "*/45 * * * *"# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中record_last_run => false# 是否需要记录某个 column 的值,如果 record_last_run 为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值use_column_value => false# 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的.比如:ID.tracking_column => id#指定文件,来记录上次执行到的 tracking_column 字段的值#比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.#我们只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值(10000)last_run_metadata_path => ""# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录clean_run => true# 是否将 column 名称转小写lowercase_column_names => false# 存放需要执行的 SQL 语句的文件位置statement_filepath => "/sql/doc.sql"# 开启分页jdbc_paging_enabled => "true"# 每页条数jdbc_page_size => "100"}}filter {# 默认使用的是Unix时间,我们需要将logstash同步数据到ES时间+8小时# 数据库中的时间字段也可以这样设置# ruby {# code => "event.set('@timestamp',event.get('@timestamp').time.localtime + 8*60*60)"# }}output {if [type]=="doc_jdbc" {elasticsearch {# index 给一个文档建立索引# delete 通过id值删除一个文档(这个action需要指定一个id值)# create 插入一条文档信息,如果这条文档信息在索引中已经存在,那么本次插入工作失败# update 通过id值更新一个文档。更新有个特殊的案例upsert,如果被更新的文档还不存在,那么就会用到upsertaction => "index"#ESIP地址与端口hosts => ""#ES索引名称index => "test_index"#ES类型名称document_type => "_doc"#选用哪个字段作为ES的主键document_id => "%{id}"}}# 这里输出调试,正式运行时可以注释掉stdout {codec => json_lines}}
如:

2.7、执行同步操作
./bin/logstash -f ./config/mysql2es.conf
2.8、检查数据是否导入成功

3、SpringBoot整合Elasticsearch
3.1、导入Maven依赖
<!-- es依赖 --><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch}</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>${elasticsearch}</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>${elasticsearch}</version><exclusions><exclusion><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId></exclusion><exclusion><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>${elasticsearch}</version></dependency>
3.2、在application文件添加ES配置信息
elasticsearch.host=IP地址
elasticsearch.port=端口
elasticsearch.clustername=集群名
3.3、添加配置类
import lombok.Data;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.net.InetAddress;@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
@Data
public class ElasticsearchConfig {private static final Logger log = LoggerFactory.getLogger(ElasticsearchConfig.class);private String host;private int port;private String clustername;private int queryport;@Beanpublic RestHighLevelClient restHighLevelClient() {RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost(host, queryport, "http")));return client;}@Beanpublic TransportClient transportClient() {TransportClient transportClient = null;try {log.info("Elasticsearch TransportClient create Starting...");// 配置信息Settings esSetting = Settings.builder().put("cluster.name", clustername).build();transportClient = new PreBuiltTransportClient(esSetting);//单节点配置TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(host), Integer.valueOf(port));transportClient.addTransportAddresses(transportAddress);log.info("Elasticsearch TransportClient create Start completed.");log.info("Elastic HostName:{}", host);log.info("Elastic Nodes:{}", clustername);} catch (Exception e) {log.error("Elasticsearch TransportClient create error!!!", e);}return transportClient;}
}
3.4、代码实现业务操作
//新增、修改数据JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(data));IndexResponse response = transportClient.prepareIndex(Constant.ES_INDEX, Constant.ES_TYPE, String.valueOf(data.getId())).setSource(jsonObject).get();//删除数据DeleteRequest request = new DeleteRequest(Constant.ES_INDEX, Constant.ES_TYPE, dataId);DeleteResponse eleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);//查询数据// 1. 创建批量搜索请求,并绑定索引
SearchRequest searchRequest = new SearchRequest(Constant.ES_INDEX);
// 2. 构建搜索条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.must(QueryBuilders.termQuery("key", "年龄"));
boolQueryBuilder.should(QueryBuilders.matchQuery("value", "18"));
// 最小匹配
boolQueryBuilder.minimumShouldMatch(1);
sourceBuilder.query(boolQueryBuilder);
// 设置分页参数
int from = (page.getPageNo() - 1) * page.getPageSize();
int size = page.getPageSize();
sourceBuilder.from(from);
sourceBuilder.size(size);
// 3. 将查询条件放入搜索请求request中
searchRequest.source(sourceBuilder);// 4. 发起查询请求获取数据
SearchResponse response = response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
到此,ELK入门应用到此结束,更多精彩技术分享请浏览本人博客:不甘于平凡的溃败的博客_CSDN博客-java,数据库,IDEA领域博主


















