11from __future__ import unicode_literals
22
33import logging
4- from itertools import islice
54from operator import methodcaller
65
76from ..exceptions import ElasticsearchException , TransportError
@@ -40,22 +39,35 @@ def expand_action(data):
4039
4140 return action , data .get ('_source' , data )
4241
43- def _chunk_actions (actions , chunk_size , serializer ):
44- while True :
45- bulk_actions = []
46- for action , data in islice (actions , chunk_size ):
47- bulk_actions .append (serializer .dumps (action ))
48- if data is not None :
49- bulk_actions .append (serializer .dumps (data ))
50-
51- if not bulk_actions :
52- return
53-
42+ def _chunk_actions (actions , chunk_size , max_chunk_bytes , serializer ):
43+ bulk_actions = []
44+ size , action_count = 0 , 0
45+ for action , data in actions :
46+ action = serializer .dumps (action )
47+ cur_size = len (action ) + 1
48+
49+ if data is not None :
50+ data = serializer .dumps (data )
51+ cur_size += len (data ) + 1
52+
53+ # full chunk, send it and start a new one
54+ if bulk_actions and (size + cur_size > max_chunk_bytes or action_count == chunk_size ):
55+ yield bulk_actions
56+ bulk_actions = []
57+ size , action_count = 0 , 0
58+
59+ bulk_actions .append (action )
60+ if data is not None :
61+ bulk_actions .append (data )
62+ size += cur_size
63+ action_count += 1
64+
65+ if bulk_actions :
5466 yield bulk_actions
5567
56- def streaming_bulk (client , actions , chunk_size = 500 , raise_on_error = True ,
57- expand_action_callback = expand_action , raise_on_exception = True ,
58- ** kwargs ):
68+ def streaming_bulk (client , actions , chunk_size = 500 , max_chunk_bytes = 100 * 1014 * 1024 ,
69+ raise_on_error = True , expand_action_callback = expand_action ,
70+ raise_on_exception = True , ** kwargs ):
5971 """
6072 Streaming bulk consumes actions from the iterable passed in and yields
6173 results per action. For non-streaming usecases use
@@ -101,6 +113,7 @@ def streaming_bulk(client, actions, chunk_size=500, raise_on_error=True,
101113 :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
102114 :arg actions: iterable containing the actions to be executed
103115 :arg chunk_size: number of docs in one chunk sent to es (default: 500)
116+ :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
104117 :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
105118 from the execution of the last chunk when some occur. By default we raise.
106119 :arg raise_on_exception: if ``False`` then don't propagate exceptions from
@@ -115,7 +128,7 @@ def streaming_bulk(client, actions, chunk_size=500, raise_on_error=True,
115128 # if raise on error is set, we need to collect errors per chunk before raising them
116129 errors = []
117130
118- for bulk_actions in _chunk_actions (actions , chunk_size , serializer ):
131+ for bulk_actions in _chunk_actions (actions , chunk_size , max_chunk_bytes , serializer ):
119132 try :
120133 # send the actual request
121134 resp = client .bulk ('\n ' .join (bulk_actions ) + '\n ' , ** kwargs )
0 commit comments