Skip to content

Commit c4bbe73

Browse files
committed
discard dodgy thrift connections that went into a bad state, made PoolingConnection thread-safe
1 parent 4c64968 commit c4bbe73

File tree

2 files changed

+8
-16
lines changed

2 files changed

+8
-16
lines changed
Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,14 @@
1+
from Queue import Queue
12
from .base import Connection
23

4+
35
class PoolingConnection(Connection):
46
def __init__(self, *args, **kwargs):
5-
self._max_pool_size = kwargs.pop('max_connection_pool_size', 50)
6-
self._free_connections = []
7-
self._in_use_connections = set()
7+
self._free_connections = Queue()
88
super(PoolingConnection, self).__init__(*args, **kwargs)
99

1010
def _get_connection(self):
11-
try:
12-
con = self._free_connections.pop()
13-
except IndexError:
14-
con = self._make_connection()
15-
16-
self._in_use_connections.add(con)
17-
return con
11+
return self._make_connection() if self._free_connections.empty() else self._free_connections.get()
1812

1913
def _release_connection(self, con):
20-
self._in_use_connections.remove(con)
21-
self._free_connections.append(con)
22-
14+
self._free_connections.put(con)

elasticsearch/connection/thrift.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ def perform_request(self, method, url, params=None, body=None, timeout=None, ign
6565
except (TException, SocketTimeout) as e:
6666
self.log_request_fail(method, url, body, time.time() - start, exception=e)
6767
raise ConnectionError('N/A', str(e), e)
68-
finally:
69-
if tclient:
70-
self._release_connection(tclient)
68+
69+
if tclient:
70+
self._release_connection(tclient)
7171

7272
if not (200 <= response.status < 300) and response.status not in ignore:
7373
self.log_request_fail(method, url, body, duration, response.status)

0 commit comments

Comments
 (0)