Apollo Cyber Study. P10 Transport

// Study: 是我的筆記

time to fight boss

  • How the transport connecto to data visitor

The transport will call the DataDispatcher in cyber/node/reader_base to provide data for
data visitor to visitor.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
transport::Transport::Instance()->CreateReceiver<MessageT>(
role_attr, [](const std::shared_ptr<MessageT>& msg,
const transport::MessageInfo& msg_info,
const proto::RoleAttributes& reader_attr) {
(void)msg_info;
(void)reader_attr;
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::TRANS_TO, reader_attr.channel_id(),
msg_info.seq_num());
data::DataDispatcher<MessageT>::Instance()->Dispatch(
reader_attr.channel_id(), msg);
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::WRITE_NOTIFY, reader_attr.channel_id(),
msg_info.seq_num());
});

cyber/transport/transport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_TRANSPORT_TRANSPORT_H_
#define CYBER_TRANSPORT_TRANSPORT_H_

#include <atomic>
#include <memory>
#include <string>

#include "cyber/common/macros.h"
#include "cyber/proto/transport_conf.pb.h"
#include "cyber/transport/dispatcher/intra_dispatcher.h"
#include "cyber/transport/dispatcher/rtps_dispatcher.h"
#include "cyber/transport/dispatcher/shm_dispatcher.h"
#include "cyber/transport/qos/qos_profile_conf.h"
#include "cyber/transport/receiver/hybrid_receiver.h"
#include "cyber/transport/receiver/intra_receiver.h"
#include "cyber/transport/receiver/receiver.h"
#include "cyber/transport/receiver/rtps_receiver.h"
#include "cyber/transport/receiver/shm_receiver.h"
#include "cyber/transport/rtps/participant.h"
#include "cyber/transport/shm/notifier_factory.h"
#include "cyber/transport/transmitter/hybrid_transmitter.h"
#include "cyber/transport/transmitter/intra_transmitter.h"
#include "cyber/transport/transmitter/rtps_transmitter.h"
#include "cyber/transport/transmitter/shm_transmitter.h"
#include "cyber/transport/transmitter/transmitter.h"

namespace apollo {
namespace cyber {
namespace transport {

using apollo::cyber::proto::OptionalMode;

class Transport {
public:
virtual ~Transport();

void Shutdown();

template <typename M>
auto CreateTransmitter(const RoleAttributes& attr,
const OptionalMode& mode = OptionalMode::HYBRID) ->
typename std::shared_ptr<Transmitter<M>>;

template <typename M>
auto CreateReceiver(const RoleAttributes& attr,
const typename Receiver<M>::MessageListener& msg_listener,
const OptionalMode& mode = OptionalMode::HYBRID) ->
typename std::shared_ptr<Receiver<M>>;

ParticipantPtr participant() const { return participant_; }

private:
void CreateParticipant();

std::atomic<bool> is_shutdown_;
ParticipantPtr participant_;
NotifierPtr notifier_;
IntraDispatcherPtr intra_dispatcher_;
ShmDispatcherPtr shm_dispatcher_;
RtpsDispatcherPtr rtps_dispatcher_;

DECLARE_SINGLETON(Transport)
};

template <typename M>
auto Transport::CreateTransmitter(const RoleAttributes& attr,
const OptionalMode& mode) ->
typename std::shared_ptr<Transmitter<M>> {
if (is_shutdown_.load()) {
AINFO << "transport has been shut down.";
return nullptr;
}

std::shared_ptr<Transmitter<M>> transmitter = nullptr;
RoleAttributes modified_attr = attr;

// Study: qos = quality of service,
// it defined the transport quality
// 1. history policy
// 2. capacity of history
// 3. message frequency
// 4. reliability
// 5. durability
if (!modified_attr.has_qos_profile()) {
modified_attr.mutable_qos_profile()->CopyFrom(
QosProfileConf::QOS_PROFILE_DEFAULT);
}

// Study: Support 4 transport mode - intra, shm, rtps, hybrid
switch (mode) {
// Study: intra = same process
case OptionalMode::INTRA:
transmitter = std::make_shared<IntraTransmitter<M>>(modified_attr);
break;

// Study: shm = shared memory
case OptionalMode::SHM:
transmitter = std::make_shared<ShmTransmitter<M>>(modified_attr);
break;

// Study: real time publish subscribe
case OptionalMode::RTPS:
transmitter =
std::make_shared<RtpsTransmitter<M>>(modified_attr, participant());
break;

default:
transmitter =
std::make_shared<HybridTransmitter<M>>(modified_attr, participant());
break;
}

RETURN_VAL_IF_NULL(transmitter, nullptr);
if (mode != OptionalMode::HYBRID) {
// Study: Init the transmitter
transmitter->Enable();
}
return transmitter;
}

// Study: Basically same as above, just change transmitter to receiver
template <typename M>
auto Transport::CreateReceiver(
const RoleAttributes& attr,
const typename Receiver<M>::MessageListener& msg_listener,
const OptionalMode& mode) -> typename std::shared_ptr<Receiver<M>> {
if (is_shutdown_.load()) {
AINFO << "transport has been shut down.";
return nullptr;
}

std::shared_ptr<Receiver<M>> receiver = nullptr;
RoleAttributes modified_attr = attr;
if (!modified_attr.has_qos_profile()) {
modified_attr.mutable_qos_profile()->CopyFrom(
QosProfileConf::QOS_PROFILE_DEFAULT);
}

switch (mode) {
case OptionalMode::INTRA:
receiver =
std::make_shared<IntraReceiver<M>>(modified_attr, msg_listener);
break;

case OptionalMode::SHM:
receiver = std::make_shared<ShmReceiver<M>>(modified_attr, msg_listener);
break;

case OptionalMode::RTPS:
receiver = std::make_shared<RtpsReceiver<M>>(modified_attr, msg_listener);
break;

default:
receiver = std::make_shared<HybridReceiver<M>>(
modified_attr, msg_listener, participant());
break;
}

RETURN_VAL_IF_NULL(receiver, nullptr);
if (mode != OptionalMode::HYBRID) {
receiver->Enable();
}
return receiver;
}

} // namespace transport
} // namespace cyber
} // namespace apollo

#endif // CYBER_TRANSPORT_TRANSPORT_H_

cyber/transport/shm/readable_info

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Study: ReadableInfo basically is a struct that have 3 element:
// host_id_, block_index_, channel_id_
// It just allow the serialize and deserialize of this struct
const size_t ReadableInfo::kSize = sizeof(uint64_t) * 2 + sizeof(uint32_t);

ReadableInfo::ReadableInfo() : host_id_(0), block_index_(0), channel_id_(0) {}

ReadableInfo::ReadableInfo(uint64_t host_id, uint32_t block_index,
uint64_t channel_id)
: host_id_(host_id), block_index_(block_index), channel_id_(channel_id) {}

ReadableInfo::~ReadableInfo() {}

ReadableInfo& ReadableInfo::operator=(const ReadableInfo& other) {
if (this != &other) {
this->host_id_ = other.host_id_;
this->block_index_ = other.block_index_;
this->channel_id_ = other.channel_id_;
}
return *this;
}

// Study: Actually, it will be much better to allow put in char array directly
// Using a string for serialize is always a bad idea
bool ReadableInfo::SerializeTo(std::string* dst) const {
RETURN_VAL_IF_NULL(dst, false);

dst->assign(reinterpret_cast<char*>(const_cast<uint64_t*>(&host_id_)),
sizeof(host_id_));
dst->append(reinterpret_cast<char*>(const_cast<uint32_t*>(&block_index_)),
sizeof(block_index_));
dst->append(reinterpret_cast<char*>(const_cast<uint64_t*>(&channel_id_)),
sizeof(channel_id_));
return true;
}

bool ReadableInfo::DeserializeFrom(const std::string& src) {
return DeserializeFrom(src.data(), src.size());
}

// Study: Read data byte by byte
bool ReadableInfo::DeserializeFrom(const char* src, std::size_t len) {
RETURN_VAL_IF_NULL(src, false);
if (len != kSize) {
AWARN << "src size[" << len << "] mismatch.";
return false;
}

char* ptr = const_cast<char*>(src);
memcpy(reinterpret_cast<char*>(&host_id_), ptr, sizeof(host_id_));
ptr += sizeof(host_id_);
memcpy(reinterpret_cast<char*>(&block_index_), ptr, sizeof(block_index_));
ptr += sizeof(block_index_);
memcpy(reinterpret_cast<char*>(&channel_id_), ptr, sizeof(channel_id_));

return true;
}

cyber/transport/shm/notifier_base

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Study: 就是一個callback
class NotifierBase;
using NotifierPtr = NotifierBase*;

class NotifierBase {
public:
virtual ~NotifierBase() = default;

virtual void Shutdown() = 0;

// Study: Put a data into it
virtual bool Notify(const ReadableInfo& info) = 0;
// Study: Anyone can call this to wait at most timeout_ms
// until someone have call Notify()
virtual bool Listen(int timeout_ms, ReadableInfo* info) = 0;
};

cyber/transport/shm/condition_notifier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

const uint32_t kBufLength = 4096;

// Study: ConditionNotifier
class ConditionNotifier : public NotifierBase {
// Study: Used a circular to store data, and added a mtx to protect multi-thread issue
// use cv to wait new data. This data is put into the shared memory
struct Indicator {
std::mutex mtx;
std::condition_variable cv;
uint64_t written_info_num = 0;
ReadableInfo infos[kBufLength];
};

public:
virtual ~ConditionNotifier();

void Shutdown() override;
bool Notify(const ReadableInfo& info) override;
bool Listen(int timeout_ms, ReadableInfo* info) override;

static const char* Type() { return "condition"; }

private:
bool Init();
// Study: Operation to the shared memory
bool OpenOrCreate();
bool OpenOnly();
bool Remove();
void Reset();

key_t key_ = 0;
// Study: The shared memory used to share indicator_ among process
void* managed_shm_ = nullptr;
size_t shm_size_ = 0;
Indicator* indicator_ = nullptr;
uint64_t next_listen_num_ = 0;
std::atomic<bool> is_shutdown_ = {false};

DECLARE_SINGLETON(ConditionNotifier)
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/transport/shm/condition_notifier.h"

#include <pthread.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <thread>

#include "cyber/common/log.h"
#include "cyber/common/util.h"

namespace apollo {
namespace cyber {
namespace transport {

using common::Hash;

ConditionNotifier::ConditionNotifier() {
// Study: The unique key for the shared memory
// Different process use the same key to access the same shared memory
key_ = static_cast<key_t>(Hash("/apollo/cyber/transport/shm/notifier"));
ADEBUG << "condition notifier key: " << key_;
shm_size_ = sizeof(Indicator);

if (!Init()) {
AERROR << "fail to init condition notifier.";
is_shutdown_.exchange(true);
}
}

ConditionNotifier::~ConditionNotifier() { Shutdown(); }

void ConditionNotifier::Shutdown() {
if (is_shutdown_.exchange(true)) {
return;
}

indicator_->cv.notify_all();
// Study: Ensure break the thread that called Listen
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Reset();
}

bool ConditionNotifier::Notify(const ReadableInfo& info) {
if (is_shutdown_.load()) {
ADEBUG << "notifier is shutdown.";
return false;
}

{
// Study: Add the data into the circular array
std::unique_lock<std::mutex> lck(indicator_->mtx);
auto idx = indicator_->written_info_num % kBufLength;
indicator_->infos[idx] = info;
++indicator_->written_info_num;
}

// Study: Let the thread who listening know new data come
indicator_->cv.notify_all();

return true;
}

bool ConditionNotifier::Listen(int timeout_ms, ReadableInfo* info) {
if (info == nullptr) {
AERROR << "info nullptr.";
return false;
}

if (is_shutdown_.load()) {
ADEBUG << "notifier is shutdown.";
return false;
}

std::unique_lock<std::mutex> lck(indicator_->mtx);
// Study: Check whether is listening to a not exist message in shared memory
// since the indicator live in shared memory in some case
// you cannot gurannte the value of written_info_num
if (next_listen_num_ >= indicator_->written_info_num) {
uint64_t target = next_listen_num_;
// Study: If the value not exist, then wait
if (!indicator_->cv.wait_for(
lck, std::chrono::milliseconds(timeout_ms), [target, this]() {
return this->indicator_->written_info_num > target ||
this->is_shutdown_.load();
})) {
ADEBUG << "timeout";
return false;
}

if (is_shutdown_.load()) {
AINFO << "notifier is shutdown.";
return false;
}
}

// Study: A new ConditionNotifier, need follow to the lastest of indicator_
if (next_listen_num_ == 0) {
next_listen_num_ = indicator_->written_info_num - 1;
}

auto idx = next_listen_num_ % kBufLength;
*info = indicator_->infos[idx];
next_listen_num_ += 1;

return true;
}

bool ConditionNotifier::Init() { return OpenOrCreate(); }

bool ConditionNotifier::OpenOrCreate() {
// create managed_shm_
int retry = 0;
int shmid = 0;
while (retry < 2) {
// Study: Create the shared memory
shmid = shmget(key_, shm_size_, 0644 | IPC_CREAT | IPC_EXCL);
if (shmid != -1) {
break;
}

if (EINVAL == errno) {
AINFO << "need larger space, recreate.";
Reset();
Remove();
++retry;
} else if (EEXIST == errno) {
ADEBUG << "shm already exist, open only.";
return OpenOnly();
} else {
break;
}
}

if (shmid == -1) {
AERROR << "create shm failed, error code: " << strerror(errno);
return false;
}

// attach managed_shm_
managed_shm_ = shmat(shmid, nullptr, 0);
if (managed_shm_ == reinterpret_cast<void*>(-1)) {
AERROR << "attach shm failed.";
shmctl(shmid, IPC_RMID, 0);
return false;
}

// create indicator_
// Study: Create the indicator_ using the created shared memory
indicator_ = new (managed_shm_) Indicator();
if (indicator_ == nullptr) {
AERROR << "create indicator failed.";
shmdt(managed_shm_);
managed_shm_ = nullptr;
shmctl(shmid, IPC_RMID, 0);
return false;
}

// Study: Necessary to set the mutex attr if want it to work in shared memory mode
pthread_mutexattr_t mtx_attr;
pthread_mutexattr_init(&mtx_attr);
pthread_mutexattr_setpshared(&mtx_attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(indicator_->mtx.native_handle(), &mtx_attr);

pthread_condattr_t cond_attr;
pthread_condattr_init(&cond_attr);
pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(indicator_->cv.native_handle(), &cond_attr);

ADEBUG << "open or create true.";
return true;
}

// Study: The indicator_ only need created by one process,
// The other people just need open it
bool ConditionNotifier::OpenOnly() {
// get managed_shm_
int shmid = shmget(key_, 0, 0644);
if (shmid == -1) {
AERROR << "get shm failed.";
return false;
}

// attach managed_shm_
managed_shm_ = shmat(shmid, nullptr, 0);
if (managed_shm_ == reinterpret_cast<void*>(-1)) {
AERROR << "attach shm failed.";
return false;
}

// get indicator_
indicator_ = reinterpret_cast<Indicator*>(managed_shm_);
if (indicator_ == nullptr) {
AERROR << "get indicator failed.";
shmdt(managed_shm_);
managed_shm_ = nullptr;
return false;
}

ADEBUG << "open true.";
return true;
}

bool ConditionNotifier::Remove() {
int shmid = shmget(key_, 0, 0644);
if (shmid == -1 || shmctl(shmid, IPC_RMID, 0) == -1) {
AERROR << "remove shm failed, error code: " << strerror(errno);
return false;
}
ADEBUG << "remove success.";

return true;
}

void ConditionNotifier::Reset() {
indicator_ = nullptr;
if (managed_shm_ != nullptr) {
shmdt(managed_shm_);
managed_shm_ = nullptr;
}
}

} // namespace transport
} // namespace cyber
} // namespace apollo

cyver/transport/shm/multicast_notifier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
MulticastNotifier::MulticastNotifier() {
if (!Init()) {
Shutdown();
}
}

MulticastNotifier::~MulticastNotifier() { Shutdown(); }

void MulticastNotifier::Shutdown() {
if (is_shutdown_.exchange(true)) {
return;
}

if (notify_fd_ != -1) {
close(notify_fd_);
notify_fd_ = -1;
}
memset(&notify_addr_, 0, sizeof(notify_addr_));

if (listen_fd_ != -1) {
close(listen_fd_);
listen_fd_ = -1;
}
memset(&listen_addr_, 0, sizeof(listen_addr_));
}

bool MulticastNotifier::Notify(const ReadableInfo& info) {
if (is_shutdown_.load()) {
return false;
}

std::string info_str;
info.SerializeTo(&info_str);
ssize_t nbytes =
sendto(notify_fd_, info_str.c_str(), info_str.size(), 0,
(struct sockaddr*)&notify_addr_, sizeof(notify_addr_));
return nbytes > 0;
}

bool MulticastNotifier::Listen(int timeout_ms, ReadableInfo* info) {
if (is_shutdown_.load()) {
return false;
}

if (info == nullptr) {
AERROR << "info nullptr.";
return false;
}

struct pollfd fds;
fds.fd = listen_fd_;
fds.events = POLLIN;
// Study: Poll data from the multicast network
int ready_num = poll(&fds, 1, timeout_ms);
if (ready_num > 0) {
char buf[32] = {0}; // larger than ReadableInfo::kSize
ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
if (nbytes == -1) {
AERROR << "fail to recvfrom, " << strerror(errno);
return false;
}
return info->DeserializeFrom(buf, nbytes);
} else if (ready_num == 0) {
ADEBUG << "timeout, no readableinfo.";
} else {
if (errno == EINTR) {
AINFO << "poll was interrupted.";
} else {
AERROR << "fail to poll, " << strerror(errno);
}
}
return false;
}

bool MulticastNotifier::Init() {
// Study: the ip range from 224.0.0.0 to 239.255.255.255 is targeted for multicasting
std::string mcast_ip("239.255.0.100");
uint16_t mcast_port = 8888;

// Study: If user have a custom ip and port, use it
auto& g_conf = GlobalData::Instance()->Config();
if (g_conf.has_transport_conf() && g_conf.transport_conf().has_shm_conf() &&
g_conf.transport_conf().shm_conf().has_shm_locator()) {
auto& locator = g_conf.transport_conf().shm_conf().shm_locator();
mcast_ip = locator.ip();
mcast_port = static_cast<uint16_t>(locator.port());
}

ADEBUG << "multicast notifier ip: " << mcast_ip;
ADEBUG << "multicast notifier port: " << mcast_port;

// Study: Create socket for send notify
notify_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
if (notify_fd_ == -1) {
AERROR << "fail to create notify fd, " << strerror(errno);
return false;
}

memset(&notify_addr_, 0, sizeof(notify_addr_));
notify_addr_.sin_family = AF_INET;
notify_addr_.sin_addr.s_addr = inet_addr(mcast_ip.c_str());
notify_addr_.sin_port = htons(mcast_port);

// Study: Create socket for listen
listen_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
if (listen_fd_ == -1) {
AERROR << "fail to create listen fd, " << strerror(errno);
return false;
}

if (fcntl(listen_fd_, F_SETFL, O_NONBLOCK) == -1) {
AERROR << "fail to set listen fd nonblock, " << strerror(errno);
return false;
}

memset(&listen_addr_, 0, sizeof(listen_addr_));
listen_addr_.sin_family = AF_INET;
listen_addr_.sin_addr.s_addr = htonl(INADDR_ANY);
listen_addr_.sin_port = htons(mcast_port);

int yes = 1;
if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
AERROR << "fail to setsockopt SO_REUSEADDR, " << strerror(errno);
return false;
}

if (bind(listen_fd_, (struct sockaddr*)&listen_addr_, sizeof(listen_addr_)) <
0) {
AERROR << "fail to bind addr, " << strerror(errno);
return false;
}

int loop = 1;
if (setsockopt(listen_fd_, IPPROTO_IP, IP_MULTICAST_LOOP, &loop,
sizeof(loop)) < 0) {
AERROR << "fail to setsockopt IP_MULTICAST_LOOP, " << strerror(errno);
return false;
}

struct ip_mreq mreq;
mreq.imr_multiaddr.s_addr = inet_addr(mcast_ip.c_str());
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
if (setsockopt(listen_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0) {
AERROR << "fail to setsockopt IP_ADD_MEMBERSHIP, " << strerror(errno);
return false;
}

return true;
}

cyber/transport/shm/state

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Study: The state for segment
// This guarantee the thread safe issue
class State {
public:
explicit State(const uint64_t& ceiling_msg_size);
virtual ~State();

void IncreaseWroteNum() { wrote_num_.fetch_add(1); }
void ResetWroteNum() { wrote_num_.store(0); }

void DecreaseReferenceCounts() {
uint32_t current_reference_count = reference_count_.load();
do {
if (current_reference_count == 0) {
return;
}
} while (!reference_count_.compare_exchange_strong(
current_reference_count, current_reference_count - 1));
}

void IncreaseReferenceCounts() { reference_count_.fetch_add(1); }

void set_need_remap(bool need) { need_remap_.store(need); }
bool need_remap() { return need_remap_; }

uint64_t ceiling_msg_size() { return ceiling_msg_size_.load(); }
uint32_t reference_counts() { return reference_count_.load(); }
uint32_t wrote_num() { return wrote_num_.load(); }

private:
std::atomic<bool> need_remap_ = {false};
std::atomic<uint32_t> wrote_num_ = {0};
std::atomic<uint32_t> reference_count_ = {0};
std::atomic<uint64_t> ceiling_msg_size_;
};

cyber/transport/shm/segment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367

Segment::Segment(uint64_t channel_id, const ReadWriteMode& mode)
: init_(false),
mode_(mode),
conf_(),
state_(nullptr),
blocks_(nullptr),
managed_shm_(nullptr),
block_buf_lock_(),
block_buf_addrs_() {
// Study: Obviously, one channel only own one segment
id_ = static_cast<key_t>(channel_id);
}

Segment::~Segment() { Destroy(); }

bool Segment::AcquireBlockToWrite(std::size_t msg_size,
WritableBlock* writable_block) {
RETURN_VAL_IF_NULL(writable_block, false);
if (!init_ && !Init()) {
AERROR << "init failed, can't write now.";
return false;
}

bool result = true;
if (state_->need_remap()) {
result = Remap();
}

// Study: Check whether the msg size is larger that
if (msg_size > conf_.ceiling_msg_size()) {
conf_.Update(msg_size);
result = Recreate();
}

if (!result) {
AERROR << "segment update failed.";
return false;
}

uint32_t index = GetNextWritableBlockIndex();
writable_block->index = index;
writable_block->block = &blocks_[index];
writable_block->buf = block_buf_addrs_[index];
return true;
}

void Segment::ReleaseWrittenBlock(const WritableBlock& writable_block) {
auto index = writable_block.index;
if (index >= conf_.block_num()) {
return;
}
blocks_[index].ReleaseWriteLock();
}

// Study: read a block after locked it
bool Segment::AcquireBlockToRead(ReadableBlock* readable_block) {
RETURN_VAL_IF_NULL(readable_block, false);

if (!init_ && !Init()) {
AERROR << "init failed, can't read now.";
return false;
}
auto index = readable_block->index;
if (index >= conf_.block_num()) {
AERROR << "invalid block_index[" << index << "].";
return false;
}

bool result = true;
if (state_->need_remap()) {
result = Remap();
}

if (!result) {
AERROR << "segment update failed.";
return false;
}

if (!blocks_[index].TryLockForRead()) {
return false;
}
readable_block->block = blocks_ + index;
readable_block->buf = block_buf_addrs_[index];
return true;
}

// Study: Unlock the block after readed it
void Segment::ReleaseReadBlock(const ReadableBlock& readable_block) {
auto index = readable_block.index;
if (index >= conf_.block_num()) {
return;
}
blocks_[index].ReleaseReadLock();
}

bool Segment::Init() {
if (mode_ == READ_ONLY) {
return OpenOnly();
} else {
return OpenOrCreate();
}
}

bool Segment::OpenOrCreate() {
if (init_) {
return true;
}

// create managed_shm_
int retry = 0;
int shmid = 0;
while (retry < 2) {
// Study: Create shared memory
shmid = shmget(id_, conf_.managed_shm_size(), 0644 | IPC_CREAT | IPC_EXCL);
if (shmid != -1) {
break;
}

if (EINVAL == errno) {
AINFO << "need larger space, recreate.";
Reset();
Remove();
++retry;
} else if (EEXIST == errno) {
ADEBUG << "shm already exist, open only.";
return OpenOnly();
} else {
break;
}
}

if (shmid == -1) {
AERROR << "create shm failed, error code: " << strerror(errno);
return false;
}

// attach managed_shm_
// Study: Using the managed_shm_ to get the shared memory ptr
managed_shm_ = shmat(shmid, nullptr, 0);
if (managed_shm_ == reinterpret_cast<void*>(-1)) {
AERROR << "attach shm failed.";
shmctl(shmid, IPC_RMID, 0);
return false;
}

// create field state_
// Study: The state is put in shared memory
state_ = new (managed_shm_) State(conf_.ceiling_msg_size());
if (state_ == nullptr) {
AERROR << "create state failed.";
shmdt(managed_shm_);
managed_shm_ = nullptr;
shmctl(shmid, IPC_RMID, 0);
return false;
}

// Study: The ceiling_msg_size will be update, so need sync
conf_.Update(state_->ceiling_msg_size());

// create field blocks_
// Study: Used the remain memory to create the block
blocks_ = new (static_cast<char*>(managed_shm_) + sizeof(State))
Block[conf_.block_num()];
if (blocks_ == nullptr) {
AERROR << "create blocks failed.";
state_->~State();
state_ = nullptr;
shmdt(managed_shm_);
managed_shm_ = nullptr;
shmctl(shmid, IPC_RMID, 0);
return false;
}

// create block buf
// Study: Use the remain remain memory to create the block buf addr map
// Size of block_buf_addrs_ is fixed
uint32_t i = 0;
for (; i < conf_.block_num(); ++i) {
uint8_t* addr =
new (static_cast<char*>(managed_shm_) + sizeof(State) +
conf_.block_num() * sizeof(Block) + i * conf_.block_buf_size())
uint8_t[conf_.block_buf_size()];
std::lock_guard<std::mutex> _g(block_buf_lock_);
block_buf_addrs_[i] = addr;
}

// Study: Cleanup if failure
if (i != conf_.block_num()) {
AERROR << "create block buf failed.";
state_->~State();
state_ = nullptr;
blocks_ = nullptr;
{
std::lock_guard<std::mutex> _g(block_buf_lock_);
block_buf_addrs_.clear();
}
shmdt(managed_shm_);
managed_shm_ = nullptr;
shmctl(shmid, IPC_RMID, 0);
return false;
}

// Study: One more process referenced to this shared memory
state_->IncreaseReferenceCounts();
init_ = true;
ADEBUG << "open or create true.";
return true;
}

bool Segment::OpenOnly() {
if (init_) {
return true;
}

// get managed_shm_
int shmid = shmget(id_, 0, 0644);
if (shmid == -1) {
AERROR << "get shm failed.";
return false;
}

// attach managed_shm_
managed_shm_ = shmat(shmid, nullptr, 0);
if (managed_shm_ == reinterpret_cast<void*>(-1)) {
AERROR << "attach shm failed.";
return false;
}

// get field state_
state_ = reinterpret_cast<State*>(managed_shm_);
if (state_ == nullptr) {
AERROR << "get state failed.";
shmdt(managed_shm_);
managed_shm_ = nullptr;
return false;
}

conf_.Update(state_->ceiling_msg_size());

// get field blocks_
blocks_ = reinterpret_cast<Block*>(static_cast<char*>(managed_shm_) +
sizeof(State));
if (blocks_ == nullptr) {
AERROR << "get blocks failed.";
state_ = nullptr;
shmdt(managed_shm_);
managed_shm_ = nullptr;
return false;
}

// get block buf
uint32_t i = 0;
for (; i < conf_.block_num(); ++i) {
uint8_t* addr = reinterpret_cast<uint8_t*>(
static_cast<char*>(managed_shm_) + sizeof(State) +
conf_.block_num() * sizeof(Block) + i * conf_.block_buf_size());

if (addr == nullptr) {
break;
}

std::lock_guard<std::mutex> _g(block_buf_lock_);
block_buf_addrs_[i] = addr;
}

if (i != conf_.block_num()) {
AERROR << "open only failed.";
state_->~State();
state_ = nullptr;
blocks_ = nullptr;
{
std::lock_guard<std::mutex> _g(block_buf_lock_);
block_buf_addrs_.clear();
}
shmdt(managed_shm_);
managed_shm_ = nullptr;
shmctl(shmid, IPC_RMID, 0);
return false;
}

state_->IncreaseReferenceCounts();
init_ = true;
ADEBUG << "open only true.";
return true;
}

bool Segment::Remove() {
int shmid = shmget(id_, 0, 0644);
if (shmid == -1 || shmctl(shmid, IPC_RMID, 0) == -1) {
AERROR << "remove shm failed, error code: " << strerror(errno);
return false;
}

ADEBUG << "remove success.";
return true;
}

bool Segment::Destroy() {
if (!init_) {
return true;
}
init_ = false;

try {
state_->DecreaseReferenceCounts();
uint32_t reference_counts = state_->reference_counts();
if (reference_counts == 0) {
return Remove();
}
} catch (...) {
AERROR << "exception.";
return false;
}
ADEBUG << "destory.";
return true;
}

// Study: Reset the shared memory and the actualy content
void Segment::Reset() {
state_ = nullptr;
blocks_ = nullptr;
{
std::lock_guard<std::mutex> _g(block_buf_lock_);
block_buf_addrs_.clear();
}

if (managed_shm_ != nullptr) {
shmdt(managed_shm_);
managed_shm_ = nullptr;
}
}

// Study: Map again to the shared memory
bool Segment::Remap() {
init_ = false;
ADEBUG << "before reset.";
Reset();
ADEBUG << "after reset.";
return OpenOnly();
}

bool Segment::Recreate() {
init_ = false;
state_->set_need_remap(true);
Reset();
Remove();
return OpenOrCreate();
}

uint32_t Segment::GetNextWritableBlockIndex() {
uint32_t try_idx = state_->wrote_num();

auto max_mod_num = conf_.block_num() - 1;
while (1) {
if (try_idx >= conf_.block_num()) {
try_idx &= max_mod_num;
}

if (blocks_[try_idx].TryLockForWrite()) {
state_->IncreaseWroteNum();
return try_idx;
}

++try_idx;
}
}

cyber/transport/dispatcher/shm_dispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/transport/dispatcher/shm_dispatcher.h"
#include "cyber/common/global_data.h"
#include "cyber/common/util.h"
#include "cyber/scheduler/scheduler_factory.h"
#include "cyber/transport/shm/readable_info.h"

namespace apollo {
namespace cyber {
namespace transport {

using common::GlobalData;

ShmDispatcher::ShmDispatcher() : host_id_(0) { Init(); }

ShmDispatcher::~ShmDispatcher() { Shutdown(); }

void ShmDispatcher::Shutdown() {
if (is_shutdown_.exchange(true)) {
return;
}

// Study: The ShmDispatcher
if (thread_.joinable()) {
thread_.join();
}

{
ReadLockGuard<AtomicRWLock> lock(segments_lock_);
segments_.clear();
}
}

// Study: We can see that one ShmDispatcher may interace to multiple channel
void ShmDispatcher::AddSegment(const RoleAttributes& self_attr) {
uint64_t channel_id = self_attr.channel_id();
WriteLockGuard<AtomicRWLock> lock(segments_lock_);
// Study: One channel only allow have one memory segment
if (segments_.count(channel_id) > 0) {
return;
}
// Study need new place, allocate it
auto segment = std::make_shared<Segment>(channel_id, READ_ONLY);
segments_[channel_id] = segment;
previous_indexes_[channel_id] = UINT32_MAX;
}

void ShmDispatcher::ReadMessage(uint64_t channel_id, uint32_t block_index) {
ADEBUG << "Reading sharedmem message: "
<< GlobalData::GetChannelById(channel_id)
<< " from block: " << block_index;
auto rb = std::make_shared<ReadableBlock>();
rb->index = block_index;

// Study: If want to read data, lock it first
if (!segments_[channel_id]->AcquireBlockToRead(rb.get())) {
AWARN << "fail to acquire block, channel: "
<< GlobalData::GetChannelById(channel_id)
<< " index: " << block_index;
return;
}

MessageInfo msg_info;
const char* msg_info_addr =
reinterpret_cast<char*>(rb->buf) + rb->block->msg_size();
// Study: Get the data from memory and change it to a object
if (msg_info.DeserializeFrom(msg_info_addr, rb->block->msg_info_size())) {
// Study: Callback
OnMessage(channel_id, rb, msg_info);
} else {
AERROR << "error msg info of channel:"
<< GlobalData::GetChannelById(channel_id);
}

// Study: Always release after acquire
segments_[channel_id]->ReleaseReadBlock(*rb);
}

void ShmDispatcher::OnMessage(uint64_t channel_id,
const std::shared_ptr<ReadableBlock>& rb,
const MessageInfo& msg_info) {
if (is_shutdown_.load()) {
return;
}
ListenerHandlerBasePtr* handler_base = nullptr;
// Study: Someone is listening to the newest data in this channel
if (msg_listeners_.Get(channel_id, &handler_base)) {
auto handler = std::dynamic_pointer_cast<ListenerHandler<ReadableBlock>>(
*handler_base);
handler->Run(rb, msg_info);
} else {
AERROR << "Cant find " << GlobalData::GetChannelById(channel_id)
<< "'s handler.";
}
}

void ShmDispatcher::ThreadFunc() {
ReadableInfo readable_info;
while (!is_shutdown_.load()) {
// Study: Check whether new message come
if (!notifier_->Listen(100, &readable_info)) {
ADEBUG << "listen failed.";
continue;
}

// Study: Where the info come
uint64_t host_id = readable_info.host_id();
if (host_id != host_id_) {
ADEBUG << "shm readable info from other host.";
continue;
}

// Study: channel id, and the index of new data
uint64_t channel_id = readable_info.channel_id();
uint32_t block_index = readable_info.block_index();

{
ReadLockGuard<AtomicRWLock> lock(segments_lock_);
// Study: The new message is not related to me
if (segments_.count(channel_id) == 0) {
continue;
}
// Study: History index
// check block index
if (previous_indexes_.count(channel_id) == 0) {
previous_indexes_[channel_id] = UINT32_MAX;
}
uint32_t& previous_index = previous_indexes_[channel_id];
if (block_index != 0 && previous_index != UINT32_MAX) {
if (block_index == previous_index) {
ADEBUG << "Receive SAME index " << block_index << " of channel "
<< channel_id;
} else if (block_index < previous_index) {
ADEBUG << "Receive PREVIOUS message. last: " << previous_index
<< ", now: " << block_index;
} else if (block_index - previous_index > 1) {
ADEBUG << "Receive JUMP message. last: " << previous_index
<< ", now: " << block_index;
}
}
previous_index = block_index;

// Study: let the msg_listeners_ do callback
ReadMessage(channel_id, block_index);
}
}
}

bool ShmDispatcher::Init() {
host_id_ = common::Hash(GlobalData::Instance()->HostIp());
notifier_ = NotifierFactory::CreateNotifier();
thread_ = std::thread(&ShmDispatcher::ThreadFunc, this);
// Study: Not managed by scheduler
scheduler::Instance()->SetInnerThreadAttr(&thread_, "shm_disp");
return true;
}


template <typename MessageT>
void ShmDispatcher::AddListener(const RoleAttributes& self_attr,
const MessageListener<MessageT>& listener) {
// FIXME: make it more clean
// Study: Basically same as listener(x, y), just added a data Parsing
auto listener_adapter = [listener](const std::shared_ptr<ReadableBlock>& rb,
const MessageInfo& msg_info) {
auto msg = std::make_shared<MessageT>();
RETURN_IF(!message::ParseFromArray(
rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
listener(msg, msg_info);
};

Dispatcher::AddListener<ReadableBlock>(self_attr, listener_adapter);
// Study: Ensure the shared memory for message data exist
// The channel info is included in self_attr
AddSegment(self_attr);
}

template <typename MessageT>
void ShmDispatcher::AddListener(const RoleAttributes& self_attr,
const RoleAttributes& opposite_attr,
const MessageListener<MessageT>& listener) {
// FIXME: make it more clean
auto listener_adapter = [listener](const std::shared_ptr<ReadableBlock>& rb,
const MessageInfo& msg_info) {
auto msg = std::make_shared<MessageT>();
RETURN_IF(!message::ParseFromArray(
rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
listener(msg, msg_info);
};

Dispatcher::AddListener<ReadableBlock>(self_attr, opposite_attr,
listener_adapter);
// Study: Ensure the shared memory for message data exist
AddSegment(self_attr);
}

} // namespace transport
} // namespace cyber
} // namespace apollo