Skip to content

Commit 2c0f6d8

Browse files
committed
refactor the helpers a bit for more consistent errors
Now every error (either returned by streaming_bulk or collected by bulk) contains the original data as well
1 parent a16679b commit 2c0f6d8

File tree

1 file changed

+18
-11
lines changed

1 file changed

+18
-11
lines changed

elasticsearch/helpers/__init__.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,12 @@ def _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception=True
120120
return
121121

122122
# go through request-reponse pairs and detect failures
123-
for op_type, item in map(methodcaller('popitem'), resp['items']):
123+
for data, (op_type, item) in zip(bulk_data, map(methodcaller('popitem'), resp['items'])):
124124
ok = 200 <= item.get('status', 500) < 300
125125
if not ok and raise_on_error:
126+
# include original document source
127+
if len(data) > 1:
128+
item['data'] = data[1]
126129
errors.append({op_type: item})
127130

128131
if ok or not errors:
@@ -168,6 +171,7 @@ def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1024 *
168171
retry. Any subsequent retries will be powers of ``inittial_backoff *
169172
2**retry_number``
170173
:arg max_backoff: maximum number of seconds a retry will wait
174+
:arg yield_ok: if set to False will skip successful documents in the output
171175
"""
172176
actions = map(expand_action_callback, actions)
173177

@@ -184,20 +188,20 @@ def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1024 *
184188
for data, (ok, info) in zip(
185189
bulk_data,
186190
_process_bulk_chunk(client, bulk_actions, bulk_data,
187-
raise_on_exception=raise_on_exception,
188-
raise_on_error=raise_on_error, **kwargs)
191+
raise_on_exception,
192+
raise_on_error, **kwargs)
189193
):
190194

191195
if not ok:
192196
action, info = info.popitem()
193197
# retry if retries enabled, we get 429, and we are not
194198
# in the last attempt
195-
if max_retries and info['status'] == 429 and (attempt+1) <= max_retries:
199+
if max_retries \
200+
and info['status'] == 429 \
201+
and (attempt+1) <= max_retries:
196202
to_retry.extend(data)
197203
to_retry_data.append(data)
198204
else:
199-
if action != 'delete':
200-
info['data'] = data[1]
201205
yield ok, {action: info}
202206
elif yield_ok:
203207
yield ok, info
@@ -210,8 +214,7 @@ def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1024 *
210214
if not to_retry:
211215
break
212216
# retry only subset of documents that didn't succeed
213-
bulk_actions = to_retry
214-
bulk_data = to_retry_data
217+
bulk_actions, bulk_data = to_retry, to_retry_data
215218

216219

217220
def bulk(client, actions, stats_only=False, **kwargs):
@@ -225,9 +228,12 @@ def bulk(client, actions, stats_only=False, **kwargs):
225228
options like ``stats_only`` only apply when ``raise_on_error`` is set to
226229
``False``.
227230
231+
When errors are being collected original document data is included in the
232+
error dictionary which can lead to an extra high memory usage. If you need
233+
to process a lot of data and want to ignore/collect errors please consider
234+
using the :func:`~elasticsearch.helpers.streaming_bulk` helper which will
235+
just return the errors and not store them in memory.
228236
229-
See :func:`~elasticsearch.helpers.streaming_bulk` for more accepted
230-
parameters
231237
232238
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
233239
:arg actions: iterator containing the actions
@@ -236,7 +242,8 @@ def bulk(client, actions, stats_only=False, **kwargs):
236242
237243
Any additional keyword arguments will be passed to
238244
:func:`~elasticsearch.helpers.streaming_bulk` which is used to execute
239-
the operation.
245+
the operation, see :func:`~elasticsearch.helpers.streaming_bulk` for more
246+
accepted parameters.
240247
"""
241248
success, failed = 0, 0
242249

0 commit comments

Comments
 (0)