Logstash数据同步

article/2025/9/11 21:00:10

Logstash 是 Elastic 技术栈中的一个技术,它是一个数据采集引擎,可以从数据库采集数据到 ES 中。可以通过设置 自增 ID 主键 或 更新时间 来控制数据的自动同步:

  • 自增 ID 主键:Logstatsh 会有定时任务,如果发现有主键的值大于先前同步记录的主键值,就会将对应的增量数据同步到 ES 中
  • 更新时间:其实原理与主键类似,不过如果设置使用主键作为依据的话,那么数据库的数据更新就不会被识别从而更新到 ES 中。

一、安装

1. 下载地址:https://www.elastic.co/cn/downloads/past-releases#logstash

  • 注:使用Logstatsh的版本号与elasticsearch版本号需要保持一致

2. 上传并解压

先上传到服务器 /home/software/ 下,然后解压,并将解压后的文件夹移动到 /usr/local/ 下

tar -zxvf logstash-7.6.2.tar.gzmv logstash-7.6.2 /usr/local/

二、配置

1. 首先在 Elasticsearch 中创建一个索引:didiok-items

2. 在 /usr/local/logstash-7.6.2/ 下创建文件夹 sync/

将数据库驱动 mysql-connector-java-5.1.41.jar 包上传到  /usr/local/logstash-7.6.2/sync/ 下,

cd /usr/local/logstash-7.6.2/
mkdir sync
cd sync/

3. 编写数据同步的SQL脚本

 SELECTi.id as id,i.item_name as itemName,i.sell_counts as sellCounts,ii.url as imgUrl,tempSpec.price_discount as price,i.updated_time as updated_timeFROMitems iLEFT JOINitems_img iioni.id = ii.item_idLEFT JOIN(SELECT item_id,MIN(price_discount) as price_discount from items_spec GROUP BY item_id) tempSpeconi.id = tempSpec.item_idWHEREii.is_main = 1andi.updated_time >= :sql_last_value# :sql_last_value 是 logstash 每次同步完成之后保存的的边界值,这里保存的是 updated_time ,用于下次数据同步时,大于等于 updated_time 的数据才会进行同步

将sql脚本 保存到 /usr/local/logstash-7.6.2/sync/didiok-items.sql 文件中 

4.  在 sync/ 下创建 配置文件 logstash-db-sync.conf,内容如下:

input {jdbc {# 设置 MySql/MariaDB 数据库url以及数据库名称jdbc_connection_string => "jdbc:mysql://192.168.1.6:3306/didiok-shop-dev?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true"# 用户名和密码jdbc_user => "root"jdbc_password => "root"# 数据库驱动所在位置,可以是绝对路径或者相对路径jdbc_driver_library => "/usr/local/logstash-7.6.2/sync/mysql-connector-java-5.1.41.jar"# 驱动类名jdbc_driver_class => "com.mysql.jdbc.Driver"# 开启分页jdbc_paging_enabled => "true"# 分页每页数量,可以自定义jdbc_page_size => "10000"# 执行的sql文件路径statement_filepath => "/usr/local/logstash-7.6.2/sync/didiok-items.sql"# 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务schedule => "* * * * *"# 索引类型type => "_doc"# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件use_column_value => true# 记录上一次追踪的结果值last_run_metadata_path => "/usr/local/logstash-7.6.2/sync/track_time"# 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间tracking_column => "updated_time"# tracking_column 对应字段的类型tracking_column_type => "timestamp"# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录clean_run => false# 数据库字段名称大写转小写lowercase_column_names => false}
}
output {elasticsearch {# es地址hosts => ["192.168.1.187:9200"]# 同步的索引名index => "didiok-items"# 设置_docID和数据库中的id相同document_id => "%{id}"}# 日志输出stdout {codec => json_lines}}

5. 启动 logstash

cd /usr/local/logstash-7.6.2/bin./logstash -f /usr/local/logstash-7.6.2/sync/logstash-db-sync.conf

如果启动过程中报错如下:

报这个错是虚拟机内存不足,是因为这个Logstash要的内存太大了。这个插件在初始化的时候要了一个g的内存,但是虚拟机没有这么多的内存,所以把这个内存改小就行了。

(!!!但是,这样好像会导致 logstash 启动不了,还是想办法扩大内存吧 ,下面的方法仅供参考)

修改 /usr/local/logstash-7.6.2/config/jvm.options ,这里原来是 1g,修改成 256m:

三、在自定义模板中配置中文分词器

首先在 ES 中创建索引 didiok-items,之后启动 logstash,然后再进行以下操作。

1. 查看Logstash默认模板

请求方式:GET

路径:http://localhost:9200/_template/logstash

2. 将查询出来的模板复制出来,进行修改如下(这里只修改了3处):

{"order": 0,"version": 1,            # 修改1"index_patterns": ["*"], # 修改2"settings": {"index": {"refresh_interval": "5s"}},"mappings": {"_default_": {"dynamic_templates": [{"message_field": {"path_match": "message","match_mapping_type": "string","mapping": {"type": "text","norms": false}}},{"string_fields": {"match": "*","match_mapping_type": "string","mapping": {"type": "text","norms": false,"analyzer": "ik_max_word",  # 修改3 加入中文分词器"fields": {"keyword": {"type": "keyword","ignore_above": 256}}}}}],"properties": {"@timestamp": {"type": "date"},"@version": {"type": "keyword"},"geoip": {"dynamic": true,"properties": {"ip": {"type": "ip"},"location": {"type": "geo_point"},"latitude": {"type": "half_float"},"longitude": {"type": "half_float"}}}}}},"aliases": {}
}

之后将其保存为  /usr/local/logstash-7.6.2/sync/logstash-ik.json

3. 在 /usr/local/logstash-7.6.2/sync/logstash-db-sync.conf 文件中进行修改,加入以下内容:

# 定义模板名称
template_name => "myik"
# 模板所在位置
template => "/usr/local/logstash-7.6.2/sync/logstash-ik.json"
# 重写模板
template_overwrite => true
# 默认为true,false关闭logstash自动管理模板功能,如果自定义模板,则设置为false
manage_template => false

4. 重新运行Logstash进行同步

./logstash -f /usr/local/logstash-7.6.2/sync/logstash-db-sync.conf

中文分词器没有设置成功? 

试试下面的解决方案:

先通过postman请求 http://192.168.1.187:9200/_template/logstash,获取的json放入logstash-ik.json中,然后在 /usr/local/logstash-7.6.2/sync/logstash-db-sync.conf 文件中设置manage_template => true,然后启动logstash,启动后 ES 的 didiok-items 索引是不正确的。

删除索引,并重新创建索引 didiok-items。

然后 postman 调 http://192.168.1.187:9200/_template/myik 拿到的 myik 的 json 重新放入logstash-ik.json中,配置 logstash-db-sync.conf  改为 manage_template => false,

再次启动logstash就能在 didiok-items 的mapping中显示中文分词器了。


http://chatgpt.dhexx.cn/article/EM87ggeL.shtml

相关文章

数据同步-数据库间的双向同步

当业务侧需要MongoDB降配、活动数据迁移时都需要应用切换数据库实例进行发版,发版过程中需最大程度保证新旧数据库数据一致,这就涉及到了一种同步技术-数据双向同步。在同步过程中遇到了一些可能会产生问题或引发思考的点,希望利用这篇文档进…

什么是数据实时同步,为什么数据实时同步很重要

随着云成为前所未有的数据供应渠道,数据准确性、一致性和隐私性的重要性与日俱增。看似轻微的数据错误或故障可能会产生重大负面影响。但是,​对数据进行排序并将其与现有​,然后定期解析数据实时同步,同时保持数据完整性&#xf…

数据同步工具的研究(实时)

数据同步工具的研究(实时同步): FlinkCDC、Canal、Maxwell、Debezium ——2023年01月17日 ——Yahui Di 1. 常用CDC方案比较 2. FlinkCDC FlinkCDC的简介: Flink CDC 连接器是 Apache Flink 的一组源连接器,使用变…

聊聊数据同步

一、简述 数据同步,这是一个很宽泛的概念,在互联网或者传统软件公司,一定会遇到数据同步的场景。数据同步一般会遇到的问题诸如同步时延、数据一致性、性能低、强依赖于中间件、失败后无法补偿等。本文笔者试图简要总结下常见的数据同步场景&…

大数据的数据同步方式

一、全量覆盖 不需要分区,同步时直接覆盖插入。适用于数据不会有任何新增和变化的情况。比如地区、时间、性别等维度数据,不会变更或变更不影响业务,可以只保留最新值 二、仅新增同步 每天新增一个日期分区,同步并存储当天的新…

DataLink 数据同步平台

文章目录 一、数据同步平台概述核心能力工作原理详细流程 二、快速接入部署中间件程序配置创建数据库表启动应用注意事项 三、扩展:四种 CDC 方案比较优劣 一、数据同步平台 在项目开发中,经常需要将数据库数据同步到 ES、Redis 等其他平台,通…

数据同步之全量同步与增量同步

一、什么是数据同步 业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计。 为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步的&#xff0…

你了解数据同步吗?

1.写在前面 本篇博客参考《操作系统实战 45 讲》 上篇博客主要介绍的是程序放在什么地方,开发操作系统要了解的最核心的硬件——CPU、MMU、Cache、内存,知道了它们的工作原理。在程序运行中,它们起到了至关重要的作用。 在开发我们自己的操…

数据库同步有哪些方式?【怎么保障目标和源数据一致性】

文章目录 摘要一、几种主流的数据库同步方式二、架构及工作原理三、全量同步和实时增量同步机制四、源和目标五、举例:Oracle 数据实时同步到 Elasticsearch六、目标和源数据一致性七、异构数据类型转换八、总结 摘要 数据库同步有3大难题: 1是如何保障…

数据技术篇之数据同步

第3章 数据同步 1.数据同步基础 直连同步 (1)什么是直连同步?直连同步是指通过定义好的规范接口 API 和基于动态链接库的方式直接连接业务库,如 ODBC/JDBC 等规定了统 一规范的标准接口,不同的数据库基于这套标准接口…

聊聊数据同步方案

文章目录 常用的数据同步方案数据库迁移场景数据同步场景应用代码中同步定时任务同步通过MQ实现同步通过CDC实现实时同步 CDC(change data capture,数据变更抓取)Canal基于日志增量订阅&消费支持的业务工作原理Mysql主备复制实现Canal架构…

大数据之路——数据同步

三、数据技术篇—— 数据同步 3.1 数据同步基础 3.1.1 直连同步3.1.2 数据文件同步3.1.3 数据库日志解析同步 3.2 数据仓库同步方式3.2.1 批量数据同步3.2.2 实时数据同步 3.3 同步遇到的问题3.3.1 分库分表3.3.2 增量全量同步的合并3.3.3 数据漂移的处理 有多种不同应用场景&…

关于数据同步的几种实现

关于数据同步的几种实现 概述 关于数据同步主要有两个层面的同步,一是通过后台程序编码实现数据同步,二是直接作用于数据库,在数据库层面实现数据的同步。通过程序编码实现数据同步,其主要的实现思路很容易理解,即有…

数据同步技术

本次旨在分享数据同步技术的相关知识点,包括数据同步概述、数据同步工具、数据库、数据同步到大数据平台。 首先来介绍一下数据同步的概念: 数据同步是为保持数据源与目的地数据一致性而进行的数据传输、处理的过程。 数据同步的场景: 1、主…

几种常见的数据同步方式

数据仓库的特性之一是集成,即首先把未经过加工处理的、不同来源的、不同形式的数据同步到ODS层,一般情况下,这些ODS层数据包括日志数据和业务DB数据。对于业务DB数据而言(比如存储在MySQL中),将数据采集并导入到数仓中(通常是Hive…

内网穿透frpc ,frps的使用

情况是这样的,公司内网中一个设备接了路由器下发的地址,内网地址是192.168.1.100,可以访问我的台式机,但我的台式机访问192.168.1.100是无法连通的 这种情况下,在我机器上运行frps.exe,frps.ini如下 [com…

Frp内网穿透——frps服务端部署

由于现在IPv4地址的短缺,在国内不可能每个设备都会分配到一个公网IP,因此从公网中访问自己的私有设备向来是一件难事儿。本次带大家了解一下frp内网穿透的服务端教学,让你也能够部署一个内网穿透服务。 frp简介 通俗的说,frp是一…

记一次使用frpc/frps进行内网穿透

1. 前提条件: 有一个公网ip,这里用x.x.x.x代替 2. 配置 【服务器端】 S_NUMBER是一个端口号 #服务端口 bind_port S_NUMBER #监听地址 bind_addr 0.0.0.0 #认证token token xxxx【客户端】(也就是需要被内网穿透的服务器) C_NUMBER是一个端口号 …

内网穿透神器Frps一键安装脚本及设置教程

frps 是一个高性能的反向代理应用,可以帮助您轻松地进行内网穿透,对外网提供服务,支持 tcp, http, https 等协议类型,并且 web 服务支持根据域名进行路由转发。 *因为frps是go语言写的,所以在路由器上使用的时候&#…

frpc和frps 内网穿透越狱插件

内网穿透、frp、frpc、frps https://zhaoboy9692.github.io/repo 越狱源 https://zhaoboy9692.github.io/repo 苦于在ios越狱下没有frp穿透使用 特地开发了的越狱插件 基于最新frp0.48编译 ios14.6测试没问题 有问题及时反馈