Skip to content

Commit 95cf68a

Browse files
committed
Merge branch 'backoff'
2 parents 57362aa + 8f73594 commit 95cf68a

File tree

3 files changed

+138
-37
lines changed

3 files changed

+138
-37
lines changed

elasticsearch/helpers/__init__.py

Lines changed: 92 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import logging
44
from operator import methodcaller
5+
import time
56

67
from ..exceptions import ElasticsearchException, TransportError
78
from ..compat import map, string_types, Queue
@@ -52,9 +53,10 @@ def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
5253
Split actions into chunks by number or size, serialize them into strings in
5354
the process.
5455
"""
55-
bulk_actions = []
56+
bulk_actions, bulk_data = [], []
5657
size, action_count = 0, 0
5758
for action, data in actions:
59+
raw_data, raw_action = data, action
5860
action = serializer.dumps(action)
5961
cur_size = len(action) + 1
6062

@@ -64,20 +66,24 @@ def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
6466

6567
# full chunk, send it and start a new one
6668
if bulk_actions and (size + cur_size > max_chunk_bytes or action_count == chunk_size):
67-
yield bulk_actions
68-
bulk_actions = []
69+
yield bulk_data, bulk_actions
70+
bulk_actions, bulk_data = [], []
6971
size, action_count = 0, 0
7072

7173
bulk_actions.append(action)
7274
if data is not None:
7375
bulk_actions.append(data)
76+
bulk_data.append((raw_action, raw_data))
77+
else:
78+
bulk_data.append((raw_action, ))
79+
7480
size += cur_size
7581
action_count += 1
7682

7783
if bulk_actions:
78-
yield bulk_actions
84+
yield bulk_data, bulk_actions
7985

80-
def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_error=True, **kwargs):
86+
def _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception=True, raise_on_error=True, **kwargs):
8187
"""
8288
Send a bulk request to elasticsearch and process the output.
8389
"""
@@ -96,22 +102,14 @@ def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_
96102
err_message = str(e)
97103
exc_errors = []
98104

99-
# deserialize the data back, thisis expensive but only run on
100-
# errors if raise_on_exception is false, so shouldn't be a real
101-
# issue
102-
bulk_data = map(client.transport.serializer.loads, bulk_actions)
103-
while True:
104-
try:
105-
# collect all the information about failed actions
106-
action = next(bulk_data)
107-
op_type, action = action.popitem()
108-
info = {"error": err_message, "status": e.status_code, "exception": e}
109-
if op_type != 'delete':
110-
info['data'] = next(bulk_data)
111-
info.update(action)
112-
exc_errors.append({op_type: info})
113-
except StopIteration:
114-
break
105+
for data in bulk_data:
106+
# collect all the information about failed actions
107+
op_type, action = data[0].copy().popitem()
108+
info = {"error": err_message, "status": e.status_code, "exception": e}
109+
if op_type != 'delete':
110+
info['data'] = data[1]
111+
info.update(action)
112+
exc_errors.append({op_type: info})
115113

116114
# emulate standard behavior for failed actions
117115
if raise_on_error:
@@ -122,9 +120,12 @@ def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_
122120
return
123121

124122
# go through request-reponse pairs and detect failures
125-
for op_type, item in map(methodcaller('popitem'), resp['items']):
123+
for data, (op_type, item) in zip(bulk_data, map(methodcaller('popitem'), resp['items'])):
126124
ok = 200 <= item.get('status', 500) < 300
127125
if not ok and raise_on_error:
126+
# include original document source
127+
if len(data) > 1:
128+
item['data'] = data[1]
128129
errors.append({op_type: item})
129130

130131
if ok or not errors:
@@ -136,15 +137,22 @@ def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_
136137
raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
137138

138139
def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1024 * 1024,
139-
raise_on_error=True, expand_action_callback=expand_action,
140-
raise_on_exception=True, **kwargs):
140+
raise_on_error=True, expand_action_callback=expand_action,
141+
raise_on_exception=True, max_retries=0, initial_backoff=2,
142+
max_backoff=600, yield_ok=True, **kwargs):
143+
141144
"""
142145
Streaming bulk consumes actions from the iterable passed in and yields
143146
results per action. For non-streaming usecases use
144147
:func:`~elasticsearch.helpers.bulk` which is a wrapper around streaming
145148
bulk that returns summary information about the bulk operation once the
146149
entire input is consumed and sent.
147150
151+
If you specify ``max_retries`` it will also retry any documents that were
152+
rejected with a ``429`` status code. To do this it will wait (**by calling
153+
time.sleep which will block**) for ``initial_backoff`` seconds and then,
154+
every subsequent rejection for the same chunk, for double the time every
155+
time up to ``max_backoff`` seconds.
148156
149157
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
150158
:arg actions: iterable containing the actions to be executed
@@ -157,12 +165,59 @@ def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1024 *
157165
:arg expand_action_callback: callback executed on each action passed in,
158166
should return a tuple containing the action line and the data line
159167
(`None` if data line should be omitted).
168+
:arg max_retries: maximum number of times a document will be retried when
169+
``429`` is received, set to 0 (default) for no retires on ``429``
170+
:arg initial_backoff: number of seconds we should wait before the first
171+
retry. Any subsequent retries will be powers of ``inittial_backoff *
172+
2**retry_number``
173+
:arg max_backoff: maximum number of seconds a retry will wait
174+
:arg yield_ok: if set to False will skip successful documents in the output
160175
"""
161176
actions = map(expand_action_callback, actions)
162177

163-
for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
164-
for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
165-
yield result
178+
for bulk_data, bulk_actions in _chunk_actions(actions, chunk_size,
179+
max_chunk_bytes,
180+
client.transport.serializer):
181+
182+
for attempt in range(max_retries + 1):
183+
to_retry, to_retry_data = [], []
184+
if attempt:
185+
time.sleep(min(max_backoff, initial_backoff * 2**(attempt-1)))
186+
187+
try:
188+
for data, (ok, info) in zip(
189+
bulk_data,
190+
_process_bulk_chunk(client, bulk_actions, bulk_data,
191+
raise_on_exception,
192+
raise_on_error, **kwargs)
193+
):
194+
195+
if not ok:
196+
action, info = info.popitem()
197+
# retry if retries enabled, we get 429, and we are not
198+
# in the last attempt
199+
if max_retries \
200+
and info['status'] == 429 \
201+
and (attempt+1) <= max_retries:
202+
# _process_bulk_chunk expects strings so we need to
203+
# re-serialize the data
204+
to_retry.extend(map(client.transport.serializer.dumps, data))
205+
to_retry_data.append(data)
206+
else:
207+
yield ok, {action: info}
208+
elif yield_ok:
209+
yield ok, info
210+
211+
except TransportError as e:
212+
# suppress 429 errors since we will retry them
213+
if not max_retries or e.status_code != 429:
214+
raise
215+
else:
216+
if not to_retry:
217+
break
218+
# retry only subset of documents that didn't succeed
219+
bulk_actions, bulk_data = to_retry, to_retry_data
220+
166221

167222
def bulk(client, actions, stats_only=False, **kwargs):
168223
"""
@@ -175,9 +230,12 @@ def bulk(client, actions, stats_only=False, **kwargs):
175230
options like ``stats_only`` only apply when ``raise_on_error`` is set to
176231
``False``.
177232
233+
When errors are being collected original document data is included in the
234+
error dictionary which can lead to an extra high memory usage. If you need
235+
to process a lot of data and want to ignore/collect errors please consider
236+
using the :func:`~elasticsearch.helpers.streaming_bulk` helper which will
237+
just return the errors and not store them in memory.
178238
179-
See :func:`~elasticsearch.helpers.streaming_bulk` for more accepted
180-
parameters
181239
182240
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
183241
:arg actions: iterator containing the actions
@@ -186,13 +244,16 @@ def bulk(client, actions, stats_only=False, **kwargs):
186244
187245
Any additional keyword arguments will be passed to
188246
:func:`~elasticsearch.helpers.streaming_bulk` which is used to execute
189-
the operation.
247+
the operation, see :func:`~elasticsearch.helpers.streaming_bulk` for more
248+
accepted parameters.
190249
"""
191250
success, failed = 0, 0
192251

193252
# list of errors to be collected is not stats_only
194253
errors = []
195254

255+
# make streaming_bulk yield successful results so we can count them
256+
kwargs['yield_ok'] = True
196257
for ok, item in streaming_bulk(client, actions, **kwargs):
197258
# go through request-reponse pairs and detect failures
198259
if not ok:
@@ -241,7 +302,7 @@ def _setup_queues(self):
241302

242303
try:
243304
for result in pool.imap(
244-
lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)),
305+
lambda bulk_chunk: list(_process_bulk_chunk(client, bulk_chunk[1], bulk_chunk[0], **kwargs)),
245306
_chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
246307
):
247308
for item in result:

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
f.close()
1313

1414
install_requires = [
15-
'urllib3>=1.8, <2.0',
15+
'urllib3<1.22,>=1.21.1',
1616
]
1717
tests_require = [
1818
'requests>=2.0.0, <3.0.0',

test_elasticsearch/test_server/test_helpers.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@
55

66

77
class FailingBulkClient(object):
8-
def __init__(self, client, fail_at=1):
8+
def __init__(self, client, fail_at=(2, ), fail_with=TransportError(599, "Error!", {})):
99
self.client = client
10-
self._called = -1
10+
self._called = 0
1111
self._fail_at = fail_at
1212
self.transport = client.transport
13+
self._fail_with = fail_with
1314

1415
def bulk(self, *args, **kwargs):
1516
self._called += 1
16-
if self._called == self._fail_at:
17-
raise TransportError(599, "Error!", {})
17+
if self._called in self._fail_at:
18+
raise self._fail_with
1819
return self.client.bulk(*args, **kwargs)
1920

2021
class TestStreamingBulk(ElasticsearchTestCase):
@@ -87,7 +88,6 @@ def test_transport_error_can_becaught(self):
8788
'_index': 'i',
8889
'_type': 't',
8990
'_id': 45,
90-
9191
'data': {'f': 'v'},
9292
'error': "TransportError(599, 'Error!')",
9393
'status': 599
@@ -96,6 +96,46 @@ def test_transport_error_can_becaught(self):
9696
results[1][1]
9797
)
9898

99+
def test_rejected_documents_are_retried(self):
100+
failing_client = FailingBulkClient(self.client, fail_with=TransportError(429, 'Rejected!', {}))
101+
docs = [
102+
{'_index': 'i', '_type': 't', '_id': 47, 'f': 'v'},
103+
{'_index': 'i', '_type': 't', '_id': 45, 'f': 'v'},
104+
{'_index': 'i', '_type': 't', '_id': 42, 'f': 'v'},
105+
]
106+
results = list(helpers.streaming_bulk(failing_client, docs,
107+
raise_on_exception=False,
108+
raise_on_error=False,
109+
chunk_size=1, max_retries=1,
110+
initial_backoff=0))
111+
self.assertEquals(3, len(results))
112+
self.assertEquals([True, True, True], [r[0] for r in results])
113+
self.client.indices.refresh(index='i')
114+
res = self.client.search(index='i')
115+
self.assertEquals(3, res['hits']['total'])
116+
self.assertEquals(4, failing_client._called)
117+
118+
def test_rejected_documents_are_retried_at_most_max_retries_times(self):
119+
failing_client = FailingBulkClient(self.client, fail_at=(1, 2, ),
120+
fail_with=TransportError(429, 'Rejected!', {}))
121+
122+
docs = [
123+
{'_index': 'i', '_type': 't', '_id': 47, 'f': 'v'},
124+
{'_index': 'i', '_type': 't', '_id': 45, 'f': 'v'},
125+
{'_index': 'i', '_type': 't', '_id': 42, 'f': 'v'},
126+
]
127+
results = list(helpers.streaming_bulk(failing_client, docs,
128+
raise_on_exception=False,
129+
raise_on_error=False,
130+
chunk_size=1, max_retries=1,
131+
initial_backoff=0))
132+
self.assertEquals(3, len(results))
133+
self.assertEquals([False, True, True], [r[0] for r in results])
134+
self.client.indices.refresh(index='i')
135+
res = self.client.search(index='i')
136+
self.assertEquals(2, res['hits']['total'])
137+
self.assertEquals(4, failing_client._called)
138+
99139

100140
class TestBulk(ElasticsearchTestCase):
101141
def test_bulk_works_with_single_item(self):

0 commit comments

Comments
 (0)