环境
- logstash-7.1.1.rpm;
- elasticsearch-7.1.1-linux-x86_64.tar.gz;
- 使用的是mysql数据库;
前期准备
- 数据库驱动文件:/home/inspur/elasticsearch/lib/mysql-connector-java-8.0.12.jar;
- SQL文件放置目录:/home/inspur/elasticsearch/sqls/jw_houseproperty_info.sql;
SQL文件内容
select
`id`,
`reporter_id`,
`ywzh`,
`qlrzjhm`,
`qlrmc`,
`qllx`,
`update_time`
FROM JW_HOUSEPROPERTY_INFO t
where ifnull(`update_time`,str_to_date('1970-01-01 00:00:00','%Y-%m-%d %H:%i:%s')) >= :sql_last_value
and is_delete = 0
logstash配置
input {
jdbc {
jdbc_driver_library => "/home/inspur/elk/lib/mysql-connector-java-8.0.12.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.110.1.117:3306/data?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Hongkong"
jdbc_user => "data"
jdbc_password => "Passw0rd"
schedule => "* * * * *"
statement_filepath => "/home/inspur/elk/sqls/jw_houseproperty_info.sql"
use_column_value => true
tracking_column => "update_time"
tracking_column_type => "timestamp"
clean_run => true
type => "jw_houseproperty_info"
}
}
filter {
mutate {
rename => { "reporter_id" => "reporterId" }
}
}
output {
if [type] == 'jw_houseproperty_info' {
elasticsearch {
hosts => ["10.47.0.172:9200"]
index => "data_archives_jw_houseproperty_info"
document_id => "%{id}"
}
}
}
这里我数据库中update_time
字段为datetime
类型。
启动
sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/ --path.settings=/etc/logstash
修改配置
在使用的过程中,需要根据实际环境不断优化配置(就是我不断排错的过程),目前(2019年8月27日)修改成这样了。
input {
jdbc {
jdbc_driver_library => "/home/xzjw/elasticsearch/logstash/mysql-connector-java-8.0.12.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.47.0.172:3306/data?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Hongkong"
jdbc_user => "data"
jdbc_password => "Inspur$777"
schedule => "* * * * *"
statement_filepath => "/home/xzjw/elasticsearch/logstash/sqls/jw_houseproperty_info.sql"
# 以下三行制定根据数据库表的哪个字段进行跟踪数据变化
use_column_value => true
tracking_column => "update_time"
tracking_column_type => "timestamp"
clean_run => true
# 以下两项在大数据量时进行分页
jdbc_paging_enabled => true
jdbc_page_size => 50000
type => "jw_houseproperty_info"
}
}
filter {
mutate {
#重命名,将elasticsearch索引下划线的字段改为驼峰命名
rename => { "reporter_id" => "reporterId" }
rename => { "id_number" => "idNumber" }
}
}
output {
if [type] == 'jw_houseproperty_info' {
elasticsearch {
hosts => ["156.11.99.108:9200"]
index => "data_archives_jw_houseproperty_info"
document_id => "%{id}"
}
}
}
注意事项
- tracking_column指定的列需要是数字或日期类型,详见:plugins-inputs-jdbc-tracking_column
- 如果指定了tracking_column,则查询SQL需要列出此列,即
select update_time from ...
; - 数据库时区问题:尽量保持一致,比如jdbc指定时区需增加
jdbc_default_timezone => "Asia/Shanghai"
; - 创建elasticsearch索引的时候,尽量保证字段的类型和数据库的类型保持一致,特别日期类型的字段,elasticsearch将类似日期类型数据当成日期类型,如果后面进入的数据不符合日期类型,logstash就会报错,日期转换失败,所以还是建索引的时候指定一下字段类型吧。参考:Elasticsearch常用操作-创建索引
- 修改logstash配置
jvm.options
,将-Xms1g
、-Xmx1g
设置为更大值,否则处理大数据量时会报错Java Heap Space
。
参考文章:
兄弟 你jdk用的哪个版本?
1.8