C++11 异步任务实现(类Java ExecutorService)

使用Java时,并发库提供很多有用的工具,比如ExecutorService,方便执行异步任务。C++11通过使用线程和lambda,可以很容易实现类似的功能。尽在代码中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class AsyncService {
MZ_DISALLOW_COPY_AND_ASSIGN(AsyncService)

using Async_t = std::function<void()>;
using AsyncQueue_t = std::queue<Async_t>;

private:
AsyncThread** m_asyncThreads = nullptr;
int m_threadSize = 0;

std::mutex m_asyncMutex;
std::condition_variable m_asyncCond;
AsyncQueue_t m_asyncQueue;

public:
AsyncService();

~AsyncService();

bool Initialize(int threadSize);

void Finalize();

void Push(Async_t async);

Async_t Pop();
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
AsyncService::AsyncService() {}

AsyncService::~AsyncService() {}

bool AsyncService::Initialize(int threadSize) {
m_threadSize = threadSize;
m_asyncThreads = new AsyncThread*[m_threadSize];
for (auto index = 0; index < m_threadSize; ++index) {
m_asyncThreads[index] = new AsyncThread(*this);
m_asyncThreads[index]->Initialize();
}

return true;
}

void AsyncService::Finalize() {
for (auto index = 0; index < m_threadSize; ++index) {
m_asyncThreads[index]->Finalize();
SafeDelete(m_asyncThreads[index]);
}
SafeDeleteArray(m_asyncThreads);
m_threadSize = 0;
}

void AsyncService::Push(Async_t async) {
std::unique_lock<std::mutex> lock(m_asyncMutex);
m_asyncQueue.push(async);
m_asyncCond.notify_all();
}

AsyncService::Async_t AsyncService::Pop() {
std::unique_lock<std::mutex> lock(m_asyncMutex);
m_asyncCond.wait(lock, [this] {
return !m_asyncQueue.empty();
});

auto async = m_asyncQueue.front();
m_asyncQueue.pop();
return async;
}

使用方式很简单,调用Push接口即可。

1
2
3
asyncService.Push([&] {
// Do something
});

AsyncService 提供异步任务功能接口,使用 mutexcondition_variable 实现多线程任务队列同步。线程依次从队列中取出任务并执行,为方便使用,任务使用lambda函数。