input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/vendor/jar/mysql-connector-java-5.1.42-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://bigfooty2.csv9hrfsbhfo.us-west-2.rds.amazonaws.com:3306/bigfooty"
statement => "select post_id, t.node_id as node, p.thread_id as thread, p.user_id as user, p.message, case when position = 0 then t.title else NULL end title, message_state, p.post_date as date, prefix_id as prefix from xf_post p, xf_thread t where p.thread_id=t.thread_id and post_id > :sql_last_value order by post_id limit 50000 "
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
last_run_metadata_path => "/usr/share/logstash/last/.logstash_jdbc_last_run"
type => "post"
tracking_column => "post_id"
use_column_value => true
}
}
filter {
if [type] == "post"{
if [message_state] != "visible" {
mutate{
add_field => {"not_visible" => true}
}
}
if "" in [title]{
mutate{
add_field => {"xm_elasticess_title" => "%{title}"}
}
}
mutate{
remove_field => ["message_state"]
add_field => {"discussion_id" => "%{thread}"}
}
mutate{
convert => {"thread" => "integer"}
convert => {"discussion_id" => "integer"}
convert => {"user" => "integer"}
convert => {"date" => "integer"}
convert => {"prefix" => "integer"}
convert => {"node" => "integer"}
}
if [prefix] == 0{
mutate{
remove_field => ["prefix"]
}
}
}
}
output {
if [type] == "post" {
elasticsearch {
hosts => ["10.38.11.11"]
document_id => "%{post_id}"
index => "bigfooty2"
flush_size => 10000
}
}
}