Skip to content

Commit fe53ab5

Browse files
committed
Fix sniffing for 5.0
Fixes elastic#477
1 parent 594f194 commit fe53ab5

File tree

2 files changed

+35
-70
lines changed

2 files changed

+35
-70
lines changed

elasticsearch/transport.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,8 @@ def get_host_info(node_info, host):
2222
:arg node_info: node information from `/_cluster/nodes`
2323
:arg host: connection information (host, port) extracted from the node info
2424
"""
25-
attrs = node_info.get('attributes', {})
26-
2725
# ignore master only nodes
28-
if (attrs.get('data', 'true') == 'false' and
29-
attrs.get('client', 'false') == 'false' and
30-
attrs.get('master', 'true') == 'true'):
26+
if node_info.get('roles', []) == ['master']:
3127
return None
3228
return host
3329

@@ -203,7 +199,8 @@ def _get_sniff_data(self, initial=False):
203199
for c in chain(self.connection_pool.connections, self.seed_connections):
204200
try:
205201
# use small timeout for the sniffing request, should be a fast api call
206-
_, headers, node_info = c.perform_request('GET', '/_nodes/_all/clear',
202+
_, headers, node_info = c.perform_request(
203+
'GET', '/_nodes/_all/http',
207204
timeout=self.sniff_timeout if not initial else None)
208205
node_info = self.deserializer.loads(node_info, headers.get('content-type'))
209206
break
@@ -219,21 +216,15 @@ def _get_sniff_data(self, initial=False):
219216
return list(node_info['nodes'].values())
220217

221218
def _get_host_info(self, host_info):
222-
address_key = self.connection_class.transport_schema + '_address'
223219
host = {}
224-
address = host_info.get(address_key, '')
225-
if '/' in address:
226-
host['host'], address = address.split('/', 1)
220+
address = host_info.get('http', {}).get('publish_address')
227221

228222
# malformed address
229223
if ':' not in address:
230224
return None
231225

232-
ip, port = address.rsplit(':', 1)
233-
234-
# use the ip if not overridden by publish_host
235-
host.setdefault('host', ip)
236-
host['port'] = int(port)
226+
host['host'], host['port'] = address.rsplit(':', 1)
227+
host['port'] = int(host['port'])
237228

238229
return self.host_info_callback(host_info, host)
239230

test_elasticsearch/test_transport.py

Lines changed: 29 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -24,52 +24,41 @@ def perform_request(self, *args, **kwargs):
2424
return self.status, self.headers, self.data
2525

2626
CLUSTER_NODES = '''{
27-
"ok" : true,
28-
"cluster_name" : "super_cluster",
29-
"nodes" : {
30-
"wE_6OGBNSjGksbONNncIbg" : {
31-
"name" : "Nightwind",
32-
"transport_address" : "127.0.0.1:9300",
33-
"hostname" : "wind",
34-
"version" : "0.20.4",
35-
"http_address" : "1.1.1.1:123",
36-
"thrift_address" : "1.1.1.1:9500"
37-
}
27+
"_nodes" : {
28+
"total" : 1,
29+
"successful" : 1,
30+
"failed" : 0
31+
},
32+
"cluster_name" : "elasticsearch",
33+
"nodes" : {
34+
"SRZpKFZdQguhhvifmN6UVA" : {
35+
"name" : "SRZpKFZ",
36+
"transport_address" : "127.0.0.1:9300",
37+
"host" : "127.0.0.1",
38+
"ip" : "127.0.0.1",
39+
"version" : "5.0.0",
40+
"build_hash" : "253032b",
41+
"roles" : [ "master", "data", "ingest" ],
42+
"http" : {
43+
"bound_address" : [ "[fe80::1]:9200", "[::1]:9200", "127.0.0.1:9200" ],
44+
"publish_address" : "1.1.1.1:123",
45+
"max_content_length_in_bytes" : 104857600
46+
}
3847
}
48+
}
3949
}'''
4050

41-
CLUSTER_NODE_PUBLISH_HOST = '''{
42-
"ok" : true,
43-
"cluster_name" : "super_cluster",
44-
"nodes" : {
45-
"wE_6OGBNSjGksbONNncIbg" : {
46-
"name": "Thunderbird",
47-
"transport_address": "obsidian/192.168.1.60:9300",
48-
"host": "192.168.1.60",
49-
"ip": "192.168.1.60",
50-
"version": "2.1.0",
51-
"build": "72cd1f1",
52-
"http_address": "obsidian/192.168.1.60:9200",
53-
"attributes": {
54-
"testattr": "test"
55-
}
56-
}
57-
}
58-
}'''
59-
60-
6151
class TestHostsInfoCallback(TestCase):
6252
def test_master_only_nodes_are_ignored(self):
6353
nodes = [
64-
{'attributes': {'data': 'false', 'client': 'true'}},
65-
{'attributes': {'data': 'false'}},
66-
{'attributes': {'data': 'false', 'master': 'true'}},
67-
{'attributes': {'data': 'false', 'master': 'false'}},
68-
{'attributes': {}},
54+
{'roles': [ "master"]},
55+
{'roles': [ "master", "data", "ingest"]},
56+
{'roles': [ "data", "ingest"]},
57+
{'roles': [ ]},
6958
{}
7059
]
71-
chosen = [ i for i, node_info in enumerate(nodes) if get_host_info(node_info, i) is not None]
72-
self.assertEquals([0, 3, 4, 5], chosen)
60+
chosen = [i for i, node_info in enumerate(nodes) if get_host_info(node_info, i) is not None]
61+
self.assertEquals([1, 2, 3, 4], chosen)
7362

7463

7564
class TestTransport(TestCase):
@@ -175,34 +164,19 @@ def test_sniff_will_use_seed_connections(self):
175164
self.assertEquals(1, len(t.connection_pool.connections))
176165
self.assertEquals('http://1.1.1.1:123', t.get_connection().host)
177166

178-
def test_sniff_will_pick_up_published_host(self):
179-
t = Transport([{'data': CLUSTER_NODE_PUBLISH_HOST}], connection_class=DummyConnection)
180-
t.sniff_hosts()
181-
182-
self.assertEquals(1, len(t.connection_pool.connections))
183-
self.assertEquals('http://obsidian:9200', t.get_connection().host)
184-
185-
186-
def test_sniff_on_start_fetches_and_uses_nodes_list_for_its_schema(self):
187-
class DummyThriftConnection(DummyConnection):
188-
transport_schema = 'thrift'
189-
t = Transport([{'data': CLUSTER_NODES}], connection_class=DummyThriftConnection, sniff_on_start=True)
190-
self.assertEquals(1, len(t.connection_pool.connections))
191-
self.assertEquals('thrift://1.1.1.1:9500', t.get_connection().host)
192-
193167
def test_sniff_on_start_fetches_and_uses_nodes_list(self):
194168
t = Transport([{'data': CLUSTER_NODES}], connection_class=DummyConnection, sniff_on_start=True)
195169
self.assertEquals(1, len(t.connection_pool.connections))
196170
self.assertEquals('http://1.1.1.1:123', t.get_connection().host)
197171

198172
def test_sniff_on_start_ignores_sniff_timeout(self):
199173
t = Transport([{'data': CLUSTER_NODES}], connection_class=DummyConnection, sniff_on_start=True, sniff_timeout=12)
200-
self.assertEquals((('GET', '/_nodes/_all/clear'), {'timeout': None}), t.seed_connections[0].calls[0])
174+
self.assertEquals((('GET', '/_nodes/_all/http'), {'timeout': None}), t.seed_connections[0].calls[0])
201175

202176
def test_sniff_uses_sniff_timeout(self):
203177
t = Transport([{'data': CLUSTER_NODES}], connection_class=DummyConnection, sniff_timeout=42)
204178
t.sniff_hosts()
205-
self.assertEquals((('GET', '/_nodes/_all/clear'), {'timeout': 42}), t.seed_connections[0].calls[0])
179+
self.assertEquals((('GET', '/_nodes/_all/http'), {'timeout': 42}), t.seed_connections[0].calls[0])
206180

207181

208182
def test_sniff_reuses_connection_instances_if_possible(self):

0 commit comments

Comments
 (0)