ElasticSearch使用Logstash从MySQL中同步数据
项目中的搜索功能需要使用ElasticSearch,现需使用Logstash来导入数据到ElasticSearch中。
安装ElasticSearch
- 下载安装包并解压,亦可用Docker安装
- 进入装目录下的config文件夹中,修改elasticsearch.yml 文件,Docker安装进入容器内进行配置即可
- 修改的主要内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| cluster.name: my-es
node.name: node-1
path.data: /usr/local/elasticsearch/data
path.logs: /usr/local/elasticsearch/logs
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["127.0.0.1","10.10.10.34:9200"]
|
安装Logstash
同样也是下载压缩到后解压即可,然后到解压目录执行
查看日志能正常启动说明安装成功
编写同步脚本
同步数据需要使用logstash-input-jdbc
插件,在logstash-6.1.1
以后已经默认支持 logstash-input-jdbc
插件,所以不需要再单独安装了。
在安装目录下新建connector、script文件夹用于分别存放MySQL 的驱动文件和同步脚本
编写脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| input { jdbc { jdbc_connection_string => "jdbc:mysql://host:port/database" jdbc_user => "xxxx" jdbc_password => "xxxx" jdbc_driver_library => "/usr/local/logstash-6.5.4/connector/mysql-connector-java-5.1.45.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" statement =>"select * from blzq_article_v3 where is_show = 1 and update_time >= :sql_last_value and update_time<now()" schedule => "* * * * *" } }
output { elasticsearch { hosts => ["127.0.0.1:9200","127.0.0.1:8200","127.0.0.1:8000"] index => "article" document_type => "article" document_id => "%{id}" } stdout { codec => json_lines } }
|
使用命令 ./bin/logstash -f ./script/mysql.conf
执行导入脚本。
总结
- 使用Logstash同步数据环境的安装相对简单,主要是配置导入脚本
- 上面的导入配置脚本使用的是update_time方式的增量同步,该方式在数据库中物理删除是无法实时更新,可在项目中执行删除MySQL数据的时候同步删除ES中的数据
- 当对实时性和数据一致性有高要求时,可使用MQ进行同步
- 亦可使用ali的canal进行同步数据。项目地址:https://github.com/alibaba/canal