設計并發隊列
復制代碼 代碼如下:
#include pthread.h>
#include list>
using namespace std;
template typename T>
class Queue
{
public:
Queue( )
{
pthread_mutex_init(_lock, NULL);
}
~Queue( )
{
pthread_mutex_destroy(_lock);
}
void push(const T data);
T pop( );
private:
listT> _list;
pthread_mutex_t _lock;
};
template typename T>
void QueueT>::push(const T value )
{
pthread_mutex_lock(_lock);
_list.push_back(value);
pthread_mutex_unlock(_lock);
}
template typename T>
T QueueT>::pop( )
{
if (_list.empty( ))
{
throw "element not found";
}
pthread_mutex_lock(_lock);
T _temp = _list.front( );
_list.pop_front( );
pthread_mutex_unlock(_lock);
return _temp;
}
上述代碼是有效的。但是,請考慮這樣的情況:您有一個很長的隊列(可能包含超過 100,000 個元素),而且在代碼執行期間的某個時候,從隊列中讀取數據的線程遠遠多于添加數據的線程。因為添加和取出數據操作使用相同的互斥鎖,所以讀取數據的速度會影響寫數據的線程訪問鎖。那么,使用兩個鎖怎么樣?一個鎖用于讀取操作,另一個用于寫操作。給出修改后的 Queue 類。
復制代碼 代碼如下:
template typename T>
class Queue
{
public:
Queue( )
{
pthread_mutex_init(_rlock, NULL);
pthread_mutex_init(_wlock, NULL);
}
~Queue( )
{
pthread_mutex_destroy(_rlock);
pthread_mutex_destroy(_wlock);
}
void push(const T data);
T pop( );
private:
listT> _list;
pthread_mutex_t _rlock, _wlock;
};
template typename T>
void QueueT>::push(const T value )
{
pthread_mutex_lock(_wlock);
_list.push_back(value);
pthread_mutex_unlock(_wlock);
}
template typename T>
T QueueT>::pop( )
{
if (_list.empty( ))
{
throw "element not found";
}
pthread_mutex_lock(_rlock);
T _temp = _list.front( );
_list.pop_front( );
pthread_mutex_unlock(_rlock);
return _temp;
}
設計并發阻塞隊列
目前,如果讀線程試圖從沒有數據的隊列讀取數據,僅僅會拋出異常并繼續執行。但是,這種做法不總是我們想要的,讀線程很可能希望等待(即阻塞自身),直到有數據可用時為止。這種隊列稱為阻塞的隊列。如何讓讀線程在發現隊列是空的之后等待?一種做法是定期輪詢隊列。但是,因為這種做法不保證隊列中有數據可用,它可能會導致浪費大量 CPU 周期。推薦的方法是使用條件變量,即 pthread_cond_t 類型的變量。
復制代碼 代碼如下:
template typename T>
class BlockingQueue
{
public:
BlockingQueue ( )
{
pthread_mutexattr_init(_attr);
// set lock recursive
pthread_mutexattr_settype(_attr,PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(_lock,_attr);
pthread_cond_init(_cond, NULL);
}
~BlockingQueue ( )
{
pthread_mutex_destroy(_lock);
pthread_cond_destroy(_cond);
}
void push(const T data);
bool push(const T data, const int seconds); //time-out push
T pop( );
T pop(const int seconds); // time-out pop
private:
listT> _list;
pthread_mutex_t _lock;
pthread_mutexattr_t _attr;
pthread_cond_t _cond;
};
template typename T>
T BlockingQueueT>::pop( )
{
pthread_mutex_lock(_lock);
while (_list.empty( ))
{
pthread_cond_wait(_cond, _lock) ;
}
T _temp = _list.front( );
_list.pop_front( );
pthread_mutex_unlock(_lock);
return _temp;
}
template typename T>
void BlockingQueue T>::push(const T value )
{
pthread_mutex_lock(_lock);
const bool was_empty = _list.empty( );
_list.push_back(value);
pthread_mutex_unlock(_lock);
if (was_empty)
pthread_cond_broadcast(_cond);
}
并發阻塞隊列設計有兩個要注意的方面:
1.可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 會釋放至少一個等待條件變量的線程,這個線程不一定是等待時間最長的讀線程。盡管使用 pthread_cond_signal 不會損害阻塞隊列的功能,但是這可能會導致某些讀線程的等待時間過長。
2.可能會出現虛假的線程喚醒。因此,在喚醒讀線程之后,要確認列表非空,然后再繼續處理。強烈建議使用基于 while 循環的 pop()。
設計有超時限制的并發阻塞隊列
在許多系統中,如果無法在特定的時間段內處理新數據,就根本不處理數據了。例如,新聞頻道的自動收報機顯示來自金融交易所的實時股票行情,它每 n 秒收到一次新數據。如果在 n 秒內無法處理以前的一些數據,就應該丟棄這些數據并顯示最新的信息。根據這個概念,我們來看看如何給并發隊列的添加和取出操作增加超時限制。這意味著,如果系統無法在指定的時間限制內執行添加和取出操作,就應該根本不執行操作。
復制代碼 代碼如下:
template typename T>
bool BlockingQueue T>::push(const T data, const int seconds)
{
struct timespec ts1, ts2;
const bool was_empty = _list.empty( );
clock_gettime(CLOCK_REALTIME, ts1);
pthread_mutex_lock(_lock);
clock_gettime(CLOCK_REALTIME, ts2);
if ((ts2.tv_sec – ts1.tv_sec) seconds)
{
was_empty = _list.empty( );
_list.push_back(value);
}
pthread_mutex_unlock(_lock);
if (was_empty)
pthread_cond_broadcast(_cond);
}
template typename T>
T BlockingQueue T>::pop(const int seconds)
{
struct timespec ts1, ts2;
clock_gettime(CLOCK_REALTIME, ts1);
pthread_mutex_lock(_lock);
clock_gettime(CLOCK_REALTIME, ts2);
// First Check: if time out when get the _lock
if ((ts1.tv_sec – ts2.tv_sec) seconds)
{
ts2.tv_sec += seconds; // specify wake up time
while(_list.empty( ) (result == 0))
{
result = pthread_cond_timedwait(_cond, _lock, ts2) ;
}
if (result == 0) // Second Check: if time out when timedwait
{
T _temp = _list.front( );
_list.pop_front( );
pthread_mutex_unlock(_lock);
return _temp;
}
}
pthread_mutex_unlock(lock);
throw "timeout happened";
}
設計有大小限制的并發阻塞隊列
最后,討論有大小限制的并發阻塞隊列。這種隊列與并發阻塞隊列相似,但是對隊列的大小有限制。在許多內存有限的嵌入式系統中,確實需要有大小限制的隊列。
對于阻塞隊列,只有讀線程需要在隊列中沒有數據時等待。對于有大小限制的阻塞隊列,如果隊列滿了,寫線程也需要等待。
復制代碼 代碼如下:
template typename T>
class BoundedBlockingQueue
{
public:
BoundedBlockingQueue (int size) : maxSize(size)
{
pthread_mutex_init(_lock, NULL);
pthread_cond_init(_rcond, NULL);
pthread_cond_init(_wcond, NULL);
_array.reserve(maxSize);
}
~BoundedBlockingQueue ( )
{
pthread_mutex_destroy(_lock);
pthread_cond_destroy(_rcond);
pthread_cond_destroy(_wcond);
}
void push(const T data);
T pop( );
private:
vectorT> _array; // or T* _array if you so prefer
int maxSize;
pthread_mutex_t _lock;
pthread_cond_t _rcond, _wcond;
};
template typename T>
void BoundedBlockingQueue T>::push(const T value )
{
pthread_mutex_lock(_lock);
const bool was_empty = _array.empty( );
while (_array.size( ) == maxSize)
{
pthread_cond_wait(_wcond, _lock);
}
_array.push_back(value);
pthread_mutex_unlock(_lock);
if (was_empty)
pthread_cond_broadcast(_rcond);
}
template typename T>
T BoundedBlockingQueueT>::pop( )
{
pthread_mutex_lock(_lock);
const bool was_full = (_array.size( ) == maxSize);
while(_array.empty( ))
{
pthread_cond_wait(_rcond, _lock) ;
}
T _temp = _array.front( );
_array.erase( _array.begin( ));
pthread_mutex_unlock(_lock);
if (was_full)
pthread_cond_broadcast(_wcond);
return _temp;
}
要注意的第一點是,這個阻塞隊列有兩個條件變量而不是一個。如果隊列滿了,寫線程等待 _wcond 條件變量;讀線程在從隊列中取出數據之后需要通知所有線程。同樣,如果隊列是空的,讀線程等待 _rcond 變量,寫線程在把數據插入隊列中之后向所有線程發送廣播消息。如果在發送廣播通知時沒有線程在等待 _wcond 或 _rcond,會發生什么?什么也不會發生;系統會忽略這些消息。還要注意,兩個條件變量使用相同的互斥鎖。
您可能感興趣的文章:- linux中高并發socket最大連接數的優化詳解
- Linux netstat命令查看并發連接數的方法
- Linux下高并發socket最大連接數所受的各種限制(詳解)
- linux并發連接50萬的配置方法
- 淺談Linux環境下并發編程中C語言fork()函數的使用
- Linux下apache如何限制并發連接和下載速度
- Linux并發執行很簡單,這么做就對了