Skip to content

Commit 4362428

Browse files
committed
Fixed auth reconnect of replicaset members as well as Mongos #929
1 parent 3003ea4 commit 4362428

File tree

6 files changed

+429
-194
lines changed

6 files changed

+429
-194
lines changed

lib/mongodb/connection/mongos.js

Lines changed: 91 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,14 @@ Mongos.prototype.connect = function(db, options, callback) {
167167
// Create a new server object
168168
var newServer = new Server(downServer.host, downServer.port, options);
169169
// Setup the connection function
170-
var connectFunction = function(_db, _server, _options) {
170+
var connectFunction = function(_db, _server, _options, _callback) {
171171
return function() {
172172
// Attempt to connect
173173
_server.connect(_db, _options, function(err, result) {
174174
numberOfServersLeft = numberOfServersLeft - 1;
175175

176176
if(err) {
177-
self.downServers.push(_server);
177+
return _callback(err, _server);
178178
} else {
179179
// Set the new server settings
180180
_server._callBackStore = self._callBackStore;
@@ -184,13 +184,11 @@ Mongos.prototype.connect = function(db, options, callback) {
184184
_server.on("timeout", errorOrCloseHandler(_server));
185185
_server.on("error", errorOrCloseHandler(_server));
186186

187-
// Add the server to the list of available servers
188-
self.servers.push(_server);
189-
190187
// Get a read connection
191188
var _connection = _server.checkoutReader();
192189
// Get the start time
193190
var startTime = new Date().getTime();
191+
194192
// Execute ping command to mark each server with the expected times
195193
self.db.command({ping:1}
196194
, {failFast:true, connection:_connection}, function(err, result) {
@@ -202,114 +200,107 @@ Mongos.prototype.connect = function(db, options, callback) {
202200
self.servers.sort(function(a, b) {
203201
return a.runtimeStats['pingMs'] > b.runtimeStats['pingMs'];
204202
});
205-
});
206-
}
207203

208-
if(numberOfServersLeft == 0) {
209-
self._haInProgress = false;
204+
// Callback
205+
return _callback(null, _server);
206+
});
210207
}
211208
});
212209
}
213210
}
214211

215212
// Attempt to connect to the database
216-
connectFunction(self.db, newServer, options)();
213+
connectFunction(self.db, newServer, options, function(err, _server) {
214+
// If we have an error
215+
if(err) {
216+
self.downServers.push(_server);
217+
}
218+
219+
// Connection function
220+
var connectionFunction = function(_auth, _connection, _callback) {
221+
var pending = _auth.length();
222+
223+
for(var j = 0; j < pending; j++) {
224+
// Get the auth object
225+
var _auth = _auth.get(j);
226+
// Unpack the parameter
227+
var username = _auth.username;
228+
var password = _auth.password;
229+
var options = {
230+
authMechanism: _auth.authMechanism
231+
, authSource: _auth.authdb
232+
, connection: _connection
233+
};
234+
235+
// Hold any error
236+
var _error = null;
237+
// Authenticate against the credentials
238+
self.db.authenticate(username, password, options, function(err, result) {
239+
_error = err != null ? err : _error;
240+
// Adjust the pending authentication
241+
pending = pending - 1;
242+
// Finished up
243+
if(pending == 0) _callback(_error ? _error : null, _error ? false : true);
244+
});
245+
}
246+
}
247+
248+
// Run auths against the connections
249+
if(self.auth.length() > 0) {
250+
var connections = _server.allRawConnections();
251+
var pendingAuthConn = connections.length;
252+
253+
// No connections we are done
254+
if(connections.length == 0) {
255+
// Set ha done
256+
if(numberOfServersLeft == 0) {
257+
self._haInProgress = false;
258+
}
259+
}
260+
261+
// Final error object
262+
var finalError = null;
263+
// Go over all the connections
264+
for(var j = 0; j < connections.length; j++) {
265+
266+
// Execute against all the connections
267+
connectionFunction(self.auth, connections[j], function(err, result) {
268+
// Pending authentication
269+
pendingAuthConn = pendingAuthConn - 1 ;
270+
271+
// Save error if any
272+
finalError = err ? err : finalError;
273+
274+
// If we are done let's finish up
275+
if(pendingAuthConn == 0) {
276+
// Set ha done
277+
if(numberOfServersLeft == 0) {
278+
self._haInProgress = false;
279+
}
280+
281+
if(finalError) {
282+
return self.downServers.push(_server);
283+
}
284+
285+
// Push to list of valid server
286+
self.servers.push(_server);
287+
}
288+
});
289+
}
290+
} else {
291+
self.servers.push(_server);
292+
// Set ha done
293+
if(numberOfServersLeft == 0) {
294+
self._haInProgress = false;
295+
}
296+
}
297+
})();
217298
}
218299
} else {
219300
self._haInProgress = false;
220301
}
221302
}
222303

223-
224-
// console.log("========================================= mongos check function")
225-
// // If we have down servers let's attempt a reconnect
226-
// if(self.downServers.length > 0) {
227-
// var numberOfServersLeft = self.downServers.length;
228-
// // Attempt to reconnect
229-
// for(var i = 0; i < self.downServers.length; i++) {
230-
// var downServer = self.downServers.pop();
231-
232-
// // Configuration
233-
// var options = {
234-
// slaveOk: true,
235-
// poolSize: 1,
236-
// socketOptions: { connectTimeoutMS: self._connectTimeoutMS },
237-
// returnIsMasterResults: true
238-
// }
239-
240-
// // Attemp to reconnect
241-
// downServer.connect(self.db, options, function(_server) {
242-
// // Return a function to check for the values
243-
// return function(err, result) {
244-
// console.log("========================================= mongos check function 0")
245-
// // Adjust the number of servers left
246-
// numberOfServersLeft = numberOfServersLeft - 1;
247-
248-
// if(err != null) {
249-
// console.log("========================================= mongos check function 1")
250-
// self.downServers.push(_server);
251-
// } else {
252-
// console.log("========================================= mongos check function 2")
253-
// // Add server event handlers
254-
// _server.on("close", errorOrCloseHandler(_server));
255-
// _server.on("error", errorOrCloseHandler(_server));
256-
// _server.on("timeout", errorOrCloseHandler(_server));
257-
// // Add to list of servers
258-
// self.servers.push(_server);
259-
// }
260-
261-
// if(numberOfServersLeft <= 0) {
262-
// // Perfom another ha
263-
// self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
264-
// }
265-
// }
266-
// }(downServer));
267-
// }
268-
// } else if(self.servers.length > 0) {
269-
// var numberOfServersLeft = self.servers.length;
270-
// var _s = new Date().getTime()
271-
272-
// // Else let's perform a ping command
273-
// for(var i = 0; i < self.servers.length; i++) {
274-
// var executePing = function(_server) {
275-
// // Get a read connection
276-
// var _connection = _server.checkoutReader();
277-
// // Execute ping command
278-
// self.db.command({ping:1}, {failFast:true, connection:_connection}, function(err, result) {
279-
// var pingTime = new Date().getTime() - _s;
280-
// // If no server set set the first one, otherwise check
281-
// // the lowest ping time and assign the server if it's got a lower ping time
282-
// if(self.lowestPingTimeServer == null) {
283-
// self.lowestPingTimeServer = _server;
284-
// self.lowestPingTime = pingTime;
285-
// console.log("=========================================== ping 0")
286-
// self._currentMongos = _server;
287-
// } else if(self.lowestPingTime > pingTime
288-
// && (_server.host != self.lowestPingTimeServer.host || _server.port != self.lowestPingTimeServer.port)) {
289-
// self.lowestPingTimeServer = _server;
290-
// self.lowestPingTime = pingTime;
291-
// console.log("=========================================== ping 1")
292-
// self._currentMongos = _server;
293-
// }
294-
295-
// // Number of servers left
296-
// numberOfServersLeft = numberOfServersLeft - 1;
297-
// // All active mongos's pinged
298-
// if(numberOfServersLeft == 0) {
299-
// // Perfom another ha
300-
// self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
301-
// }
302-
// })
303-
// }
304-
305-
// // Execute the function
306-
// executePing(self.servers[i]);
307-
// }
308-
// } else {
309-
// self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
310-
// }
311-
// }
312-
313304
// Connect all the server instances
314305
for(var i = 0; i < this.servers.length; i++) {
315306
// Get the connection

lib/mongodb/connection/repl_set.js

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -447,14 +447,16 @@ ReplSet.prototype._enableHA = function () {
447447
self._haServer = null;
448448
return;
449449
}
450+
450451
// If error let's set perform another check
451452
if(err) {
452453
// Force new server selection
453454
self._haServer = null;
454455
return check();
455456
}
457+
456458
// Validate the replicaset
457-
self._validateReplicaset(res, db.auths, function() {
459+
self._validateReplicaset(res, self.auth, function() {
458460
check();
459461
});
460462
});
@@ -473,7 +475,7 @@ ReplSet.prototype._enableHA = function () {
473475
/**
474476
* @ignore
475477
*/
476-
ReplSet.prototype._validateReplicaset = function(result, auths, cb) {
478+
ReplSet.prototype._validateReplicaset = function(result, auth, cb) {
477479
var self = this;
478480
var res = result.documents[0];
479481

@@ -507,25 +509,25 @@ ReplSet.prototype._validateReplicaset = function(result, auths, cb) {
507509
}
508510
}
509511

510-
connectTo(hosts, auths, self, cb);
512+
connectTo(hosts, auth, self, cb);
511513
}
512514

513515
/**
514516
* Create connections to all `hosts` firing `cb` after
515517
* connections are attempted for all `hosts`.
516518
*
517519
* @param {Array} hosts
518-
* @param {Array} [auths]
520+
* @param {AuthStore} [auth]
519521
* @param {ReplSet} replset
520522
* @param {Function} cb
521523
* @ignore
522524
*/
523-
function connectTo(hosts, auths, replset, cb) {
525+
function connectTo(hosts, auth, replset, cb) {
524526
var pending = hosts.length;
525527
if (!pending) return cb();
526528

527529
for(var i = 0; i < hosts.length; ++i) {
528-
connectToHost(hosts[i], auths, replset, handle);
530+
connectToHost(hosts[i], auth, replset, handle);
529531
}
530532

531533
function handle () {
@@ -539,12 +541,12 @@ function connectTo(hosts, auths, replset, cb) {
539541
* for the given `replset` firing `cb` when finished.
540542
*
541543
* @param {String} host
542-
* @param {Array} auths
544+
* @param {AuthStore} auth
543545
* @param {ReplSet} replset
544546
* @param {Function} cb
545547
* @ignore
546548
*/
547-
function connectToHost(host, auths, replset, cb) {
549+
function connectToHost(host, auth, replset, cb) {
548550
var server = createServer(host, replset);
549551

550552
var options = {
@@ -579,34 +581,60 @@ function connectToHost(host, auths, replset, cb) {
579581
}
580582

581583
// authenticate if necessary
582-
if(!(Array.isArray(auths) && auths.length > 0)) {
584+
if(auth.length() == 0) {
583585
return complete();
584586
}
585587

586-
var pending = auths.length;
587-
588+
var pending = auth.length();
588589
var connections = server.allRawConnections();
589590
var pendingAuthConn = connections.length;
590-
for(var x = 0; x <connections.length; x++) {
591-
var connection = connections[x];
592-
var authDone = false;
593-
for(var i = 0; i < auths.length; i++) {
594-
var auth = auths[i];
595-
var options = { authdb: auth.authdb, connection: connection };
596-
var username = auth.username;
597-
var password = auth.password;
598-
replset.db.authenticate(username, password, options, function() {
599-
--pending;
600-
if(0 === pending) {
601-
authDone = true;
602-
--pendingAuthConn;
603-
if(0 === pendingAuthConn) {
604-
return complete();
605-
}
606-
}
591+
592+
// Connection function
593+
var connectionFunction = function(_auth, _connection, _callback) {
594+
var pending = _auth.length();
595+
596+
for(var j = 0; j < pending; j++) {
597+
// Get the auth object
598+
var _auth = _auth.get(j);
599+
// Unpack the parameter
600+
var username = _auth.username;
601+
var password = _auth.password;
602+
var options = {
603+
authMechanism: _auth.authMechanism
604+
, authSource: _auth.authdb
605+
, connection: _connection
606+
};
607+
608+
// Hold any error
609+
var _error = null;
610+
// Authenticate against the credentials
611+
replset.db.authenticate(username, password, options, function(err, result) {
612+
_error = err != null ? err : _error;
613+
// Adjust the pending authentication
614+
pending = pending - 1;
615+
// Finished up
616+
if(pending == 0) _callback(_error ? _error : null, _error ? false : true);
607617
});
608618
}
609619
}
620+
621+
// Final error object
622+
var finalError = null;
623+
// Iterate over all the connections
624+
for(var i = 0; i < connections.length; i++) {
625+
connectionFunction(auth, connections[i], function(err, result) {
626+
// Pending authentication
627+
pendingAuthConn = pendingAuthConn - 1 ;
628+
629+
// Save error if any
630+
finalError = err ? err : finalError;
631+
632+
// If we are done let's finish up
633+
if(pendingAuthConn == 0) {
634+
complete();
635+
}
636+
});
637+
}
610638
});
611639
}
612640

0 commit comments

Comments
 (0)