- 구글에서 대용량 Data에 대한 Batch Processing 및 Aggregation 을 위해 만들어졌으며, 공유되지 않은 여러 Node에서 동시에 병렬 처리 방식으로 Big Data에 대한 처리가 가능한 기능입니다.
- 아직 MongoDB는 다른 RDBMS만큼 오래되지 않아 지원되는 함수가 많이 없는데, 개발자가 Programming으로 함수를 생성하게 해주는 기능입니다.
2. 구조
db.collection.mapReduce( <map>, // Map Function : 분석 대상 Field를 emit 함수를 이용하여 정의 <reduce>, // Recude Function : 분석 및 통계 작업을 수행 { <out>, // 실행 결과를 저장할 collection명 <query>, // 검색 조건 <sort>, // 정렬 조건 <limit>, // 검색 조건 <finalize>, // 처리된 결과를 집계하는 함수 <scope>, <jsMode>, <verbose> } ) |
3. 실습
먼저 order 라는 collection을 생성 한 후 예제 data를 넣습니다.
> db.order.insert( { cust_id : "A2012001" , order_date : new Date("Oct 01, 2012") , status : "A" , price : 250 , items : [ { item_name : "Bunny Boots" , qty : 5 , price : 2.5 } , { item_name : "Sky Pole" , qty : 5 , price : 2.5 } ] } ) WriteResult({ "nInserted" : 1 }) > db.order.insert( { cust_id : "A2012001" , order_date : new Date("Sep 15, 2012") , status : "A" , price : 1125 , items : [ { item_name : "Bunny Boots" , qty : 15 , price : 2.5 } , { item_name : "Sky Pole" , qty : 5 , price : 2.5 } ] } ) WriteResult({ "nInserted" : 1 }) |
이제 cust_id 별로 price를 집계하는 mapReduce 함수를 생성하고, 그 결과를 order_cust_total에 저장합니다.
var map_function = function() { emit ( this.cust_id , this.price ); } var reduce_function = function ( keyCustId, valuesPrices ) { return Array.sum ( valuesPrices ); }; db.order.mapReduce( map_function , reduce_function , { out : "order_cust_total" } ) { "result" : "order_cust_total", "timeMillis" : 38, "counts" : { "input" : 2, "emit" : 2, "reduce" : 1, "output" : 1 }, "ok" : 1 } db.order_cust_total.find() { "_id" : "A2012001", "value" : 1375 } |
다음은 주문 항목의 제품 별 수량 평균을 집계하는 MapReduce 예제입니다.
var map_function = function() { for (var idx = 0; idx < this.items.length; idx++) { var key = this.items[idx].item_name; emit(key); } } var reduce_function = function(keySKU, valuesCountObjects) { reducedValue = { count : 0 , qty : 0 }; for (var idx = 0; idx < valuesCountObjects.length; idx++) { reducedValue.count += valuesCountObjects[idx].count; reducedValue.qty += valuesCountObjects[idx].qty; } return reducedValue; } var finalize_function = function(key, reducedValue) { reducedValue.average = reducedValue.qty / reducedValue.count; return reducedValue; } db.order.mapReduce ( map_function , reduce_function , { out : { merge : "map_reduce_example" } , query : { order_date: { $gt : new Date('01/01/2012') } } , finalize: finalize_function } ) { "result" : "map_reduce_example", "timeMillis" : 4, "counts" : { "input" : 2, "emit" : 4, "reduce" : 2, "output" : 2 }, "ok" : 1 } db.map_reduce_example.find() { "_id" : "Bunny Boots", "value" : { "count" : 2, "qty" : 20, "average" : 10 } } { "_id" : "Sky Pole", "value" : { "count" : 2, "qty" : 10, "average" : 5 } } |
위 작업들을 Orange for Oracle 을 이용하여 RDBMS에서 수행한 결과는 다음과 같습니다.
다음은 사용자별로 접속기록을 Log 로 저장하고 있는 경우에 대하여
사용자별 접속회수, 총 접속 시간, 평균 접속 시간을 분석하는 예제입니다.
먼저 Log 예제 collection을 생성하겠습니다.
db.log.save( { user : "T2013001" , date : ISODate("2013-01-03 15:07:00") , len : 88 } ); db.log.save( { user : "T2013002" , date : ISODate("2013-01-03 15:13:00") , len : 10 } ); db.log.save( { user : "T2013003" , date : ISODate("2013-01-03 16:12:00") , len : 120 } ); db.log.save( { user : "T2013004" , date : ISODate("2013-01-03 17:35:00") , len : 35 } ); db.log.save( { user : "T2013001" , date : ISODate("2013-01-04 12:15:00") , len : 125 } ); db.log.save( { user : "T2013002" , date : ISODate("2013-01-04 14:04:00") , len : 110 } ); db.log.save( { user : "T2013003" , date : ISODate("2013-01-04 18:50:00") , len : 135 } ); db.log.save( { user : "T2013004" , date : ISODate("2013-01-04 16:27:00") , len : 85 } ); |
다음으로 분석하는 MapReduce를 생성하겠습니다.
var map_log = function() { var key = this.user; var value = { user : this.user , total : this.len, count : 1, avg : 0 }; emit ( key , value ) ; } var red_log = function ( key , values ) { var reduced = { user : key , total : 0 , count : 0 , avg : 0 }; values.forEach ( function ( value ) { reduced.total += value.total; reduced.count += value.count; } ); return reduced; } var final_log = function ( key , reduced ) { if ( reduced.count > 0 ) reduced.avg = reduced.total / reduced.count; return reduced; } db.log.mapReduce ( map_log , red_log , { out : { reduce: "session_stat" } , finalize : final_log } ) { "result" : "session_stat", "timeMillis" : 7, "counts" : { "input" : 8, "emit" : 8, "reduce" : 4, "output" : 4 }, "ok" : 1 } db.session_stat.find() { "_id" : "T2013001", "value" : { "user" : "T2013001", "total" : 213, "count" : 2, "avg" : 106.5 } } { "_id" : "T2013002", "value" : { "user" : "T2013002", "total" : 120, "count" : 2, "avg" : 60 } } { "_id" : "T2013003", "value" : { "user" : "T2013003", "total" : 255, "count" : 2, "avg" : 127.5 } } { "_id" : "T2013004", "value" : { "user" : "T2013004", "total" : 120, "count" : 2, "avg" : 60 } } |
위 작업들을 Orange for Oracle 을 이용하여 RDBMS에서 수행한 결과는 다음과 같습니다.
딱 봐도 아시겠지만, SQL보다는 MongoDB를 이용한 방법이 더 불편합니다.
하지만 Big Data들을 빠르게 분석하기 위해서는 배워야하죠. ㅠㅠ
다음 예제는 EMP Table에서 부서별, 직책별 인원 및 급여에 대한 통계값을 구하는 Query 입니다.
Oracle에서 조회하는 SQL문과 MongoDB에서 조회하는 MapReduce를 비교해 보겠습니다.
색깔별로 같은 기능을 하는 곳을 표시해 두었습니다.
Oracle의 SQL문
SELECT DEPTNO, JOB, SUM(SAL) AS MSUM, COUNT(*) AS RECS, AVG(SAL) AS MAVG, MIN(SAL) AS MIN, MAX(CASE WHEN SAL > 1000 THEN SAL END) AS MMAX FROM SCOTT.EMP WHERE (HIREDATE > '1981-01-01' AND HIREDATE < '1983-12-13') AND SAL > 800 GROUP BY DEPTNO, JOB HAVING MIN(SAL) > 0 ORDER BY RECS DESC ; |
MongoDB의 MapReduce
db.runCommand( { mapreduce : "emp" , query : { hiredate : { $gt : '01-01-1981' , $lt : '31-12-1983' } , sal : { $gt : 800 } } , map : function() { emit ( { d1 : this.deptno, d2 : this.job } , { msum : this.sal , recs : 1 , mmin : this.sal , mmax : this.sal > 1000 ? this.sal : 0 } ) ; } , reduce : function ( key , vals ) { var ret = { msum : 0 , recs : 0 , mmin : 0 , mmax : 0 } ; for ( var i = 0 ; i < vals.length ; i++) { ret.msum += vals[i].msum; ret.recs += vals[i].recs; if ( vals[i].mmin < ret.mmin ) ret.mmin = vals[i].mmin; if ( ( vals[i].mmax > 1000 ) && ( vals[i].mmax > ret.mmax ) ) ret.mmax = vals[i].mmax; } return ret ; } , finalize : function ( key, val ) { val.mavg = val.msum / val.recs ; return val ; } , out : "result_emp" , verbose : true } ) { "result" : "result_emp", "timeMillis" : 48, "timing" : { "mapTime" : 0, "emitLoop" : 46, "reduceTime" : 1, "mode" : "mixed", "total" : 48 }, "counts" : { "input" : 13, "emit" : 13, "reduce" : 2, "output" : 9 }, "ok" : 1 } db.result_emp.find( { "value.mmin" : { $gt : 0 } } ).sort( { "value.recs" : 1 } ) { "_id" : { "d1" : 10, "d2" : "CEO" }, "value" : { "msum" : 5000, "recs" : 1, "mmin" : 5000, "mmax" : 5000, "mavg" : 5000 } } { "_id" : { "d1" : 10, "d2" : "CLERK" }, "value" : { "msum" : 1300, "recs" : 1, "mmin" : 1300, "mmax" : 1300, "mavg" : 1300 } } { "_id" : { "d1" : 10, "d2" : "MANAGER" }, "value" : { "msum" : 2450, "recs" : 1, "mmin" : 2450, "mmax" : 2450, "mavg" : 2450 } } { "_id" : { "d1" : 20, "d2" : "CLERK" }, "value" : { "msum" : 1100, "recs" : 1, "mmin" : 1100, "mmax" : 1100, "mavg" : 1100 } } { "_id" : { "d1" : 20, "d2" : "MANAGER" }, "value" : { "msum" : 2975, "recs" : 1, "mmin" : 2975, "mmax" : 2975, "mavg" : 2975 } } { "_id" : { "d1" : 30, "d2" : "CLERK" }, "value" : { "msum" : 950, "recs" : 1, "mmin" : 950, "mmax" : 0, "mavg" : 950 }} { "_id" : { "d1" : 30, "d2" : "MANAGER" }, "value" : { "msum" : 2850, "recs" : 1, "mmin" : 2850, "mmax" : 2850, "mavg" : 2850 } } |
댓글 없음:
댓글 쓰기