Apollo Cyber Study (cyber/base 2)

繼續cyber/base
// Study: 是我的筆記

cyber/base/rw_lock_guard

Provide two wrapper ReadLockGuard, WriteLockGuard for Read-Write-Lock. Using RAII, lock at constructor and unlock at destructor.

cyber/base/atomic_rw_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_BASE_ATOMIC_RW_LOCK_H_
#define CYBER_BASE_ATOMIC_RW_LOCK_H_

#include <stdint.h>
#include <unistd.h>
#include <atomic>
#include <condition_variable>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <thread>

#include "cyber/base/rw_lock_guard.h"

namespace apollo {
namespace cyber {
namespace base {

class AtomicRWLock {
// Study: Since only allow using this lock with the lock guard,
// so the lock and unlock function is placed on private
// then the lockguard must define as friend here.
friend class ReadLockGuard<AtomicRWLock>;
friend class WriteLockGuard<AtomicRWLock>;

public:
static const int32_t RW_LOCK_FREE = 0;
static const int32_t WRITE_EXCLUSIVE = -1;
static const uint32_t MAX_RETRY_TIMES = 5;
AtomicRWLock() {}
explicit AtomicRWLock(bool write_first) : write_first_(write_first) {}

private:
// all these function only can used by ReadLockGuard/WriteLockGuard;
void ReadLock();
void WriteLock();

void ReadUnlock();
void WriteUnlock();

AtomicRWLock(const AtomicRWLock&) = delete;
AtomicRWLock& operator=(const AtomicRWLock&) = delete;
std::atomic<uint32_t> write_lock_wait_num_ = {0};
std::atomic<int32_t> lock_num_ = {0};
bool write_first_ = true;
};

// Study: First wait all write lock release using looping
// (will reschedule this thread if still not release after N try)
// If in write frist mode, need also wait waiting write lock
inline void AtomicRWLock::ReadLock() {
uint32_t retry_times = 0;
int32_t lock_num = lock_num_.load();
if (write_first_) {
do {
while (lock_num < RW_LOCK_FREE || write_lock_wait_num_.load() > 0) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load();
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed));
} else {
do {
while (lock_num < RW_LOCK_FREE) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load();
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed));
}
}

// Study: Don't think too much, just lock
inline void AtomicRWLock::WriteLock() {
int32_t rw_lock_free = RW_LOCK_FREE;
uint32_t retry_times = 0;
write_lock_wait_num_.fetch_add(1);
while (!lock_num_.compare_exchange_weak(rw_lock_free, WRITE_EXCLUSIVE,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
// rw_lock_free will change after CAS fail, so init agin
rw_lock_free = RW_LOCK_FREE;
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
}
write_lock_wait_num_.fetch_sub(1);
}

// Study: Read lock is +, unlock is -
inline void AtomicRWLock::ReadUnlock() { lock_num_.fetch_sub(1); }

// Study: Write lock is -, unlock is +
inline void AtomicRWLock::WriteUnlock() { lock_num_.fetch_add(1); }

} // namespace base
} // namespace cyber
} // namespace apollo

#endif // CYBER_BASE_ATOMIC_RW_LOCK_H_

cyber/base/bounded_queue

A atomic fixed-size queue imlpemented using circular array and atomic variable.
The atomic imlpementation is similiar to the atomic_fifo so not describe it.
The special feature of this class is its wait_strategy_. It allow WaitQueue.
It is similiar to Dequeue, but it will wait if the queue is empty.

cyber/base/concurrent_object_pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_BASE_CONCURRENT_OBJECT_POOL_H_
#define CYBER_BASE_CONCURRENT_OBJECT_POOL_H_

#include <atomic>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <memory>
#include <stdexcept>
#include <utility>

#include "cyber/base/for_each.h"
#include "cyber/base/macros.h"

namespace apollo {
namespace cyber {
namespace base {

// Study: The internal data structure is a linked list
// that behave like a stack while using a continuous memory
// The have a abstract free node list above the node list
template <typename T>
class CCObjectPool : public std::enable_shared_from_this<CCObjectPool<T>> {
public:
explicit CCObjectPool(uint32_t size);
virtual ~CCObjectPool();

// Study: It arguments is used to pass to the constructor of T
// If input is rvalue, pass reference to rvalue.
// If it is lvalue, it will be lvalue reference
template <typename... Args>
void ConstructAll(Args &&... args);

template <typename... Args>
std::shared_ptr<T> ConstructObject(Args &&... args);

std::shared_ptr<T> GetObject();
void ReleaseObject(T *);
uint32_t size() const;

private:
struct Node {
T object;
Node *next;
};

struct alignas(2 * sizeof(Node *)) Head {
uintptr_t count;
Node *node;
};

private:
CCObjectPool(CCObjectPool &) = delete;
CCObjectPool &operator=(CCObjectPool &) = delete;
bool FindFreeHead(Head *head);

std::atomic<Head> free_head_;
// Study: This is the place for storing the real object
Node *node_arena_ = nullptr;
uint32_t capacity_ = 0;
};

// Study: Initialize node_arena_
template <typename T>
CCObjectPool<T>::CCObjectPool(uint32_t size) : capacity_(size) {
node_arena_ = static_cast<Node *>(CheckedCalloc(capacity_, sizeof(Node)));
FOR_EACH(i, 0, capacity_ - 1) { node_arena_[i].next = node_arena_ + 1 + i; }
node_arena_[capacity_ - 1].next = nullptr;
free_head_.store({0, node_arena_}, std::memory_order_relaxed);
}

// Study: Using the same argument to create object and fill all the node_arena_
template <typename T>
template <typename... Args>
void CCObjectPool<T>::ConstructAll(Args &&... args) {
FOR_EACH(i, 0, capacity_) {
new (node_arena_ + i) T(std::forward<Args>(args)...);
}
}

// Study: This is a very danger implementation
// Since it allow other class get the object shared pointer
// However, it not destruct the object when pool destruct
// It mean that it will cause memory leak if type T have a heap-base member
// Maybe it assumed this pool will only destructed after all object is manually released
// of released by shared-pointer
template <typename T>
CCObjectPool<T>::~CCObjectPool() {
std::free(node_arena_);
}

template <typename T>
bool CCObjectPool<T>::FindFreeHead(Head *head) {
Head new_head;
Head old_head = free_head_.load(std::memory_order_acquire);
do {
// Study: Already at the tails
if (unlikely(old_head.node == nullptr)) {
return false;
}
new_head.node = old_head.node->next;
new_head.count = old_head.count + 1;
} while (!free_head_.compare_exchange_weak(old_head, new_head,
std::memory_order_acq_rel,
std::memory_order_acquire));
// Study: Get the free head, and move the free head
*head = old_head;
return true;
}

// Study: Get one resource, release it if that object no one if point to
template <typename T>
std::shared_ptr<T> CCObjectPool<T>::GetObject() {
Head free_head;
if (unlikely(!FindFreeHead(&free_head))) {
return nullptr;
}
auto self = this->shared_from_this();
return std::shared_ptr<T>(reinterpret_cast<T *>(free_head.node),
[self](T *object) { self->ReleaseObject(object); });
}

// Study: Get the first node in free node list.
template <typename T>
template <typename... Args>
std::shared_ptr<T> CCObjectPool<T>::ConstructObject(Args &&... args) {
Head free_head;
if (unlikely(!FindFreeHead(&free_head))) {
return nullptr;
}
auto self = this->shared_from_this();
T *ptr = new (free_head.node) T(std::forward<Args>(args)...);
return std::shared_ptr<T>(ptr, [self](T *object) {
object->~T();
self->ReleaseObject(object);
});
}

// Study: When an object is released, let it head relinked to the free node list.
template <typename T>
void CCObjectPool<T>::ReleaseObject(T *object) {
Head new_head;
Node *node = reinterpret_cast<Node *>(object);
Head old_head = free_head_.load(std::memory_order_acquire);
do {
node->next = old_head.node;
new_head.node = node;
new_head.count = old_head.count + 1;
} while (!free_head_.compare_exchange_weak(old_head, new_head,
std::memory_order_acq_rel,
std::memory_order_acquire));
}

} // namespace base
} // namespace cyber
} // namespace apollo

#endif // CYBER_BASE_CONCURRENT_OBJECT_POOL_H_

cyber/base/object_pool

The regular version of concurrent_object_pool without any concurrent promise.
The object create and destruction inside the pool will only happen in the pool constructor and destructor.
This is quite different to concurrent_object_pool.
Since concurrent_object_pool allow public call of release and create of an object.
And concurrent_object_pool doesn’t call object destructor in pool destructor.

Obviously that, the using assumption of this two class is different and they cannot be replace each other easily.