Skip to content

Commit 8106c0e

Browse files
authored
Don't raise sniffing errors when retrying a request
1 parent 4186925 commit 8106c0e

File tree

4 files changed

+82
-30
lines changed

4 files changed

+82
-30
lines changed

elasticsearch/_async/transport.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -316,20 +316,26 @@ async def perform_request(self, method, url, headers=None, params=None, body=Non
316316
retry = True
317317

318318
if retry:
319-
# only mark as dead if we are retrying
320-
self.mark_dead(connection)
319+
try:
320+
# only mark as dead if we are retrying
321+
self.mark_dead(connection)
322+
except TransportError:
323+
# If sniffing on failure, it could fail too. Catch the
324+
# exception not to interrupt the retries.
325+
pass
321326
# raise exception on last retry
322327
if attempt == self.max_retries:
323-
raise
328+
raise e
324329
else:
325-
raise
330+
raise e
326331

327332
else:
333+
# connection didn't fail, confirm it's live status
334+
self.connection_pool.mark_live(connection)
335+
328336
if method == "HEAD":
329337
return 200 <= status < 300
330338

331-
# connection didn't fail, confirm it's live status
332-
self.connection_pool.mark_live(connection)
333339
if data:
334340
data = self.deserializer.loads(data, headers.get("content-type"))
335341
return data

elasticsearch/transport.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -378,13 +378,18 @@ def perform_request(self, method, url, headers=None, params=None, body=None):
378378
retry = True
379379

380380
if retry:
381-
# only mark as dead if we are retrying
382-
self.mark_dead(connection)
381+
try:
382+
# only mark as dead if we are retrying
383+
self.mark_dead(connection)
384+
except TransportError:
385+
# If sniffing on failure, it could fail too. Catch the
386+
# exception not to interrupt the retries.
387+
pass
383388
# raise exception on last retry
384389
if attempt == self.max_retries:
385-
raise
390+
raise e
386391
else:
387-
raise
392+
raise e
388393

389394
else:
390395
# connection didn't fail, confirm it's live status

test_elasticsearch/test_async/test_transport.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
from __future__ import unicode_literals
2020
import asyncio
21+
import json
2122
from mock import patch
2223
import pytest
2324

2425
from elasticsearch import AsyncTransport
2526
from elasticsearch.connection import Connection
2627
from elasticsearch.connection_pool import DummyConnectionPool
27-
from elasticsearch.exceptions import ConnectionError
28+
from elasticsearch.exceptions import ConnectionError, TransportError
2829

2930

3031
pytestmark = pytest.mark.asyncio
@@ -273,16 +274,17 @@ async def test_failed_connection_will_be_marked_as_dead(self):
273274
assert 0 == len(t.connection_pool.connections)
274275

275276
async def test_resurrected_connection_will_be_marked_as_live_on_success(self):
276-
t = AsyncTransport([{}, {}], connection_class=DummyConnection)
277-
await t._async_call()
278-
con1 = t.connection_pool.get_connection()
279-
con2 = t.connection_pool.get_connection()
280-
t.connection_pool.mark_dead(con1)
281-
t.connection_pool.mark_dead(con2)
282-
283-
await t.perform_request("GET", "/")
284-
assert 1 == len(t.connection_pool.connections)
285-
assert 1 == len(t.connection_pool.dead_count)
277+
for method in ("GET", "HEAD"):
278+
t = AsyncTransport([{}, {}], connection_class=DummyConnection)
279+
await t._async_call()
280+
con1 = t.connection_pool.get_connection()
281+
con2 = t.connection_pool.get_connection()
282+
t.connection_pool.mark_dead(con1)
283+
t.connection_pool.mark_dead(con2)
284+
285+
await t.perform_request(method, "/")
286+
assert 1 == len(t.connection_pool.connections)
287+
assert 1 == len(t.connection_pool.dead_count)
286288

287289
async def test_sniff_will_use_seed_connections(self):
288290
t = AsyncTransport([{"data": CLUSTER_NODES}], connection_class=DummyConnection)
@@ -368,6 +370,25 @@ async def test_sniff_on_fail_triggers_sniffing_on_fail(self):
368370
assert 1 == len(t.connection_pool.connections)
369371
assert "http://1.1.1.1:123" == t.get_connection().host
370372

373+
@patch("elasticsearch._async.transport.AsyncTransport.sniff_hosts")
374+
async def test_sniff_on_fail_failing_does_not_prevent_retires(self, sniff_hosts):
375+
sniff_hosts.side_effect = [TransportError("sniff failed")]
376+
t = AsyncTransport(
377+
[{"exception": ConnectionError("abandon ship")}, {"data": CLUSTER_NODES}],
378+
connection_class=DummyConnection,
379+
sniff_on_connection_fail=True,
380+
max_retries=3,
381+
randomize_hosts=False,
382+
)
383+
await t._async_init()
384+
385+
conn_err, conn_data = t.connection_pool.connections
386+
response = await t.perform_request("GET", "/")
387+
assert json.loads(CLUSTER_NODES) == response
388+
assert 1 == sniff_hosts.call_count
389+
assert 1 == len(conn_err.calls)
390+
assert 1 == len(conn_data.calls)
391+
371392
async def test_sniff_after_n_seconds(self, event_loop):
372393
t = AsyncTransport(
373394
[{"data": CLUSTER_NODES}],

test_elasticsearch/test_transport.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
# under the License.
1818

1919
from __future__ import unicode_literals
20+
import json
2021
import time
2122
from mock import patch
2223

2324
from elasticsearch.transport import Transport, get_host_info
2425
from elasticsearch.connection import Connection
2526
from elasticsearch.connection_pool import DummyConnectionPool
26-
from elasticsearch.exceptions import ConnectionError
27+
from elasticsearch.exceptions import ConnectionError, TransportError
2728

2829
from .test_cases import TestCase
2930

@@ -254,15 +255,16 @@ def test_failed_connection_will_be_marked_as_dead(self):
254255
self.assertEqual(0, len(t.connection_pool.connections))
255256

256257
def test_resurrected_connection_will_be_marked_as_live_on_success(self):
257-
t = Transport([{}, {}], connection_class=DummyConnection)
258-
con1 = t.connection_pool.get_connection()
259-
con2 = t.connection_pool.get_connection()
260-
t.connection_pool.mark_dead(con1)
261-
t.connection_pool.mark_dead(con2)
258+
for method in ("GET", "HEAD"):
259+
t = Transport([{}, {}], connection_class=DummyConnection)
260+
con1 = t.connection_pool.get_connection()
261+
con2 = t.connection_pool.get_connection()
262+
t.connection_pool.mark_dead(con1)
263+
t.connection_pool.mark_dead(con2)
262264

263-
t.perform_request("GET", "/")
264-
self.assertEqual(1, len(t.connection_pool.connections))
265-
self.assertEqual(1, len(t.connection_pool.dead_count))
265+
t.perform_request(method, "/")
266+
self.assertEqual(1, len(t.connection_pool.connections))
267+
self.assertEqual(1, len(t.connection_pool.dead_count))
266268

267269
def test_sniff_will_use_seed_connections(self):
268270
t = Transport([{"data": CLUSTER_NODES}], connection_class=DummyConnection)
@@ -330,6 +332,24 @@ def test_sniff_on_fail_triggers_sniffing_on_fail(self):
330332
self.assertEqual(1, len(t.connection_pool.connections))
331333
self.assertEqual("http://1.1.1.1:123", t.get_connection().host)
332334

335+
@patch("elasticsearch.transport.Transport.sniff_hosts")
336+
def test_sniff_on_fail_failing_does_not_prevent_retires(self, sniff_hosts):
337+
sniff_hosts.side_effect = [TransportError("sniff failed")]
338+
t = Transport(
339+
[{"exception": ConnectionError("abandon ship")}, {"data": CLUSTER_NODES}],
340+
connection_class=DummyConnection,
341+
sniff_on_connection_fail=True,
342+
max_retries=3,
343+
randomize_hosts=False,
344+
)
345+
346+
conn_err, conn_data = t.connection_pool.connections
347+
response = t.perform_request("GET", "/")
348+
self.assertEqual(json.loads(CLUSTER_NODES), response)
349+
self.assertEqual(1, sniff_hosts.call_count)
350+
self.assertEqual(1, len(conn_err.calls))
351+
self.assertEqual(1, len(conn_data.calls))
352+
333353
def test_sniff_after_n_seconds(self):
334354
t = Transport(
335355
[{"data": CLUSTER_NODES}],

0 commit comments

Comments
 (0)