Skip to content
This repository was archived by the owner on Feb 4, 2022. It is now read-only.

Commit aef9083

Browse files
committed
refactor(read-preference): defensively resolve read pref in command
1 parent d7070c7 commit aef9083

File tree

5 files changed

+88
-16
lines changed

5 files changed

+88
-16
lines changed

lib/connection/apm.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ const extractReply = (command, reply) => {
147147
};
148148
}
149149

150+
// in the event of a `noResponse` command, just return
151+
if (reply === null) return reply;
152+
150153
return reply.result;
151154
};
152155

lib/connection/pool.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,8 +1117,6 @@ function _execute(self) {
11171117
var connection = null;
11181118
const connections = self.availableConnections.filter(conn => conn.workItems.length === 0);
11191119

1120-
// console.log({ totalConnections, connectionsLength: connections.length });
1121-
11221120
// No connection found that has no work on it, just pick one for pipelining
11231121
if (connections.length === 0) {
11241122
connection =

lib/sdam/server.js

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,35 @@ const monitorServer = require('./monitoring').monitorServer;
1313
const MongoParseError = require('../error').MongoParseError;
1414
const MongoNetworkError = require('../error').MongoNetworkError;
1515
const collationNotSupported = require('../utils').collationNotSupported;
16+
const debugOptions = require('../connection/utils').debugOptions;
17+
18+
// Used for filtering out fields for logging
19+
const DEBUG_FIELDS = [
20+
'reconnect',
21+
'reconnectTries',
22+
'reconnectInterval',
23+
'emitError',
24+
'cursorFactory',
25+
'host',
26+
'port',
27+
'size',
28+
'keepAlive',
29+
'keepAliveInitialDelay',
30+
'noDelay',
31+
'connectionTimeout',
32+
'checkServerIdentity',
33+
'socketTimeout',
34+
'ssl',
35+
'ca',
36+
'crl',
37+
'cert',
38+
'key',
39+
'rejectUnauthorized',
40+
'promoteLongs',
41+
'promoteValues',
42+
'promoteBuffers',
43+
'servername'
44+
];
1645

1746
const STATE_DISCONNECTED = 0;
1847
const STATE_CONNECTING = 1;
@@ -67,8 +96,6 @@ class Server extends EventEmitter {
6796

6897
/**
6998
* Initiate server connect
70-
*
71-
* @param {Array} [options.auth] Array of auth options to apply on connect
7299
*/
73100
connect(options) {
74101
options = options || {};
@@ -189,7 +216,11 @@ class Server extends EventEmitter {
189216
// Debug log
190217
if (this.s.logger.isDebug()) {
191218
this.s.logger.debug(
192-
`executing command [${JSON.stringify({ ns, cmd, options })}] against ${this.name}`
219+
`executing command [${JSON.stringify({
220+
ns,
221+
cmd,
222+
options: debugOptions(DEBUG_FIELDS, options)
223+
})}] against ${this.name}`
193224
);
194225
}
195226

lib/sdam/topology.js

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ class Topology extends EventEmitter {
137137
// Active client sessions
138138
sessions: [],
139139
// Promise library
140-
promiseLibrary: options.promiseLibrary || Promise
140+
promiseLibrary: options.promiseLibrary || Promise,
141+
credentials: options.credentialss
141142
};
142143

143144
// amend options for server instance creation
@@ -198,7 +199,7 @@ class Topology extends EventEmitter {
198199

199200
// otherwise, wait for a server to properly connect based on user provided read preference,
200201
// or primary.
201-
const readPreference = options.readPreference || ReadPreference.primary;
202+
const readPreference = resolveReadPreference(options);
202203
this.selectServer(readPreferenceServerSelector(readPreference), (err, server) => {
203204
if (err) {
204205
if (typeof callback === 'function') {
@@ -391,6 +392,15 @@ class Topology extends EventEmitter {
391392
);
392393
}
393394

395+
auth(credentials, callback) {
396+
if (typeof credentials === 'function') (callback = credentials), (credentials = null);
397+
if (typeof callback === 'function') callback(null, true);
398+
}
399+
400+
logout(callback) {
401+
if (typeof callback === 'function') callback(null, true);
402+
}
403+
394404
// Basic operation support. Eventually this should be moved into command construction
395405
// during the command refactor.
396406

@@ -463,7 +473,10 @@ class Topology extends EventEmitter {
463473
(callback = options), (options = {}), (options = options || {});
464474
}
465475

466-
const readPreference = options.readPreference ? options.readPreference : ReadPreference.primary;
476+
// TODO: type resolution should happen elsewhere
477+
const readPreference = resolveReadPreference(options);
478+
options = Object.assign(options, { readPreference });
479+
467480
this.selectServer(readPreferenceServerSelector(readPreference), (err, server) => {
468481
if (err) {
469482
callback(err, null);
@@ -686,6 +699,11 @@ function selectServers(topology, selector, timeout, start, callback) {
686699
// successful iteration, clear the check timer
687700
clearTimeout(iterationTimer);
688701

702+
if (topology.description.error) {
703+
callback(topology.description.error, null);
704+
return;
705+
}
706+
689707
// topology description has changed due to monitoring, reattempt server selection
690708
selectServers(topology, selector, timeout, start, callback);
691709
};
@@ -779,8 +797,6 @@ function serverErrorEventHandler(server, topology) {
779797
new monitoring.ServerClosedEvent(topology.s.id, server.description.address)
780798
);
781799

782-
topology.emit('error', err);
783-
784800
if (err instanceof MongoParseError) {
785801
resetServerState(server, err, { clearPool: true });
786802
return;
@@ -843,7 +859,7 @@ function executeWriteOperation(args, options, callback) {
843859
}
844860

845861
/**
846-
* Resets the internal state of this server to `Unknown`.
862+
* Resets the internal state of this server to `Unknown` by simulating an empty ismaster
847863
*
848864
* @private
849865
* @param {Server} server
@@ -857,7 +873,7 @@ function resetServerState(server, error, options) {
857873
function resetState() {
858874
server.emit(
859875
'descriptionReceived',
860-
new ServerDescription(server.description.address, null, error)
876+
new ServerDescription(server.description.address, null, { error })
861877
);
862878
}
863879

@@ -869,6 +885,19 @@ function resetServerState(server, error, options) {
869885
resetState();
870886
}
871887

888+
function resolveReadPreference(options) {
889+
let readPreference = options.readPreference || new ReadPreference('primary');
890+
if (typeof readPreference === 'string') {
891+
readPreference = new ReadPreference(readPreference);
892+
}
893+
894+
if (!(readPreference instanceof ReadPreference)) {
895+
throw new MongoError('read preference must be a ReadPreference instance');
896+
}
897+
898+
return readPreference;
899+
}
900+
872901
/**
873902
* A server opening SDAM monitoring event
874903
*

lib/sdam/topology_description.js

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ const ReadPreference = require('../topologies/read_preference');
77
const MIN_SUPPORTED_SERVER_VERSION = '2.6';
88
const MIN_SUPPORTED_WIRE_VERSION = 2;
99
const MAX_SUPPORTED_WIRE_VERSION = 5;
10-
const DEFAULT_HEARTBEAT_FREQUENCY_MS = 10000;
1110

1211
// An enumeration of topology types we know about
1312
const TopologyType = {
@@ -29,7 +28,15 @@ class TopologyDescription {
2928
* @param {number} maxSetVersion
3029
* @param {ObjectId} maxElectionId
3130
*/
32-
constructor(topologyType, serverDescriptions, setName, maxSetVersion, maxElectionId, options) {
31+
constructor(
32+
topologyType,
33+
serverDescriptions,
34+
setName,
35+
maxSetVersion,
36+
maxElectionId,
37+
options,
38+
error
39+
) {
3340
options = options || {};
3441

3542
// TODO: consider assigning all these values to a temporary value `s` which
@@ -47,6 +54,7 @@ class TopologyDescription {
4754
this.heartbeatFrequencyMS = options.heartbeatFrequencyMS || 0;
4855
this.localThresholdMS = options.localThresholdMS || 0;
4956
this.options = options;
57+
this.error = error;
5058

5159
// determine server compatibility
5260
for (const serverDescription of this.servers.values()) {
@@ -109,6 +117,7 @@ class TopologyDescription {
109117
let setName = this.setName;
110118
let maxSetVersion = this.maxSetVersion;
111119
let maxElectionId = this.maxElectionId;
120+
let error = serverDescription.error || null;
112121

113122
const serverType = serverDescription.type;
114123
let serverDescriptions = new Map(this.servers);
@@ -124,7 +133,8 @@ class TopologyDescription {
124133
setName,
125134
maxSetVersion,
126135
maxElectionId,
127-
this.options
136+
this.options,
137+
error
128138
);
129139
}
130140

@@ -204,7 +214,8 @@ class TopologyDescription {
204214
setName,
205215
maxSetVersion,
206216
maxElectionId,
207-
this.options
217+
this.options,
218+
error
208219
);
209220
}
210221

0 commit comments

Comments
 (0)