@@ -46,7 +46,6 @@ def bulk_index(client, docs, chunk_size=500, stats_only=False, raise_on_error=Fa
4646 while True :
4747 chunk = islice (docs , chunk_size )
4848 bulk_actions = []
49- ndocs = 0
5049 for d in chunk :
5150 action = {'index' : {}}
5251 for key in ('_index' , '_parent' , '_percolate' , '_routing' ,
@@ -56,25 +55,25 @@ def bulk_index(client, docs, chunk_size=500, stats_only=False, raise_on_error=Fa
5655
5756 bulk_actions .append (action )
5857 bulk_actions .append (d .get ('_source' , d ))
59- ndocs += 1
6058
6159 if not bulk_actions :
6260 return success , failed if stats_only else errors
6361
62+ # send the actual request
6463 resp = client .bulk (bulk_actions , ** kwargs )
6564
65+ # go through request-reponse pairs and detect failures
6666 for req , item in zip (bulk_actions [::2 ], resp ['items' ]):
6767 ok = item ['index' if '_id' in req ['index' ] else 'create' ].get ('ok' )
6868 if not ok :
69- if stats_only :
70- failed += 1
71- else :
69+ if not stats_only :
7270 errors .append (item )
71+ failed += 1
7372 else :
7473 success += 1
7574
76- if ( failed or errors ) and raise_on_error :
77- raise BulkIndexError (failed , errors )
75+ if failed and raise_on_error :
76+ raise BulkIndexError ('%i document(s) failed to index.' % failed , errors )
7877
7978def scan (client , query = None , scroll = '5m' , ** kwargs ):
8079 """
0 commit comments