Aggregation Pipeline Power++ MongoDB 4.2 파이프 라인 쿼리 , 업데이트 및 구체화된 뷰 소개 김준 , 기술담당 상무, MongoDB
파이프라인(PIPELINE)
파이프라인(PIPELINE) ps ax | grep mongod | head 1 *nix command line pipe $match | $group | $sort MongoDB Document Pipeline Inputstream{}{}{}{} Result{}{}...
파이프라인 스테이지 DATA PIPELINE
파이프라인 스테이지 DATA PIPELINE Stage 1 Stage 2 Stage 3 Stage 4 {}{}{}{} {}{}{}{} {}{}{}{} {"$stage":{ ... }} Collection View Special stage
{title: "The Great Gatsby", language: "English", subjects: "Long Island"} {title: "The Great Gatsby", language: "English", subjects: "New York"} {title: "The Great Gatsby", language: "English", subjects: "1920s"} {title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, {"$match":{"language":"English"}} $match { _id:"Long Island", count: 1 }, $group { _id: "New York", count: 2 }, $unwind { _id: "1920s", count: 1 }, $sort $skip$limit $project {"$unwind":"$subjects"} {"$group":{"_id":"$subjects", "count":{"$sum:1}} { _id: "Harlem", count: 1 }, { _id:"Long Island", count: 1 }, { _id: "New York", count: 2 }, { _id: "1920s", count: 1 }, {title: "Open City", language: "English", subjects: [ "New York" "Harlem" ] } { title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, { title: "War and Peace", language: "Russian", subjects: [ "Russia", "War of 1812", "Napoleon"] }, { title: "Open City", language: "English", subjects: [ "New York", "Harlem" ] }, {title: "Open City", language: "English", subjects: "New York"} {title: "Open City", language: "English", subjects: "Harlem"} { _id: "Harlem", count: 1 }, {"$sort:{"count":-1} {"$limit":3} {"$project":...}
실행계획 ( Explain )
스트리밍(Streaming) 자원 사용 INPUT STAGE RESULTSSTAGE Each document is streamed through in RAM
블럭킹(Blocking) 자원 사용 INPUT STAGE RESULTSSTAGE Everything has to be kept in RAM (or spill)
PREVIOUSLY ...
Update 에서 집계 파이프라인 사용
Update db.coll.update( <query>, <update>, <options> )
Update db.coll.update( <query>, <update>, <options> )
Update { $set: { }, $inc: { }, $... } { f1: <value>, f2: <value>, ... }
Update { } OR [ ] [ <aggregation-pipeline> ]
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, {$inc:{a:1}}, {upsert:true}) { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } "errmsg" : "Cannot apply to a value of non-numeric type."
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$sum:["$a",1]}}} ], {upsert:true}) { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } { _id: 1, a: 1 }
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$add:["$a",1]}}} ], {upsert:true}) { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } "errmsg" : "$add only supports numeric or date types, not string"
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ ], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 }
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: then: , else: }} }}] {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 }
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: , else: }} }}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 }
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }} }}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 }
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }} }}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 }
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$min:[ 100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }}]} }}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 }
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$min:[ 100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }}]} }}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } { _id:1, a: 1 }
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$min:[ 100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }}]}, prev_a: "$a"}}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } { _id:1, a: 1 }
{ _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$min:[ 100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }}]}, prev_a: "$a"}}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11, prev_a: 10 } { _id: 1, a: 100, prev_a: 100 } { _id:1, a: 21 } { _id:1, a: 1, prev_a: "10" }
Set Defaults {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset"
Set Defaults {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{ }} ], {multi:true})
Set Defaults {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ ]}} ], {multi:true})
Set Defaults {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ { a:0, b:0, c:"unset" }, "$$ROOT" ]}} ], {multi:true})
Set Defaults {_id: 1, a: 5, b: 12, c: "unset"} {_id: 2, a: 15, b: 0, c: "abc"} {_id: 3, a: 0, b: 99, c: "xyz"} {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ { a:0, b:0, c:"unset" }, "$$ROOT" ]}} ], {multi:true})
Set array element upsert append update { id: 1, d: ISODate("2019-06-04T00:00:00"), h: [ { hour:"11", value: 296 }, { hour:"12", value: 300 } ]} id: X, d:Y, hour:Z, value: VAL db.coll.update({id:X, d:Y}, [ {$set:{h:{$cond:{ if: then: else: }}}}], {upsert:true})
Set array element upsert append update { id: 1, d: ISODate("2019-06-04T00:00:00"), h: [ { hour:"11", value: 296 }, { hour:"12", value: 300 } ]} id: X, d:Y, hour:Z, value: VAL db.coll.update({id:X, d:Y}, [ {$set:{h:{$cond:{ if: {$in:[Z,{$ifNull:["$h.hour",[]]}]}, then:{$map:{ input:"$h", in: {$cond:{ if:{$ne:["$$this.hour",Z]}, then:"$$this", else: {hour: Z, value: {$sum:[ "$$this.value", VAL]}} }} }}, else:{$concatArrays:[{$ifNull:["$h",[]]},[{hour:Z,value:VAL}]]} }}}}], {upsert:true})
Recap : • Updates구문에서 집계 파이프라인을 사용 할 수 있습니다. • 기존 문서의 모든 필드를 참조, 액세스 할 수 있습니다.
새로운 $merge 스테이지
MongoDB 4.2 이전 $out coll new_coll db.coll.aggregate( [ { pipeline }, ….. { $out: “new_coll” } ]) ; new_coll ○ must be unsharded ○ overwrites existing
MongoDB 4.2 이후 $merge db.coll.aggregate( [ {pipeline}, ..., {$merge: { ... } ]); coll2 can exist same or different 'db' can be sharded coll coll2
$merge 구문 { $merge: { into: <target> } }
$merge 구문 { $merge: { into: <target> } } {$merge: "collection2"}
$merge 구문 { $merge: { into: <target> } } {$merge: {db: "db2", coll: "collection2"}
$merge 구문 { $merge: { into: <target> } }
$merge 구문 { $merge: { into: <target>, on: <fields> } } on: "_id" on: [ "_id", "shardkey(s)" ] must be unique
$merge 구문 { $merge: { into: <target>, on: <fields> } }
방안 ( Actions ) source target
방안 ( Actions ) nothing matched: source target
방안 ( Actions ) nothing matched: usually insert source target
방안 ( Actions ) nothing matched: usually insert document matched: source target
방안 ( Actions ) nothing matched: usually insert document matched: overwrite? update? ??? source target
방안 ( Actions ) nothing matched: usually insert document matched: update source target
방안 ( Actions ) nothing matched: usually insert document matched: update (merge) source target
$merge 구문 { $merge: { into: <target>, whenNotMatched: whenMatched: } }
$merge 구문 { $merge: { into: <target>, whenNotMatched:"insert", whenMatched: } }
$merge 구문 { $merge: { into: <target>, whenNotMatched:"insert", whenMatched:"merge" } }
$merge 구문 { $merge: { into: <target>, whenNotMatched:"insert"|"discard"|"fail", whenMatched:"merge" } }
$merge 구문 { $merge: { into: <target>, whenNotMatched:"insert"|"discard"|"fail", whenMatched:"merge"|"replace"|"keepExisting"|"fail"|[...] } }
$merge 구문 { $merge: { into: <target>, whenMatched:[...] } }
$merge 구문 { $merge: { into: <target>, whenMatched:[<custom pipeline>] } }
$merge 예제 { $merge: { into: <target>, whenMatched:[ {$addFields:{ }} ] } }
$merge 예제 { $merge: { into: <target>, whenMatched:[ {$addFields:{ total:{$sum:["$total","$$new.total"]} }} ] } }
$merge 예제 { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
$merge 예제 { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
$merge 예제 { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { }
$merge 예제 { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { _id: "37", total: 309, f1: "yyy" }
$merge 예제 2 { $merge: { into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } }
$merge 예제 2 { $merge: { into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { }
$merge 예제 2 { $merge: { into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { _id: "37", total: 309, f1: "x" }
$merge 구문 { $merge: { into: <target>, whenMatched:[...] } }
$merge 구문 { $merge: { into: <target>, let: { ... }, whenMatched:[ ...] } }
$merge 구문 { $merge: { into: <target>, let: {new: "$$ROOT"}, whenMatched:[ ...] } }
$merge 구문 { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
$merge 구문 { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } } { $merge:{ into:<target>, let:{itotal:"$total"}, whenMatched:[ {$set:{ total:{$sum:["$total","$$itotal"]} }} ] } }
활용 사례 1 APPEND from TEMP collection
temp real data real Using $merge to append loaded and cleansed records loaded into db
aggregate 'temp' and append valid records to 'data' db.temp.aggregate( [ { ... } /* pipeline to massage and cleanse data in temp */, {$merge:{ into: "data", whenMatched: "fail" }} ]);
aggregate 'temp' and append valid records to 'data' db.temp.aggregate( [ { ... } /* pipeline to massage and cleanse data in temp */, {$merge:{ into: "data", whenMatched: "fail" }} ]); Similar to SQL's INSERT INTO T1 SELECT * from T2
활용 사례 2 Maintain Single View
mflix users users mfriendbook users sv Using $merge to populate/update user fields from other services
mflix users users mfriendbook users sv Using $merge to populate/update user fields from other services sv.users { _id: "user253", dob: ISODate(...), f1: "yyy" }
$merge updates fields from mflix.users collection into sv.users collection. Our "_id" field is unique username mflix_pipeline = [ {"$project" : { "_id" : "$username", "mflix" : "$$ROOT" }}, {"$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mflix) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy" }
$merge updates fields from mflix.users collection into sv.users collection. Our "_id" field is unique username mflix_pipeline = [ {"$project" : { "_id" : "$username", "mflix" : "$$ROOT" }}, {"$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mflix) db.users.aggregate(mflix_pipeline) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... } }
$merge updates fields from mfriendbook.users collection into sv.users collection. Our "_id" field is unique username mfriendbook_pipeline = [ {"$project" : { "_id" : "$username", "mfriendbook" : "$$ROOT" }}, {"$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mfriendbook) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... } }
$merge updates fields from mfriendbook.users collection into sv.users collection. Our "_id" field is unique username mfriendbook_pipeline = [ {"$project" : { "_id" : "$username", "mfriendbook" : "$$ROOT" }}, {"$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mfriendbook) db.users.aggregate(mfriendbook_pipeline) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... }, mfriendbook: { ... } }
활용 사례 3 Populate ROLLUPS into summary table
registrations real regsummary real Using $merge to incrementally update periodic rollups in summary
$merge to create/update periodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true});
$merge to create/update periodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true}); db.registrations.aggregate([ {$match: {event_id: "MDBW19"}}, {$group:{ _id:{$dateToString:{date:"$date",format:"%Y-%m-%d"}}, count: {$sum:1} }}, {$project: {_id:0,event:"MDBW19",date:"$_id",total:"$count"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ])
$merge to create/update periodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true}); db.registrations.aggregate([ {$match: {event_id: "MDBW19"}}, {$group:{ _id:{$dateToString:{date:"$date",format:"%Y-%m-%d"}}, count: {$sum:1} }}, {$project: {_id:0,event:"MDBW19",date:"$_id",total:"$count"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ]) { "event" : "MDBW19", "date" : "2019-05-19", "total" : 33 } { "event" : "MDBW19", "date" : "2019-05-20", "total" : 15 } { "event" : "MDBW19", "date" : "2019-05-21", "total" : 24 }
$merge to incrementally update periodic rollups in summary collection (for single day) db.registrations.aggregate([ {$match: { event_id: "MDBW19", date:{$gte:ISODate("2019-05-22"),$lt:ISODate("2019-05-23")} }}, {$count: "total"}, {$addFields: {event:"MDBW19", "date":"2019-05-22"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ])
$merge to incrementally update periodic rollups in summary collection (for single day) db.registrations.aggregate([ {$match: { event_id: "MDBW19", date:{$gte:ISODate("2019-05-22"),$lt:ISODate("2019-05-23")} }}, {$count: "total"}, {$addFields: {event:"MDBW19", "date":"2019-05-22"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ]) { "event" : "MDBW19", "date" : "2019-05-19", "total" : 33 } { "event" : "MDBW19", "date" : "2019-05-20", "total" : 15 } { "event" : "MDBW19", "date" : "2019-05-21", "total" : 24 } { "event" : "MDBW19", "date" : "2019-05-22", "total" : 34 }
Thank You!
Aggregation Pipeline Power [김준] https://www.research.net/r/AggPipelineSeoul

Aggregation Pipeline Power++: MongoDB 4.2 파이프 라인 쿼리, 업데이트 및 구체화된 뷰 소개 [MongoDB]

  • 1.
    Aggregation Pipeline Power++ MongoDB4.2 파이프 라인 쿼리 , 업데이트 및 구체화된 뷰 소개 김준 , 기술담당 상무, MongoDB
  • 2.
  • 3.
    파이프라인(PIPELINE) ps ax |grep mongod | head 1 *nix command line pipe $match | $group | $sort MongoDB Document Pipeline Inputstream{}{}{}{} Result{}{}...
  • 4.
  • 5.
    파이프라인 스테이지 DATA PIPELINE Stage1 Stage 2 Stage 3 Stage 4 {}{}{}{} {}{}{}{} {}{}{}{} {"$stage":{ ... }} Collection View Special stage
  • 6.
    {title: "The GreatGatsby", language: "English", subjects: "Long Island"} {title: "The Great Gatsby", language: "English", subjects: "New York"} {title: "The Great Gatsby", language: "English", subjects: "1920s"} {title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, {"$match":{"language":"English"}} $match { _id:"Long Island", count: 1 }, $group { _id: "New York", count: 2 }, $unwind { _id: "1920s", count: 1 }, $sort $skip$limit $project {"$unwind":"$subjects"} {"$group":{"_id":"$subjects", "count":{"$sum:1}} { _id: "Harlem", count: 1 }, { _id:"Long Island", count: 1 }, { _id: "New York", count: 2 }, { _id: "1920s", count: 1 }, {title: "Open City", language: "English", subjects: [ "New York" "Harlem" ] } { title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, { title: "War and Peace", language: "Russian", subjects: [ "Russia", "War of 1812", "Napoleon"] }, { title: "Open City", language: "English", subjects: [ "New York", "Harlem" ] }, {title: "Open City", language: "English", subjects: "New York"} {title: "Open City", language: "English", subjects: "Harlem"} { _id: "Harlem", count: 1 }, {"$sort:{"count":-1} {"$limit":3} {"$project":...}
  • 8.
  • 9.
    스트리밍(Streaming) 자원 사용 INPUTSTAGE RESULTSSTAGE Each document is streamed through in RAM
  • 10.
    블럭킹(Blocking) 자원 사용 INPUTSTAGE RESULTSSTAGE Everything has to be kept in RAM (or spill)
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
    Update { $set: { }, $inc:{ }, $... } { f1: <value>, f2: <value>, ... }
  • 16.
    Update { } OR[ ] [ <aggregation-pipeline> ]
  • 17.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, {$inc:{a:1}}, {upsert:true}) { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } "errmsg" : "Cannot apply to a value of non-numeric type."
  • 18.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$sum:["$a",1]}}} ], {upsert:true}) { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } { _id: 1, a: 1 }
  • 19.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$add:["$a",1]}}} ], {upsert:true}) { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } "errmsg" : "$add only supports numeric or date types, not string"
  • 20.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ ], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 }
  • 21.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: then: , else: }} }}] {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 }
  • 22.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: , else: }} }}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 }
  • 23.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }} }}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 }
  • 24.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }} }}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 }
  • 25.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$min:[ 100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }}]} }}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 }
  • 26.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$min:[ 100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }}]} }}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } { _id:1, a: 1 }
  • 27.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$min:[ 100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }}]}, prev_a: "$a"}}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } { _id:1, a: 1 }
  • 28.
    { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$min:[ 100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a", 1]} }}]}, prev_a: "$a"}}], {upsert:true}) { _id:1, a: 21 } { _id: 1, a: 11, prev_a: 10 } { _id: 1, a: 100, prev_a: 100 } { _id:1, a: 21 } { _id:1, a: 1, prev_a: "10" }
  • 29.
    Set Defaults {_id: 1,a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset"
  • 30.
    Set Defaults {_id: 1,a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{ }} ], {multi:true})
  • 31.
    Set Defaults {_id: 1,a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ ]}} ], {multi:true})
  • 32.
    Set Defaults {_id: 1,a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ { a:0, b:0, c:"unset" }, "$$ROOT" ]}} ], {multi:true})
  • 33.
    Set Defaults {_id: 1,a: 5, b: 12, c: "unset"} {_id: 2, a: 15, b: 0, c: "abc"} {_id: 3, a: 0, b: 99, c: "xyz"} {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ { a:0, b:0, c:"unset" }, "$$ROOT" ]}} ], {multi:true})
  • 34.
    Set array elementupsert append update { id: 1, d: ISODate("2019-06-04T00:00:00"), h: [ { hour:"11", value: 296 }, { hour:"12", value: 300 } ]} id: X, d:Y, hour:Z, value: VAL db.coll.update({id:X, d:Y}, [ {$set:{h:{$cond:{ if: then: else: }}}}], {upsert:true})
  • 35.
    Set array elementupsert append update { id: 1, d: ISODate("2019-06-04T00:00:00"), h: [ { hour:"11", value: 296 }, { hour:"12", value: 300 } ]} id: X, d:Y, hour:Z, value: VAL db.coll.update({id:X, d:Y}, [ {$set:{h:{$cond:{ if: {$in:[Z,{$ifNull:["$h.hour",[]]}]}, then:{$map:{ input:"$h", in: {$cond:{ if:{$ne:["$$this.hour",Z]}, then:"$$this", else: {hour: Z, value: {$sum:[ "$$this.value", VAL]}} }} }}, else:{$concatArrays:[{$ifNull:["$h",[]]},[{hour:Z,value:VAL}]]} }}}}], {upsert:true})
  • 36.
    Recap : • Updates구문에서집계 파이프라인을 사용 할 수 있습니다. • 기존 문서의 모든 필드를 참조, 액세스 할 수 있습니다.
  • 37.
  • 38.
    MongoDB 4.2 이전 $out coll new_coll db.coll.aggregate([ { pipeline }, ….. { $out: “new_coll” } ]) ; new_coll ○ must be unsharded ○ overwrites existing
  • 39.
    MongoDB 4.2 이후 $merge db.coll.aggregate([ {pipeline}, ..., {$merge: { ... } ]); coll2 can exist same or different 'db' can be sharded coll coll2
  • 40.
  • 41.
    $merge 구문 { $merge: { into:<target> } } {$merge: "collection2"}
  • 42.
    $merge 구문 { $merge: { into:<target> } } {$merge: {db: "db2", coll: "collection2"}
  • 43.
  • 44.
    $merge 구문 { $merge: { into:<target>, on: <fields> } } on: "_id" on: [ "_id", "shardkey(s)" ] must be unique
  • 45.
    $merge 구문 { $merge: { into:<target>, on: <fields> } }
  • 46.
    방안 ( Actions) source target
  • 47.
    방안 ( Actions) nothing matched: source target
  • 48.
    방안 ( Actions) nothing matched: usually insert source target
  • 49.
    방안 ( Actions) nothing matched: usually insert document matched: source target
  • 50.
    방안 ( Actions) nothing matched: usually insert document matched: overwrite? update? ??? source target
  • 51.
    방안 ( Actions) nothing matched: usually insert document matched: update source target
  • 52.
    방안 ( Actions) nothing matched: usually insert document matched: update (merge) source target
  • 53.
    $merge 구문 { $merge: { into:<target>, whenNotMatched: whenMatched: } }
  • 54.
    $merge 구문 { $merge: { into:<target>, whenNotMatched:"insert", whenMatched: } }
  • 55.
    $merge 구문 { $merge: { into:<target>, whenNotMatched:"insert", whenMatched:"merge" } }
  • 56.
    $merge 구문 { $merge: { into:<target>, whenNotMatched:"insert"|"discard"|"fail", whenMatched:"merge" } }
  • 57.
    $merge 구문 { $merge: { into:<target>, whenNotMatched:"insert"|"discard"|"fail", whenMatched:"merge"|"replace"|"keepExisting"|"fail"|[...] } }
  • 58.
    $merge 구문 { $merge: { into:<target>, whenMatched:[...] } }
  • 59.
    $merge 구문 { $merge: { into:<target>, whenMatched:[<custom pipeline>] } }
  • 60.
    $merge 예제 { $merge: { into:<target>, whenMatched:[ {$addFields:{ }} ] } }
  • 61.
    $merge 예제 { $merge: { into:<target>, whenMatched:[ {$addFields:{ total:{$sum:["$total","$$new.total"]} }} ] } }
  • 62.
    $merge 예제 { $merge: { into:<target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
  • 63.
    $merge 예제 { $merge: { into:<target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
  • 64.
    $merge 예제 { $merge: { into:<target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { }
  • 65.
    $merge 예제 { $merge: { into:<target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { _id: "37", total: 309, f1: "yyy" }
  • 66.
    $merge 예제 2 { $merge:{ into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } }
  • 67.
    $merge 예제 2 { $merge:{ into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { }
  • 68.
    $merge 예제 2 { $merge:{ into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { _id: "37", total: 309, f1: "x" }
  • 69.
    $merge 구문 { $merge: { into:<target>, whenMatched:[...] } }
  • 70.
    $merge 구문 { $merge: { into:<target>, let: { ... }, whenMatched:[ ...] } }
  • 71.
    $merge 구문 { $merge: { into:<target>, let: {new: "$$ROOT"}, whenMatched:[ ...] } }
  • 72.
    $merge 구문 { $merge: { into:<target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
  • 73.
    $merge 구문 { $merge: { into:<target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } } { $merge:{ into:<target>, let:{itotal:"$total"}, whenMatched:[ {$set:{ total:{$sum:["$total","$$itotal"]} }} ] } }
  • 74.
    활용 사례 1 APPENDfrom TEMP collection
  • 75.
    temp real data real Using $merge toappend loaded and cleansed records loaded into db
  • 76.
    aggregate 'temp' andappend valid records to 'data' db.temp.aggregate( [ { ... } /* pipeline to massage and cleanse data in temp */, {$merge:{ into: "data", whenMatched: "fail" }} ]);
  • 77.
    aggregate 'temp' andappend valid records to 'data' db.temp.aggregate( [ { ... } /* pipeline to massage and cleanse data in temp */, {$merge:{ into: "data", whenMatched: "fail" }} ]); Similar to SQL's INSERT INTO T1 SELECT * from T2
  • 78.
  • 79.
    mflix users users mfriendbook users sv Using $merge topopulate/update user fields from other services
  • 80.
    mflix users users mfriendbook users sv Using $merge topopulate/update user fields from other services sv.users { _id: "user253", dob: ISODate(...), f1: "yyy" }
  • 81.
    $merge updates fieldsfrom mflix.users collection into sv.users collection. Our "_id" field is unique username mflix_pipeline = [ {"$project" : { "_id" : "$username", "mflix" : "$$ROOT" }}, {"$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mflix) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy" }
  • 82.
    $merge updates fieldsfrom mflix.users collection into sv.users collection. Our "_id" field is unique username mflix_pipeline = [ {"$project" : { "_id" : "$username", "mflix" : "$$ROOT" }}, {"$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mflix) db.users.aggregate(mflix_pipeline) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... } }
  • 83.
    $merge updates fieldsfrom mfriendbook.users collection into sv.users collection. Our "_id" field is unique username mfriendbook_pipeline = [ {"$project" : { "_id" : "$username", "mfriendbook" : "$$ROOT" }}, {"$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mfriendbook) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... } }
  • 84.
    $merge updates fieldsfrom mfriendbook.users collection into sv.users collection. Our "_id" field is unique username mfriendbook_pipeline = [ {"$project" : { "_id" : "$username", "mfriendbook" : "$$ROOT" }}, {"$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mfriendbook) db.users.aggregate(mfriendbook_pipeline) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... }, mfriendbook: { ... } }
  • 85.
    활용 사례 3 PopulateROLLUPS into summary table
  • 86.
    registrations real regsummary real Using $merge toincrementally update periodic rollups in summary
  • 87.
    $merge to create/updateperiodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true});
  • 88.
    $merge to create/updateperiodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true}); db.registrations.aggregate([ {$match: {event_id: "MDBW19"}}, {$group:{ _id:{$dateToString:{date:"$date",format:"%Y-%m-%d"}}, count: {$sum:1} }}, {$project: {_id:0,event:"MDBW19",date:"$_id",total:"$count"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ])
  • 89.
    $merge to create/updateperiodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true}); db.registrations.aggregate([ {$match: {event_id: "MDBW19"}}, {$group:{ _id:{$dateToString:{date:"$date",format:"%Y-%m-%d"}}, count: {$sum:1} }}, {$project: {_id:0,event:"MDBW19",date:"$_id",total:"$count"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ]) { "event" : "MDBW19", "date" : "2019-05-19", "total" : 33 } { "event" : "MDBW19", "date" : "2019-05-20", "total" : 15 } { "event" : "MDBW19", "date" : "2019-05-21", "total" : 24 }
  • 90.
    $merge to incrementallyupdate periodic rollups in summary collection (for single day) db.registrations.aggregate([ {$match: { event_id: "MDBW19", date:{$gte:ISODate("2019-05-22"),$lt:ISODate("2019-05-23")} }}, {$count: "total"}, {$addFields: {event:"MDBW19", "date":"2019-05-22"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ])
  • 91.
    $merge to incrementallyupdate periodic rollups in summary collection (for single day) db.registrations.aggregate([ {$match: { event_id: "MDBW19", date:{$gte:ISODate("2019-05-22"),$lt:ISODate("2019-05-23")} }}, {$count: "total"}, {$addFields: {event:"MDBW19", "date":"2019-05-22"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ]) { "event" : "MDBW19", "date" : "2019-05-19", "total" : 33 } { "event" : "MDBW19", "date" : "2019-05-20", "total" : 15 } { "event" : "MDBW19", "date" : "2019-05-21", "total" : 24 } { "event" : "MDBW19", "date" : "2019-05-22", "total" : 34 }
  • 92.
  • 93.