Skip to content

Commit 2181611

Browse files
0bsearchfxdgear
authored andcommitted
Scan refactor (elastic#924)
* Adds scan test for exception & data yielded * Adds scan test for fast route & initial search error * Refactores scan implementation; better scroll test * Adds tests clear_scroll & logger
1 parent cf1e946 commit 2181611

File tree

2 files changed

+142
-20
lines changed

2 files changed

+142
-20
lines changed

elasticsearch/helpers/actions.py

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -427,33 +427,19 @@ def scan(
427427
if not preserve_order:
428428
query = query.copy() if query else {}
429429
query["sort"] = "_doc"
430+
430431
# initial search
431432
resp = client.search(
432433
body=query, scroll=scroll, size=size, request_timeout=request_timeout, **kwargs
433434
)
434-
435435
scroll_id = resp.get("_scroll_id")
436-
if scroll_id is None:
437-
return
438436

439437
try:
440-
first_run = True
441-
while True:
442-
# if we didn't set search_type to scan initial search contains data
443-
if first_run:
444-
first_run = False
445-
else:
446-
resp = client.scroll(
447-
scroll_id,
448-
scroll=scroll,
449-
request_timeout=request_timeout,
450-
**scroll_kwargs
451-
)
452-
438+
while scroll_id and resp['hits']['hits']:
453439
for hit in resp["hits"]["hits"]:
454440
yield hit
455441

456-
# check if we have any errrors
442+
# check if we have any errors
457443
if resp["_shards"]["successful"] < resp["_shards"]["total"]:
458444
logger.warning(
459445
"Scroll request has only succeeded on %d shards out of %d.",
@@ -467,10 +453,14 @@ def scan(
467453
% (resp["_shards"]["successful"], resp["_shards"]["total"]),
468454
)
469455

456+
resp = client.scroll(
457+
scroll_id,
458+
scroll=scroll,
459+
request_timeout=request_timeout,
460+
**scroll_kwargs
461+
)
470462
scroll_id = resp.get("_scroll_id")
471-
# end of scroll
472-
if scroll_id is None or not resp["hits"]["hits"]:
473-
break
463+
474464
finally:
475465
if scroll_id and clear_scroll:
476466
client.clear_scroll(body={"scroll_id": [scroll_id]}, ignore=(404,))

test_elasticsearch/test_server/test_helpers.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
from mock import patch
2+
13
from elasticsearch import helpers, TransportError
4+
from elasticsearch.helpers import ScanError
25

36
from . import ElasticsearchTestCase
47
from ..test_cases import SkipTest
@@ -305,6 +308,24 @@ def test_errors_are_collected_properly(self):
305308

306309

307310
class TestScan(ElasticsearchTestCase):
311+
mock_scroll_responses = [
312+
{
313+
'_scroll_id': 'dummy_id',
314+
'_shards': {'successful': 4, 'total': 5},
315+
'hits': {'hits': [{'scroll_data': 42}]},
316+
},
317+
{
318+
'_scroll_id': 'dummy_id',
319+
'_shards': {'successful': 4, 'total': 5},
320+
'hits': {'hits': []},
321+
},
322+
]
323+
324+
@classmethod
325+
def tearDownClass(cls):
326+
cls.client.transport.perform_request('DELETE', '/_search/scroll/_all')
327+
super(TestScan, cls).tearDownClass()
328+
308329
def test_order_can_be_preserved(self):
309330
bulk = []
310331
for x in range(100):
@@ -338,6 +359,117 @@ def test_all_documents_are_read(self):
338359
self.assertEquals(set(map(str, range(100))), set(d["_id"] for d in docs))
339360
self.assertEquals(set(range(100)), set(d["_source"]["answer"] for d in docs))
340361

362+
def test_scroll_error(self):
363+
bulk = []
364+
for x in range(4):
365+
bulk.append({"index": {"_index": "test_index", "_type": "_doc"}})
366+
bulk.append({"value": x})
367+
self.client.bulk(bulk, refresh=True)
368+
369+
with patch.object(self.client, 'scroll') as scroll_mock:
370+
scroll_mock.side_effect = self.mock_scroll_responses
371+
data = list(helpers.scan(
372+
self.client,
373+
index='test_index',
374+
size=2,
375+
raise_on_error=False,
376+
clear_scroll=False
377+
))
378+
self.assertEqual(len(data), 3)
379+
self.assertEqual(data[-1], {'scroll_data': 42})
380+
381+
scroll_mock.side_effect = self.mock_scroll_responses
382+
with self.assertRaises(ScanError):
383+
data = list(helpers.scan(
384+
self.client,
385+
index='test_index',
386+
size=2,
387+
raise_on_error=True,
388+
clear_scroll=False
389+
))
390+
self.assertEqual(len(data), 3)
391+
self.assertEqual(data[-1], {'scroll_data': 42})
392+
393+
def test_initial_search_error(self):
394+
with patch.object(self, 'client') as client_mock:
395+
client_mock.search.return_value = {
396+
'_scroll_id': 'dummy_id',
397+
'_shards': {'successful': 4, 'total': 5},
398+
'hits': {'hits': [{'search_data': 1}]},
399+
}
400+
client_mock.scroll.side_effect = self.mock_scroll_responses
401+
402+
data = list(helpers.scan(self.client, index='test_index', size=2, raise_on_error=False))
403+
self.assertEqual(data, [{'search_data': 1}, {'scroll_data': 42}])
404+
405+
client_mock.scroll.side_effect = self.mock_scroll_responses
406+
with self.assertRaises(ScanError):
407+
data = list(
408+
helpers.scan(self.client, index='test_index', size=2, raise_on_error=True)
409+
)
410+
self.assertEqual(data, [{'search_data': 1}])
411+
client_mock.scroll.assert_not_called()
412+
413+
def test_no_scroll_id_fast_route(self):
414+
with patch.object(self, 'client') as client_mock:
415+
client_mock.search.return_value = {'no': '_scroll_id'}
416+
data = list(helpers.scan(self.client, index='test_index'))
417+
418+
self.assertEqual(data, [])
419+
client_mock.scroll.assert_not_called()
420+
client_mock.clear_scroll.assert_not_called()
421+
422+
@patch('elasticsearch.helpers.actions.logger')
423+
def test_logger(self, logger_mock):
424+
bulk = []
425+
for x in range(4):
426+
bulk.append({'index': {'_index': 'test_index', '_type': '_doc'}})
427+
bulk.append({'value': x})
428+
self.client.bulk(bulk, refresh=True)
429+
430+
with patch.object(self.client, 'scroll') as scroll_mock:
431+
scroll_mock.side_effect = self.mock_scroll_responses
432+
list(helpers.scan(
433+
self.client,
434+
index='test_index',
435+
size=2,
436+
raise_on_error=False,
437+
clear_scroll=False
438+
))
439+
logger_mock.warning.assert_called()
440+
441+
scroll_mock.side_effect = self.mock_scroll_responses
442+
try:
443+
list(helpers.scan(
444+
self.client,
445+
index='test_index',
446+
size=2,
447+
raise_on_error=True,
448+
clear_scroll=False
449+
))
450+
except ScanError:
451+
pass
452+
logger_mock.warning.assert_called()
453+
454+
def test_clear_scroll(self):
455+
bulk = []
456+
for x in range(4):
457+
bulk.append({'index': {'_index': 'test_index', '_type': '_doc'}})
458+
bulk.append({'value': x})
459+
self.client.bulk(bulk, refresh=True)
460+
461+
with patch.object(self.client, 'clear_scroll', wraps=self.client.clear_scroll) as spy:
462+
list(helpers.scan(self.client, index='test_index', size=2))
463+
spy.assert_called_once()
464+
465+
spy.reset_mock()
466+
list(helpers.scan(self.client, index='test_index', size=2, clear_scroll=True))
467+
spy.assert_called_once()
468+
469+
spy.reset_mock()
470+
list(helpers.scan(self.client, index='test_index', size=2, clear_scroll=False))
471+
spy.assert_not_called()
472+
341473

342474
class TestReindex(ElasticsearchTestCase):
343475
def setUp(self):

0 commit comments

Comments
 (0)