欢迎光临
我们一直在努力

记一次logstash 7配置jdbc插件使用实践

环境

  • logstash-7.1.1.rpm;
  • elasticsearch-7.1.1-linux-x86_64.tar.gz;
  • 使用的是mysql数据库;

前期准备

  • 数据库驱动文件:/home/inspur/elasticsearch/lib/mysql-connector-java-8.0.12.jar;
  • SQL文件放置目录:/home/inspur/elasticsearch/sqls/jw_houseproperty_info.sql;

SQL文件内容

select 
`id`,
`reporter_id`,
`ywzh`,
`qlrzjhm`,
`qlrmc`,
`qllx`, 
`update_time`
FROM JW_HOUSEPROPERTY_INFO t 
where ifnull(`update_time`,str_to_date('1970-01-01 00:00:00','%Y-%m-%d %H:%i:%s'))  >= :sql_last_value
and is_delete = 0

logstash配置

input {
  jdbc {
    jdbc_driver_library => "/home/inspur/elk/lib/mysql-connector-java-8.0.12.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://10.110.1.117:3306/data?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Hongkong"
    jdbc_user => "data"
    jdbc_password => "Passw0rd"
    schedule => "* * * * *"
    statement_filepath => "/home/inspur/elk/sqls/jw_houseproperty_info.sql"
    use_column_value => true
    tracking_column => "update_time"
    tracking_column_type => "timestamp"
    clean_run => true
    type => "jw_houseproperty_info"
  }
}

filter {
  mutate {
    rename => { "reporter_id" => "reporterId" }
    }
}

output {
   if [type] == 'jw_houseproperty_info' {
       elasticsearch { 
          hosts => ["10.47.0.172:9200"]
          index => "data_archives_jw_houseproperty_info"
          document_id => "%{id}"
       }
   }
}

这里我数据库中update_time字段为datetime类型。

启动

sudo /usr/share/logstash/bin/logstash  -f /etc/logstash/conf.d/ --path.settings=/etc/logstash

修改配置

在使用的过程中,需要根据实际环境不断优化配置(就是我不断排错的过程),目前(2019年8月27日)修改成这样了。

input {
  jdbc {
    jdbc_driver_library => "/home/xzjw/elasticsearch/logstash/mysql-connector-java-8.0.12.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://10.47.0.172:3306/data?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Hongkong"
    jdbc_user => "data"
    jdbc_password => "Inspur$777"
    schedule => "* * * * *"
    statement_filepath => "/home/xzjw/elasticsearch/logstash/sqls/jw_houseproperty_info.sql"
    # 以下三行制定根据数据库表的哪个字段进行跟踪数据变化
    use_column_value => true
    tracking_column => "update_time"
    tracking_column_type => "timestamp"
    clean_run => true
    # 以下两项在大数据量时进行分页
    jdbc_paging_enabled => true
    jdbc_page_size => 50000
    type => "jw_houseproperty_info"
  }
}

filter {
  mutate {
    #重命名,将elasticsearch索引下划线的字段改为驼峰命名
    rename => { "reporter_id" => "reporterId" }
    rename => { "id_number" => "idNumber" }
    }
}

output {
      if [type] == 'jw_houseproperty_info' {
       elasticsearch { 
          hosts => ["156.11.99.108:9200"]
          index => "data_archives_jw_houseproperty_info"
          document_id => "%{id}"
       }
   }
}

注意事项

  • tracking_column指定的列需要是数字或日期类型,详见:plugins-inputs-jdbc-tracking_column
  • 如果指定了tracking_column,则查询SQL需要列出此列,即select update_time from ...
  • 数据库时区问题:尽量保持一致,比如jdbc指定时区需增加jdbc_default_timezone => "Asia/Shanghai"
  • 创建elasticsearch索引的时候,尽量保证字段的类型和数据库的类型保持一致,特别日期类型的字段,elasticsearch将类似日期类型数据当成日期类型,如果后面进入的数据不符合日期类型,logstash就会报错,日期转换失败,所以还是建索引的时候指定一下字段类型吧。参考:Elasticsearch常用操作-创建索引
  • 修改logstash配置jvm.options,将-Xms1g-Xmx1g设置为更大值,否则处理大数据量时会报错Java Heap Space

参考文章:

  1. logstash-input-jdbc实现mysql 与elasticsearch实时同步深入详解
  2. 使用Logstash同步mysql到Elasticsearch 6.0.1
赞(0)
未经允许不得转载:Ddmit » 记一次logstash 7配置jdbc插件使用实践

评论 2

  1. #1

    兄弟 你jdk用的哪个版本?

    阿德5年前 (2019-07-18)回复
    • 1.8

      michael5年前 (2019-07-18)回复

登录

找回密码

注册