0
votes

I have installed logstash and elasticSearch on my windows machine with below config (which polls records from customer table in 1 mins interval as configured)

#1 Logstash config file loads customer table data and index it logstash-config.conf

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
        jdbc_driver_library => "C:\\org\\postgresql\\postgresql\\42.2.11\\postgresql-42.2.11.jar"
        jdbc_user => "test"
        jdbc_password => "test"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "* * * * *"
        statement => "SELECT * FROM public.customer where id >:sql_last_value"
        tracking_column_type => "numeric"
        use_column_value =>true
        tracking_column => id
    }
}
output {
    elasticsearch {
            
             index => "customer_index"
             document_type => "customer"
             document_id => "%{id}"
             hosts =>"localhost:9200"
    }
    
    stdout {
        codec =>rubydebug
    }
}

#2 Create table in db with some records

create table customer (id integer,name varchar);

select * from customer;
insert into customer values (1,'test1');
insert into customer values (2,'test2');

#3 Start logstash e:\Software\logstash-7.8.0\bin>logstash -f logstash-config.conf http://localhost:9600/

#4 start elasticSearch e:\Software\elasticsearch-7.8.0\bin>elasticsearch.bat http://localhost:9200/_cat/indices

#4 Hit get API 4.1: Does not return any record http://localhost:9200/customer_index/_search?q=1

e.g. 
    "took": 86,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    }
}
4.2 Returns record
http://localhost:9200/customer_index/_search?q=2

e.g. 
    {
    "took": 371,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "customer_index",
                "_type": "customer",
                "_id": "%{customer_id}",
                "_score": 1.0,
                "_source": {
                    "name": "test2",
                    "id": 17,
                    "@timestamp": "2020-07-06T06:41:00.343Z",
                    "@version": "1"
                }
            }
        ]
    }
}
**Also what is customer_id here and how can I get the whole record as I indexed whole row i.e. select * from customer (Which means I should get all columns)**
4.3
Looks like index contains only the record which was added to index last
e.g. if I will execute in db 
insert into customer values (2,'test2');
http://localhost:9200/customer_index/_search?q=2
will not return record  
4.4 however Returns record
http://localhost:9200/customer_index/_search?q=3
1

1 Answers

0
votes
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
        jdbc_driver_library => "C:\\org\\postgresql\\postgresql\\42.2.11\\postgresql-42.2.11.jar"
        jdbc_user => "test"
        jdbc_password => "test"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "* * * * *"
        statement => "SELECT * FROM public.customer where id >:sql_last_value"
        tracking_column_type => "numeric"
        use_column_value =>true
        tracking_column => id
        **last_run_metadata_path => "C:\\logstash\\.sch_id_tracker_file"**
    }
}

Here last_run_metadata_path captures the value of a given field when it ran last time so that As per schedule when it runs next time records with value 
i.e.  id >:sql_last_value would be considered to process and push to elasticsearch