Post List

2016년 1월 2일 토요일

C++ Concurrency in Action : study ch.04 Synchronizing Concurrent Operations

Cover
Facebook C++ Korea의 Concurrency In Action 관련 스터디 모임에서의 내용을 정리했습니다.

Chapter 04, Synchronizing Concurrent Operations

4장에서는 thread 간의 연산의 동기화 방법에 대해서 소개하고 있습니다.
여기서의 동기화는 사용자가 원하는대로 순서를 제어하는 것이라고 생각하면 됩니다.
아래 순서는 책의 챕터와는 상관없이 정리하는 것이니 참고하시기 바랍니다.
예제 코드 또한 책에 있지 않은 예제 코드를 사용한 것도 있습니다.
이 책의 저자분은 쉬운 것도 어렵게 표현하는 놀라운 제주가 있으신듯해서요.
저 같은 일반인이 이해하기가 많이 어려워서 다른 책들의 예제를 참조하여 만들었습니다.

다룰 내용

  1. thread들의 순서를 제어하는 방법
    • condition_variable (조건변수) : thread 간의 이벤트 전달
    • future<T>, promise<T>, packaged_task<T> : 비동기 실행 및 thread 간의 data 전달
    • 시간제한(timeout)을 이용하여 thread 순서제어
  2. chrono
  3. Programming skill로의 접근
    • Funtional Programming
    • Communicating Sequential Process

4.1 thread 순서 제어 방법

thread 동기화라는 말은 thread의 순서를 사용자가 원하는대로 제어할 경우에 사용하는 용어입니다. hardware 적인 I/O 작업 같이 많은 시간이 걸리는 작업을 별도 thread로 진행을 시킬 경우 해당 작업이 끝난 이후에 이루어 져야 할 작업의 경우에는 해당 작업이 끝날때까지 기다려야 합니다. 이런 경우에 thread 간의 작업 순서를 직접 제어해 줘야 합니다. 이번 장에서는 thread 동기화 방법 3가지를 소개하고 있습니다. 각각에 대해서 대표적인 예제와 함께 소개해 드리겠습니다.

4.1.1 condition_variable (조건변수)

  • 해더파일 : <condition_variable>
C++ 표준 라이브러리에는 2개의 조건변수 std::condition_variable , std::condition_variable_any를 제공하고 있습니다.std::condition_variable는 std::mutex 객체를 이용하여 사용해야만 합니다. 후자는 좀 더 유연하게 사용이 가능합니다. 유연성이 크게 필요하지 않을 경우에는 std::condition_variable와 std::mutex를 사용하는 것이 좋습니다.
위의 3가지 함수가 기본적인 사용방법이며, wait()에 다른 조건을 주는 것이 가능합니다.
간단한 예제 하나를 보시면 사용하는 것은 다 이해가 되시리라 봅니다.
그래도 치시면 실행이 되며 그 결과를 확인 할 수 있습니다.
#include <condition_variable>
#include <iostream>
#include <thread>
#include <chrono>

using namespace std::literals;

void main()
{
    std::cout << "Main Start" << std::endl;

    std::mutex M;
    std::condition_variable CV;

    std::mutex MS;
    std::string sResult = "";

    std::thread T1([&]() {
        std::cout << "T1 Start" << std::endl;

        std::cout << "T1 Processing" << std::endl;
        std::this_thread::sleep_for(100ms);
        {
            std::lock_guard<std::mutex> LG(MS);
            sResult += "Welcome ";
        }

        CV.notify_one();
        std::cout << "T1 End" << std::endl;
    });

    std::thread T2([&]() {
        std::cout << "T2 Start" << std::endl;

        std::unique_lock<std::mutex> UL(M);
        CV.wait(UL);

        std::cout << "T2 Processing" << std::endl;
        std::this_thread::sleep_for(100ms);
        {
            std::lock_guard<std::mutex> LG(MS);
            sResult += "to the ";
        }


        CV.notify_one();
        std::cout << "T2 End" << std::endl;
    });

    std::thread T3([&]() {
        std::cout << "T3 Start" << std::endl;

        std::unique_lock<std::mutex> UL(M);
        CV.wait(UL);

        std::cout << "T3 Processing" << std::endl;
        std::this_thread::sleep_for(100ms);
        {
            std::lock_guard<std::mutex> LG(MS);
            sResult += "Condition Variable ";
        }

        CV.notify_one();
        std::cout << "T3 End" << std::endl;
    });

    T1.join();
    T2.join();
    T3.join();

    std::cout << sResult.c_str() << std::endl;

    std::cout << "Main End" << std::endl;
}
위 예제에서 CV 관련 부분을 주석처리해서도 실행해 보시고, wait_all()로도 바꿔보시고 등의 변화를 주시면서 여러번 실행해보면 thread 간의 동기화 결과를 확인 할 수 있습니다.
책을 읽다보면 Spurious Wakeup이라는 용어가 나옵니다.
시스템이 대기중인 thread에 다른 thread가 notify()하지 않았음에도 활성화 될 수 있다
는 뜻인데,
  • side effect가 없는지 잘 검사해야 하며,
  • wait()를 loop 등으로 감싸는 등의 다른 조치를 해야 한다는 내용도 있습니다.
http://en.cppreference.com/w/cpp/thread/condition_variable/wait 에 보면 wait(lock, predicate) 형식으로 사용이 가능하다고 나와 있습니다. 관심 있으신 분들은 참고하시면 됩니다.

4.1.2 future, promise, packaged_task 를 이용한 비동기 실행 및 thread 간의 data 전달

  • 해더파일 : <future>
위에서 소개한 std::condition_variable은 thread간의 순서는 제어가 가능하지만, 값을 전달하는 역할을 하지 못합니다.
future , promise, async를 이용하면 thread 간의 비동기 제어 및 값을 전달 할 수 있습니다.
std::condition_variable는 wait()와 notify()를 여러 thread에서 여러번 할 수 있는 반면 future는 1회성으로 한번의 호출과 그 호출에 대한 값을 전달 받는 것으로만 활용이 가능합니다. 우리가 하는 작업 대부분은 이런 1회성 호출만으로도 충분한 경우가 많습니다. 대부분의 경우 std::condition_variable보다는 future이 더 유용한 경우가 많습니다.
thread에게 작업을 시키면 해당 thread는 비동기로 작업을 진행하고 있다가 future를 통해서 그 전달값이 필요하다고 하는 시점에서 해당 thread의 작업이 끝난지를 검사하여 끝났다면 값을 얻어서 다음을 진행하고, 그렇지 않았다면 thread의 해당 작업이 끝나서 값을 전달할때까지 기다립니다.

4.1.2.1 future 와 async 활용 방법

일단 간단한 사용 예제를 보겠습니다.
#include <future>
#include <iostream>

int sum(int nStart, int nEnd)
{
    int nSum = 0;
    for (int i = nStart; i <= nEnd; i++)
    {
        nSum += i;
    }
    return nSum;
}

void main()
{
    std::future<int> the_result = std::async(&sum, 1, 10);
    std::cout << "sum 1 ~ 10 : " << the_result.get() << std::endl;
}
아래와 같이 Lambda를 이용해서도 구현이 가능합니다.
#include <future>
#include <iostream>

void main()
{
    std::future<int> sumUsingLambda = std::async([](int nStart, int nEnd) {
        int nSum = 0;
        for (int i = nStart; i <= nEnd; i++)
        {
            nSum += i;
        }
        return nSum;
    }, 1, 10);
    std::cout << "sum 1 ~ 10 : " << sumUsingLambda.get() << std::endl;
}
.get()을 이용하여 결과값을 비동기로 가져 오는 것을 확인 할 수 있습니다.
std::aync()의 인자로는 첫번째는 실행할 함수의 포인터, 함수객체, Lambda를 전달하며, 다음 부터는 해당 함수로 전달될 인자들을 열거하면 됩니다. 인자가 없는 경우에는 첫번째 인자로 함수만 전달하면 됩니다.
만약 .get()으로 받아 올 값이 없는 경우에는 .wait() 함수를 이용하여 해당 함수의 실행 종료를 기다릴 수 있습니다.
#include <future>
#include <iostream>

void main()
{
    std::future<void> noRet = std::async([](int nStart, int nEnd) {
        int nSum = 0;
        for (int i = nStart; i <= nEnd; i++)
        {
            nSum += i;
        }
        std::cout << "sum 1 ~ 10 : " << nSum << std::endl;
    }, 1, 10);

    noRet.wait();
}
.wait() 뿐만 아니라 wait_for() , wait_until() 함수 역시 있습니다. 그 결과로 std::future_status를 return 합니다.
  • std::future_status::deferred : 아직 해당 함수가 실행되지 않았습니다.
  • std::future_status::ready : 결과가 준비되었습니다.
  • std::future_status::timeout : 설정된 시간이 초과되었습니다.
자세한 사항은 http://en.cppreference.com/w/cpp/thread/future/wait_until 를 참조해 주시기 바랍니다.
std::async 생성시 실행 시점을 설정 할 수 있습니다.
첫번째 인자로 std::launch 상태를 추가할 경우에는 해당 상태대로 실행이 됩니다.
  • std::launch::async : 새로운 thread로 바로 실행합니다.
  • std::launch::deferred : wait() 나 get() 함수 호출시 실행합니다.
  • std::launch::deferred | std::launch::async : default 값으로써, 책에는 구현을 선택가능 하다고 되어 있는데, 기본적으로 설정된 정책에 따라 실행이 됩니다. 예를 들어서 gcc에서는 늘 deferred로 실행된다고 합니다.
아래 예제코드를 실행해 보시면 그 차이를 알 수 있습니다.
#include <future>
#include <iostream>
#include <chrono>

using namespace std::literals;

int sum(int nStart, int nEnd)
{
    std::cout << "thread id : " << std::this_thread::get_id() << " start" << std::endl;
    int nSum = 0;
    for (int i = nStart; i <= nEnd; i++)
    {
        nSum += i;
    }
    std::cout << "thread id : " << std::this_thread::get_id() << " end" << std::endl;
    return nSum;
}

void main()
{
    std::future<int> runAsync = std::async(std::launch::async, &sum, 1, 10000000);
    std::future<int> runDeferred = std::async(std::launch::deferred, &sum, 1, 10);

    std::cout << "main before sleep" << std::endl;
    std::this_thread::sleep_for(10ms);
    std::cout << "main after sleep" << std::endl;

    runDeferred.get();

    std::cout << "main before sleep2" << std::endl;
    std::this_thread::sleep_for(1s);
    std::cout << "main after sleep2" << std::endl;

    runAsync.get();
}
결과는 아래와 같이 나옵니다.
main before sleep
thread id : 1248 start
main after sleep
thread id : 11256 start
thread id : 11256 end
main before sleep2
thread id : 1248 end
main after sleep2
아마도 1248이 std::launch::async로 실행된 thread id 인 듯 합니다.
std::async로 선언된 후 시스템에서 thread가 준비되는대로 바로 실행을 하는 것을 확인 할 수 있습니다.
std::launch::deferred로 선언된 것은 .get()을 호출한 다음에 실행되는 것을 확인 할 수 있습니다. 1248의 경우에는.get()을 호출하기 전에 실행이 완료된 것을 확인 할 수 있습니다.
std::shared_future란 것도 있습니다.
이름에서 대충 상상이 가죠 ? future 객체를 공유하여 여러 곳에서 값을 get()을 통해서 공유가 가능합니다.
std::future는 .get()을 한 번 밖에 사용 할 수가 없기 때문에 다른 thread에서 사용시에는 .share()를 활용하여std::shared_future를 사용 하면 됩니다.
    std::future<int> f = std::async(sum, 1, 10);
    std::shared_future<int> sf = f.share();
    auto sf2 = f.share();

    std::cout << "shared_future : " << sf.get() << std::endl;
자세한 내용은 http://en.cppreference.com/w/cpp/thread/shared_future 를 참조해 주세요.

4.1.2.2 future 와 promise 활용 방법

std::promise<T>의 역할은 크게 2가지 입니다.
  • set_value() : T 타입의 값을 저장(set) 할 수 있습니다.
  • get_future() : std::future<T> 객체를 생성합니다.
Cover
위 2가지 역할을 이용해서 thread, async task 간의 값을 전달 할 수 있습니다.
아래 예제에서 각각의 경우에 대한 사용법을 확인 할 수 있습니다.
#include <future>
#include <iostream>

void getVec(std::promise<std::vector<int>>& p, int nStart, int nNum)
{
    std::vector<int> v;
    for (int i = 0; i < nNum; i++)
        v.push_back(nStart + i);
    p.set_value(std::move(v));
}

void main()
{
    // using thread
    std::promise<std::vector<int>> P;
    std::future<std::vector<int>> F = P.get_future();

    std::thread T(getVec, std::ref(P), 11, 5);
    T.detach();

    auto V = F.get();
    for (auto item : V)
        std::cout << "Get Values : " << item << std::endl;

    // using async task
    std::promise<std::vector<int>> P2;
    std::future<std::vector<int>> F2 = P2.get_future();

    std::async(std::launch::async, &getVec, std::ref(P2), 3, 3);

    auto V2 = F2.get();
    for (auto item : V2)
        std::cout << "Get Values : " << item << std::endl;
}
만약 실행 중 예외를 발생해야 하는 경우에는 set_value() 대신에 set_exception()을 호출하면 됩니다.
일반적인 경우에는 std::current_exception()을 전달하면 되지만, 사용자가 직접 전달할 경우에는 다음에 나오는 예제를 참고하시면 됩니다.
#include <future>
#include <iostream>

void getVec(std::promise<std::vector<int>>& p, int nStart, int nNum)
{
    if (nNum < 0)
    p.set_exception(std::make_exception_ptr(
        std::exception(std::logic_error("invalid nNum"))));

    std::vector<int> v;
    for (int i = 0; i < nNum; i++)
        v.push_back(nStart + i);
    p.set_value(std::move(v));
}

void main()
{
    try
    {
        std::promise<std::vector<int>> P3;
        std::future<std::vector<int>> F3 = P3.get_future();

        std::async(std::launch::async, &getVec, std::ref(P3), 3, -1);
        auto V3 = F3.get();
    }
    catch (const std::exception& e)
    {
        std::cout << "Exception from Promise : " << e.what() << std::endl;
    }
}
set_value() 와 set_exception()의 뒤에다가 _at_thread_exit()를 붙인 함수들도 제공하고 있습니다.
말 그대로 해당 thread가 종료될 때 set_value 와 set_exception의 역할을 수행하겠다는 뜻 입니다.
이게 어떤 차이가 있을까요 ?
  • set_value() , set_exception() : 해당 함수 수행 후 바로 std::future가 ready상태가 되므로 get(), wait()의 수행이 가능해집니다.
  • set_value_at_thread_exit() , set_exception_at_thread_exit() : 수행 중인 thread가 종료되기 전까지는 std::futureready 상태가 되지 않으므로 get(), wait()를 호출하더라도 대기를 하게 됩니다.
즉 std::future의 실행시점을 조금 더 늦추는 용도라던지, 리소스가 한정된 환경에서 여러 thread가 동시에 run 상태로 되지 않게 하기 위한 용도로 사용이 가능합니다.

4.1.2.3 packaged_task 활용 방법

std::packaged_task<T>는 std::future와 실행시킬 함수를 묶어서 관리합니다.
std::async와의 다른 점은 다음과 같습니다.
  • std::async
    • std::launch 값으로 실행 시점을 결정합니다.
    • 실행시 각각의 std::async가 서로 다른 thread에서 실행되는 것을 보장받지 못합니다.
    • .get() , .wait() 등의 호출을 하기 전까지 thread가 소멸되지 않습니다.
    • 간단한 비동기 작업에 대해서는 std::packaged_task<T>보다 훨씬 더 효율적입니다.
  • std::packaged_task
    • 실행시 명시적으로 std::thread를 선언하여서 실행시킵니다.
    • 그러므로 당연히 서로 다른 thread에서 실행되는 것을 보장 받습니다.
    • thread는 실행 후 소멸되며, .get()으로 가져올 값은 thread가 아닌 std::packaged_task<T>가 가지고 있습니다.
    • 그러므로 실행 후 결과를 나중에 사용할 때는 더 효율적일 수 있습니다.
Cover
사용법은 std::async를 사용 할 때 보다는 더 번거롭습니다.
std::packaged_task<T>의 T에는 함수의 시그너쳐(signature)를 넣어주고 생성시 인자로 실행할 함수 (함수객체, 함수포인터, Lambda식 등...)를 넣어 주면 됩니다. 여기서 잠깐 주목해봐야 할 곳이 있습니다. 바로 함수의 시그너쳐를 템플릿으로 사용한다는 점입니다. 즉, 함수 시그너쳐만 같다면, thread 간에 함수를 전달 할 수가 있습니다. .get_future() 함수를 이용하여std::future<T> 객체를 생성합니다.
std::thread를 생성하여 첫번째 인자로 std::packaged_task<T>를 R-Value 형태로 전달해야 합니다. (std::move를 사용하면 됩니다.)
간단한 사용 예제를 보겠습니다.
#include <future>
#include <iostream>

int sum(int nStart, int nEnd)
{
    std::cout << "sum[" << std::this_thread::get_id() << "] start" << std::endl;
    int nSum = 0;
    for (int i = nStart; i <= nEnd; i++)
    {
        nSum += i;
    }
    std::cout << "sum[" << std::this_thread::get_id() << "] end" << std::endl;
    return nSum;
}

int sumSq(int nStart, int nEnd)
{
    std::cout << "sumSq[" << std::this_thread::get_id() << "] start" << std::endl;
    int nSum = 0;
    for (int i = nStart; i <= nEnd; i++)
    {
        nSum += i * i;
    }
    std::cout << "sumSq[" << std::this_thread::get_id() << "] end" << std::endl;
    return nSum;
}

void main()
{
    std::packaged_task<int(int, int)> T1(sum);
    std::packaged_task<int(int, int)> T2(sumSq);
    std::future<int> F1 = T1.get_future();
    std::future<int> F2 = T2.get_future();

    std::thread t2(std::move(T2), 1, 100);
    std::thread t1(std::move(T1), 1, 100);

    t1.join();
    std::cout << "sum[1,100] = " << F1.get() << std::endl;

    t2.join();
    std::cout << "sumSq[1,100] = " << F2.get() << std::endl;
}
결과는 다음과 같습니다.
sumSq[5904] start
sum[14236] start
sum[14236] end
sumSq[5904] end
sum[1,100] = 5050
sumSq[1,100] = 338350
T2를 먼저 thread에 할당을 하였더니 sumSq가 먼저 실행되었습니다.

4.1.3 시간제한(timeout)을 이용하여 thread 순서제어

이건 아주 단순한 방법으로 thread 실행 중에 time delay를 주어서 처리하는 방법입니다.
  • thread 실행시 delay를 주는 방법
    • std::this_thread::sleep_for() : 특정 기간 동안 sleep상태
    • std::this_thread::sleep_until() : 특정 시점까지 sleep상태
위 함수를 이용하여 개발자가 임의로 thread 간의 실행 순서를 제어하는 방법 입니다.
앞에서 본 std::condition_variable , std::future에서의 wait_for() , wait_until() , get_for()get_until() 또한 활용이 가능합니다.
위 함수들의 인자로는 std::chrono를 활용하여 ~~동안이라는 기간 과 ~~까지 라는 시간을 전달해야 합니다.
std::chrono에 대해서는 아래 단락에서 설명해 드리겠습니다.

4.2 chrono

  • 해더파일 : <chrono>

4.2.1 현재시간 (now)

std::chrono::system_clock::now() 함수를 이용하면 std::chrono::system_clock::time_point 타입의 현재 시간은 얻을 수 있습니다.

4.2.2 기간 (duration)

타입

std::chrono::duration<T, std::ratio<N, D>
  • T : 숫자형의 표현 타입입니다. (e.g. short, int, long, double)
  • std::ratio<N, D> : 표현 수치입니다. N / D 초마다 T의 값을 1씩 증가시킵니다.
    • e.g. std::ratio<60,1> : 60초마다 값을 1씩 증가시키므로 1 minute 단위를 의미합니다.
    • e.g. std::ratio<1,1000> : 1/1000초마다 값을 1씩 증가시키므로 1 millisecond 단위를 의미합니다.
시스템에서 미리 정해 놓은 duration 들이 std::chrono namespace 안에 있습니다.
(nonoseconds , microseconds , milliseconds , seconds , minutes , hours)
duration 간의 conversion의 경우 큰 단위 -> 작은 단위로는 묵시적으로 변환이 가능하지만,
작은 단위 -> 큰 단위로는 std::duration_cast<>를 이용해서 가능하긴 하지만, 중간 값에 대해서는 반올림이 아니라 버림(truncation)이 되므로 주의해야 합니다.
std::chrono::milliseconds ms(54802);
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds> (ms)
// 밀리 초에서 초 단위로의 변환. 정밀도가 감소하기 때문에 명시적 변환이 필요하다.
duration 간의 덧셈(+) , 뺄셈(-)이 가능하며, 상수와의 곱셈(*),나눗셈(/)이 가능합니다.
5 * seconds(1) = seconds(5)
minutes(1) - seconds(55) = seconds(5)
duration에서 T 타입의 값을 얻는 방법은 .count() 멤버함수를 이용하면 됩니다.
std::chrono::milliseconds(1234).count() = 1234

4.2.3 시점 (time point)

타입

std::chrono::time_point<clock, std::chrono::duration<>>
  • clock : 어떤 시계를 사용할 것인지 정의해 줍니다. 예제로는 다음과 같은 것들이 있습니다.
    • std::chrono::system_clock : 시스템에서 사용하는 기본 clock 객체로써 OS의 영향을 받으며 tick rate가 균일하지 않습니다.
    • std::chrono::steady_clock : 비교적 균일한 tick rate를 가진 clock입니다. (is_steady = true)
    • std::chrono::high_resolution_clock : 최대 정밀도를 지원하는 clock입니다. 단위가 일정합니다.
대부분 아래와 같은 방식으로 선언 합니다.
auto const timeout = std::chrono::steady_clock::now()
                   + std::chrono::milliseconds(500);

4.3 Program Skill로의 접근

4.3.1 Functional Programming

먼저 함수형 프로그래밍 (functional programming)이 무엇인지 부터 알아야 겠죠 ?
함수의 결과가 오직 매개변수(parameter)에 의해서만 결정되며, 함수 외부 상태와는 완전 분리되어 있는 프로그래밍 스타일
을 의미합니다. 즉, 같은 값을 인자로 전달하면 항상 같은 결과가 나오는 함수를 말합니다.
간단한 예로는 sin()cos()과 같은 산술연산 함수들을 생각하면 됩니다.
그럼 함수형 프로그래밍이 multi-thread 프로그램에 대해서 program sill로의 접근과 무슨 상관이 있냐는 것을 생각해 봐야겠죠 ?
동시성(concurrency)에서는 공유 데이터에 대해서 어떻게 접근해야 하는지가 큰 문제였습니다.
그것 때문에 앞에서 mutex, lock 등에 대해서 공부했습니다.
함수형 프로그래밍에서는 이런 공유 데이터에 대한 문제를 고려하지 않아도 됩니다.
대신 thread 간의 함수의 결과값을 어떻게 전달해야 할 것인지만 생각하면 됩니다.
이것은 이미 앞에서 배운 std::future를 통해서 전달하면 됩니다.
책에 소개된 quirt-sort를 함수형으로 구현한 예제를 살펴보도록 하겠습니다.
그림을 먼저 살펴보자면 아래와 같은 형태로 재귀적(recursive)으로 구현할 예정입니다.
Cover
Listing 4.12 : A sequental implementation of Quicksort
template <typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
    if( input.empty() ) return input;

    std::list<T> result;    
    result.splice( result.begin(), input, input.begin() ); // #1

    T const& pivot = *result.begin(); // #2
    auto divide_point = std::partition( input.begin(), input.end() ,
        [&](T const& t) {return t<pivot;} ); // #3

    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point ); // #4

    auto new_lower( sequential_quick_sort( std::move(lower_part) ) ); // #5
    auto new_higher( sequential_quick_sort( std::move(input) ) );  // #6

    result.splice(result.end(), new_higher); // #7
    result.splice(result.begin(), new_lower); // #8

    return result;
}
다소 복잡해 보일 수 있지만 크게 어렵진 않습니다.
  • #1 : input의 첫번째 element를 result로 이동합니다. (이제 input에는 첫번째 element가 없습니다.)
  • #2, #3 : #1에 넣은 값보다 작은 값들을 추려내서 해당 원소들만 재배열(reorder)합니다. (여러번의 swap 연산이 있겠죠.)
  • #4 : 위 과정에서 추려낸 작은 값들을 lower_part로 이동합니다. (이제 input에는 pivot보다 작은 값은 없습니다.)
  • #5, #6 : 나뉘어진 lower_part, input 을 재귀적으로(recursive) 호출합니다.
  • #7, #8 : #5, #6 과정을 통해서 정렬된 값들을 result의 앞 , 뒤로 각각 이동시킵니다.
single thread 상에서의 구현으로는 잘되어 있습니다. 책에서는 많은 복사가 이루어졌다고 하는데, 글쎄요. readability를 해치지 않으면서 이보다 최적화를 더 잘하기가 쉽지 않을 듯합니다.
하지만 우리가 여기서 다뤄야 할 내용은 위 함수가 잘되었다, 아니다가 아니라 이걸 어떻게 multi-thread에서 동작하도록 구현할 것이냐는 것이죠 ? 바로 아래에 std::future<T>를 이용하여 위와 같은 기능을 하는 함수를 구현하였습니다.
Listing 4.13 : Parallel Quicksort using futures
template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    if( input.empty() ) return input;

    std::list<T> result;    

    result.splice( result.begin(), input, input.begin() ); 
    T const& pivot = *result.begin();

    auto divide_point = std::partition( input.begin(), input.end() ,
        [&](T const& t) {return t<pivot;} ); 

    std::list<T> lower_part;
    lower_part.splice( lower_part.end(), input, input.begin(), divide_point );

    std::future<std::list<T>> new_lower = std::async(&parallel_quick_sort<T>, std::move(lower_part)  ); // #1

    auto new_higher = parallel_quick_sort( std::move(input) ); // #2

    result.splice(result.end(), new_higher); // #3
    result.splice(result.begin(), new_lower.get() ); // #4

    return result;
}
  • 앞의 pivot을 기준으로 작은 부분과 큰 부분을 나누는 것은 List 4.12와 같으므로 설명을 생략하겠습니다.
  • #1 : 작은 부분을 std::async를 이용해서 비동기로 다른 thread에게 할당합니다.
  • #2 : 큰 부분에 관한 것은 현재 thread에서 직접 호출하여 진행합니다.
  • #3, #4 : #1, #2 과정을 거쳐서 나온 결과를 합치는 과정입니다.
문제없어 보이지만, std::async를 이용할 경우 .get()으로 값을 가져오기 전까지 thread가 해제되지 않을 수 있습니다.
만약 1000개의 원소를 sort 하기 위해서는 10번의 재귀호출이 필요한데, 그러면 최악의 경우에는 1024개의 thread가 생성되어 있을 수도 있습니다.
물론 OS에서 알아서 할당하고 해제하고, 여러개의 async task를 하나의 thread 상에서 돌아가게 만들어 주고 등의 작업을 해줍니다.
책에서는 이 방법이 아주 잘못된 것인것 처럼 설명을 했지만, 글쎄요 ? 제가 보이겐 이것도 괜찮습니다.
다만 OS에서 thread의 실행 및 해제를 판단합니다. C++ 개발자 입장에서는 내가 직접 다 관리하는게 아니면 믿지 못한다고 생각하는 경우가 많으니깐요.
그래서 아래와 같이 std::async대신 std::packaged_task를 활용해서 일단 thread는 내가 직접 해제하는 예제가 있습니다.
Listing 4.14 : A sample implementation of spawn_task
template<typename F, typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f, A&& a)
{
    typedef std::result_of<F(A&&)>>::type result_type; // 편의를 위한 결과 타입의 재정의

    std::packaged_task<result_type(A&&)> task( std::move(f) ); // packaged_task를 이용한 작업 생성.

    std::future<result_type> res(task.get_future()); // 작업의 결과물을 위한 future type.

    std::thread th(std::move(task), std::move(a)); // 백그라운드에서 작업 실행.
    th.detach();

    return res; // 앞서 정의한 future를 반환. 함수 외부에서는 get()통해 값을 받게 된다.
}
직접 thread를 생성하여 작업을 시킨 뒤 detach()해버립니다. 그리고는 값을 얻을 수 있는 std::future를 return 합니다.

4.3.2 Communicating sequential processes

먼저 CSP(Communicating sequential processes)가 무엇인지 부터 알아야 겠죠 ?
공유 데이터가 없는 경우, thread는 독립적으로 수행이 가능하므로, 메세지를 전달하는 것으로 구현이 가능합니다. 그 각각의 thread를 state machine의 형태로 구현을 하면 됩니다.
아주 간단히 설명하자면 위와 같으며, 자세한 사항은 아래 Link를 참고해 주세요.
CSP가 뭔지도 모르는데... 또 무슨 State Machine 이란 말이 나오네요.
예전에 embedded 분야에서 많이 사용한 RTOS (Real-time OS)에서 많이 사용하던 방식의 패턴 중 하나입니다.
모든 작업을 각각의 state로 나눈 다음에 각 상태에서는 작업을 수행한 다음에 다른 state로 작업을 넘기는 방법입니다.
아주 간단한 state machine의 예제를 살펴보겠습니다.
enum STATE { INIT = 0. INPUT, PRINT, EXIT };
STATE nState = INIT;
int nSum = 0;

while (nState != EXIT)
{
    switch (nState)
    {
    case INIT:
        nSum = 0;
        nState = INPUT;
        break;
    case INPUT:
        {
            int nInput = 0;
            std::cin >> nInput;
            if (nInput == 0)
                nState = PRINT;
            else if (nInput == -1)
                nState = EXIT;
            else
                nSum += nInput;
        }
        break;
    case PRINT:
        std::cout << nSum << std::endl;
        nState = INPUT;
        break;
    }
}
while을 통해서 무한히 switch문을 반복합니다.
nState가 각각의 상태를 뜻하고, switch문에서 그 상태값에 따라서 정해진 작업을 합니다.
각각의 작업이 마친 후 다음에 실행할 상태값으로 바꿔줍니다.
이 예제는 아주 간단한 예제입니다만,
State가 여러 단계로 나눠질 경우 이런 식의 while-switch 구문이 여러 단계로 중첩이 됩니다.
State도 nCurrentState, nNextState 등으로 나누어서 값을 넣어서 현재 상태와 다음 상태의 값을 보고 각각의 작업을 다르게 하는 식으로 구현을 하기도 합니다.
State Machine의 가장 큰 장점은 Diagram으로 쉽게 설계가 가능하며, diagram만 보고도 쉽게 구현 할 수 있습니다.
State Machine에 대해서는 아래 Link를 참고해 주세요.
이제 다시 책의 내용으로 돌아가겠습니다.
CSP의 구현을 위하여 ATM기의 동작을 State Machine으로 구현한 예제를 사용하였습니다.
Cover
위 그램대로 구현을 하면 됩니다.
하지만 옜날 방식의 State Machine이 아닌 thread를 이용한 CSP 형식의 구현은 다음과 같습니다.
  • 각각의 State를 class의 멤버함수로 선언합니다.
  • 전달하는 메세지는 struct로 구현합니다.
먼저 아래 예제 코드를 먼저 보고 설명 드리겠습니다. 참고로 여기서 사용한 메세지 전달(message-passing) 라이브러리는 Appendix C를 참고해 주시기 바랍니다.
Listing 4.15 : A simple implementation of an ATM logic class
struct card_inserted
{
    std::string account;
};

class atm
{
    messaging::receiver incoming;
    messaging::sender bank;
    messaging::sender interface_hardware;

    void (atm::*state)();

    std::string account;
    std::string pin;

    void waiting_for_card() // #1
    {
        interface_hardware.send(display_enter_card()); // #2 
         incoming.wait() // #3
           .handle<card_inserted>(
                [&](card_inserted const& msg) // #4
                {
                    account=msg.account;
                    pin="";
                    interface_hardware.send(display_enter_pin());
                    state=&atm::getting_pin;
                }
            );
    }

    void getting_pin();

public:
    void run() // #5
    {
        state=&atm::waiting_for_card; // #6
        try
        {
            for(;;)
            {
                (this->*state)(); // #7
            }
        }
        catch(messaging::close_queue const&)
        { }
    }
};
  • #5 : State Machine의 시작점 입니다.
  • #6 : 첫 state를 waiting_for_card로 설정한 다음에, for()를 통해 무한으로 실행시킵니다. 단 exception 발생시 빠져나오게 됩니다.
  • #7 : 현재 설정된 state를 실행시킵니다. 각각의 state는 class의 멤버 함수를 뜻합니다.
위 과정을 통해서 계속해서 state를 실행시킵니다.
그럼 처음으로 실행하는 waiting_for_card (#1)를 자세히 살펴보겠습니다.
  • #3 : 다음에 작업할 state를 기다립니다. 여기서는 card_inserted에 대해서만 동작합니다.
  • #4 : card_inserted state는 따로 멤버함수로 구현하지 않고, 간단하게 Lambda로 구현하였습니다.
그 구현부가 복잡하지 않고, 한 곳의 state에서만 호출이 되는 경우에는 Lambda로 구현하는 것도 좋은 방법입니다. 해당 state의 작업이 끝난 뒤 다음 state로 getting_pin를 호출합니다.
getting_pin에 대한 구현부는 다음과 같습니다.
Listing 4.16 : The "getting_pin" state function for the simple ATM implementation
void atm::getting_pin()
{
    incoming.wait()
        .handle<digit_pressed>( // #1
            [&](digit_pressed const& msg)
            {
                unsigned const pin_length=4;
                pin+=msg.digit;
                if(pin.length()==pin_length)
                {
                    bank.send(verify_pin(account,pin,incoming));
                    state=&atm::verifying_pin;
                }
            }
        )
        .handle<clear_last_pressed>( // #2
            [&](clear_last_pressed const& msg)
            {
                if(!pin.empty())
                {
                    pin.resize(pin.length()-1);
                }
            }
        )
        .handle<cancel_pressed>( // #3
            [&](cancel_pressed const& msg)
            {
                state=&atm::done_processing;
            }
        );
}
  • getting_pin state에서는 #1, #2, #3의 3가지 state를 수행합니다.
  • 모두 Lambda로 구현하였습니다.
  • #1, #3의 경우는 작업을 완료한뒤 다른 state로 상태를 바꿉니다.
위와 같이 CSP 스타일로 프로그래밍을 하시면 됩니다.
이와 같이 하면 동시성(concurrency) 프로그래밍을 단순하게 구현이 가능합니다.
하지만 각각의 thread가 완전히 독립적일때만 가능한 방법입니다.

댓글 없음:

댓글 쓰기