Fork me on GitHub

Data Structure in concurrent computing

Linked List

Layout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <pthread.h>
#include <list.h>
namespace concurrent {
template <typename T>
class Queue {
public:
Queue() {
pthread_mutex_init(&_lock, NULL);
}
~Queue() {
pthread_mutex_destroy(&_lock);
}
void push(const T& data);
T pop();
private:
list<T> _list;
pthread_mutex_t _lock;
}
}

Insert and Delete

Insertion

1
2
3
4
5
void Queue<T>::push(const T& value) {
pthread_mutex_lock(&_lock);
_list.push_back(value);
pthread_mutex_unlock(&_lock);
}

Delete

1
2
3
4
5
6
7
8
9
10
T Queue<T>::pop() {
if (_list.empty()){
throw ”element not found”;
}
pthread_mutex_lock(&_lock);
T _temp = _list.front();
_list.pop_front();
pthread_mutex_unlock(&_lock);
return _temp;
}

Now let us consider a long list, and the situation is the reading thread is much moare that write thread

So we need to be more accurate about the granularity of the lock, Then let us redesign the whole class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
template <typename T>
class Queue {
public:
Queue() {
pthread_mutex_init(&_rlock, NULL);
pthread_mutex_init(&_wlock, NULL);
}
~Queue() {
pthread_mutex_init(&_rlock);
pthread_mutex_destroy(&_wlock);
}
void push(const T& data);
T pop();
private:
list<T> _list;
pthread_cond_t _cond;
pthread_mutex_t _rlock, _wlock;
}

Reimplement the push and pop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void Queue<T>::push(const T& value) {
pthread_mutex_lock(&_wlock);
_list.push_back(value);
pthread_mutex_unlock(&_wlock);
}

T Queue<T>::pop( ) {
if (_list.empty( )) {
throw ”element not found”;
}
pthread_mutex_lock(&_rlock);
T _temp = _list.front( );
_list.pop_front( );
pthread_mutex_unlock(&_rlock);
return _temp;
}

Then we addd the condition variable in pthread to concurrent queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
template <typenmae T>
class BlockingQueue {
public:
BlockingQueue() {
pthread_mutex_init(&_lock, NULL);
pthread_cond_init(&_cond, NULL);
}
~BlockingQueue() {
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
void push(const T& data);
T pop();
private:
list<T> _list;
pthread_mutex_t _lock;
pthread_cond_t _cond;
}

The implementation of push

1
2
3
4
5
6
7
8
void BlockingQueue <T>::push(const T& value) {
pthread_mutex_lock(&_lock);
const bool was_empty = _list.empty();
_list.push_back(value);
pthread_mutex_unlock(&_lock);
if (was_empty)
pthread_cond_broadcast(&_cond);
}

The implementation of pop, pay attention to the while loop, it is aimed to prevent the fake resume of the process.

1
2
3
4
5
6
7
8
9
10
T BlockingQueue<T>::pop() {
pthread_cond_wait(&_cond, &_lock);
while(_list.empty()) {
pthread_cond_wait(&_cond);
}
T _temp = _list.front();
_list.pop_front();
pthread_mutex_unlock(&_lock);
return _temp;
}

Design a size-limited blocking queue

In the previous implementation, the size of the list is infinite. In this implementation, we will set a constant size for the queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
template <typenmae T>
class BoundedBlockingQueue {
public:
BoundedBlockingQueue(int size): maxSize(size) {
pthread_mutex_init(&_lock, NULL);
pthread_cond_init(&_rcond, NULL);
pthread_cond_init(&_wcond, NULL);
_array.reserve(maxSize);
}
~BounedBlockingQueue() {
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_rcond);
pthread_cond_destroy(&_wcond);
}
void push(const T& data);
T pop();
private:
vector<T> _array;
int maxSize;
pthread_mutex_t _lock;
pthread_cond_t _rcond, _wcond;
}

Push item into a bounded queue

1
2
3
4
5
6
7
8
9
10
11
void BoundedBlockingQueue <T>::push(const T& value ) { 
pthread_mutex_lock(&_lock);
const bool was_empty = _array.empty();
while (_array.size() == maxSize) {
pthread_cond_wait(&_wcond, &_lock);
}
_array.push_back(value);
pthread_mutex_unlock(&_lock);
if (was_empty)
pthread_cond_broadcast(&_rcond);
}

Pop item out of a bounded queue

1
2
3
4
5
6
7
8
9
10
11
12
13
T BoundedBlockingQueue<T>::pop() {
pthread_mutex_lock(&_lock);
const bool was_full = (_array.size( ) == maxSize);
while(_array.empty( )) {
pthread_cond_wait(&_rcond, &_lock) ;
}
T _temp = _array.front( );
_array.erase( _array.begin( ));
pthread_mutex_unlock(&_lock);
if (was_full)
pthread_cond_broadcast(&_wcond);
return _temp;
}