Bug 1491401 - Many duplicate index entries when logging under stress (bulk queue rejections present)
Summary: Many duplicate index entries when logging under stress (bulk queue rejections...
Keywords:
Status: CLOSED DUPLICATE of bug 1548104
Alias: None
Product: OpenShift Container Platform
Classification: Red Hat
Component: Logging
Version: 3.6.1
Hardware: x86_64
OS: Linux
unspecified
high
Target Milestone: ---
: 3.9.z
Assignee: Rich Megginson
QA Contact: Anping Li
URL:
Whiteboard: aos-scalability-39
Depends On:
Blocks:
TreeView+ depends on / blocked
 
Reported: 2017-09-13 16:58 UTC by Mike Fiedler
Modified: 2018-04-11 20:43 UTC (History)
6 users (show)

Fixed In Version:
Doc Type: If docs needed, set a value
Doc Text:
Clone Of:
Environment:
Last Closed: 2018-04-11 20:43:03 UTC
Target Upstream Version:
Embargoed:


Attachments (Terms of Use)

Description Mike Fiedler 2017-09-13 16:58:22 UTC
Description of problem:

200 fluentd nodes
non-mux configuration - 3 ES nodes, 200 fluentd, 1 kibana, 1 curator, 0 mux
logging v3.6.173.0.32
ES CPU limit is 4
bulk.queueSize is 50 (OOTB setting)

While incrementally pushing the logging traffic higher, I reach a point where the bulk request queues fill up and bulk.rejected starts to happen.  

In the logging-fluentd logs I see

2017-09-13 15:20:55 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2017-09-13 15:21:12 +0000 error_class="Fluent::ElasticsearchOutput::BulkIndexQueueFull" error="Bulk index queue is full, retrying" plugin_id="object:10cd6a8"
  2017-09-13 15:20:55 +0000 [warn]: suppressed same stacktrace
2017-09-13 15:21:27 +0000 [warn]: retry succeeded. plugin_id="object:10cd6a8"


Which is great - retries are succeeding.   However I am seeing more than double the number of entries which should show up in some indices when the run is over.  Count shows a much larger number than expected while Unique Count shows the correct value.

Example:  200 logging projects/namespaces each logging 45,0000 total messages at an individual rate of 50 messages per second.   Total throughput for the cluster is 10,000 messages per second.    

During the run the bulk request queues stay steady at 50 and bulk.rejected are recorded.

After the run completes, just about every index has more than 75,000 entries.  One example is a project that shows a Count in kibana of 107,561 but when I change it to Unique Count, it shows 45,561.

I am deleting all test project indices before each run.

When logging at a lower level, these duplicates do not happen.   Example, logging 22500 messages at 25 messages second results in 22,500 or 22,501 index entries.

I will try with larger bulk queue and remove the ES cpu limit and report back.

Version-Release number of selected component (if applicable): logging v3.6.173.0.32


How reproducible:  Always at message rates which cause the bulk request queue to overflow


Steps to Reproduce:
1.  200 node cluster with inventory installed using the playbook below (non-mux)
2.  Create 200 projects with ocp-logtest pods (https://github.com/openshift/svt/blob/master/openshift_scalability/content/logtest/ocp_logtest-README.md)
3.  Configure the logtest pods to run 45K messages at 50 messages/second (- INITIAL_FLAGS: "-n 45000 --line-length 1024 --word-length 7 --rate 3000 --fixed-line")
4. Run the pods.   

Actual results:

Inspect the indices in ES with curl or kibana.  Each project index will contain duplicate messages


Expected results:

No duplicates (my assumption - are duplicates acceptable? - I would think not where counts are significant)

Additional info:

[oo_first_master]
ip-172-31-24-59

[oo_first_master:vars]
openshift_deployment_type=openshift-enterprise
openshift_release=v3.6.0

openshift_logging_install_logging=true
openshift_logging_master_url=https://ec2-54-186-107-126.us-west-2.compute.amazonaws.com:8443
openshift_logging_master_public_url=https://ec2-54-186-107-126.us-west-2.compute.amazonaws.com:8443
openshift_logging_kibana_hostname=kibana.0907-ihc.qe.rhcloud.com
openshift_logging_namespace=logging
openshift_logging_image_prefix=registry.ops.openshift.com/openshift3/
openshift_logging_image_version=v3.6.173.0.32
openshift_logging_es_cluster_size=3
openshift_logging_es_pvc_dynamic=true
openshift_logging_es_pvc_size=50Gi
openshift_logging_fluentd_use_journal=true
openshift_logging_fluentd_read_from_head=false
openshift_logging_use_mux=false
openshift_logging_use_ops=false

openshift_logging_es_cpu_limit=4000m
openshift_logging_fluentd_cpu_limit=500m
openshift_logging_kibana_cpu_limit=200m
openshift_logging_kibana_proxy_cpu_limit=100m
openshift_logging_es_memory_limit=9Gi
openshift_logging_fluentd_memory_limit=512Mi
openshift_logging_kibana_memory_limit=1Gi
openshift_logging_kibana_proxy_memory_limit=256Mi

Comment 1 Rich Megginson 2017-09-13 17:26:58 UTC
tl;dr we made a tradeoff between losing data and additional complexity, and decided that for now, we did not want to lose data, even at the risk of having duplicates.

When there is a failure with the bulk index operation, that means there is at least 1 sub-operation that failed.  Then it will retry the bulk index operation.  However, it retries, it retries _the entire chunk, even the operations that succeeded.

We use the "index" operation with no "_id" field: https://github.com/uken/fluent-plugin-elasticsearch/blob/master/lib/fluent/plugin/out_elasticsearch.rb#L43

 config_param :write_operation, :string, :default => "index"

https://github.com/uken/fluent-plugin-elasticsearch/blob/master/lib/fluent/plugin/out_elasticsearch.rb#L252

https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html

{ "index": { "_index": "project.namespace....", "_type": "com.redhat....." }}
{ "message": "the message", "@timestamp": ....., .... }

Elasticsearch will create the unique "_id" field for the record.

There are a few ways we could handle this.

- remove the successful records from the chunk so that the retry logic will not add the data again

This would be quite tricky to get right.  We get the data in the plugin as a msgpack encoded blob, so we would have to figure out how to update it in place to remove those records from the blob.

- use the "create" write_operation, and generate a uniqueid based on the data.  E.g. we would set the "_id" field based on the hash of the record.

We would need to figure out some way to prevent hash collisions.  Also, https://www.elastic.co/guide/en/elasticsearch/guide/2.x/indexing-performance.html#_other

"If you are using your own ID, try to pick an ID that is friendly to Lucene. Examples include zero-padded sequential IDs, UUID-1, and nanotime; these IDs have consistent, sequential patterns that compress well. In contrast, IDs such as UUID-4 are essentially random, which offer poor compression and slow down Lucene."

If we do it this way, we would have to use a hash, or some other uuid that can be derived only from the data in the record (so that we generate the exact same value upon the retry), and is unique (no other record can have the same value).

Comment 2 Mike Fiedler 2017-09-13 18:08:59 UTC
1. Removed ES cpu and memory limits (the cpu limit was hit in the original run which likely contributed to bulk queue overflow)
2. Raised the bulk queue to 200

At the message rate described originally (50/second/node x 1024Kb x 200 nodes), no duplicate indices were seen.  No data loss occurred.   Will continue pushing to higher rates.

Comment 3 Mike Fiedler 2017-09-13 18:11:19 UTC
Re: comment 2.  "No duplicate indices" should be "no duplicate messages in indices"

Comment 4 Rich Megginson 2017-09-14 20:09:05 UTC
Another possible solution

- Tell Elasticsearch to fail _all_ operations in the bulk index request if _any_ of the operations fail e.g. treat the entire bulk index request as a transaction, and once one of them fails, rollback the previous ones and return an error

Not sure if Elasticsearch has transactional support, or if there is some other similar semantic

Comment 5 Peter Portante 2017-09-15 04:07:08 UTC
+1 for the use of the "create" write id and generate the _id field.

Comment 6 Rich Megginson 2017-09-15 15:39:04 UTC
(In reply to Peter Portante from comment #5)
> +1 for the use of the "create" write id and generate the _id field.

Could you give us some guidance about how to do that, in a way that will abide by the guidelines given at https://www.elastic.co/guide/en/elasticsearch/guide/2.x/indexing-performance.html#_other ?

This will cause another problem - a retry will attempt to resubmit operations that have already succeeded, and we will have to handle the case where we get an error return of "document already exists".

Comment 7 Rich Megginson 2017-09-20 17:19:45 UTC
What we probably should have done in the first place is to take a look at the canonical Elasticsearch output plugin in logstash.

https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/master/docs/index.asciidoc

Although it doesn't look like they have any interesting, robust bulk index rejection handling:

https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/master/docs/index.asciidoc#retry-policy

they have a "dead letter queue" for records which could not be submitted to Elasticsearch: https://github.com/elastic/logstash/blob/master/docs/static/dead-letter-queues.asciidoc and (image link is broken, use this) https://github.com/elastic/logstash/blob/master/docs/static/images/dead_letter_queue.png

I also don't see if/where/how they recommend setting the "_id" field to avoid duplicates when bulk index retries are submitted.  Are these returned with error 409?

"Note that 409 exceptions are no longer retried. Please set a higher retry_on_conflict value if you experience 409 exceptions. It is more performant for Elasticsearch to retry these exceptions than this plugin."

Comment 8 Lukas Vlcek 2017-09-25 10:22:04 UTC
As for https://bugzilla.redhat.com/show_bug.cgi?id=1491401#c4
This is not supported by Elasticsearch. There is no bulk transaction notion.

> I also don't see if/where/how they recommend setting the "_id" field to avoid duplicates when bulk index retries are submitted.

From bulk docs (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html):
'create' will fail if a document with the same index and type exists already, whereas 'index' will add or replace a document as necessary.
If we had an _id field we could use the 'index' operation and no "document already exists" should be raised.

Comment 9 Peter Portante 2017-09-25 13:41:31 UTC
(In reply to Lukas Vlcek from comment #8)
> As for https://bugzilla.redhat.com/show_bug.cgi?id=1491401#c4
> This is not supported by Elasticsearch. There is no bulk transaction notion.
> 
> > I also don't see if/where/how they recommend setting the "_id" field to avoid duplicates when bulk index retries are submitted.
> 
> From bulk docs
> (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html):
> 'create' will fail if a document with the same index and type exists
> already, whereas 'index' will add or replace a document as necessary.

These seems like it should read, "'create' will fail if a document with the same _id and type exists already, whereas 'index' will add or replace a document as necessary."

Otherwise, a 'create' operation would only ever index one document, if I understand that line correctly.

> If we had an _id field we could use the 'index' operation and no "document
> already exists" should be raised.

Actually, we don't want document replacement to occur gratuitously.  That is very expensive in Lucene, since replacing/deleting a document implies re-writing the index *without* that document.  Lucene indices are immutable.

Comment 10 Rich Megginson 2017-09-25 14:02:39 UTC
Right.  We need

- a way to generate an _id field that is fast, unique, and is Lucene friendly - maybe something like Hash(record) + Random(8) + Timestamp(usec)

Comment 11 Lukas Vlcek 2017-09-25 14:11:11 UTC
> Actually, we don't want document replacement to occur gratuitously.  That is very expensive in Lucene, since replacing/deleting a document implies re-writing the index *without* that document.  Lucene indices are immutable.

Correct, Lucene indices are immutable ... up to deletes.

When document in ES is updated then what happens is that old document is marked as deleted in existing Lucene segment (it is a bitset flag change in Lucene segment) and a new document is written into a new Lucene segment (this is the replacement). From that point Lucene (and ES as well) filter out all deleted documents from necessary operations at low level. The old documents are physically removed when Lucene segments are merged. Document update does not mean segment merge is started immediately.

Mike wrote blog about this 2yrs ago (things might have changes since then): https://www.elastic.co/blog/lucenes-handling-of-deleted-documents

I think having few deleted documents here and there is perfectly ok. Unless we are talking about some pathological case (like having more than 50% deleted docs per segment). Of course, if we can replay just a single failed doc from bulk request instead of whole bulk requests, that would be much much better.

Comment 12 Peter Portante 2017-09-25 14:34:24 UTC
It seems like the case we are worried about is re-issuing a failed bulk request without causing duplicates. In that case, a bulk request can contain thousands of documents, so seems like a bad idea to consider this method.

Comment 13 Rich Megginson 2017-09-25 15:24:39 UTC
(In reply to Peter Portante from comment #12)
> It seems like the case we are worried about is re-issuing a failed bulk
> request without causing duplicates. In that case, a bulk request can contain
> thousands of documents, so seems like a bad idea to consider this method.

What case should we be worried about?

Comment 14 Peter Portante 2017-09-25 16:02:16 UTC
(In reply to Rich Megginson from comment #13)
> What case should we be worried about?

We need to handle the case where fluentd re-issues a failed bulk request, such that the re-issued bulk request will not result in duplicate documents being created.

Since a bulk request can contain thousands of documents, it seems like a bad idea to consider using the 'index' operation where documents with the same _id field would be updated, since that will involve many Lucene delete operations (which are bad).

Instead, if we consider using a 'create' operation with proper '_id' fields for each record, then duplicates will be reported back to fluentd, where they can be ignored, which is much more performant.

Comment 15 Rich Megginson 2017-09-25 16:20:33 UTC
Right - use create with the _id field - that's why we need to figure out how to generate the _id field that is fast, unique, and Lucene friendly - maybe something like Hash(record) + Random(8) + Timestamp(usec)

Comment 16 Peter Portante 2017-11-15 14:20:59 UTC
Doesn't it mean that using Random(8) means we can't recreate the hash?  Shouldn't the hash of the record should be the hash of its contents (or some defined sub-set of its contents)?  And shouldn't it be possible for somebody to take the _source record and apply the same algorithm to arrive at the hash?

Using Random(8) seems to defeat that.

Comment 17 Lukas Vlcek 2017-11-16 08:55:37 UTC
For pragmatic reasons I would prefer repeatability of _id generation. Ie what Peter talks about. However, this requires _source enabled and we need to keep in mind that in some cases (or generally in the future) the _source may not be enabled in our data. In which case the repeatability benefit is gone.

Comment 19 Jeff Cantrill 2018-04-11 20:43:03 UTC
https://bugzilla.redhat.com/show_bug.cgi?id=1548104

*** This bug has been marked as a duplicate of bug 1548104 ***


Note You need to log in before you can comment on or make changes to this bug.