2
votes

I'm indexing a lot of data in Elasticsearch (through NEST) from multiple processes each running multiple threads. Part of indexing a document is finding out if we have seen a similar document before. This feature is implemented by generating a hash of a set of fields on the document and checking if we have documents in Elasticsearch with the same hash. Before indexing a document, I make the following query:

var result = elasticClient
    .Index(indexName)
    .Count<MyDocument>(c => c
        .Query(q => q
            .ConstantScore(qs => qs
                .Filter(f => f
                    .Term(field => field.Hash, hash))))
...

This returns a count of existing documents with the specified hash. So far so good. Things are working. If a process is indexing two documents with the same hash within the same second, the count check doesn't work, since the first document isn't available for search yet. I'm running with the default refresh interval (1 second). For now I have added a refresh call after indexing each document:

var refreshResponse = client.Refresh(indexName);

This also works but it doesn't scale when indexing large amounts of documents (indexing becomes slow as already pointed out here: https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-indexing-speed.html).

Any ideas for how to avoid having to call Refresh but still be able to perform a uniqueness check? I'm thinking some sort of local cache shared between all threads with hashes of documents indexed since the last refresh. I know that this won't work across processes, but that is acceptaple for now.

1
When performing uniqueness checks this way, it might be better to have a write-through cache containing all existing hashes and you'd first check that cache instead of running a query. But since your set up is multi-threaded and there's no way to have XA transactions between your app, the cache and ES, it's going to be shaky at best... If I understand correctly, you only allow a single document with a given hash to make it into the index, is that right?Val
I didn't get into much detail about that. Multiple documents with the same hash may be indexed. But when indexing the second document with the same index, I set a boolean field on document two to false. If not seen before, this field is true.ThomasArdal
Did you consider making hash value your document id and using update with upsert option? This way you could use bulk update which should speed up the performance of getting data in.Rob
I didn't consider that. But wouldn't that allow only one document per hash? If you read through my previous comment, I want to allow multiple documents with the same hash.ThomasArdal
so this issue is not about uniqueness.it's about concurrency. you are searching or getting count based on hash and if there are already documents with the same hash you save the document but with different property values (not in hash), correct?Andrey Borisko

1 Answers

0
votes

I ended up implementing a write-through cache as suggested by Val. This makes it possible to remove the call to Refresh but still make the count on each iteration. This is implemented using a singleton MemoryCache shared between all threads:

var cache = new MemoryCache("hashes");

When checking for uniqueness I check the cache in case no similar documents are found in Elasticsearch:

var result = elasticClient
    .Count<MyDocument>(c => c
        .Index(indexName)
        .Query(q => q
            .ConstantScore(qs => qs
                .Filter(f => f
                    .Term(field => field.Hash, hash)))));

bool isUnique = false;
if (result.Count == 0)
{
    isUnique = !cache.Contains(hash);
}

In case the count for the hash returns 0 I check a cache for that hash.

When a document has been successful indexed, I store the hash in the cache with an expiration:

var policy = new CacheItemPolicy();
policy.AbsoluteExpiration = DateTimeOffset.UtcNow.AddSeconds(5);
cache.AddOrGetExisting(hash, string.Empty, policy);

TTL could probably be 1 second as well since that is the refresh interval I currently have configured on the index.