Skip to content

Commit 8f73594

Browse files
committed
Tests for retry logic in streaming_bulk
1 parent ba8772a commit 8f73594

File tree

2 files changed

+50
-8
lines changed

2 files changed

+50
-8
lines changed

elasticsearch/helpers/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception=True
9292

9393
try:
9494
# send the actual request
95-
resp = client.bulk('\n'.join(map(client.transport.serializer.dumps, bulk_actions)) + '\n', **kwargs)
95+
resp = client.bulk('\n'.join(bulk_actions) + '\n', **kwargs)
9696
except TransportError as e:
9797
# default behavior - just propagate exception
9898
if raise_on_exception:
@@ -104,7 +104,7 @@ def _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception=True
104104

105105
for data in bulk_data:
106106
# collect all the information about failed actions
107-
op_type, action = data[0].popitem()
107+
op_type, action = data[0].copy().popitem()
108108
info = {"error": err_message, "status": e.status_code, "exception": e}
109109
if op_type != 'delete':
110110
info['data'] = data[1]
@@ -199,7 +199,9 @@ def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1024 *
199199
if max_retries \
200200
and info['status'] == 429 \
201201
and (attempt+1) <= max_retries:
202-
to_retry.extend(data)
202+
# _process_bulk_chunk expects strings so we need to
203+
# re-serialize the data
204+
to_retry.extend(map(client.transport.serializer.dumps, data))
203205
to_retry_data.append(data)
204206
else:
205207
yield ok, {action: info}

test_elasticsearch/test_server/test_helpers.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@
55

66

77
class FailingBulkClient(object):
8-
def __init__(self, client, fail_at=1):
8+
def __init__(self, client, fail_at=(2, ), fail_with=TransportError(599, "Error!", {})):
99
self.client = client
10-
self._called = -1
10+
self._called = 0
1111
self._fail_at = fail_at
1212
self.transport = client.transport
13+
self._fail_with = fail_with
1314

1415
def bulk(self, *args, **kwargs):
1516
self._called += 1
16-
if self._called == self._fail_at:
17-
raise TransportError(599, "Error!", {})
17+
if self._called in self._fail_at:
18+
raise self._fail_with
1819
return self.client.bulk(*args, **kwargs)
1920

2021
class TestStreamingBulk(ElasticsearchTestCase):
@@ -87,7 +88,6 @@ def test_transport_error_can_becaught(self):
8788
'_index': 'i',
8889
'_type': 't',
8990
'_id': 45,
90-
9191
'data': {'f': 'v'},
9292
'error': "TransportError(599, 'Error!')",
9393
'status': 599
@@ -96,6 +96,46 @@ def test_transport_error_can_becaught(self):
9696
results[1][1]
9797
)
9898

99+
def test_rejected_documents_are_retried(self):
100+
failing_client = FailingBulkClient(self.client, fail_with=TransportError(429, 'Rejected!', {}))
101+
docs = [
102+
{'_index': 'i', '_type': 't', '_id': 47, 'f': 'v'},
103+
{'_index': 'i', '_type': 't', '_id': 45, 'f': 'v'},
104+
{'_index': 'i', '_type': 't', '_id': 42, 'f': 'v'},
105+
]
106+
results = list(helpers.streaming_bulk(failing_client, docs,
107+
raise_on_exception=False,
108+
raise_on_error=False,
109+
chunk_size=1, max_retries=1,
110+
initial_backoff=0))
111+
self.assertEquals(3, len(results))
112+
self.assertEquals([True, True, True], [r[0] for r in results])
113+
self.client.indices.refresh(index='i')
114+
res = self.client.search(index='i')
115+
self.assertEquals(3, res['hits']['total'])
116+
self.assertEquals(4, failing_client._called)
117+
118+
def test_rejected_documents_are_retried_at_most_max_retries_times(self):
119+
failing_client = FailingBulkClient(self.client, fail_at=(1, 2, ),
120+
fail_with=TransportError(429, 'Rejected!', {}))
121+
122+
docs = [
123+
{'_index': 'i', '_type': 't', '_id': 47, 'f': 'v'},
124+
{'_index': 'i', '_type': 't', '_id': 45, 'f': 'v'},
125+
{'_index': 'i', '_type': 't', '_id': 42, 'f': 'v'},
126+
]
127+
results = list(helpers.streaming_bulk(failing_client, docs,
128+
raise_on_exception=False,
129+
raise_on_error=False,
130+
chunk_size=1, max_retries=1,
131+
initial_backoff=0))
132+
self.assertEquals(3, len(results))
133+
self.assertEquals([False, True, True], [r[0] for r in results])
134+
self.client.indices.refresh(index='i')
135+
res = self.client.search(index='i')
136+
self.assertEquals(2, res['hits']['total'])
137+
self.assertEquals(4, failing_client._called)
138+
99139

100140
class TestBulk(ElasticsearchTestCase):
101141
def test_bulk_works_with_single_item(self):

0 commit comments

Comments
 (0)