페이지

2015년 8월 5일 수요일

MongoDB Study #07 추출과 분석 : MapReduce

1. MapReduce 란 ?

  - 구글에서 대용량 Data에 대한 Batch Processing 및 Aggregation 을 위해 만들어졌으며, 공유되지 않은 여러 Node에서 동시에 병렬 처리 방식으로 Big Data에 대한 처리가 가능한 기능입니다.
  - 아직 MongoDB는 다른 RDBMS만큼 오래되지 않아 지원되는 함수가 많이 없는데, 개발자가 Programming으로 함수를 생성하게 해주는 기능입니다.

2. 구조

db.collection.mapReduce(
    <map>,          // Map Function : 분석 대상 Field를 emit 함수를 이용하여 정의
    <reduce>,      // Recude Function : 분석 및 통계 작업을 수행
    {
        <out>,      // 실행 결과를 저장할 collection명
        <query>,   // 검색 조건
        <sort>,     // 정렬 조건
        <limit>,     // 검색 조건
        <finalize>, // 처리된 결과를 집계하는 함수
        <scope>,
        <jsMode>,
        <verbose> } ) 

3. 실습

먼저 order 라는 collection을 생성 한 후 예제 data를 넣습니다.

> db.order.insert( { cust_id : "A2012001" , order_date : new Date("Oct 01, 2012") , status : "A" , price : 250 , items : [ { item_name : "Bunny Boots" , qty : 5 , price : 2.5 } , { item_name : "Sky Pole" , qty : 5 , price : 2.5 } ] } )
WriteResult({ "nInserted" : 1 })
> db.order.insert( { cust_id : "A2012001" , order_date : new Date("Sep 15, 2012") , status : "A" , price : 1125 , items : [ { item_name : "Bunny Boots" , qty : 15 , price : 2.5 } , { item_name : "Sky Pole" , qty : 5 , price : 2.5 } ] } )
WriteResult({ "nInserted" : 1 })

이제 cust_id 별로 price를 집계하는 mapReduce 함수를 생성하고, 그 결과를 order_cust_total에 저장합니다.

var map_function = function() { emit ( this.cust_id , this.price ); }
var reduce_function = function ( keyCustId, valuesPrices ) { return Array.sum ( valuesPrices ); };
db.order.mapReduce( map_function , reduce_function , { out : "order_cust_total" } )
{
        "result" : "order_cust_total",
        "timeMillis" : 38,
        "counts" : {
                "input" : 2,
                "emit" : 2,
                "reduce" : 1,
                "output" : 1
        },
        "ok" : 1
}

db.order_cust_total.find()
{ "_id" : "A2012001", "value" : 1375 }

다음은 주문 항목의 제품 별 수량 평균을 집계하는 MapReduce 예제입니다.

 var map_function = function() {
     for (var idx = 0; idx < this.items.length; idx++) {
         var key = this.items[idx].item_name;
         emit(key);
     }
 }


var reduce_function = function(keySKU, valuesCountObjects) {
     reducedValue = { count : 0 , qty : 0 };
     for (var idx = 0; idx < valuesCountObjects.length; idx++) {
         reducedValue.count += valuesCountObjects[idx].count;
         reducedValue.qty += valuesCountObjects[idx].qty;
     }
     return reducedValue;
 }


var finalize_function = function(key, reducedValue) {
     reducedValue.average = reducedValue.qty / reducedValue.count;
     return reducedValue;
 }


db.order.mapReduce ( map_function , reduce_function , { out : { merge : "map_reduce_example" } , query : { order_date: { $gt : new Date('01/01/2012') } } , finalize: finalize_function } )
{
        "result" : "map_reduce_example",
        "timeMillis" : 4,
        "counts" : {
                "input" : 2,
                "emit" : 4,
                "reduce" : 2,
                "output" : 2
        },
        "ok" : 1
}

db.map_reduce_example.find()
{ "_id" : "Bunny Boots", "value" : { "count" : 2, "qty" : 20, "average" : 10 } }
{ "_id" : "Sky Pole", "value" : { "count" : 2, "qty" : 10, "average" : 5 } }

위 작업들을 Orange for Oracle 을 이용하여 RDBMS에서 수행한 결과는 다음과 같습니다.


다음은 사용자별로 접속기록을 Log 로 저장하고 있는 경우에 대하여
사용자별 접속회수, 총 접속 시간, 평균 접속 시간을 분석하는 예제입니다.

먼저 Log 예제 collection을 생성하겠습니다.

 db.log.save( { user : "T2013001" , date : ISODate("2013-01-03 15:07:00") , len : 88  }  );
 db.log.save( { user : "T2013002" , date : ISODate("2013-01-03 15:13:00") , len : 10  }  );
 db.log.save( { user : "T2013003" , date : ISODate("2013-01-03 16:12:00") , len : 120 } );
 db.log.save( { user : "T2013004" , date : ISODate("2013-01-03 17:35:00") , len : 35 } );
 db.log.save( { user : "T2013001" , date : ISODate("2013-01-04 12:15:00") , len : 125 } );
 db.log.save( { user : "T2013002" , date : ISODate("2013-01-04 14:04:00") , len : 110 } );

 db.log.save( { user : "T2013003" , date : ISODate("2013-01-04 18:50:00") , len : 135 } );
 db.log.save( { user : "T2013004" , date : ISODate("2013-01-04 16:27:00") , len : 85 } );

다음으로 분석하는 MapReduce를 생성하겠습니다.

 var map_log = function() {
    var key = this.user;
    var value = { user : this.user , total : this.len, count : 1, avg : 0 };
    emit ( key , value ) ;
}

var red_log = function ( key , values ) {
    var reduced = { user : key , total : 0 , count : 0 , avg : 0 };
    values.forEach ( function ( value ) {
        reduced.total += value.total;
        reduced.count += value.count;
    } );
    return reduced;
}

var final_log = function ( key , reduced ) {
    if ( reduced.count > 0 )
        reduced.avg = reduced.total / reduced.count;
    return reduced;
}

db.log.mapReduce ( map_log , red_log , { out : { reduce: "session_stat" } , finalize : final_log } )
{
        "result" : "session_stat",
        "timeMillis" : 7,
        "counts" : {
                "input" : 8,
                "emit" : 8,
                "reduce" : 4,
                "output" : 4
        },
        "ok" : 1
}


db.session_stat.find()
{ "_id" : "T2013001", "value" : { "user" : "T2013001", "total" : 213, "count" : 2, "avg" : 106.5 } }
{ "_id" : "T2013002", "value" : { "user" : "T2013002", "total" : 120, "count" : 2, "avg" : 60 } }
{ "_id" : "T2013003", "value" : { "user" : "T2013003", "total" : 255, "count" : 2, "avg" : 127.5 } }
{ "_id" : "T2013004", "value" : { "user" : "T2013004", "total" : 120, "count" : 2, "avg" : 60 } }

위 작업들을 Orange for Oracle 을 이용하여 RDBMS에서 수행한 결과는 다음과 같습니다.



딱 봐도 아시겠지만, SQL보다는 MongoDB를 이용한 방법이 더 불편합니다.
하지만 Big Data들을 빠르게 분석하기 위해서는 배워야하죠. ㅠㅠ

다음 예제는 EMP Table에서 부서별, 직책별 인원 및 급여에 대한 통계값을 구하는 Query 입니다.
Oracle에서 조회하는 SQL문과 MongoDB에서 조회하는 MapReduce를 비교해 보겠습니다.
색깔별로 같은 기능을 하는 곳을 표시해 두었습니다.

Oracle의 SQL문
SELECT DEPTNO, JOB,
      ​        SUM(SAL) AS MSUM,
      ​        COUNT(*) AS RECS,
      ​        AVG(SAL) AS MAVG,
      ​        MIN(SAL) AS MIN,
      ​        MAX(CASE WHEN SAL > 1000 THEN SAL END) AS MMAX
  FROM SCOTT.EMP
 WHERE (HIREDATE > '1981-01-01' AND HIREDATE < '1983-12-13')
  ​    AND SAL > 800
 GROUP BY DEPTNO, JOB
HAVING MIN(SAL) > 0
ORDER BY RECS DESC ;

MongoDB의 MapReduce
db.runCommand( {
mapreduce : "emp" ,
query : {
    hiredate : { $gt : '01-01-1981' , $lt : '31-12-1983' } ,
    sal : { $gt : 800 }
} ,
map : function() {
    emit (
        { d1 : this.deptno, d2 : this.job } ,
        { msum : this.sal , recs : 1 , mmin : this.sal , mmax : this.sal > 1000 ? this.sal : 0 }
    ) ;
} ,
reduce : function ( key , vals ) {
    var ret = { msum : 0 , recs : 0 , mmin : 0 , mmax : 0 } ;
    for ( var i = 0 ; i < vals.length ; i++) {
        ret.msum += vals[i].msum;
        ret.recs += vals[i].recs;
        if ( vals[i].mmin < ret.mmin ) ret.mmin = vals[i].mmin;
        if ( ( vals[i].mmax > 1000 ) && ( vals[i].mmax > ret.mmax ) )
            ret.mmax = vals[i].mmax;
    }
    return ret ;
} ,
finalize : function ( key, val ) {
    val.mavg = val.msum / val.recs ;
    return val ;
} ,
out : "result_emp" ,
verbose : true
} )
{
        "result" : "result_emp",
        "timeMillis" : 48,
        "timing" : {
                "mapTime" : 0,
                "emitLoop" : 46,
                "reduceTime" : 1,
                "mode" : "mixed",
                "total" : 48
        },
        "counts" : {
                "input" : 13,
                "emit" : 13,
                "reduce" : 2,
                "output" : 9
        },
        "ok" : 1
}

db.result_emp.find( { "value.mmin" : { $gt : 0 } } ).sort( { "value.recs" : 1 } )
{ "_id" : { "d1" : 10, "d2" : "CEO" }, "value" : { "msum" : 5000, "recs" : 1, "mmin" : 5000, "mmax" : 5000, "mavg" : 5000 } }
{ "_id" : { "d1" : 10, "d2" : "CLERK" }, "value" : { "msum" : 1300, "recs" : 1, "mmin" : 1300, "mmax" : 1300, "mavg" : 1300 } }
{ "_id" : { "d1" : 10, "d2" : "MANAGER" }, "value" : { "msum" : 2450, "recs" : 1, "mmin" : 2450, "mmax" : 2450, "mavg" : 2450 } }
{ "_id" : { "d1" : 20, "d2" : "CLERK" }, "value" : { "msum" : 1100, "recs" : 1, "mmin" : 1100, "mmax" : 1100, "mavg" : 1100 } }
{ "_id" : { "d1" : 20, "d2" : "MANAGER" }, "value" : { "msum" : 2975, "recs" : 1, "mmin" : 2975, "mmax" : 2975, "mavg" : 2975 } }
{ "_id" : { "d1" : 30, "d2" : "CLERK" }, "value" : { "msum" : 950, "recs" : 1, "mmin" : 950, "mmax" : 0, "mavg" : 950 }}
{ "_id" : { "d1" : 30, "d2" : "MANAGER" }, "value" : { "msum" : 2850, "recs" : 1, "mmin" : 2850, "mmax" : 2850, "mavg" : 2850 } }

댓글 없음:

댓글 쓰기