Skip to content
This repository was archived by the owner on Feb 4, 2022. It is now read-only.

Commit 67588f4

Browse files
committed
refactor(apm): correct apm event assumptions for OP_MSG messages
1 parent fbedb91 commit 67588f4

File tree

4 files changed

+47
-26
lines changed

4 files changed

+47
-26
lines changed

lib/connection/msg.js

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
// };
2929

3030
const opcodes = require('../wireprotocol/shared').opcodes;
31+
const databaseNamespace = require('../wireprotocol/shared').databaseNamespace;
3132

3233
// Incrementing request id
3334
let _requestId = 0;
@@ -38,13 +39,15 @@ const OPTS_MORE_TO_COME = 2;
3839
const OPTS_EXHAUST_ALLOWED = 1 >> 16;
3940

4041
class Msg {
41-
constructor(bson, query, options) {
42+
constructor(bson, ns, command, options) {
4243
// Basic options needed to be passed in
43-
if (query == null) throw new Error('query must be specified for query');
44+
if (command == null) throw new Error('query must be specified for query');
4445

4546
// Basic options
4647
this.bson = bson;
47-
this.query = Array.isArray(query) ? query : [query];
48+
this.ns = ns;
49+
this.command = command;
50+
this.command.$db = databaseNamespace(ns);
4851

4952
// Ensure empty options
5053
this.options = options || {};
@@ -91,24 +94,21 @@ class Msg {
9194

9295
let totalLength = header.length;
9396

94-
for (let i = 0; i < this.query.length; ++i) {
95-
const query = this.query[i];
96-
97-
const nameArgumentPair = getValidSegmentListNamePairs(query);
98-
if (nameArgumentPair) {
99-
// TODO: Add support for payload type 1
100-
const argument = nameArgumentPair.argument;
101-
102-
// Add initial type 0 segment with arguments pulled up
103-
const clonedQuery = Object.assign({}, query);
104-
delete clonedQuery[argument];
105-
totalLength += this.makeDocumentSegment(buffers, clonedQuery);
106-
107-
// Create type 1 query
108-
totalLength += this.makeSequenceSegment(buffers, argument, query[argument]);
109-
} else {
110-
totalLength += this.makeDocumentSegment(buffers, query);
111-
}
97+
const command = this.command;
98+
const nameArgumentPair = validSegmentListNamePairs(command);
99+
if (nameArgumentPair) {
100+
// TODO: Add support for payload type 1
101+
const argument = nameArgumentPair.argument;
102+
103+
// Add initial type 0 segment with arguments pulled up
104+
const clonedQuery = Object.assign({}, command);
105+
delete clonedQuery[argument];
106+
totalLength += this.makeDocumentSegment(buffers, clonedQuery);
107+
108+
// Create type 1 query
109+
totalLength += this.makeSequenceSegment(buffers, argument, command[argument]);
110+
} else {
111+
totalLength += this.makeDocumentSegment(buffers, command);
112112
}
113113

114114
writeInt32ListToUint8Buffer(header, [totalLength, this.requestId, 0, opcodes.OP_MSG, flags]);
@@ -183,6 +183,7 @@ class BinMsg {
183183
this.requestId = msgHeader.requestId;
184184
this.responseTo = msgHeader.responseTo;
185185
this.opCode = msgHeader.opCode;
186+
this.fromCompressed = msgHeader.fromCompressed;
186187

187188
// Read response flags
188189
this.responseFlags = msgBody.readInt32LE(0);
@@ -266,7 +267,7 @@ const VALID_NAME_ARGUMENT_MAPS = {
266267
delete: 'deletes'
267268
};
268269

269-
function getValidSegmentListNamePairs(query) {
270+
function validSegmentListNamePairs(query) {
270271
for (let name in VALID_NAME_ARGUMENT_MAPS) {
271272
if (name in query) {
272273
const argument = VALID_NAME_ARGUMENT_MAPS[name];

lib/connection/pool.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const MongoWriteConcernError = require('../error').MongoWriteConcernError;
99
const Logger = require('./logger');
1010
const f = require('util').format;
1111
const Query = require('./commands').Query;
12+
const Msg = require('./msg').Msg;
1213
const CommandResult = require('./command_result');
1314
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
1415
const COMPRESSION_DETAILS_SIZE = require('../wireprotocol/shared').COMPRESSION_DETAILS_SIZE;
@@ -1273,7 +1274,8 @@ Pool.prototype.write = function(command, options, cb) {
12731274
// Return whether a command contains an uncompressible command term
12741275
// Will return true if command contains no uncompressible command terms
12751276
function canCompress(command) {
1276-
const commandName = Object.keys(command.query)[0];
1277+
const commandDoc = command instanceof Msg ? command.command : command.query;
1278+
const commandName = Object.keys(commandDoc)[0];
12771279
return uncompressibleCommands.indexOf(commandName) === -1;
12781280
}
12791281

lib/sdam/server_description.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ const ISMASTER_FIELDS = [
3131
'setVersion',
3232
'electionId',
3333
'primary',
34-
'logicalSessionTimeoutMinutes'
34+
'logicalSessionTimeoutMinutes',
35+
'__nodejs_mock_server__'
3536
];
3637

3738
/**

lib/wireprotocol/command.js

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const Query = require('../connection/commands').Query;
4+
const Msg = require('../connection/msg').Msg;
45
const retrieveBSON = require('../connection/utils').retrieveBSON;
56
const MongoError = require('../error').MongoError;
67
const getReadPreference = require('./shared').getReadPreference;
@@ -48,14 +49,30 @@ function command(server, ns, cmd, options, callback) {
4849
// This value is not overridable
4950
commandOptions.slaveOk = readPreference.slaveOk();
5051

52+
const cmdNs = `${databaseNamespace(ns)}.$cmd`;
53+
const message = supportsOpMsg(server)
54+
? new Msg(bson, cmdNs, finalCmd, commandOptions)
55+
: new Query(bson, cmdNs, finalCmd, commandOptions);
56+
5157
try {
52-
const query = new Query(bson, `${databaseNamespace(ns)}.$cmd`, finalCmd, commandOptions);
53-
pool.write(query, commandOptions, callback);
58+
pool.write(message, commandOptions, callback);
5459
} catch (err) {
5560
callback(err);
5661
}
5762
}
5863

64+
function supportsOpMsg(topologyOrServer) {
65+
const description = topologyOrServer.ismaster
66+
? topologyOrServer.ismaster
67+
: topologyOrServer.description;
68+
69+
if (description == null) {
70+
return false;
71+
}
72+
73+
return description.maxWireVersion >= 6 && description.__nodejs_mock_server__ == null;
74+
}
75+
5976
function isTransactionCommand(command) {
6077
return !!(command.commitTransaction || command.abortTransaction);
6178
}

0 commit comments

Comments
 (0)