Apollo Cyber Study. P8 io

先說一下 cyber/iocyber內部是沒有用到的一個模塊。
modules那也沒有用到。而且它是直接操作epoll event層,我們都直接走grpc,感覺之後也沒有機會用到
不過也看一看吧。

// Study: 是我的筆記

cyber/io/poll_data

The basic data structure for the io modules.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

struct PollResponse {
explicit PollResponse(uint32_t e = 0) : events(e) {}

uint32_t events;
};

// Study: A request to get data from io.
// And what should be call after the io really come
struct PollRequest {
int fd = -1;
uint32_t events = 0;
int timeout_ms = -1;
std::function<void(const PollResponse&)> callback = nullptr;
};

struct PollCtrlParam {
int operation;
// Study: File descriptor
int fd;
// Study: epoll is a way to get data from io\
epoll_event event;
};

cyber/io/poller

Poller is a singleton

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
class Poller {
public:
using RequestPtr = std::shared_ptr<PollRequest>;
using RequestMap = std::unordered_map<int, RequestPtr>;
using CtrlParamMap = std::unordered_map<int, PollCtrlParam>;

virtual ~Poller();

void Shutdown();

bool Register(const PollRequest& req);
bool Unregister(const PollRequest& req);

private:
bool Init();
void Clear();
void Poll(int timeout_ms);
void ThreadFunc();
void HandleChanges();
int GetTimeoutMs();
void Notify();

int epoll_fd_ = -1;
std::thread thread_;
std::atomic<bool> is_shutdown_ = {true};

int pipe_fd_[2] = {-1, -1};
std::mutex pipe_mutex_;

RequestMap requests_;
CtrlParamMap ctrl_params_;
base::AtomicRWLock poll_data_lock_;

const int kPollSize = 32;
const int kPollTimeoutMs = 100;

DECLARE_SINGLETON(Poller)
};

Poller::Poller() {
if (!Init()) {
AERROR << "Poller init failed!";
Clear();
}
}

Poller::~Poller() { Shutdown(); }

void Poller::Shutdown() {
if (is_shutdown_.exchange(true)) {
return;
}
Clear();
}

bool Poller::Register(const PollRequest& req) {
// Study: Same as if(is_shutdown)
if (is_shutdown_.load()) {
return false;
}

// Study: file descriptor would not less than 0
if (req.fd < 0 || req.callback == nullptr) {
AERROR << "input is invalid";
return false;
}

PollCtrlParam ctrl_param;
ctrl_param.fd = req.fd;
ctrl_param.event.data.fd = req.fd;
ctrl_param.event.events = req.events;

{
// Study: Since the io may happen frequently while this poller is a singleton
// A read write lock is required to maximize the io
WriteLockGuard<AtomicRWLock> lck(poll_data_lock_);
if (requests_.count(req.fd) == 0) {
ctrl_param.operation = EPOLL_CTL_ADD;
requests_[req.fd] = std::make_shared<PollRequest>();
} else {
// Study: Already someone want to read the same io, overwrite it
ctrl_param.operation = EPOLL_CTL_MOD;
}
*requests_[req.fd] = req;
ctrl_params_[ctrl_param.fd] = ctrl_param;
}

Notify();
return true;
}

bool Poller::Unregister(const PollRequest& req) {
if (is_shutdown_.load()) {
return false;
}

if (req.fd < 0 || req.callback == nullptr) {
AERROR << "input is invalid";
return false;
}

{
WriteLockGuard<AtomicRWLock> lck(poll_data_lock_);
auto size = requests_.erase(req.fd);
if (size == 0) {
AERROR << "unregister failed, can't find fd: " << req.fd;
return false;
}

// Study: Removed the request, but need a del operation ctrl
PollCtrlParam ctrl_param;
ctrl_param.operation = EPOLL_CTL_DEL;
ctrl_param.fd = req.fd;
ctrl_params_[ctrl_param.fd] = ctrl_param;
}

Notify();
return true;
}

bool Poller::Init() {
// The epoll datastructure needed for kernel space operation
epoll_fd_ = epoll_create(kPollSize);
if (epoll_fd_ < 0) {
AERROR << "epoll create failed, " << strerror(errno);
return false;
}

// Study: http://man7.org/linux/man-pages/man2/pipe.2.html
// pipe_fd_[0] is read, pipe_fd_[1] is write
// Allow inter-process communication
// create pipe, and set nonblock
if (pipe(pipe_fd_) == -1) {
AERROR << "create pipe failed, " << strerror(errno);
return false;
}
// Study: pipe flag set
if (fcntl(pipe_fd_[0], F_SETFL, O_NONBLOCK) == -1) {
AERROR << "set nonblock failed, " << strerror(errno);
return false;
}
if (fcntl(pipe_fd_[1], F_SETFL, O_NONBLOCK) == -1) {
AERROR << "set nonblock failed, " << strerror(errno);
return false;
}

// Study: Read the data from epoll
// add pipe[0] to epoll
auto request = std::make_shared<PollRequest>();
request->fd = pipe_fd_[0];
request->events = EPOLLIN;
request->timeout_ms = -1;
request->callback = [this](const PollResponse&) {
char c = 0;
while (read(pipe_fd_[0], &c, 1) > 0) {
}
};
requests_[request->fd] = request;

PollCtrlParam ctrl_param;
ctrl_param.operation = EPOLL_CTL_ADD;
ctrl_param.fd = pipe_fd_[0];
ctrl_param.event.data.fd = pipe_fd_[0];
ctrl_param.event.events = EPOLLIN;
ctrl_params_[ctrl_param.fd] = ctrl_param;

is_shutdown_.exchange(false);
// Study: The main task of the poller is ThreadFunc
thread_ = std::thread(&Poller::ThreadFunc, this);
// Study: How to determine the priority of this thread task and other component task?
// Let the schedule to determine it
scheduler::Instance()->SetInnerThreadAttr(&thread_, "io_poller");
return true;
}

void Poller::Clear() {
if (thread_.joinable()) {
thread_.join();
}

if (epoll_fd_ >= 0) {
close(epoll_fd_);
epoll_fd_ = -1;
}

if (pipe_fd_[0] >= 0) {
close(pipe_fd_[0]);
pipe_fd_[0] = -1;
}

if (pipe_fd_[1] >= 0) {
close(pipe_fd_[1]);
pipe_fd_[1] = -1;
}

{
WriteLockGuard<AtomicRWLock> lck(poll_data_lock_);
requests_.clear();
ctrl_params_.clear();
}
}

// Study: the core function of this class
// this function pool all the data from io device according to the request map
void Poller::Poll(int timeout_ms) {
epoll_event evt[kPollSize];
auto before_time_ns = Time::Now().ToNanosecond();
int ready_num = epoll_wait(epoll_fd_, evt, kPollSize, timeout_ms);
auto after_time_ns = Time::Now().ToNanosecond();
// Study: How many time used for epoll kernal space operation
int interval_ms =
static_cast<int>((after_time_ns - before_time_ns) / 1000000);
// Study: Assume 0 is very very small instead of real zero
if (interval_ms == 0) {
interval_ms = 1;
}

std::unordered_map<int, PollResponse> responses;
{
ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
for (auto& item : requests_) {
auto& request = item.second;
if (ctrl_params_.count(request->fd) != 0) {
continue;
}

// Study: Every request have a time limit
// The request will be abort after timeout
if (request->timeout_ms > 0) {
request->timeout_ms -= interval_ms;
if (request->timeout_ms < 0) {
request->timeout_ms = 0;
}
}

if (request->timeout_ms == 0) {
responses[item.first] = PollResponse();
request->timeout_ms = -1;
}
}
}

// Study: epoll have data, then get the event and assigned to cooresponding response
if (ready_num > 0) {
for (int i = 0; i < ready_num; ++i) {
int fd = evt[i].data.fd;
uint32_t events = evt[i].events;
responses[fd] = PollResponse(events);
}
}

for (auto& item : responses) {
int fd = item.first;
auto& response = item.second;

ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
auto search = requests_.find(fd);
if (search != requests_.end()) {
search->second->timeout_ms = -1;
// Study: Let the callback know there is event
search->second->callback(response);
}
}

if (ready_num < 0) {
if (errno != EINTR) {
AERROR << "epoll wait failed, " << strerror(errno);
}
}
}

void Poller::ThreadFunc() {
// block all signals in this thread
sigset_t signal_set;
sigfillset(&signal_set);
pthread_sigmask(SIG_BLOCK, &signal_set, NULL);

// Study: Loop, loop, loop
while (!is_shutdown_.load()) {
HandleChanges();
int timeout_ms = GetTimeoutMs();
ADEBUG << "this poll timeout ms: " << timeout_ms;
Poll(timeout_ms);
}
}

void Poller::HandleChanges() {
CtrlParamMap local_params;
{
ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
if (ctrl_params_.empty()) {
return;
}
local_params.swap(ctrl_params_);
}

// Study: Update epoll struct
for (auto& pair : local_params) {
auto& item = pair.second;
ADEBUG << "epoll ctl, op[" << item.operation << "] fd[" << item.fd
<< "] events[" << item.event.events << "]";
if (epoll_ctl(epoll_fd_, item.operation, item.fd, &item.event) != 0 &&
errno != EBADF) {
AERROR << "epoll ctl failed, " << strerror(errno);
}
}
}

// min heap can be used to optimize
int Poller::GetTimeoutMs() {
int timeout_ms = kPollTimeoutMs;
ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
for (auto& item : requests_) {
auto& req = item.second;
if (req->timeout_ms >= 0 && req->timeout_ms < timeout_ms) {
timeout_ms = req->timeout_ms;
}
}
return timeout_ms;
}

void Poller::Notify() {
std::unique_lock<std::mutex> lock(pipe_mutex_, std::try_to_lock);
if (!lock.owns_lock()) {
return;
}

// Study: Maybe this is the common way for change notify, i am not familiar in this sector
char msg = 'C';
if (write(pipe_fd_[1], &msg, 1) < 0) {
AWARN << "notify failed, " << strerror(errno);
}
}

cyber/io/poll_handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/io/poll_handler.h"
#include "cyber/common/log.h"
#include "cyber/io/poller.h"

namespace apollo {
namespace cyber {
namespace io {

using croutine::CRoutine;
using croutine::RoutineState;

PollHandler::PollHandler(int fd)
: fd_(fd), is_read_(false), is_blocking_(false), routine_(nullptr) {}

// Study: This is a blocking operation, will wait until timeout
bool PollHandler::Block(int timeout_ms, bool is_read) {
// Study: validate the input state
if (!Check(timeout_ms)) {
return false;
}

// Study: Already someone is blocked
if (is_blocking_.exchange(true)) {
AINFO << "poll handler is blocking.";
return false;
}

// Study: Construct request_, If i do that, I will add request_ to argument list
// It can convient for the code reading
Fill(timeout_ms, is_read);
// Study: Let the Poller handle the request, Poller will do epoll operation
if (!Poller::Instance()->Register(request_)) {
is_blocking_.exchange(false);
return false;
}

// Study: Let scheduler reschedule this task, back to MainStack
// This will swap the current croutine in this thread back to main stack
// The internal impl is asm code, you can think it as python yield
// The current function execution will be stopped here
// If the schedule wake up this function, it will start continue from this line
routine_->Yield(RoutineState::IO_WAIT);

// Study: This function is call again after the IO_WAIT state changed,
// it mean that ResponseCallback must be called
bool result = false;
uint32_t target_events = is_read ? EPOLLIN : EPOLLOUT;
if (response_.events & target_events) {
result = true;
}
is_blocking_.exchange(false);

return result;
}

bool PollHandler::Unblock() {
is_blocking_.exchange(false);
return Poller::Instance()->Unregister(request_);
}

bool PollHandler::Check(int timeout_ms) {
if (timeout_ms == 0) {
AINFO << "timeout[" << timeout_ms
<< "] must be larger than zero or less than zero.";
return false;
}

if (fd_ < 0) {
AERROR << "invalid fd[" << fd_ << "]";
return false;
}

routine_ = CRoutine::GetCurrentRoutine();
if (routine_ == nullptr) {
AERROR << "routine nullptr, please use IO in routine context.";
return false;
}

return true;
}

void PollHandler::Fill(int timeout_ms, bool is_read) {
is_read_.exchange(is_read);

request_.fd = fd_;
request_.events = EPOLLET | EPOLLONESHOT;
if (is_read) {
request_.events |= EPOLLIN;
} else {
request_.events |= EPOLLOUT;
}
request_.timeout_ms = timeout_ms;
request_.callback =
std::bind(&PollHandler::ResponseCallback, this, std::placeholders::_1);
}

void PollHandler::ResponseCallback(const PollResponse& rsp) {
if (!is_blocking_.load() || routine_ == nullptr) {
return;
}

response_ = rsp;

if (routine_->state() == RoutineState::IO_WAIT) {
routine_->SetUpdateFlag();
}
}

} // namespace io
} // namespace cyber
} // namespace apollo

cyber/base/session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
Session::Session() : Session(-1) {}

Session::Session(int fd) : fd_(fd), `poll_handler_`(nullptr) {
poll_handler_.reset(new PollHandler(fd_));
}

int Session::Socket(int domain, int type, int protocol) {
if (fd_ != -1) {
AINFO << "session has hold a valid fd[" << fd_ << "]";
return -1;
}
// Study: every io connection can use socket to make connection
int sock_fd = socket(domain, type | SOCK_NONBLOCK, protocol);
if (sock_fd != -1) {
set_fd(sock_fd);
}
return sock_fd;
}

int Session::Listen(int backlog) {
ACHECK(fd_ != -1);
return listen(fd_, backlog);
}

// Study: Bind to address
int Session::Bind(const struct sockaddr *addr, socklen_t addrlen) {
ACHECK(fd_ != -1);
ACHECK(addr != nullptr);
return bind(fd_, addr, addrlen);
}

auto Session::Accept(struct sockaddr *addr, socklen_t *addrlen) -> SessionPtr {
ACHECK(fd_ != -1);

int sock_fd = accept4(fd_, addr, addrlen, SOCK_NONBLOCK);
// Study: Wait event and accept connection from socket
while (sock_fd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
poll_handler_->Block(-1, true);
sock_fd = accept4(fd_, addr, addrlen, SOCK_NONBLOCK);
}

if (sock_fd == -1) {
return nullptr;
}

return std::make_shared<Session>(sock_fd);
}

int Session::Connect(const struct sockaddr *addr, socklen_t addrlen) {
ACHECK(fd_ != -1);

int optval;
socklen_t optlen = sizeof(optval);
// Study: Connect to a socket
int res = connect(fd_, addr, addrlen);
if (res == -1 && errno == EINPROGRESS) {
poll_handler_->Block(-1, false);
getsockopt(fd_, SOL_SOCKET, SO_ERROR, reinterpret_cast<void *>(&optval),
&optlen);
if (optval == 0) {
res = 0;
} else {
errno = optval;
}
}
return res;
}

int Session::Close() {
ACHECK(fd_ != -1);

poll_handler_->Unblock();
int res = close(fd_);
fd_ = -1;
return res;
}

ssize_t Session::Recv(void *buf, size_t len, int flags, int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(fd_ != -1);

// Study: Receive data from the file descriptor
ssize_t nbytes = recv(fd_, buf, len, flags);
if (timeout_ms == 0) {
return nbytes;
}

// Study: If cannot get data from first try
while (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// Study: Wait io event and try again
if (poll_handler_->Block(timeout_ms, true)) {
nbytes = recv(fd_, buf, len, flags);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

// Study: Similiar to recv, but can get the oppsite side address
ssize_t Session::RecvFrom(void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen,
int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(fd_ != -1);

ssize_t nbytes = recvfrom(fd_, buf, len, flags, src_addr, addrlen);
if (timeout_ms == 0) {
return nbytes;
}

while (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (poll_handler_->Block(timeout_ms, true)) {
nbytes = recvfrom(fd_, buf, len, flags, src_addr, addrlen);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

ssize_t Session::Send(const void *buf, size_t len, int flags, int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(fd_ != -1);

ssize_t nbytes = send(fd_, buf, len, flags);
if (timeout_ms == 0) {
return nbytes;
}

// Study: Try send if the errno is ok
while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (poll_handler_->Block(timeout_ms, false)) {
nbytes = send(fd_, buf, len, flags);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

ssize_t Session::SendTo(const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen,
int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(dest_addr != nullptr);
ACHECK(fd_ != -1);

ssize_t nbytes = sendto(fd_, buf, len, flags, dest_addr, addrlen);
if (timeout_ms == 0) {
return nbytes;
}

while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (poll_handler_->Block(timeout_ms, false)) {
nbytes = sendto(fd_, buf, len, flags, dest_addr, addrlen);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

ssize_t Session::Read(void *buf, size_t count, int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(fd_ != -1);

ssize_t nbytes = read(fd_, buf, count);
if (timeout_ms == 0) {
return nbytes;
}

while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (poll_handler_->Block(timeout_ms, true)) {
nbytes = read(fd_, buf, count);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

ssize_t Session::Write(const void *buf, size_t count, int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(fd_ != -1);

ssize_t nbytes = write(fd_, buf, count);
if (timeout_ms == 0) {
return nbytes;
}

while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (poll_handler_->Block(timeout_ms, false)) {
nbytes = write(fd_, buf, count);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}