69
votes

I have to store some message in ElasticSearch integrate with my python program. Now what I try to store the message is:

d={"message":"this is message"}
    for index_nr in range(1,5):
        ElasticSearchAPI.addToIndex(index_nr, d)
        print d

That means if I have 10 messages then I have to repeat my code 10 times. So what I want to do is try to make a script file or batch file. I've checked the ElasticSearch Guide, BULK API is possible to use. The format should be something like below:

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

what I did is:

{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}

I also use curl tool to store the doc.

$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json

Now I want to use my Python code to store the file to the Elastic Search.

4
Have a look at some python clients like pyes : github.com/aparo/pyes OR the elasticsearch official client github.com/elasticsearch/elasticsearch-py - mconlin
Thank you so much, I check some client and I try to use pyelasticsearch. And I already store the stall with bulk index with pyelastic. In pyelasticsearch the doc file will be inside the code. Is that possible to put the doc file which I want to bulk index outside the program? - chengji18

4 Answers

133
votes
from datetime import datetime

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

actions = [
  {
    "_index": "tickets-index",
    "_type": "tickets",
    "_id": j,
    "_source": {
        "any":"data" + str(j),
        "timestamp": datetime.now()}
  }
  for j in range(0, 10)
]

helpers.bulk(es, actions)
47
votes

Although @justinachen 's code helped me start with py-elasticsearch, after looking in the source code let me do a simple improvement:

es = Elasticsearch()
j = 0
actions = []
while (j <= 10):
    action = {
        "_index": "tickets-index",
        "_type": "tickets",
        "_id": j,
        "_source": {
            "any":"data" + str(j),
            "timestamp": datetime.now()
            }
        }
    actions.append(action)
    j += 1

helpers.bulk(es, actions)

helpers.bulk() already does the segmentation for you. And by segmentation I mean the chucks sent every time to the server. If you want to reduce the chunk of sent documents do: helpers.bulk(es, actions, chunk_size=100)

Some handy info to get started:

helpers.bulk() is just a wrapper of the helpers.streaming_bulk but the first accepts a list which makes it handy.

helpers.streaming_bulk has been based on Elasticsearch.bulk() so you do not need to worry about what to choose.

So in most cases, helpers.bulk() should be all you need.

39
votes

(the other approaches mentioned in this thread use python list for the ES update, which is not a good solution today, especially when you need to add millions of data to ES)

Better approach is using python generators -- process gigs of data without going out of memory or compromising much on speed.

Below is an example snippet from a practical use case - adding data from nginx log file to ES for analysis.

def decode_nginx_log(_nginx_fd):
    for each_line in _nginx_fd:
        # Filter out the below from each log line
        remote_addr = ...
        timestamp   = ...
        ...

        # Index for elasticsearch. Typically timestamp.
        idx = ...

        es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
        es_fields_vals = (remote_addr, timestamp, url, status)

        # We return a dict holding values from each line
        es_nginx_d = dict(zip(es_fields_keys, es_fields_vals))

        # Return the row on each iteration
        yield idx, es_nginx_d   # <- Note the usage of 'yield'

def es_add_bulk(nginx_file):
    # The nginx file can be gzip or just text. Open it appropriately.
    ...

    es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}])

    # NOTE the (...) round brackets. This is for a generator.
    k = ({
            "_index": "nginx",
            "_type" : "logs",
            "_id"   : idx,
            "_source": es_nginx_d,
         } for idx, es_nginx_d in decode_nginx_log(_nginx_fd))

    helpers.bulk(es, k)

# Now, just run it.
es_add_bulk('./nginx.1.log.gz')

This skeleton demonstrates the usage of generators. You can use this even on a bare machine if you need to. And you can go on expanding on this to tailor to your needs quickly.

Python Elasticsearch reference here.

9
votes

There are two options which I can think of at the moment:

1. Define index name and document type with each entity:

es_client = Elasticsearch()

body = []
for entry in entries:
    body.append({'index': {'_index': index, '_type': 'doc', '_id': entry['id']}})
    body.append(entry)

response = es_client.bulk(body=body)

2. Provide the default index and document type with the method:

es_client = Elasticsearch()

body = []
for entry in entries:
    body.append({'index': {'_id': entry['id']}})
    body.append(entry)

response = es_client.bulk(index='my_index', doc_type='doc', body=body)

Works with:

ES version:6.4.0

ES python lib: 6.3.1