需要下载的东西
- ElasticSearch——https://www.elastic.co/cn/downloads/elasticsearch
- Logstash(版本需要和ES对应)——https://www.elastic.co/cn/downloads/logstash
- mysql驱动jar包——https://dev.mysql.com/downloads/connector/j/
ElasticSearch
下载完ElasticSearch后解压,然后点击bin目录下的elasticsearch.bat启动(ES默认端口号为9200,若不能启动请检查端口号是否占用)
Logstash
将下载好的mysql驱动压缩包和Logstash压缩包解压,将mysql的的jar包放入logstash文件夹内
同步mysql数据库中数据到ES中
在logstash/bin中新建logstash-mysql.conf文件(名字任意,可以不同,只要更改后面运行命令即可)
其中写入
input {stdin {}jdbc {# 数据库 数据库名称为ESDB,表名为userjdbc_connection_string => "jdbc:mysql://localhost:3306/ESDB"# 用户名密码jdbc_user => "root"jdbc_password => "root"# jar包的位置jdbc_driver_library => "D:/ElasticSearch/logstash-7.13.3/mysql-connector-java-8.0.25.jar"# mysql的Driverjdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"# sql文件位置 没有可以不写#statement_filepath => "config-mysql/user.sql"statement => "select * from user"# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务schedule => "* * * * *"#索引的类型type => "user"}
}output {elasticsearch {hosts => "127.0.0.1:9200"# index名index => "user"# 需要关联的数据库中有有一个id字段,对应索引的id号document_id => "%{id}"}stdout {codec => json_lines}
}
然后在bin目录下执行logstash -f logstash-mysql.conf就能自动同步mysql数据了
可能出现的问题
- 数据库中的时间字段与同步到ES中的对应的数据不符(相差8小时)
修改logstash-mysql.conf配置文件
# 在jdbc字段中添加
plugin_timezone => "local"
# 添加filter字段
filter {# 因为时区问题需要修正时间ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" }ruby {code => "event.set('@timestamp',event.get('timestamp'))"}mutate {remove_field => ["timestamp"]}# 因为时区问题需要修正时间ruby {code => "event.set('create_date', event.get('create_date').time.localtime + 8*60*60)" } # 因为时区问题需要修正时间ruby {code => "event.set('update_date', event.get('update_date').time.localtime + 8*60*60)" }
}# 完整配置信息
input {stdin {}jdbc {# 数据库 数据库名称为elk,表名为book_tablejdbc_connection_string => "jdbc:mysql://localhost:3306/ESDB"# 用户名密码jdbc_user => "root"jdbc_password => "root"# jar包的位置jdbc_driver_library => "D:/ElasticSearch/logstash-7.13.3/mysql-connector-java-8.0.25.jar"# mysql的Driverjdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"#statement_filepath => "config-mysql/user.sql"statement => "select * from user"schedule => "* * * * *"#索引的类型type => "user"plugin_timezone => "local"}
}filter {# 因为时区问题需要修正时间ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" }ruby {code => "event.set('@timestamp',event.get('timestamp'))"}mutate {remove_field => ["timestamp"]}# 因为时区问题需要修正时间ruby {code => "event.set('create_date', event.get('create_date').time.localtime + 8*60*60)" } # 因为时区问题需要修正时间ruby {code => "event.set('update_date', event.get('update_date').time.localtime + 8*60*60)" }
}output {elasticsearch {hosts => "127.0.0.1:9200"# index名index => "user"# 需要关联的数据库中有有一个id字段,对应索引的id号document_id => "%{id}"}stdout {codec => json_lines}
}