Apollo Cyber Study P16 Task

// Study: 是我的筆記

task/task_manager.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_TASK_TASK_MANAGER_H_
#define CYBER_TASK_TASK_MANAGER_H_

#include <atomic>
#include <future>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

#include "cyber/base/bounded_queue.h"
#include "cyber/scheduler/scheduler_factory.h"

namespace apollo {
namespace cyber {

class TaskManager {
public:
virtual ~TaskManager();

void Shutdown();

// Study: F&& = allow using std::move for the func, args&& is the same reason
// decltype === std::result_of, it mean this enqueue return type is equal to the
// return type of F(Args...) wrapped by std::future
template <typename F, typename... Args>
auto Enqueue(F&& func, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
// Study: Combine the F(Args...) in to one thing, don;t need to pass argument later
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(func), std::forward<Args>(args)...));
if (!stop_.load()) {
task_queue_->Enqueue([task]() { (*task)(); });
for (auto& task : tasks_) {
scheduler::Instance()->NotifyTask(task);
}
}

// Study: Wrap by future, provide a way to get the result after it done
std::future<return_type> res(task->get_future());
return res;
}

private:
uint32_t num_threads_ = 0;
uint32_t task_queue_size_ = 1000;
std::atomic<bool> stop_ = {false};
std::vector<uint64_t> tasks_;
std::shared_ptr<base::BoundedQueue<std::function<void()>>> task_queue_;
DECLARE_SINGLETON(TaskManager);
};

} // namespace cyber
} // namespace apollo

#endif // CYBER_TASK_TASK_MANAGER_H_

task/task_manager.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**************Scheduler::****************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Vesched_infoon 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/task/task_manager.h"

#include "cyber/common/global_data.h"
#include "cyber/croutine/croutine.h"
#include "cyber/croutine/routine_factory.h"
#include "cyber/scheduler/scheduler_factory.h"

namespace apollo {
namespace cyber {

using apollo::cyber::common::GlobalData;
// Study: Classifiy the internal task and the user define task
static const char* const task_prefix = "/internal/task";

TaskManager::TaskManager()
: task_queue_size_(1000),
task_queue_(new base::BoundedQueue<std::function<void()>>()) {

// Study: BlockWaitStrategy is similiar to traditional os handle
if (!task_queue_->Init(task_queue_size_, new base::BlockWaitStrategy())) {
AERROR << "Task queue init failed";
throw std::runtime_error("Task queue init failed");
}
auto func = [this]() {
while (!stop_) {
std::function<void()> task;
// Study: No Task, then make this routine to wait data
if (!task_queue_->Dequeue(&task)) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
routine->HangUp();
continue;
}
task();
}
};

auto pool_size = scheduler::Instance()->TaskPoolSize();
// Study: If not provide the data visitor to CreateRoutineFactory
// it assume the function inside can get data by themselve
auto factory = croutine::CreateRoutineFactory(std::move(func));
tasks_.reserve(pool_size);
for (uint32_t i = 0; i < pool_size; i++) {
auto task_name = task_prefix + std::to_string(i);
tasks_.push_back(common::GlobalData::RegisterTaskName(task_name));
scheduler::Instance()->CreateTask(factory, task_name);
}
}

TaskManager::~TaskManager() { Shutdown(); }

void TaskManager::Shutdown() {
if (stop_.exchange(true)) {
return;
}

for (uint32_t i = 0; i < num_threads_; i++) {
scheduler::Instance()->RemoveTask(task_prefix + std::to_string(i));
}
}

} // namespace cyber
} // namespace apollo

task/task.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_TASK_TASK_H_
#define CYBER_TASK_TASK_H_

#include <future>
#include <utility>

#include "cyber/task/task_manager.h"

namespace apollo {
namespace cyber {

template <typename F, typename... Args>
static auto Async(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
// Study: A Wrapper Function only
return TaskManager::Instance()->Enqueue(std::forward<F>(f),
std::forward<Args>(args)...);
}

// Study: A Wrapper Function that support both routine and thread
static inline void Yield() {
if (croutine::CRoutine::GetCurrentRoutine()) {
croutine::CRoutine::Yield();
} else {
std::this_thread::yield();
}
}

// Study: A Wrapper Function that support both routine and thread
template <typename Rep, typename Period>
static void SleepFor(const std::chrono::duration<Rep, Period>& sleep_duration) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
if (routine == nullptr) {
std::this_thread::sleep_for(sleep_duration);
} else {
routine->Sleep(sleep_duration);
}
}

// Study: Actually same as the above function, just use different time unit
static inline void USleep(useconds_t usec) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
if (routine == nullptr) {
std::this_thread::sleep_for(std::chrono::microseconds{usec});
} else {
routine->Sleep(croutine::Duration(usec));
}
}

} // namespace cyber
} // namespace apollo

#endif // CYBER_TASK_TASK_H_

Apollo Cyber Study P15 Service Discovery

// Study: 是我的筆記

service_discovery/role/role

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_SERVICE_DISCOVERY_ROLE_ROLE_H_
#define CYBER_SERVICE_DISCOVERY_ROLE_ROLE_H_

#include <stdint.h>
#include <memory>
#include <string>

#include "cyber/proto/role_attributes.pb.h"

namespace apollo {
namespace cyber {
namespace service_discovery {

class RoleBase;
using RolePtr = std::shared_ptr<RoleBase>;
using RoleNode = RoleBase;
using RoleNodePtr = std::shared_ptr<RoleNode>;

class RoleWriter;
using RoleWriterPtr = std::shared_ptr<RoleWriter>;
using RoleReader = RoleWriter;
using RoleReaderPtr = std::shared_ptr<RoleReader>;

class RoleServer;
using RoleServerPtr = std::shared_ptr<RoleServer>;
using RoleClient = RoleServer;
using RoleClientPtr = std::shared_ptr<RoleClient>;

// Study: The only different of RoleWriter and RoleServer is how they do match
class RoleBase {
public:
RoleBase();
explicit RoleBase(const proto::RoleAttributes& attr,
uint64_t timestamp_ns = 0);
virtual ~RoleBase() = default;

virtual bool Match(const proto::RoleAttributes& target_attr) const;
bool IsEarlierThan(const RoleBase& other) const;

const proto::RoleAttributes& attributes() const { return attributes_; }
void set_attributes(const proto::RoleAttributes& attr) { attributes_ = attr; }

uint64_t timestamp_ns() const { return timestamp_ns_; }
void set_timestamp_ns(uint64_t timestamp_ns) { timestamp_ns_ = timestamp_ns; }

protected:
proto::RoleAttributes attributes_;
uint64_t timestamp_ns_;
};

class RoleWriter : public RoleBase {
public:
RoleWriter() {}
explicit RoleWriter(const proto::RoleAttributes& attr,
uint64_t timestamp_ns = 0);
virtual ~RoleWriter() = default;

bool Match(const proto::RoleAttributes& target_attr) const override;
};

class RoleServer : public RoleBase {
public:
RoleServer() {}
explicit RoleServer(const proto::RoleAttributes& attr,
uint64_t timestamp_ns = 0);
virtual ~RoleServer() = default;

bool Match(const proto::RoleAttributes& target_attr) const override;
};

} // namespace service_discovery
} // namespace cyber
} // namespace apollo

#endif // CYBER_SERVICE_DISCOVERY_ROLE_ROLE_H_

service_discovery/container/warehouse_base
service_discovery/container/single_value_warehouse
service_discovery/container/muti_value_warehouse

Just a wrapper for a map of role using unordered map(map/multi-map)

service_discovery/container/graph

graph structure with graph structure maintain

service_discovery/communication/SubscriberListener
https://fast-rtps.docs.eprosima.com/en/latest/pubsub.html?highlight=SubscriberListener

service_discovery/communication/ParticipantListener

https://fast-rtps.docs.eprosima.com/en/latest/advanced.html?highlight=ParticipantListener

The participant listener interface includes methods which are called each time a Publisher or a Subscriber is discovered. This allows you to create your own network analysis tools.

service_discovery/specific_manager/manager

The base calss of specific manager. It will listen a specifc channel (each type of manager will have their own channel) to record the target graph change.
It also support callback (what to do when target graph change) and gettter (The status of it target graph).

service_discovery/specific_manager/service_manager

Mainly used to check whether a service exist

service_discovery/specific_manager/node_manager

A simple tracking for all node(s) (check node duplicate/existence/basic config)

service_discovery/specific_manager/channel_manager

Target for the relationship between nodes and channel

service_discovery/topology_manager

The manager of all service. Group those manager into one manager, easy to start, stop.

Apollo Cyber Study P14 Service

// Study: 是我的筆記

service/client_base

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_SERVICE_CLIENT_BASE_H_
#define CYBER_SERVICE_CLIENT_BASE_H_

#include <chrono>
#include <string>

#include "cyber/common/macros.h"

namespace apollo {
namespace cyber {

class ClientBase {
public:
explicit ClientBase(const std::string& service_name)
: service_name_(service_name) {}
virtual ~ClientBase() {}

virtual void Destroy() = 0;

const std::string& ServiceName() const { return service_name_; }

virtual bool ServiceIsReady() const = 0;

protected:
std::string service_name_;

bool WaitForServiceNanoseconds(std::chrono::nanoseconds time_out) {
bool has_service = false;
auto step_duration = std::chrono::nanoseconds(5 * 1000 * 1000);
while (time_out.count() > 0) {
// Study: Check have service yet?
has_service = service_discovery::TopologyManager::Instance()
->service_manager()
->HasService(service_name_);
if (!has_service) {
// Study: loop until time out
std::this_thread::sleep_for(step_duration);
time_out -= step_duration;
} else {
break;
}
}
return has_service;
}
};

} // namespace cyber
} // namespace apollo

#endif // CYBER_SERVICE_CLIENT_BASE_H_

service/client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
template <typename Request, typename Response>
class Client : public ClientBase {
public:
using SharedRequest = typename std::shared_ptr<Request>;
using SharedResponse = typename std::shared_ptr<Response>;
using Promise = std::promise<SharedResponse>;
using SharedPromise = std::shared_ptr<Promise>;
using SharedFuture = std::shared_future<SharedResponse>;
using CallbackType = std::function<void(SharedFuture)>;

Client(const std::string& node_name, const std::string& service_name)
: ClientBase(service_name),
node_name_(node_name),
request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX),
sequence_number_(0) {}

Client() = delete;

virtual ~Client() {}

bool Init();

// Study: Traditional request response api
SharedResponse SendRequest(
SharedRequest request,
const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
SharedResponse SendRequest(
const Request& request,
const std::chrono::seconds& timeout_s = std::chrono::seconds(5));

// Study: Don't return the response directly, return a future response
SharedFuture AsyncSendRequest(SharedRequest request);
SharedFuture AsyncSendRequest(const Request& request);
SharedFuture AsyncSendRequest(SharedRequest request, CallbackType&& cb);

bool ServiceIsReady() const;
void Destroy();

template <typename RatioT = std::milli>
bool WaitForService(std::chrono::duration<int64_t, RatioT> timeout =
std::chrono::duration<int64_t, RatioT>(-1)) {
return WaitForServiceNanoseconds(
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout));
}

private:
void HandleResponse(const std::shared_ptr<Response>& response,
const transport::MessageInfo& request_info);
bool IsInit(void) const { return response_receiver_ != nullptr; }

std::string node_name_;

std::function<void(const std::shared_ptr<Response>&,
const transport::MessageInfo&)>
response_callback_;

std::unordered_map<uint64_t,
std::tuple<SharedPromise, CallbackType, SharedFuture>>
pending_requests_;
std::mutex pending_requests_mutex_;

// Study: Access the transport layer directly
std::shared_ptr<transport::Transmitter<Request>> request_transmitter_;
std::shared_ptr<transport::Receiver<Response>> response_receiver_;
std::string request_channel_;
std::string response_channel_;

transport::Identity writer_id_;
uint64_t sequence_number_;
};

template <typename Request, typename Response>
void Client<Request, Response>::Destroy() {}

template <typename Request, typename Response>
bool Client<Request, Response>::Init() {
proto::RoleAttributes role;
role.set_node_name(node_name_);
role.set_channel_name(request_channel_);
auto channel_id = common::GlobalData::RegisterChannel(request_channel_);
role.set_channel_id(channel_id);
role.mutable_qos_profile()->CopyFrom(
transport::QosProfileConf::QOS_PROFILE_SERVICES_DEFAULT);
auto transport = transport::Transport::Instance();
// Study: Selected rtps, so the client seem to be used to handle network conection
request_transmitter_ =
transport->CreateTransmitter<Request>(role, proto::OptionalMode::RTPS);
if (request_transmitter_ == nullptr) {
AERROR << "Create request pub failed.";
return false;
}
writer_id_ = request_transmitter_->id();

response_callback_ =
std::bind(&Client<Request, Response>::HandleResponse, this,
std::placeholders::_1, std::placeholders::_2);

role.set_channel_name(response_channel_);
channel_id = common::GlobalData::RegisterChannel(response_channel_);
role.set_channel_id(channel_id);

// Study: Let the receiver to wake up itself callback instead of polling itself
response_receiver_ = transport->CreateReceiver<Response>(
role,
[=](const std::shared_ptr<Response>& request,
const transport::MessageInfo& message_info,
const proto::RoleAttributes& reader_attr) {
(void)message_info;
(void)reader_attr;
response_callback_(request, message_info);
},
proto::OptionalMode::RTPS);
if (response_receiver_ == nullptr) {
AERROR << "Create response sub failed.";
request_transmitter_.reset();
return false;
}
return true;
}

template <typename Request, typename Response>
typename Client<Request, Response>::SharedResponse
Client<Request, Response>::SendRequest(SharedRequest request,
const std::chrono::seconds& timeout_s) {
if (!IsInit()) { return nullptr; }
auto future = AsyncSendRequest(request);
if (!future.valid()) {
return nullptr;
}
auto status = future.wait_for(timeout_s);
if (status == std::future_status::ready) {
return future.get();
} else {
return nullptr;
}
}

template <typename Request, typename Response>
typename Client<Request, Response>::SharedResponse
Client<Request, Response>::SendRequest(const Request& request,
const std::chrono::seconds& timeout_s) {
if (!IsInit()) { return nullptr; }
auto request_ptr = std::make_shared<const Request>(request);
return SendRequest(request_ptr, timeout_s);
}

template <typename Request, typename Response>
typename Client<Request, Response>::SharedFuture
Client<Request, Response>::AsyncSendRequest(const Request& request) {
auto request_ptr = std::make_shared<const Request>(request);
return AsyncSendRequest(request_ptr);
}

// Study: Using SharedXX can be reduce the work for this api user
template <typename Request, typename Response>
typename Client<Request, Response>::SharedFuture
Client<Request, Response>::AsyncSendRequest(SharedRequest request) {
return AsyncSendRequest(request, [](SharedFuture) {});
}

template <typename Request, typename Response>
typename Client<Request, Response>::SharedFuture
Client<Request, Response>::AsyncSendRequest(SharedRequest request,
CallbackType&& cb) {
if (IsInit()) {
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
sequence_number_++;
// Study: Using transport module directly, write the response to the response channel
transport::MessageInfo info(writer_id_, sequence_number_, writer_id_);
request_transmitter_->Transmit(request, info);
SharedPromise call_promise = std::make_shared<Promise>();
SharedFuture f(call_promise->get_future());
// Study: Record what should do when receiver wake it up
pending_requests_[info.seq_num()] =
std::make_tuple(call_promise, std::forward<CallbackType>(cb), f);
return f;
} else {
return std::shared_future<std::shared_ptr<Response>>();
}
}

template <typename Request, typename Response>
bool Client<Request, Response>::ServiceIsReady() const {
return true;
}

template <typename Request, typename Response>
void Client<Request, Response>::HandleResponse(
const std::shared_ptr<Response>& response,
const transport::MessageInfo& request_header) {
ADEBUG << "client recv response.";
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
// Study: Allow multiple client access to the same service, need to classify who should get this response
if (request_header.spare_id() != writer_id_) {
return;
}
// Study: The spare_id + seq_num can be the uuid of each request
uint64_t sequence_number = request_header.seq_num();
if (this->pending_requests_.count(sequence_number) == 0) {
return;
}
// Study: Is the response of each request
auto tuple = this->pending_requests_[sequence_number];
auto call_promise = std::get<0>(tuple);
auto callback = std::get<1>(tuple);
auto future = std::get<2>(tuple);
this->pending_requests_.erase(sequence_number);
call_promise->set_value(response);
callback(future);
}

service/service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_SERVICE_SERVICE_H_
#define CYBER_SERVICE_SERVICE_H_

#include <list>
#include <memory>
#include <string>
#include <utility>

#include "cyber/common/types.h"
#include "cyber/node/node_channel_impl.h"
#include "cyber/scheduler/scheduler.h"
#include "cyber/service/service_base.h"

namespace apollo {
namespace cyber {

template <typename Request, typename Response>
class Service : public ServiceBase {
public:
using ServiceCallback = std::function<void(const std::shared_ptr<Request>&,
std::shared_ptr<Response>&)>;
Service(const std::string& node_name, const std::string& service_name,
const ServiceCallback& service_callback)
: ServiceBase(service_name),
node_name_(node_name),
service_callback_(service_callback),
request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX) {}

Service(const std::string& node_name, const std::string& service_name,
ServiceCallback&& service_callback)
: ServiceBase(service_name),
node_name_(node_name),
service_callback_(service_callback),
request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX) {}

Service() = delete;
~Service() {
inited_ = false;
// Study: need a inner thread to keep checking request
condition_.notify_all();
if (thread_.joinable()) {
thread_.join();
}
}
bool Init();
void destroy();

private:
void HandleRequest(const std::shared_ptr<Request>& request,
const transport::MessageInfo& message_info);

void SendResponse(const transport::MessageInfo& message_info,
const std::shared_ptr<Response>& response);
bool IsInit(void) const { return request_receiver_ != nullptr; }

std::string node_name_;
ServiceCallback service_callback_;

std::function<void(const std::shared_ptr<Request>&,
const transport::MessageInfo&)>
request_callback_;
std::shared_ptr<transport::Transmitter<Response>> response_transmitter_;
std::shared_ptr<transport::Receiver<Request>> request_receiver_;
std::string request_channel_;
std::string response_channel_;
std::mutex service_handle_request_mutex_;

volatile bool inited_;
void Enqueue(std::function<void()>&& task);
void Process();
std::thread thread_;
std::mutex queue_mutex_;
std::condition_variable condition_;
std::list<std::function<void()>> tasks_;
};

template <typename Request, typename Response>
void Service<Request, Response>::destroy() {
inited_ = false;
condition_.notify_all();
if (thread_.joinable()) {
thread_.join();
}
}

template <typename Request, typename Response>
inline void Service<Request, Response>::Enqueue(std::function<void()>&& task) {
std::lock_guard<std::mutex> lg(queue_mutex_);
tasks_.emplace_back(std::move(task));
condition_.notify_one();
}

template <typename Request, typename Response>
void Service<Request, Response>::Process() {
while (!cyber::IsShutdown()) {
std::unique_lock<std::mutex> ul(queue_mutex_);
condition_.wait(ul, [this]() { return !inited_ || !this->tasks_.empty(); });
if (!inited_) {
break;
}
if (!tasks_.empty()) {
auto task = tasks_.front();
tasks_.pop_front();
ul.unlock();
task();
}
}
}

template <typename Request, typename Response>
bool Service<Request, Response>::Init() {
if (IsInit()) {
return true;
}
proto::RoleAttributes role;
role.set_node_name(node_name_);
role.set_channel_name(response_channel_);
auto channel_id = common::GlobalData::RegisterChannel(response_channel_);
role.set_channel_id(channel_id);
role.mutable_qos_profile()->CopyFrom(
transport::QosProfileConf::QOS_PROFILE_SERVICES_DEFAULT);
auto transport = transport::Transport::Instance();
response_transmitter_ =
transport->CreateTransmitter<Response>(role, proto::OptionalMode::RTPS);
if (response_transmitter_ == nullptr) {
AERROR << " Create response pub failed.";
return false;
}

request_callback_ =
std::bind(&Service<Request, Response>::HandleRequest, this,
std::placeholders::_1, std::placeholders::_2);

role.set_channel_name(request_channel_);
channel_id = common::GlobalData::RegisterChannel(request_channel_);
role.set_channel_id(channel_id);
// Stduy: The process of getting request and handle request is separated
// Can maximize the performance since the callblock operation will not block io
request_receiver_ = transport->CreateReceiver<Request>(
role,
[=](const std::shared_ptr<Request>& request,
const transport::MessageInfo& message_info,
const proto::RoleAttributes& reader_attr) {
(void)reader_attr;
(void)reader_attr;
auto task = [this, request, message_info]() {
this->HandleRequest(request, message_info);
};
Enqueue(std::move(task));
},
proto::OptionalMode::RTPS);
inited_ = true;
// Study: Handle the received request in other thread
thread_ = std::thread(&Service<Request, Response>::Process, this);
if (request_receiver_ == nullptr) {
AERROR << " Create request sub failed." << request_channel_;
response_transmitter_.reset();
return false;
}
return true;
}

template <typename Request, typename Response>
void Service<Request, Response>::HandleRequest(
const std::shared_ptr<Request>& request,
const transport::MessageInfo& message_info) {
if (!IsInit()) {
// LOG_DEBUG << "not inited error.";
return;
// Study: Handle request and put the response back to transport module again
}
ADEBUG << "handling request:" << request_channel_;
std::lock_guard<std::mutex> lk(service_handle_request_mutex_);
auto response = std::make_shared<Response>();
service_callback_(request, response);
transport::MessageInfo msg_info(message_info);
msg_info.set_sender_id(response_transmitter_->id());
SendResponse(msg_info, response);
}

template <typename Request, typename Response>
void Service<Request, Response>::SendResponse(
const transport::MessageInfo& message_info,
const std::shared_ptr<Response>& response) {
if (!IsInit()) {
// LOG_DEBUG << "not inited error.";
return;
}
// publish return value ?
// LOG_DEBUG << "send response id:" << message_id.sequence_number;
response_transmitter_->Transmit(response, message_info);
}

} // namespace cyber
} // namespace apollo

#endif // CYBER_SERVICE_SERVICE_H_

Apollo Cyber Study P13 Blocker

因為Blocker中其實沒有甚麼特別值得說的。
就大概說一下它是做甚麼的吧

在cyber中,從數據傳輸,分發有關的主要為transport,data visitorblocker三個模塊

  • transport負責最底層的數據傳輸 工作,也提供了跨進程數據傳輸的能力。而寫數據到channel都是直接由transport去處理的
  • data visitor則根據由transport模塊得到的數據去提供一個讀數據的接口,而這接口在cyber內部被大量使用,比如ComponentProc接的數據就是從data visitor來的
  • blocker則是把從Node::Reader那邊得到數據(Node::Reader的數據是由data visitor得到的 ),然後放到自己的message queue中,再提供一個接口給Node::Reader用。主要就幫Reader管理數據相關的邏輯。而Node::Reader的接口也是開發者們會用到的(而abi跟以前adapter提供的很像 )。

cyber外部模塊的開發者只會用到Node::Reader,而不是data visitor,transportBlocker

Apollo Cyber Study P12 Transport RTPS

先來看一下rtps是甚麼
https://wiki.wireshark.org/Protocols/rtps

Apollo用了eprosima-fast-rtps的rtps實現。
可以先看一下Docs
其concept和apollo內部的channel,receiver,transmitter是差不多的

1
2
3
4
5
6
7

At the top of RTPS, we find the Domain, which defines a separate plane of communication. Several domains can coexist at the same time independently. A domain contains any number of Participants, elements capable of sending and receiving data. To do this, the participants use their Endpoints:

Reader: Endpoint able to receive data.
Writer: Endpoint able to send data.

A Participant can have any number of writer and reader endpoints.
1
2
3
4
5
6
7
8
9
10
比較不一樣的是

Communication revolves around Topics, which define the data being exchanged. Topics don’t belong to any participant in particular; instead, all interested participants keep track of changes to the topic data and make sure to keep each other up to date. The unit of communication is called a Change, which represents an update to a topic. Endpoints register these changes on their History, a data structure that serves as a cache for recent changes. When you publish a change through a writer endpoint, the following steps happen behind the scenes:

The change is added to the writer’s history cache.
The writer informs any readers it knows about.
Any interested (subscribed) readers request the change.
After receiving data, readers update their history cache with the new change.

By choosing Quality of Service policies, you can affect how these history caches are managed in several ways, but the communication loop remains the same. You can read more information in Configuration.

比起shared memory版的, rtps版多了

  • qos (define quality of service)
  • have history cache in both reader and writer side

而因為eprosima-fast-rtps已經做了很大一部份的事情了,所以在cyber的相關代碼比shm的簡淺多了

// Study: 是我的筆記

cyber/transport/rtps/sub_listener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/transport/rtps/sub_listener.h"

#include "cyber/common/log.h"
#include "cyber/common/util.h"

namespace apollo {
namespace cyber {
namespace transport {

SubListener::SubListener(const NewMsgCallback& callback)
: callback_(callback) {}

SubListener::~SubListener() {}

void SubListener::onNewDataMessage(eprosima::fastrtps::Subscriber* sub) {
RETURN_IF_NULL(sub);
RETURN_IF_NULL(callback_);
// Study: Using a mutex make life easy, espacially when you are dueing with third library
std::lock_guard<std::mutex> lock(mutex_);

// fetch channel name
auto channel_id = common::Hash(sub->getAttributes().topic.getTopicName());
eprosima::fastrtps::SampleInfo_t m_info;
// Study: Cyber use the same message type for all data type, this can saving the work for creating message type for all data type
// since it must have a type conversion later
UnderlayMessage m;

// Study: Get data if possible
RETURN_IF(!sub->takeNextData(reinterpret_cast<void*>(&m), &m_info));
RETURN_IF(m_info.sampleKind != eprosima::fastrtps::ALIVE);

// fetch MessageInfo
char* ptr =
reinterpret_cast<char*>(&m_info.related_sample_identity.writer_guid());
Identity sender_id(false);
sender_id.set_data(ptr);
msg_info_.set_sender_id(sender_id);

Identity spare_id(false);
spare_id.set_data(ptr + ID_SIZE);
msg_info_.set_spare_id(spare_id);

uint64_t seq_num =
((int64_t)m_info.related_sample_identity.sequence_number().high) << 32 |
m_info.related_sample_identity.sequence_number().low;
msg_info_.set_seq_num(seq_num);

// fetch message string
std::shared_ptr<std::string> msg_str =
std::make_shared<std::string>(m.data());

// callback
callback_(channel_id, msg_str, msg_info_);
}

// Study: Not called at all
void SubListener::onSubscriptionMatched(
eprosima::fastrtps::Subscriber* sub,
eprosima::fastrtps::MatchingInfo& info) {
(void)sub;
(void)info;
}

} // namespace transport
} // namespace cyber
} // namespace apollo

transport/rtps/participant

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/transport/rtps/participant.h"

#include "cyber/common/global_data.h"
#include "cyber/common/log.h"
#include "cyber/proto/transport_conf.pb.h"

namespace apollo {
namespace cyber {
namespace transport {

Participant::Participant(const std::string& name, int send_port,
eprosima::fastrtps::ParticipantListener* listener)
: shutdown_(false),
name_(name),
send_port_(send_port),
listener_(listener),
fastrtps_participant_(nullptr) {}

Participant::~Participant() {}

void Participant::Shutdown() {
if (shutdown_.exchange(true)) {
return;
}

std::lock_guard<std::mutex> lk(mutex_);
if (fastrtps_participant_ != nullptr) {
eprosima::fastrtps::Domain::removeParticipant(fastrtps_participant_);
fastrtps_participant_ = nullptr;
listener_ = nullptr;
}
}

eprosima::fastrtps::Participant* Participant::fastrtps_participant() {
if (shutdown_.load()) {
return nullptr;
}

std::lock_guard<std::mutex> lk(mutex_);
if (fastrtps_participant_ != nullptr) {
return fastrtps_participant_;
}

CreateFastRtpsParticipant(name_, send_port_, listener_);
return fastrtps_participant_;
}

void Participant::CreateFastRtpsParticipant(
const std::string& name, int send_port,
eprosima::fastrtps::ParticipantListener* listener) {
// Study: DomainId: Publishers and Subscribers can only talk to each other if their Participants belong to the same DomainId.
// A interesting part, it selected one domain id only which mean it is impossible to have multiple domain
// But in original fast-rtps, it support multiplt domain
uint32_t domain_id = 80;

// Study: Set in the bash script
const char* val = ::getenv("CYBER_DOMAIN_ID");
if (val != nullptr) {
try {
domain_id = std::stoi(val);
} catch (const std::exception& e) {
AERROR << "convert domain_id error " << e.what();
return;
}
}

auto part_attr_conf = std::make_shared<proto::RtpsParticipantAttr>();
auto& global_conf = common::GlobalData::Instance()->Config();
if (global_conf.has_transport_conf() &&
global_conf.transport_conf().has_participant_attr()) {
part_attr_conf->CopyFrom(global_conf.transport_conf().participant_attr());
}

eprosima::fastrtps::ParticipantAttributes attr;
attr.rtps.defaultSendPort = send_port;
attr.rtps.port.domainIDGain =
static_cast<uint16_t>(part_attr_conf->domain_id_gain());
attr.rtps.port.portBase = static_cast<uint16_t>(part_attr_conf->port_base());
attr.rtps.use_IP6_to_send = false;
attr.rtps.builtin.use_SIMPLE_RTPSParticipantDiscoveryProtocol = true;
attr.rtps.builtin.use_SIMPLE_EndpointDiscoveryProtocol = true;
attr.rtps.builtin.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter =
true;
attr.rtps.builtin.m_simpleEDP.use_PublicationWriterANDSubscriptionReader =
true;
attr.rtps.builtin.domainId = domain_id;
attr.rtps.builtin.leaseDuration.seconds = part_attr_conf->lease_duration();
attr.rtps.builtin.leaseDuration_announcementperiod.seconds =
part_attr_conf->announcement_period();

attr.rtps.setName(name.c_str());

// Study: Since rtps is a network protocol, it must know a ip
std::string ip_env("127.0.0.1");
const char* ip_val = ::getenv("CYBER_IP");
if (ip_val != nullptr) {
ip_env = ip_val;
if (ip_env.size() == 0) {
AERROR << "invalid CYBER_IP (an empty string)";
return;
}
}
ADEBUG << "cyber ip: " << ip_env;

eprosima::fastrtps::rtps::Locator_t locator;
locator.port = 0;
RETURN_IF(!locator.set_IP4_address(ip_env));

locator.kind = LOCATOR_KIND_UDPv4;

attr.rtps.defaultUnicastLocatorList.push_back(locator);
attr.rtps.defaultOutLocatorList.push_back(locator);
attr.rtps.builtin.metatrafficUnicastLocatorList.push_back(locator);

locator.set_IP4_address(239, 255, 0, 1);
attr.rtps.builtin.metatrafficMulticastLocatorList.push_back(locator);

fastrtps_participant_ =
eprosima::fastrtps::Domain::createParticipant(attr, listener);
RETURN_IF_NULL(fastrtps_participant_);
eprosima::fastrtps::Domain::registerType(fastrtps_participant_, &type_);
}

} // namespace transport
} // namespace cyber
} // namespace apollo

transport/rtps/underlay_message
transport/rtps/underlay_message_type

The class generated by the Fast RTPS IDL, used to serialize and deserialize data

transport/dispatcher/shm_dispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/transport/dispatcher/rtps_dispatcher.h"

namespace apollo {
namespace cyber {
namespace transport {

RtpsDispatcher::RtpsDispatcher() : participant_(nullptr) {}

RtpsDispatcher::~RtpsDispatcher() { Shutdown(); }

void RtpsDispatcher::Shutdown() {
if (is_shutdown_.exchange(true)) {
return;
}

{
std::lock_guard<std::mutex> lock(subs_mutex_);
for (auto& item : subs_) {
item.second.sub = nullptr;
}
}

participant_ = nullptr;
}

void RtpsDispatcher::AddSubscriber(const RoleAttributes& self_attr) {
if (participant_ == nullptr) {
AWARN << "please set participant firstly.";
return;
}

uint64_t channel_id = self_attr.channel_id();
std::lock_guard<std::mutex> lock(subs_mutex_);
if (subs_.count(channel_id) > 0) {
return;
}

Subscriber new_sub;
eprosima::fastrtps::SubscriberAttributes sub_attr;
auto& qos = self_attr.qos_profile();
RETURN_IF(!AttributesFiller::FillInSubAttr(self_attr.channel_name(), qos,
&sub_attr));

// Study: So the SubListener is just a subscriber + callback....
new_sub.sub_listener = std::make_shared<SubListener>(
std::bind(&RtpsDispatcher::OnMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));

new_sub.sub = eprosima::fastrtps::Domain::createSubscriber(
participant_->fastrtps_participant(), sub_attr,
new_sub.sub_listener.get());
RETURN_IF_NULL(new_sub.sub);
subs_[channel_id] = new_sub;
}

void RtpsDispatcher::OnMessage(uint64_t channel_id,
const std::shared_ptr<std::string>& msg_str,
const MessageInfo& msg_info) {
if (is_shutdown_.load()) {
return;
}

// Study: Similiar to shm part impl, no big deal
ListenerHandlerBasePtr* handler_base = nullptr;
if (msg_listeners_.Get(channel_id, &handler_base)) {
auto handler =
std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base);
handler->Run(msg_str, msg_info);
}
}

} // namespace transport
} // namespace cyber
} // namespace apollo

transport/transmitter/rtps_transmitter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_TRANSPORT_TRANSMITTER_RTPS_TRANSMITTER_H_
#define CYBER_TRANSPORT_TRANSMITTER_RTPS_TRANSMITTER_H_

#include <memory>
#include <string>

#include "cyber/common/log.h"
#include "cyber/message/message_traits.h"
#include "cyber/transport/rtps/attributes_filler.h"
#include "cyber/transport/rtps/participant.h"
#include "cyber/transport/transmitter/transmitter.h"
#include "fastrtps/Domain.h"
#include "fastrtps/attributes/PublisherAttributes.h"
#include "fastrtps/participant/Participant.h"
#include "fastrtps/publisher/Publisher.h"

namespace apollo {
namespace cyber {
namespace transport {

template <typename M>
class RtpsTransmitter : public Transmitter<M> {
public:
using MessagePtr = std::shared_ptr<M>;

RtpsTransmitter(const RoleAttributes& attr,
const ParticipantPtr& participant);
virtual ~RtpsTransmitter();

void Enable() override;
void Disable() override;

bool Transmit(const MessagePtr& msg, const MessageInfo& msg_info) override;

private:
bool Transmit(const M& msg, const MessageInfo& msg_info);

ParticipantPtr participant_;
eprosima::fastrtps::Publisher* publisher_;
};

template <typename M>
RtpsTransmitter<M>::RtpsTransmitter(const RoleAttributes& attr,
const ParticipantPtr& participant)
: Transmitter<M>(attr), participant_(participant), publisher_(nullptr) {}

template <typename M>
RtpsTransmitter<M>::~RtpsTransmitter() {
Disable();
}

template <typename M>
void RtpsTransmitter<M>::Enable() {
if (this->enabled_) {
return;
}

RETURN_IF_NULL(participant_);

eprosima::fastrtps::PublisherAttributes pub_attr;
// Study: Need fill the attribute according to the qos profile
// Annoying
RETURN_IF(!AttributesFiller::FillInPubAttr(
this->attr_.channel_name(), this->attr_.qos_profile(), &pub_attr));
publisher_ = eprosima::fastrtps::Domain::createPublisher(
participant_->fastrtps_participant(), pub_attr);
RETURN_IF_NULL(publisher_);
this->enabled_ = true;
}

template <typename M>
void RtpsTransmitter<M>::Disable() {
if (this->enabled_) {
publisher_ = nullptr;
this->enabled_ = false;
}
}

template <typename M>
bool RtpsTransmitter<M>::Transmit(const MessagePtr& msg,
const MessageInfo& msg_info) {
return Transmit(*msg, msg_info);
}

template <typename M>
bool RtpsTransmitter<M>::Transmit(const M& msg, const MessageInfo& msg_info) {
if (!this->enabled_) {
ADEBUG << "not enable.";
return false;
}

UnderlayMessage m;
// Study: m.data is just a str, can reuse the original existing serializaion and deserialzation
RETURN_VAL_IF(!message::SerializeToString(msg, &m.data()), false);

eprosima::fastrtps::rtps::WriteParams wparams;

char* ptr =
reinterpret_cast<char*>(&wparams.related_sample_identity().writer_guid());

memcpy(ptr, msg_info.sender_id().data(), ID_SIZE);
memcpy(ptr + ID_SIZE, msg_info.spare_id().data(), ID_SIZE);

wparams.related_sample_identity().sequence_number().high =
(int32_t)((msg_info.seq_num() & 0xFFFFFFFF00000000) >> 32);
wparams.related_sample_identity().sequence_number().low =
(int32_t)(msg_info.seq_num() & 0xFFFFFFFF);

if (participant_->is_shutdown()) {
return false;
}
return publisher_->write(reinterpret_cast<void*>(&m), wparams);
}

} // namespace transport
} // namespace cyber
} // namespace apollo

#endif // CYBER_TRANSPORT_TRANSMITTER_RTPS_TRANSMITTER_H_

transport/receiver/rtps_receiver

Same as shm_receiver, just changed shm_dispatcher to rtps_dispatcher

Apollo Cyber Study. P11 Transport 2

// Study: 是我的筆記

cyber/transport/dispatcher/dispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_
#define CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_

#include <atomic>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

#include "cyber/base/atomic_hash_map.h"
#include "cyber/base/atomic_rw_lock.h"
#include "cyber/common/global_data.h"
#include "cyber/common/log.h"
#include "cyber/proto/role_attributes.pb.h"
#include "cyber/transport/message/listener_handler.h"
#include "cyber/transport/message/message_info.h"

namespace apollo {
namespace cyber {
namespace transport {

using apollo::cyber::base::AtomicHashMap;
using apollo::cyber::base::AtomicRWLock;
using apollo::cyber::base::ReadLockGuard;
using apollo::cyber::base::WriteLockGuard;
using apollo::cyber::common::GlobalData;
using cyber::proto::RoleAttributes;

class Dispatcher;
using DispatcherPtr = std::shared_ptr<Dispatcher>;

template <typename MessageT>
// Study: Fuck this naming, if it is a callback, just name it xxxCallback
// Name it XXXListener will make people think this is a class instead of function
using MessageListener =
std::function<void(const std::shared_ptr<MessageT>&, const MessageInfo&)>;

class Dispatcher {
public:
Dispatcher();
virtual ~Dispatcher();

virtual void Shutdown();

template <typename MessageT>
void AddListener(const RoleAttributes& self_attr,
const MessageListener<MessageT>& listener);

template <typename MessageT>
void AddListener(const RoleAttributes& self_attr,
const RoleAttributes& opposite_attr,
const MessageListener<MessageT>& listener);

template <typename MessageT>
void RemoveListener(const RoleAttributes& self_attr);

template <typename MessageT>
void RemoveListener(const RoleAttributes& self_attr,
const RoleAttributes& opposite_attr);

bool HasChannel(uint64_t channel_id);

protected:
std::atomic<bool> is_shutdown_;
// key: channel_id of message
AtomicHashMap<uint64_t, ListenerHandlerBasePtr> msg_listeners_;
base::AtomicRWLock rw_lock_;
};

template <typename MessageT>
void Dispatcher::AddListener(const RoleAttributes& self_attr,
const MessageListener<MessageT>& listener) {
if (is_shutdown_.load()) {
return;
}
uint64_t channel_id = self_attr.channel_id();

// Study: ListenerHandler is just one more layer of abstract
// It make the work that can be done in 1 abstract level
// into 3 to 4 layer, fuck it
std::shared_ptr<ListenerHandler<MessageT>> handler;
ListenerHandlerBasePtr* handler_base = nullptr;
if (msg_listeners_.Get(channel_id, &handler_base)) {
handler =
std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
if (handler == nullptr) {
AERROR << "please ensure that readers with the same channel["
<< self_attr.channel_name()
<< "] in the same process have the same message type";
return;
}
} else {
ADEBUG << "new reader for channel:"
<< GlobalData::GetChannelById(channel_id);
// Study Do think too much, Just a abstract to maintain all callback
handler.reset(new ListenerHandler<MessageT>());
msg_listeners_.Set(channel_id, handler);
}
handler->Connect(self_attr.id(), listener);
}

template <typename MessageT>
void Dispatcher::AddListener(const RoleAttributes& self_attr,
const RoleAttributes& opposite_attr,
const MessageListener<MessageT>& listener) {
if (is_shutdown_.load()) {
return;
}
uint64_t channel_id = self_attr.channel_id();

std::shared_ptr<ListenerHandler<MessageT>> handler;
ListenerHandlerBasePtr* handler_base = nullptr;
if (msg_listeners_.Get(channel_id, &handler_base)) {
// Study: Assume all the existing handler for same channel is same type
handler =
std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
if (handler == nullptr) {
AERROR << "please ensure that readers with the same channel["
<< self_attr.channel_name()
<< "] in the same process have the same message type";
return;
}
} else {
ADEBUG << "new reader for channel:"
<< GlobalData::GetChannelById(channel_id);
handler.reset(new ListenerHandler<MessageT>());
msg_listeners_.Set(channel_id, handler);
}
handler->Connect(self_attr.id(), opposite_attr.id(), listener);
}

template <typename MessageT>
void Dispatcher::RemoveListener(const RoleAttributes& self_attr) {
if (is_shutdown_.load()) {
return;
}
uint64_t channel_id = self_attr.channel_id();

ListenerHandlerBasePtr* handler_base = nullptr;
if (msg_listeners_.Get(channel_id, &handler_base)) {
(*handler_base)->Disconnect(self_attr.id());
}
}

template <typename MessageT>
void Dispatcher::RemoveListener(const RoleAttributes& self_attr,
const RoleAttributes& opposite_attr) {
if (is_shutdown_.load()) {
return;
}
uint64_t channel_id = self_attr.channel_id();

ListenerHandlerBasePtr* handler_base = nullptr;
if (msg_listeners_.Get(channel_id, &handler_base)) {
(*handler_base)->Disconnect(self_attr.id(), opposite_attr.id());
}
}

} // namespace transport
} // namespace cyber
} // namespace apollo

#endif // CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_

cyber/transport/receiver/receiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_TRANSPORT_RECEIVER_RECEIVER_H_
#define CYBER_TRANSPORT_RECEIVER_RECEIVER_H_

#include <functional>
#include <memory>

#include "cyber/transport/common/endpoint.h"
#include "cyber/transport/message/history.h"
#include "cyber/transport/message/message_info.h"

namespace apollo {
namespace cyber {
namespace transport {

template <typename M>
class Receiver : public Endpoint {
public:
using MessagePtr = std::shared_ptr<M>;
using MessageListener = std::function<void(
const MessagePtr&, const MessageInfo&, const RoleAttributes&)>;

Receiver(const RoleAttributes& attr, const MessageListener& msg_listener);
virtual ~Receiver();

virtual void Enable() = 0;
virtual void Disable() = 0;
virtual void Enable(const RoleAttributes& opposite_attr) = 0;
virtual void Disable(const RoleAttributes& opposite_attr) = 0;

protected:
void OnNewMessage(const MessagePtr& msg, const MessageInfo& msg_info);

MessageListener msg_listener_;
};

template <typename M>
Receiver<M>::Receiver(const RoleAttributes& attr,
const MessageListener& msg_listener)
: Endpoint(attr), msg_listener_(msg_listener) {}

template <typename M>
Receiver<M>::~Receiver() {}

template <typename M>
void Receiver<M>::OnNewMessage(const MessagePtr& msg,
const MessageInfo& msg_info) {
if (msg_listener_ != nullptr) {
msg_listener_(msg, msg_info, attr_);
}
}

} // namespace transport
} // namespace cyber
} // namespace apollo

#endif // CYBER_TRANSPORT_RECEIVER_RECEIVER_H_

Apollo Cyber Study. P10 Transport

// Study: 是我的筆記

time to fight boss

  • How the transport connecto to data visitor

The transport will call the DataDispatcher in cyber/node/reader_base to provide data for
data visitor to visitor.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
transport::Transport::Instance()->CreateReceiver<MessageT>(
role_attr, [](const std::shared_ptr<MessageT>& msg,
const transport::MessageInfo& msg_info,
const proto::RoleAttributes& reader_attr) {
(void)msg_info;
(void)reader_attr;
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::TRANS_TO, reader_attr.channel_id(),
msg_info.seq_num());
data::DataDispatcher<MessageT>::Instance()->Dispatch(
reader_attr.channel_id(), msg);
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::WRITE_NOTIFY, reader_attr.channel_id(),
msg_info.seq_num());
});

cyber/transport/transport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_TRANSPORT_TRANSPORT_H_
#define CYBER_TRANSPORT_TRANSPORT_H_

#include <atomic>
#include <memory>
#include <string>

#include "cyber/common/macros.h"
#include "cyber/proto/transport_conf.pb.h"
#include "cyber/transport/dispatcher/intra_dispatcher.h"
#include "cyber/transport/dispatcher/rtps_dispatcher.h"
#include "cyber/transport/dispatcher/shm_dispatcher.h"
#include "cyber/transport/qos/qos_profile_conf.h"
#include "cyber/transport/receiver/hybrid_receiver.h"
#include "cyber/transport/receiver/intra_receiver.h"
#include "cyber/transport/receiver/receiver.h"
#include "cyber/transport/receiver/rtps_receiver.h"
#include "cyber/transport/receiver/shm_receiver.h"
#include "cyber/transport/rtps/participant.h"
#include "cyber/transport/shm/notifier_factory.h"
#include "cyber/transport/transmitter/hybrid_transmitter.h"
#include "cyber/transport/transmitter/intra_transmitter.h"
#include "cyber/transport/transmitter/rtps_transmitter.h"
#include "cyber/transport/transmitter/shm_transmitter.h"
#include "cyber/transport/transmitter/transmitter.h"

namespace apollo {
namespace cyber {
namespace transport {

using apollo::cyber::proto::OptionalMode;

class Transport {
public:
virtual ~Transport();

void Shutdown();

template <typename M>
auto CreateTransmitter(const RoleAttributes& attr,
const OptionalMode& mode = OptionalMode::HYBRID) ->
typename std::shared_ptr<Transmitter<M>>;

template <typename M>
auto CreateReceiver(const RoleAttributes& attr,
const typename Receiver<M>::MessageListener& msg_listener,
const OptionalMode& mode = OptionalMode::HYBRID) ->
typename std::shared_ptr<Receiver<M>>;

ParticipantPtr participant() const { return participant_; }

private:
void CreateParticipant();

std::atomic<bool> is_shutdown_;
ParticipantPtr participant_;
NotifierPtr notifier_;
IntraDispatcherPtr intra_dispatcher_;
ShmDispatcherPtr shm_dispatcher_;
RtpsDispatcherPtr rtps_dispatcher_;

DECLARE_SINGLETON(Transport)
};

template <typename M>
auto Transport::CreateTransmitter(const RoleAttributes& attr,
const OptionalMode& mode) ->
typename std::shared_ptr<Transmitter<M>> {
if (is_shutdown_.load()) {
AINFO << "transport has been shut down.";
return nullptr;
}

std::shared_ptr<Transmitter<M>> transmitter = nullptr;
RoleAttributes modified_attr = attr;

// Study: qos = quality of service,
// it defined the transport quality
// 1. history policy
// 2. capacity of history
// 3. message frequency
// 4. reliability
// 5. durability
if (!modified_attr.has_qos_profile()) {
modified_attr.mutable_qos_profile()->CopyFrom(
QosProfileConf::QOS_PROFILE_DEFAULT);
}

// Study: Support 4 transport mode - intra, shm, rtps, hybrid
switch (mode) {
// Study: intra = same process
case OptionalMode::INTRA:
transmitter = std::make_shared<IntraTransmitter<M>>(modified_attr);
break;

// Study: shm = shared memory
case OptionalMode::SHM:
transmitter = std::make_shared<ShmTransmitter<M>>(modified_attr);
break;

// Study: real time publish subscribe
case OptionalMode::RTPS:
transmitter =
std::make_shared<RtpsTransmitter<M>>(modified_attr, participant());
break;

default:
transmitter =
std::make_shared<HybridTransmitter<M>>(modified_attr, participant());
break;
}

RETURN_VAL_IF_NULL(transmitter, nullptr);
if (mode != OptionalMode::HYBRID) {
// Study: Init the transmitter
transmitter->Enable();
}
return transmitter;
}

// Study: Basically same as above, just change transmitter to receiver
template <typename M>
auto Transport::CreateReceiver(
const RoleAttributes& attr,
const typename Receiver<M>::MessageListener& msg_listener,
const OptionalMode& mode) -> typename std::shared_ptr<Receiver<M>> {
if (is_shutdown_.load()) {
AINFO << "transport has been shut down.";
return nullptr;
}

std::shared_ptr<Receiver<M>> receiver = nullptr;
RoleAttributes modified_attr = attr;
if (!modified_attr.has_qos_profile()) {
modified_attr.mutable_qos_profile()->CopyFrom(
QosProfileConf::QOS_PROFILE_DEFAULT);
}

switch (mode) {
case OptionalMode::INTRA:
receiver =
std::make_shared<IntraReceiver<M>>(modified_attr, msg_listener);
break;

case OptionalMode::SHM:
receiver = std::make_shared<ShmReceiver<M>>(modified_attr, msg_listener);
break;

case OptionalMode::RTPS:
receiver = std::make_shared<RtpsReceiver<M>>(modified_attr, msg_listener);
break;

default:
receiver = std::make_shared<HybridReceiver<M>>(
modified_attr, msg_listener, participant());
break;
}

RETURN_VAL_IF_NULL(receiver, nullptr);
if (mode != OptionalMode::HYBRID) {
receiver->Enable();
}
return receiver;
}

} // namespace transport
} // namespace cyber
} // namespace apollo

#endif // CYBER_TRANSPORT_TRANSPORT_H_

cyber/transport/shm/readable_info

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Study: ReadableInfo basically is a struct that have 3 element:
// host_id_, block_index_, channel_id_
// It just allow the serialize and deserialize of this struct
const size_t ReadableInfo::kSize = sizeof(uint64_t) * 2 + sizeof(uint32_t);

ReadableInfo::ReadableInfo() : host_id_(0), block_index_(0), channel_id_(0) {}

ReadableInfo::ReadableInfo(uint64_t host_id, uint32_t block_index,
uint64_t channel_id)
: host_id_(host_id), block_index_(block_index), channel_id_(channel_id) {}

ReadableInfo::~ReadableInfo() {}

ReadableInfo& ReadableInfo::operator=(const ReadableInfo& other) {
if (this != &other) {
this->host_id_ = other.host_id_;
this->block_index_ = other.block_index_;
this->channel_id_ = other.channel_id_;
}
return *this;
}

// Study: Actually, it will be much better to allow put in char array directly
// Using a string for serialize is always a bad idea
bool ReadableInfo::SerializeTo(std::string* dst) const {
RETURN_VAL_IF_NULL(dst, false);

dst->assign(reinterpret_cast<char*>(const_cast<uint64_t*>(&host_id_)),
sizeof(host_id_));
dst->append(reinterpret_cast<char*>(const_cast<uint32_t*>(&block_index_)),
sizeof(block_index_));
dst->append(reinterpret_cast<char*>(const_cast<uint64_t*>(&channel_id_)),
sizeof(channel_id_));
return true;
}

bool ReadableInfo::DeserializeFrom(const std::string& src) {
return DeserializeFrom(src.data(), src.size());
}

// Study: Read data byte by byte
bool ReadableInfo::DeserializeFrom(const char* src, std::size_t len) {
RETURN_VAL_IF_NULL(src, false);
if (len != kSize) {
AWARN << "src size[" << len << "] mismatch.";
return false;
}

char* ptr = const_cast<char*>(src);
memcpy(reinterpret_cast<char*>(&host_id_), ptr, sizeof(host_id_));
ptr += sizeof(host_id_);
memcpy(reinterpret_cast<char*>(&block_index_), ptr, sizeof(block_index_));
ptr += sizeof(block_index_);
memcpy(reinterpret_cast<char*>(&channel_id_), ptr, sizeof(channel_id_));

return true;
}

cyber/transport/shm/notifier_base

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Study: 就是一個callback
class NotifierBase;
using NotifierPtr = NotifierBase*;

class NotifierBase {
public:
virtual ~NotifierBase() = default;

virtual void Shutdown() = 0;

// Study: Put a data into it
virtual bool Notify(const ReadableInfo& info) = 0;
// Study: Anyone can call this to wait at most timeout_ms
// until someone have call Notify()
virtual bool Listen(int timeout_ms, ReadableInfo* info) = 0;
};

cyber/transport/shm/condition_notifier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

const uint32_t kBufLength = 4096;

// Study: ConditionNotifier
class ConditionNotifier : public NotifierBase {
// Study: Used a circular to store data, and added a mtx to protect multi-thread issue
// use cv to wait new data. This data is put into the shared memory
struct Indicator {
std::mutex mtx;
std::condition_variable cv;
uint64_t written_info_num = 0;
ReadableInfo infos[kBufLength];
};

public:
virtual ~ConditionNotifier();

void Shutdown() override;
bool Notify(const ReadableInfo& info) override;
bool Listen(int timeout_ms, ReadableInfo* info) override;

static const char* Type() { return "condition"; }

private:
bool Init();
// Study: Operation to the shared memory
bool OpenOrCreate();
bool OpenOnly();
bool Remove();
void Reset();

key_t key_ = 0;
// Study: The shared memory used to share indicator_ among process
void* managed_shm_ = nullptr;
size_t shm_size_ = 0;
Indicator* indicator_ = nullptr;
uint64_t next_listen_num_ = 0;
std::atomic<bool> is_shutdown_ = {false};

DECLARE_SINGLETON(ConditionNotifier)
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/transport/shm/condition_notifier.h"

#include <pthread.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <thread>

#include "cyber/common/log.h"
#include "cyber/common/util.h"

namespace apollo {
namespace cyber {
namespace transport {

using common::Hash;

ConditionNotifier::ConditionNotifier() {
// Study: The unique key for the shared memory
// Different process use the same key to access the same shared memory
key_ = static_cast<key_t>(Hash("/apollo/cyber/transport/shm/notifier"));
ADEBUG << "condition notifier key: " << key_;
shm_size_ = sizeof(Indicator);

if (!Init()) {
AERROR << "fail to init condition notifier.";
is_shutdown_.exchange(true);
}
}

ConditionNotifier::~ConditionNotifier() { Shutdown(); }

void ConditionNotifier::Shutdown() {
if (is_shutdown_.exchange(true)) {
return;
}

indicator_->cv.notify_all();
// Study: Ensure break the thread that called Listen
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Reset();
}

bool ConditionNotifier::Notify(const ReadableInfo& info) {
if (is_shutdown_.load()) {
ADEBUG << "notifier is shutdown.";
return false;
}

{
// Study: Add the data into the circular array
std::unique_lock<std::mutex> lck(indicator_->mtx);
auto idx = indicator_->written_info_num % kBufLength;
indicator_->infos[idx] = info;
++indicator_->written_info_num;
}

// Study: Let the thread who listening know new data come
indicator_->cv.notify_all();

return true;
}

bool ConditionNotifier::Listen(int timeout_ms, ReadableInfo* info) {
if (info == nullptr) {
AERROR << "info nullptr.";
return false;
}

if (is_shutdown_.load()) {
ADEBUG << "notifier is shutdown.";
return false;
}

std::unique_lock<std::mutex> lck(indicator_->mtx);
// Study: Check whether is listening to a not exist message in shared memory
// since the indicator live in shared memory in some case
// you cannot gurannte the value of written_info_num
if (next_listen_num_ >= indicator_->written_info_num) {
uint64_t target = next_listen_num_;
// Study: If the value not exist, then wait
if (!indicator_->cv.wait_for(
lck, std::chrono::milliseconds(timeout_ms), [target, this]() {
return this->indicator_->written_info_num > target ||
this->is_shutdown_.load();
})) {
ADEBUG << "timeout";
return false;
}

if (is_shutdown_.load()) {
AINFO << "notifier is shutdown.";
return false;
}
}

// Study: A new ConditionNotifier, need follow to the lastest of indicator_
if (next_listen_num_ == 0) {
next_listen_num_ = indicator_->written_info_num - 1;
}

auto idx = next_listen_num_ % kBufLength;
*info = indicator_->infos[idx];
next_listen_num_ += 1;

return true;
}

bool ConditionNotifier::Init() { return OpenOrCreate(); }

bool ConditionNotifier::OpenOrCreate() {
// create managed_shm_
int retry = 0;
int shmid = 0;
while (retry < 2) {
// Study: Create the shared memory
shmid = shmget(key_, shm_size_, 0644 | IPC_CREAT | IPC_EXCL);
if (shmid != -1) {
break;
}

if (EINVAL == errno) {
AINFO << "need larger space, recreate.";
Reset();
Remove();
++retry;
} else if (EEXIST == errno) {
ADEBUG << "shm already exist, open only.";
return OpenOnly();
} else {
break;
}
}

if (shmid == -1) {
AERROR << "create shm failed, error code: " << strerror(errno);
return false;
}

// attach managed_shm_
managed_shm_ = shmat(shmid, nullptr, 0);
if (managed_shm_ == reinterpret_cast<void*>(-1)) {
AERROR << "attach shm failed.";
shmctl(shmid, IPC_RMID, 0);
return false;
}

// create indicator_
// Study: Create the indicator_ using the created shared memory
indicator_ = new (managed_shm_) Indicator();
if (indicator_ == nullptr) {
AERROR << "create indicator failed.";
shmdt(managed_shm_);
managed_shm_ = nullptr;
shmctl(shmid, IPC_RMID, 0);
return false;
}

// Study: Necessary to set the mutex attr if want it to work in shared memory mode
pthread_mutexattr_t mtx_attr;
pthread_mutexattr_init(&mtx_attr);
pthread_mutexattr_setpshared(&mtx_attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(indicator_->mtx.native_handle(), &mtx_attr);

pthread_condattr_t cond_attr;
pthread_condattr_init(&cond_attr);
pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(indicator_->cv.native_handle(), &cond_attr);

ADEBUG << "open or create true.";
return true;
}

// Study: The indicator_ only need created by one process,
// The other people just need open it
bool ConditionNotifier::OpenOnly() {
// get managed_shm_
int shmid = shmget(key_, 0, 0644);
if (shmid == -1) {
AERROR << "get shm failed.";
return false;
}

// attach managed_shm_
managed_shm_ = shmat(shmid, nullptr, 0);
if (managed_shm_ == reinterpret_cast<void*>(-1)) {
AERROR << "attach shm failed.";
return false;
}

// get indicator_
indicator_ = reinterpret_cast<Indicator*>(managed_shm_);
if (indicator_ == nullptr) {
AERROR << "get indicator failed.";
shmdt(managed_shm_);
managed_shm_ = nullptr;
return false;
}

ADEBUG << "open true.";
return true;
}

bool ConditionNotifier::Remove() {
int shmid = shmget(key_, 0, 0644);
if (shmid == -1 || shmctl(shmid, IPC_RMID, 0) == -1) {
AERROR << "remove shm failed, error code: " << strerror(errno);
return false;
}
ADEBUG << "remove success.";

return true;
}

void ConditionNotifier::Reset() {
indicator_ = nullptr;
if (managed_shm_ != nullptr) {
shmdt(managed_shm_);
managed_shm_ = nullptr;
}
}

} // namespace transport
} // namespace cyber
} // namespace apollo

cyver/transport/shm/multicast_notifier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
MulticastNotifier::MulticastNotifier() {
if (!Init()) {
Shutdown();
}
}

MulticastNotifier::~MulticastNotifier() { Shutdown(); }

void MulticastNotifier::Shutdown() {
if (is_shutdown_.exchange(true)) {
return;
}

if (notify_fd_ != -1) {
close(notify_fd_);
notify_fd_ = -1;
}
memset(&notify_addr_, 0, sizeof(notify_addr_));

if (listen_fd_ != -1) {
close(listen_fd_);
listen_fd_ = -1;
}
memset(&listen_addr_, 0, sizeof(listen_addr_));
}

bool MulticastNotifier::Notify(const ReadableInfo& info) {
if (is_shutdown_.load()) {
return false;
}

std::string info_str;
info.SerializeTo(&info_str);
ssize_t nbytes =
sendto(notify_fd_, info_str.c_str(), info_str.size(), 0,
(struct sockaddr*)&notify_addr_, sizeof(notify_addr_));
return nbytes > 0;
}

bool MulticastNotifier::Listen(int timeout_ms, ReadableInfo* info) {
if (is_shutdown_.load()) {
return false;
}

if (info == nullptr) {
AERROR << "info nullptr.";
return false;
}

struct pollfd fds;
fds.fd = listen_fd_;
fds.events = POLLIN;
// Study: Poll data from the multicast network
int ready_num = poll(&fds, 1, timeout_ms);
if (ready_num > 0) {
char buf[32] = {0}; // larger than ReadableInfo::kSize
ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
if (nbytes == -1) {
AERROR << "fail to recvfrom, " << strerror(errno);
return false;
}
return info->DeserializeFrom(buf, nbytes);
} else if (ready_num == 0) {
ADEBUG << "timeout, no readableinfo.";
} else {
if (errno == EINTR) {
AINFO << "poll was interrupted.";
} else {
AERROR << "fail to poll, " << strerror(errno);
}
}
return false;
}

bool MulticastNotifier::Init() {
// Study: the ip range from 224.0.0.0 to 239.255.255.255 is targeted for multicasting
std::string mcast_ip("239.255.0.100");
uint16_t mcast_port = 8888;

// Study: If user have a custom ip and port, use it
auto& g_conf = GlobalData::Instance()->Config();
if (g_conf.has_transport_conf() && g_conf.transport_conf().has_shm_conf() &&
g_conf.transport_conf().shm_conf().has_shm_locator()) {
auto& locator = g_conf.transport_conf().shm_conf().shm_locator();
mcast_ip = locator.ip();
mcast_port = static_cast<uint16_t>(locator.port());
}

ADEBUG << "multicast notifier ip: " << mcast_ip;
ADEBUG << "multicast notifier port: " << mcast_port;

// Study: Create socket for send notify
notify_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
if (notify_fd_ == -1) {
AERROR << "fail to create notify fd, " << strerror(errno);
return false;
}

memset(&notify_addr_, 0, sizeof(notify_addr_));
notify_addr_.sin_family = AF_INET;
notify_addr_.sin_addr.s_addr = inet_addr(mcast_ip.c_str());
notify_addr_.sin_port = htons(mcast_port);

// Study: Create socket for listen
listen_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
if (listen_fd_ == -1) {
AERROR << "fail to create listen fd, " << strerror(errno);
return false;
}

if (fcntl(listen_fd_, F_SETFL, O_NONBLOCK) == -1) {
AERROR << "fail to set listen fd nonblock, " << strerror(errno);
return false;
}

memset(&listen_addr_, 0, sizeof(listen_addr_));
listen_addr_.sin_family = AF_INET;
listen_addr_.sin_addr.s_addr = htonl(INADDR_ANY);
listen_addr_.sin_port = htons(mcast_port);

int yes = 1;
if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
AERROR << "fail to setsockopt SO_REUSEADDR, " << strerror(errno);
return false;
}

if (bind(listen_fd_, (struct sockaddr*)&listen_addr_, sizeof(listen_addr_)) <
0) {
AERROR << "fail to bind addr, " << strerror(errno);
return false;
}

int loop = 1;
if (setsockopt(listen_fd_, IPPROTO_IP, IP_MULTICAST_LOOP, &loop,
sizeof(loop)) < 0) {
AERROR << "fail to setsockopt IP_MULTICAST_LOOP, " << strerror(errno);
return false;
}

struct ip_mreq mreq;
mreq.imr_multiaddr.s_addr = inet_addr(mcast_ip.c_str());
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
if (setsockopt(listen_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0) {
AERROR << "fail to setsockopt IP_ADD_MEMBERSHIP, " << strerror(errno);
return false;
}

return true;
}

cyber/transport/shm/state

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Study: The state for segment
// This guarantee the thread safe issue
class State {
public:
explicit State(const uint64_t& ceiling_msg_size);
virtual ~State();

void IncreaseWroteNum() { wrote_num_.fetch_add(1); }
void ResetWroteNum() { wrote_num_.store(0); }

void DecreaseReferenceCounts() {
uint32_t current_reference_count = reference_count_.load();
do {
if (current_reference_count == 0) {
return;
}
} while (!reference_count_.compare_exchange_strong(
current_reference_count, current_reference_count - 1));
}

void IncreaseReferenceCounts() { reference_count_.fetch_add(1); }

void set_need_remap(bool need) { need_remap_.store(need); }
bool need_remap() { return need_remap_; }

uint64_t ceiling_msg_size() { return ceiling_msg_size_.load(); }
uint32_t reference_counts() { return reference_count_.load(); }
uint32_t wrote_num() { return wrote_num_.load(); }

private:
std::atomic<bool> need_remap_ = {false};
std::atomic<uint32_t> wrote_num_ = {0};
std::atomic<uint32_t> reference_count_ = {0};
std::atomic<uint64_t> ceiling_msg_size_;
};

cyber/transport/shm/segment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367

Segment::Segment(uint64_t channel_id, const ReadWriteMode& mode)
: init_(false),
mode_(mode),
conf_(),
state_(nullptr),
blocks_(nullptr),
managed_shm_(nullptr),
block_buf_lock_(),
block_buf_addrs_() {
// Study: Obviously, one channel only own one segment
id_ = static_cast<key_t>(channel_id);
}

Segment::~Segment() { Destroy(); }

bool Segment::AcquireBlockToWrite(std::size_t msg_size,
WritableBlock* writable_block) {
RETURN_VAL_IF_NULL(writable_block, false);
if (!init_ && !Init()) {
AERROR << "init failed, can't write now.";
return false;
}

bool result = true;
if (state_->need_remap()) {
result = Remap();
}

// Study: Check whether the msg size is larger that
if (msg_size > conf_.ceiling_msg_size()) {
conf_.Update(msg_size);
result = Recreate();
}

if (!result) {
AERROR << "segment update failed.";
return false;
}

uint32_t index = GetNextWritableBlockIndex();
writable_block->index = index;
writable_block->block = &blocks_[index];
writable_block->buf = block_buf_addrs_[index];
return true;
}

void Segment::ReleaseWrittenBlock(const WritableBlock& writable_block) {
auto index = writable_block.index;
if (index >= conf_.block_num()) {
return;
}
blocks_[index].ReleaseWriteLock();
}

// Study: read a block after locked it
bool Segment::AcquireBlockToRead(ReadableBlock* readable_block) {
RETURN_VAL_IF_NULL(readable_block, false);

if (!init_ && !Init()) {
AERROR << "init failed, can't read now.";
return false;
}
auto index = readable_block->index;
if (index >= conf_.block_num()) {
AERROR << "invalid block_index[" << index << "].";
return false;
}

bool result = true;
if (state_->need_remap()) {
result = Remap();
}

if (!result) {
AERROR << "segment update failed.";
return false;
}

if (!blocks_[index].TryLockForRead()) {
return false;
}
readable_block->block = blocks_ + index;
readable_block->buf = block_buf_addrs_[index];
return true;
}

// Study: Unlock the block after readed it
void Segment::ReleaseReadBlock(const ReadableBlock& readable_block) {
auto index = readable_block.index;
if (index >= conf_.block_num()) {
return;
}
blocks_[index].ReleaseReadLock();
}

bool Segment::Init() {
if (mode_ == READ_ONLY) {
return OpenOnly();
} else {
return OpenOrCreate();
}
}

bool Segment::OpenOrCreate() {
if (init_) {
return true;
}

// create managed_shm_
int retry = 0;
int shmid = 0;
while (retry < 2) {
// Study: Create shared memory
shmid = shmget(id_, conf_.managed_shm_size(), 0644 | IPC_CREAT | IPC_EXCL);
if (shmid != -1) {
break;
}

if (EINVAL == errno) {
AINFO << "need larger space, recreate.";
Reset();
Remove();
++retry;
} else if (EEXIST == errno) {
ADEBUG << "shm already exist, open only.";
return OpenOnly();
} else {
break;
}
}

if (shmid == -1) {
AERROR << "create shm failed, error code: " << strerror(errno);
return false;
}

// attach managed_shm_
// Study: Using the managed_shm_ to get the shared memory ptr
managed_shm_ = shmat(shmid, nullptr, 0);
if (managed_shm_ == reinterpret_cast<void*>(-1)) {
AERROR << "attach shm failed.";
shmctl(shmid, IPC_RMID, 0);
return false;
}

// create field state_
// Study: The state is put in shared memory
state_ = new (managed_shm_) State(conf_.ceiling_msg_size());
if (state_ == nullptr) {
AERROR << "create state failed.";
shmdt(managed_shm_);
managed_shm_ = nullptr;
shmctl(shmid, IPC_RMID, 0);
return false;
}

// Study: The ceiling_msg_size will be update, so need sync
conf_.Update(state_->ceiling_msg_size());

// create field blocks_
// Study: Used the remain memory to create the block
blocks_ = new (static_cast<char*>(managed_shm_) + sizeof(State))
Block[conf_.block_num()];
if (blocks_ == nullptr) {
AERROR << "create blocks failed.";
state_->~State();
state_ = nullptr;
shmdt(managed_shm_);
managed_shm_ = nullptr;
shmctl(shmid, IPC_RMID, 0);
return false;
}

// create block buf
// Study: Use the remain remain memory to create the block buf addr map
// Size of block_buf_addrs_ is fixed
uint32_t i = 0;
for (; i < conf_.block_num(); ++i) {
uint8_t* addr =
new (static_cast<char*>(managed_shm_) + sizeof(State) +
conf_.block_num() * sizeof(Block) + i * conf_.block_buf_size())
uint8_t[conf_.block_buf_size()];
std::lock_guard<std::mutex> _g(block_buf_lock_);
block_buf_addrs_[i] = addr;
}

// Study: Cleanup if failure
if (i != conf_.block_num()) {
AERROR << "create block buf failed.";
state_->~State();
state_ = nullptr;
blocks_ = nullptr;
{
std::lock_guard<std::mutex> _g(block_buf_lock_);
block_buf_addrs_.clear();
}
shmdt(managed_shm_);
managed_shm_ = nullptr;
shmctl(shmid, IPC_RMID, 0);
return false;
}

// Study: One more process referenced to this shared memory
state_->IncreaseReferenceCounts();
init_ = true;
ADEBUG << "open or create true.";
return true;
}

bool Segment::OpenOnly() {
if (init_) {
return true;
}

// get managed_shm_
int shmid = shmget(id_, 0, 0644);
if (shmid == -1) {
AERROR << "get shm failed.";
return false;
}

// attach managed_shm_
managed_shm_ = shmat(shmid, nullptr, 0);
if (managed_shm_ == reinterpret_cast<void*>(-1)) {
AERROR << "attach shm failed.";
return false;
}

// get field state_
state_ = reinterpret_cast<State*>(managed_shm_);
if (state_ == nullptr) {
AERROR << "get state failed.";
shmdt(managed_shm_);
managed_shm_ = nullptr;
return false;
}

conf_.Update(state_->ceiling_msg_size());

// get field blocks_
blocks_ = reinterpret_cast<Block*>(static_cast<char*>(managed_shm_) +
sizeof(State));
if (blocks_ == nullptr) {
AERROR << "get blocks failed.";
state_ = nullptr;
shmdt(managed_shm_);
managed_shm_ = nullptr;
return false;
}

// get block buf
uint32_t i = 0;
for (; i < conf_.block_num(); ++i) {
uint8_t* addr = reinterpret_cast<uint8_t*>(
static_cast<char*>(managed_shm_) + sizeof(State) +
conf_.block_num() * sizeof(Block) + i * conf_.block_buf_size());

if (addr == nullptr) {
break;
}

std::lock_guard<std::mutex> _g(block_buf_lock_);
block_buf_addrs_[i] = addr;
}

if (i != conf_.block_num()) {
AERROR << "open only failed.";
state_->~State();
state_ = nullptr;
blocks_ = nullptr;
{
std::lock_guard<std::mutex> _g(block_buf_lock_);
block_buf_addrs_.clear();
}
shmdt(managed_shm_);
managed_shm_ = nullptr;
shmctl(shmid, IPC_RMID, 0);
return false;
}

state_->IncreaseReferenceCounts();
init_ = true;
ADEBUG << "open only true.";
return true;
}

bool Segment::Remove() {
int shmid = shmget(id_, 0, 0644);
if (shmid == -1 || shmctl(shmid, IPC_RMID, 0) == -1) {
AERROR << "remove shm failed, error code: " << strerror(errno);
return false;
}

ADEBUG << "remove success.";
return true;
}

bool Segment::Destroy() {
if (!init_) {
return true;
}
init_ = false;

try {
state_->DecreaseReferenceCounts();
uint32_t reference_counts = state_->reference_counts();
if (reference_counts == 0) {
return Remove();
}
} catch (...) {
AERROR << "exception.";
return false;
}
ADEBUG << "destory.";
return true;
}

// Study: Reset the shared memory and the actualy content
void Segment::Reset() {
state_ = nullptr;
blocks_ = nullptr;
{
std::lock_guard<std::mutex> _g(block_buf_lock_);
block_buf_addrs_.clear();
}

if (managed_shm_ != nullptr) {
shmdt(managed_shm_);
managed_shm_ = nullptr;
}
}

// Study: Map again to the shared memory
bool Segment::Remap() {
init_ = false;
ADEBUG << "before reset.";
Reset();
ADEBUG << "after reset.";
return OpenOnly();
}

bool Segment::Recreate() {
init_ = false;
state_->set_need_remap(true);
Reset();
Remove();
return OpenOrCreate();
}

uint32_t Segment::GetNextWritableBlockIndex() {
uint32_t try_idx = state_->wrote_num();

auto max_mod_num = conf_.block_num() - 1;
while (1) {
if (try_idx >= conf_.block_num()) {
try_idx &= max_mod_num;
}

if (blocks_[try_idx].TryLockForWrite()) {
state_->IncreaseWroteNum();
return try_idx;
}

++try_idx;
}
}

cyber/transport/dispatcher/shm_dispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/transport/dispatcher/shm_dispatcher.h"
#include "cyber/common/global_data.h"
#include "cyber/common/util.h"
#include "cyber/scheduler/scheduler_factory.h"
#include "cyber/transport/shm/readable_info.h"

namespace apollo {
namespace cyber {
namespace transport {

using common::GlobalData;

ShmDispatcher::ShmDispatcher() : host_id_(0) { Init(); }

ShmDispatcher::~ShmDispatcher() { Shutdown(); }

void ShmDispatcher::Shutdown() {
if (is_shutdown_.exchange(true)) {
return;
}

// Study: The ShmDispatcher
if (thread_.joinable()) {
thread_.join();
}

{
ReadLockGuard<AtomicRWLock> lock(segments_lock_);
segments_.clear();
}
}

// Study: We can see that one ShmDispatcher may interace to multiple channel
void ShmDispatcher::AddSegment(const RoleAttributes& self_attr) {
uint64_t channel_id = self_attr.channel_id();
WriteLockGuard<AtomicRWLock> lock(segments_lock_);
// Study: One channel only allow have one memory segment
if (segments_.count(channel_id) > 0) {
return;
}
// Study need new place, allocate it
auto segment = std::make_shared<Segment>(channel_id, READ_ONLY);
segments_[channel_id] = segment;
previous_indexes_[channel_id] = UINT32_MAX;
}

void ShmDispatcher::ReadMessage(uint64_t channel_id, uint32_t block_index) {
ADEBUG << "Reading sharedmem message: "
<< GlobalData::GetChannelById(channel_id)
<< " from block: " << block_index;
auto rb = std::make_shared<ReadableBlock>();
rb->index = block_index;

// Study: If want to read data, lock it first
if (!segments_[channel_id]->AcquireBlockToRead(rb.get())) {
AWARN << "fail to acquire block, channel: "
<< GlobalData::GetChannelById(channel_id)
<< " index: " << block_index;
return;
}

MessageInfo msg_info;
const char* msg_info_addr =
reinterpret_cast<char*>(rb->buf) + rb->block->msg_size();
// Study: Get the data from memory and change it to a object
if (msg_info.DeserializeFrom(msg_info_addr, rb->block->msg_info_size())) {
// Study: Callback
OnMessage(channel_id, rb, msg_info);
} else {
AERROR << "error msg info of channel:"
<< GlobalData::GetChannelById(channel_id);
}

// Study: Always release after acquire
segments_[channel_id]->ReleaseReadBlock(*rb);
}

void ShmDispatcher::OnMessage(uint64_t channel_id,
const std::shared_ptr<ReadableBlock>& rb,
const MessageInfo& msg_info) {
if (is_shutdown_.load()) {
return;
}
ListenerHandlerBasePtr* handler_base = nullptr;
// Study: Someone is listening to the newest data in this channel
if (msg_listeners_.Get(channel_id, &handler_base)) {
auto handler = std::dynamic_pointer_cast<ListenerHandler<ReadableBlock>>(
*handler_base);
handler->Run(rb, msg_info);
} else {
AERROR << "Cant find " << GlobalData::GetChannelById(channel_id)
<< "'s handler.";
}
}

void ShmDispatcher::ThreadFunc() {
ReadableInfo readable_info;
while (!is_shutdown_.load()) {
// Study: Check whether new message come
if (!notifier_->Listen(100, &readable_info)) {
ADEBUG << "listen failed.";
continue;
}

// Study: Where the info come
uint64_t host_id = readable_info.host_id();
if (host_id != host_id_) {
ADEBUG << "shm readable info from other host.";
continue;
}

// Study: channel id, and the index of new data
uint64_t channel_id = readable_info.channel_id();
uint32_t block_index = readable_info.block_index();

{
ReadLockGuard<AtomicRWLock> lock(segments_lock_);
// Study: The new message is not related to me
if (segments_.count(channel_id) == 0) {
continue;
}
// Study: History index
// check block index
if (previous_indexes_.count(channel_id) == 0) {
previous_indexes_[channel_id] = UINT32_MAX;
}
uint32_t& previous_index = previous_indexes_[channel_id];
if (block_index != 0 && previous_index != UINT32_MAX) {
if (block_index == previous_index) {
ADEBUG << "Receive SAME index " << block_index << " of channel "
<< channel_id;
} else if (block_index < previous_index) {
ADEBUG << "Receive PREVIOUS message. last: " << previous_index
<< ", now: " << block_index;
} else if (block_index - previous_index > 1) {
ADEBUG << "Receive JUMP message. last: " << previous_index
<< ", now: " << block_index;
}
}
previous_index = block_index;

// Study: let the msg_listeners_ do callback
ReadMessage(channel_id, block_index);
}
}
}

bool ShmDispatcher::Init() {
host_id_ = common::Hash(GlobalData::Instance()->HostIp());
notifier_ = NotifierFactory::CreateNotifier();
thread_ = std::thread(&ShmDispatcher::ThreadFunc, this);
// Study: Not managed by scheduler
scheduler::Instance()->SetInnerThreadAttr(&thread_, "shm_disp");
return true;
}


template <typename MessageT>
void ShmDispatcher::AddListener(const RoleAttributes& self_attr,
const MessageListener<MessageT>& listener) {
// FIXME: make it more clean
// Study: Basically same as listener(x, y), just added a data Parsing
auto listener_adapter = [listener](const std::shared_ptr<ReadableBlock>& rb,
const MessageInfo& msg_info) {
auto msg = std::make_shared<MessageT>();
RETURN_IF(!message::ParseFromArray(
rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
listener(msg, msg_info);
};

Dispatcher::AddListener<ReadableBlock>(self_attr, listener_adapter);
// Study: Ensure the shared memory for message data exist
// The channel info is included in self_attr
AddSegment(self_attr);
}

template <typename MessageT>
void ShmDispatcher::AddListener(const RoleAttributes& self_attr,
const RoleAttributes& opposite_attr,
const MessageListener<MessageT>& listener) {
// FIXME: make it more clean
auto listener_adapter = [listener](const std::shared_ptr<ReadableBlock>& rb,
const MessageInfo& msg_info) {
auto msg = std::make_shared<MessageT>();
RETURN_IF(!message::ParseFromArray(
rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
listener(msg, msg_info);
};

Dispatcher::AddListener<ReadableBlock>(self_attr, opposite_attr,
listener_adapter);
// Study: Ensure the shared memory for message data exist
AddSegment(self_attr);
}

} // namespace transport
} // namespace cyber
} // namespace apollo

Apollo Cyber Study. P9 logger

// Study: 是我的筆記

cyber/logger/logger_util

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
inline int64_t CycleClock_Now() {
struct timeval tv;
gettimeofday(&tv, nullptr);
return static_cast<int64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
}

static inline void GetHostName(std::string* hostname) {
struct utsname buf;
// Study: http://man7.org/linux/man-pages/man2/uname.2.html
// a syscall for uname
if (0 != uname(&buf)) {
*buf.nodename = '\0';
}
*hostname = buf.nodename;
}

const std::vector<std::string>& GetLoggingDirectories();

int32_t GetMainThreadPid();

bool PidHasChanged();

inline int32_t MaxLogSize() {
return (FLAGS_max_log_size > 0 ? FLAGS_max_log_size : 1);
}

// Study: If the log_messaeg already have [xx], use xx as modules_name
// Otherwise, use the ProcessGroup (set in conf) as module name
inline void FindModuleName(std::string* log_message, std::string* module_name) {
auto lpos = log_message->find('[');
if (lpos != std::string::npos) {
auto rpos = log_message->find(']', lpos);
if (rpos != std::string::npos) {
module_name->assign(*log_message, lpos + 1, rpos - lpos - 1);
auto cut_length = rpos - lpos + 1;
log_message->erase(lpos, cut_length);
}
}
if (module_name->empty()) {
*module_name = common::GlobalData::Instance()->ProcessGroup();
}
}


namespace {
// Study: syscall
static int32_t g_main_thread_pid = getpid();
}

int32_t GetMainThreadPid() { return g_main_thread_pid; }

bool PidHasChanged() {
int32_t pid = getpid();
if (g_main_thread_pid == pid) {
return false;
}
g_main_thread_pid = pid;
return true;
}

const std::vector<std::string>& GetLoggingDirectories() {
static std::vector<std::string> logging_directories_list;
if (logging_directories_list.empty()) {
if (!FLAGS_log_dir.empty()) {
logging_directories_list.emplace_back(FLAGS_log_dir.c_str());
} else {
logging_directories_list.emplace_back("./");
}
}
return logging_directories_list;
}

cyber/logger/log_file_object

Take over the low level file-interactive function. Can be seem as the real logger

cyber/logger/async_logger

Just watch its describe is ok

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Wrapper for a glog Logger which asynchronously writes log messages.
// This class starts a new thread responsible for forwarding the messages
// to the logger, and performs double buffering. Writers append to the
// current buffer and then wake up the logger thread. The logger swaps in
// a new buffer and writes any accumulated messages to the wrapped
// Logger.
//
// This double-buffering design dramatically improves performance, especially
// for logging messages which require flushing the underlying file (i.e WARNING
// and above for default). The flush can take a couple of milliseconds, and in
// some cases can even block for hundreds of milliseconds or more. With the
// double-buffered approach, threads can proceed with useful work while the IO
// thread blocks.
//
// The semantics provided by this wrapper are slightly weaker than the default
// glog semantics. By default, glog will immediately (synchronously) flush
// WARNING
// and above to the underlying file, whereas here we are deferring that flush to
// a separate thread. This means that a crash just after a 'LOG_WARN' would
// may be missing the message in the logs, but the perf benefit is probably
// worth it. We do take care that a glog FATAL message flushes all buffered log
// messages before exiting.
//
// NOTE: the logger limits the total amount of buffer space, so if the
// underlying
// log blocks for too long, eventually the threads generating the log messages
// will block as well. This prevents runaway memory usage.

This logger is registered using this,
then can use the regular glog operation to use it

1
google::base::SetLogger(FLAGS_minloglevel, async_logger);

cyber/logger/logger

Similiar to regular glog, but use LogFileObject to maintain the underlying write and naming, etc
Don’t have the double buffering, etc.

Apollo Cyber Study. P8 io

先說一下 cyber/iocyber內部是沒有用到的一個模塊。
modules那也沒有用到。而且它是直接操作epoll event層,我們都直接走grpc,感覺之後也沒有機會用到
不過也看一看吧。

// Study: 是我的筆記

cyber/io/poll_data

The basic data structure for the io modules.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

struct PollResponse {
explicit PollResponse(uint32_t e = 0) : events(e) {}

uint32_t events;
};

// Study: A request to get data from io.
// And what should be call after the io really come
struct PollRequest {
int fd = -1;
uint32_t events = 0;
int timeout_ms = -1;
std::function<void(const PollResponse&)> callback = nullptr;
};

struct PollCtrlParam {
int operation;
// Study: File descriptor
int fd;
// Study: epoll is a way to get data from io\
epoll_event event;
};

cyber/io/poller

Poller is a singleton

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
class Poller {
public:
using RequestPtr = std::shared_ptr<PollRequest>;
using RequestMap = std::unordered_map<int, RequestPtr>;
using CtrlParamMap = std::unordered_map<int, PollCtrlParam>;

virtual ~Poller();

void Shutdown();

bool Register(const PollRequest& req);
bool Unregister(const PollRequest& req);

private:
bool Init();
void Clear();
void Poll(int timeout_ms);
void ThreadFunc();
void HandleChanges();
int GetTimeoutMs();
void Notify();

int epoll_fd_ = -1;
std::thread thread_;
std::atomic<bool> is_shutdown_ = {true};

int pipe_fd_[2] = {-1, -1};
std::mutex pipe_mutex_;

RequestMap requests_;
CtrlParamMap ctrl_params_;
base::AtomicRWLock poll_data_lock_;

const int kPollSize = 32;
const int kPollTimeoutMs = 100;

DECLARE_SINGLETON(Poller)
};

Poller::Poller() {
if (!Init()) {
AERROR << "Poller init failed!";
Clear();
}
}

Poller::~Poller() { Shutdown(); }

void Poller::Shutdown() {
if (is_shutdown_.exchange(true)) {
return;
}
Clear();
}

bool Poller::Register(const PollRequest& req) {
// Study: Same as if(is_shutdown)
if (is_shutdown_.load()) {
return false;
}

// Study: file descriptor would not less than 0
if (req.fd < 0 || req.callback == nullptr) {
AERROR << "input is invalid";
return false;
}

PollCtrlParam ctrl_param;
ctrl_param.fd = req.fd;
ctrl_param.event.data.fd = req.fd;
ctrl_param.event.events = req.events;

{
// Study: Since the io may happen frequently while this poller is a singleton
// A read write lock is required to maximize the io
WriteLockGuard<AtomicRWLock> lck(poll_data_lock_);
if (requests_.count(req.fd) == 0) {
ctrl_param.operation = EPOLL_CTL_ADD;
requests_[req.fd] = std::make_shared<PollRequest>();
} else {
// Study: Already someone want to read the same io, overwrite it
ctrl_param.operation = EPOLL_CTL_MOD;
}
*requests_[req.fd] = req;
ctrl_params_[ctrl_param.fd] = ctrl_param;
}

Notify();
return true;
}

bool Poller::Unregister(const PollRequest& req) {
if (is_shutdown_.load()) {
return false;
}

if (req.fd < 0 || req.callback == nullptr) {
AERROR << "input is invalid";
return false;
}

{
WriteLockGuard<AtomicRWLock> lck(poll_data_lock_);
auto size = requests_.erase(req.fd);
if (size == 0) {
AERROR << "unregister failed, can't find fd: " << req.fd;
return false;
}

// Study: Removed the request, but need a del operation ctrl
PollCtrlParam ctrl_param;
ctrl_param.operation = EPOLL_CTL_DEL;
ctrl_param.fd = req.fd;
ctrl_params_[ctrl_param.fd] = ctrl_param;
}

Notify();
return true;
}

bool Poller::Init() {
// The epoll datastructure needed for kernel space operation
epoll_fd_ = epoll_create(kPollSize);
if (epoll_fd_ < 0) {
AERROR << "epoll create failed, " << strerror(errno);
return false;
}

// Study: http://man7.org/linux/man-pages/man2/pipe.2.html
// pipe_fd_[0] is read, pipe_fd_[1] is write
// Allow inter-process communication
// create pipe, and set nonblock
if (pipe(pipe_fd_) == -1) {
AERROR << "create pipe failed, " << strerror(errno);
return false;
}
// Study: pipe flag set
if (fcntl(pipe_fd_[0], F_SETFL, O_NONBLOCK) == -1) {
AERROR << "set nonblock failed, " << strerror(errno);
return false;
}
if (fcntl(pipe_fd_[1], F_SETFL, O_NONBLOCK) == -1) {
AERROR << "set nonblock failed, " << strerror(errno);
return false;
}

// Study: Read the data from epoll
// add pipe[0] to epoll
auto request = std::make_shared<PollRequest>();
request->fd = pipe_fd_[0];
request->events = EPOLLIN;
request->timeout_ms = -1;
request->callback = [this](const PollResponse&) {
char c = 0;
while (read(pipe_fd_[0], &c, 1) > 0) {
}
};
requests_[request->fd] = request;

PollCtrlParam ctrl_param;
ctrl_param.operation = EPOLL_CTL_ADD;
ctrl_param.fd = pipe_fd_[0];
ctrl_param.event.data.fd = pipe_fd_[0];
ctrl_param.event.events = EPOLLIN;
ctrl_params_[ctrl_param.fd] = ctrl_param;

is_shutdown_.exchange(false);
// Study: The main task of the poller is ThreadFunc
thread_ = std::thread(&Poller::ThreadFunc, this);
// Study: How to determine the priority of this thread task and other component task?
// Let the schedule to determine it
scheduler::Instance()->SetInnerThreadAttr(&thread_, "io_poller");
return true;
}

void Poller::Clear() {
if (thread_.joinable()) {
thread_.join();
}

if (epoll_fd_ >= 0) {
close(epoll_fd_);
epoll_fd_ = -1;
}

if (pipe_fd_[0] >= 0) {
close(pipe_fd_[0]);
pipe_fd_[0] = -1;
}

if (pipe_fd_[1] >= 0) {
close(pipe_fd_[1]);
pipe_fd_[1] = -1;
}

{
WriteLockGuard<AtomicRWLock> lck(poll_data_lock_);
requests_.clear();
ctrl_params_.clear();
}
}

// Study: the core function of this class
// this function pool all the data from io device according to the request map
void Poller::Poll(int timeout_ms) {
epoll_event evt[kPollSize];
auto before_time_ns = Time::Now().ToNanosecond();
int ready_num = epoll_wait(epoll_fd_, evt, kPollSize, timeout_ms);
auto after_time_ns = Time::Now().ToNanosecond();
// Study: How many time used for epoll kernal space operation
int interval_ms =
static_cast<int>((after_time_ns - before_time_ns) / 1000000);
// Study: Assume 0 is very very small instead of real zero
if (interval_ms == 0) {
interval_ms = 1;
}

std::unordered_map<int, PollResponse> responses;
{
ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
for (auto& item : requests_) {
auto& request = item.second;
if (ctrl_params_.count(request->fd) != 0) {
continue;
}

// Study: Every request have a time limit
// The request will be abort after timeout
if (request->timeout_ms > 0) {
request->timeout_ms -= interval_ms;
if (request->timeout_ms < 0) {
request->timeout_ms = 0;
}
}

if (request->timeout_ms == 0) {
responses[item.first] = PollResponse();
request->timeout_ms = -1;
}
}
}

// Study: epoll have data, then get the event and assigned to cooresponding response
if (ready_num > 0) {
for (int i = 0; i < ready_num; ++i) {
int fd = evt[i].data.fd;
uint32_t events = evt[i].events;
responses[fd] = PollResponse(events);
}
}

for (auto& item : responses) {
int fd = item.first;
auto& response = item.second;

ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
auto search = requests_.find(fd);
if (search != requests_.end()) {
search->second->timeout_ms = -1;
// Study: Let the callback know there is event
search->second->callback(response);
}
}

if (ready_num < 0) {
if (errno != EINTR) {
AERROR << "epoll wait failed, " << strerror(errno);
}
}
}

void Poller::ThreadFunc() {
// block all signals in this thread
sigset_t signal_set;
sigfillset(&signal_set);
pthread_sigmask(SIG_BLOCK, &signal_set, NULL);

// Study: Loop, loop, loop
while (!is_shutdown_.load()) {
HandleChanges();
int timeout_ms = GetTimeoutMs();
ADEBUG << "this poll timeout ms: " << timeout_ms;
Poll(timeout_ms);
}
}

void Poller::HandleChanges() {
CtrlParamMap local_params;
{
ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
if (ctrl_params_.empty()) {
return;
}
local_params.swap(ctrl_params_);
}

// Study: Update epoll struct
for (auto& pair : local_params) {
auto& item = pair.second;
ADEBUG << "epoll ctl, op[" << item.operation << "] fd[" << item.fd
<< "] events[" << item.event.events << "]";
if (epoll_ctl(epoll_fd_, item.operation, item.fd, &item.event) != 0 &&
errno != EBADF) {
AERROR << "epoll ctl failed, " << strerror(errno);
}
}
}

// min heap can be used to optimize
int Poller::GetTimeoutMs() {
int timeout_ms = kPollTimeoutMs;
ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
for (auto& item : requests_) {
auto& req = item.second;
if (req->timeout_ms >= 0 && req->timeout_ms < timeout_ms) {
timeout_ms = req->timeout_ms;
}
}
return timeout_ms;
}

void Poller::Notify() {
std::unique_lock<std::mutex> lock(pipe_mutex_, std::try_to_lock);
if (!lock.owns_lock()) {
return;
}

// Study: Maybe this is the common way for change notify, i am not familiar in this sector
char msg = 'C';
if (write(pipe_fd_[1], &msg, 1) < 0) {
AWARN << "notify failed, " << strerror(errno);
}
}

cyber/io/poll_handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/io/poll_handler.h"
#include "cyber/common/log.h"
#include "cyber/io/poller.h"

namespace apollo {
namespace cyber {
namespace io {

using croutine::CRoutine;
using croutine::RoutineState;

PollHandler::PollHandler(int fd)
: fd_(fd), is_read_(false), is_blocking_(false), routine_(nullptr) {}

// Study: This is a blocking operation, will wait until timeout
bool PollHandler::Block(int timeout_ms, bool is_read) {
// Study: validate the input state
if (!Check(timeout_ms)) {
return false;
}

// Study: Already someone is blocked
if (is_blocking_.exchange(true)) {
AINFO << "poll handler is blocking.";
return false;
}

// Study: Construct request_, If i do that, I will add request_ to argument list
// It can convient for the code reading
Fill(timeout_ms, is_read);
// Study: Let the Poller handle the request, Poller will do epoll operation
if (!Poller::Instance()->Register(request_)) {
is_blocking_.exchange(false);
return false;
}

// Study: Let scheduler reschedule this task, back to MainStack
// This will swap the current croutine in this thread back to main stack
// The internal impl is asm code, you can think it as python yield
// The current function execution will be stopped here
// If the schedule wake up this function, it will start continue from this line
routine_->Yield(RoutineState::IO_WAIT);

// Study: This function is call again after the IO_WAIT state changed,
// it mean that ResponseCallback must be called
bool result = false;
uint32_t target_events = is_read ? EPOLLIN : EPOLLOUT;
if (response_.events & target_events) {
result = true;
}
is_blocking_.exchange(false);

return result;
}

bool PollHandler::Unblock() {
is_blocking_.exchange(false);
return Poller::Instance()->Unregister(request_);
}

bool PollHandler::Check(int timeout_ms) {
if (timeout_ms == 0) {
AINFO << "timeout[" << timeout_ms
<< "] must be larger than zero or less than zero.";
return false;
}

if (fd_ < 0) {
AERROR << "invalid fd[" << fd_ << "]";
return false;
}

routine_ = CRoutine::GetCurrentRoutine();
if (routine_ == nullptr) {
AERROR << "routine nullptr, please use IO in routine context.";
return false;
}

return true;
}

void PollHandler::Fill(int timeout_ms, bool is_read) {
is_read_.exchange(is_read);

request_.fd = fd_;
request_.events = EPOLLET | EPOLLONESHOT;
if (is_read) {
request_.events |= EPOLLIN;
} else {
request_.events |= EPOLLOUT;
}
request_.timeout_ms = timeout_ms;
request_.callback =
std::bind(&PollHandler::ResponseCallback, this, std::placeholders::_1);
}

void PollHandler::ResponseCallback(const PollResponse& rsp) {
if (!is_blocking_.load() || routine_ == nullptr) {
return;
}

response_ = rsp;

if (routine_->state() == RoutineState::IO_WAIT) {
routine_->SetUpdateFlag();
}
}

} // namespace io
} // namespace cyber
} // namespace apollo

cyber/base/session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
Session::Session() : Session(-1) {}

Session::Session(int fd) : fd_(fd), `poll_handler_`(nullptr) {
poll_handler_.reset(new PollHandler(fd_));
}

int Session::Socket(int domain, int type, int protocol) {
if (fd_ != -1) {
AINFO << "session has hold a valid fd[" << fd_ << "]";
return -1;
}
// Study: every io connection can use socket to make connection
int sock_fd = socket(domain, type | SOCK_NONBLOCK, protocol);
if (sock_fd != -1) {
set_fd(sock_fd);
}
return sock_fd;
}

int Session::Listen(int backlog) {
ACHECK(fd_ != -1);
return listen(fd_, backlog);
}

// Study: Bind to address
int Session::Bind(const struct sockaddr *addr, socklen_t addrlen) {
ACHECK(fd_ != -1);
ACHECK(addr != nullptr);
return bind(fd_, addr, addrlen);
}

auto Session::Accept(struct sockaddr *addr, socklen_t *addrlen) -> SessionPtr {
ACHECK(fd_ != -1);

int sock_fd = accept4(fd_, addr, addrlen, SOCK_NONBLOCK);
// Study: Wait event and accept connection from socket
while (sock_fd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
poll_handler_->Block(-1, true);
sock_fd = accept4(fd_, addr, addrlen, SOCK_NONBLOCK);
}

if (sock_fd == -1) {
return nullptr;
}

return std::make_shared<Session>(sock_fd);
}

int Session::Connect(const struct sockaddr *addr, socklen_t addrlen) {
ACHECK(fd_ != -1);

int optval;
socklen_t optlen = sizeof(optval);
// Study: Connect to a socket
int res = connect(fd_, addr, addrlen);
if (res == -1 && errno == EINPROGRESS) {
poll_handler_->Block(-1, false);
getsockopt(fd_, SOL_SOCKET, SO_ERROR, reinterpret_cast<void *>(&optval),
&optlen);
if (optval == 0) {
res = 0;
} else {
errno = optval;
}
}
return res;
}

int Session::Close() {
ACHECK(fd_ != -1);

poll_handler_->Unblock();
int res = close(fd_);
fd_ = -1;
return res;
}

ssize_t Session::Recv(void *buf, size_t len, int flags, int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(fd_ != -1);

// Study: Receive data from the file descriptor
ssize_t nbytes = recv(fd_, buf, len, flags);
if (timeout_ms == 0) {
return nbytes;
}

// Study: If cannot get data from first try
while (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// Study: Wait io event and try again
if (poll_handler_->Block(timeout_ms, true)) {
nbytes = recv(fd_, buf, len, flags);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

// Study: Similiar to recv, but can get the oppsite side address
ssize_t Session::RecvFrom(void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen,
int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(fd_ != -1);

ssize_t nbytes = recvfrom(fd_, buf, len, flags, src_addr, addrlen);
if (timeout_ms == 0) {
return nbytes;
}

while (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (poll_handler_->Block(timeout_ms, true)) {
nbytes = recvfrom(fd_, buf, len, flags, src_addr, addrlen);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

ssize_t Session::Send(const void *buf, size_t len, int flags, int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(fd_ != -1);

ssize_t nbytes = send(fd_, buf, len, flags);
if (timeout_ms == 0) {
return nbytes;
}

// Study: Try send if the errno is ok
while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (poll_handler_->Block(timeout_ms, false)) {
nbytes = send(fd_, buf, len, flags);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

ssize_t Session::SendTo(const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen,
int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(dest_addr != nullptr);
ACHECK(fd_ != -1);

ssize_t nbytes = sendto(fd_, buf, len, flags, dest_addr, addrlen);
if (timeout_ms == 0) {
return nbytes;
}

while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (poll_handler_->Block(timeout_ms, false)) {
nbytes = sendto(fd_, buf, len, flags, dest_addr, addrlen);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

ssize_t Session::Read(void *buf, size_t count, int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(fd_ != -1);

ssize_t nbytes = read(fd_, buf, count);
if (timeout_ms == 0) {
return nbytes;
}

while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (poll_handler_->Block(timeout_ms, true)) {
nbytes = read(fd_, buf, count);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

ssize_t Session::Write(const void *buf, size_t count, int timeout_ms) {
ACHECK(buf != nullptr);
ACHECK(fd_ != -1);

ssize_t nbytes = write(fd_, buf, count);
if (timeout_ms == 0) {
return nbytes;
}

while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (poll_handler_->Block(timeout_ms, false)) {
nbytes = write(fd_, buf, count);
}
if (timeout_ms > 0) {
break;
}
}
return nbytes;
}

Apollo Cyber Study. P7 Data

在解決scheduler, blocker,transport,component幾個大哥前先解決cyber/data

// Study: 是我的筆記

cyber/data/cache_buffer

就一個circular array為實現的buffer, 可加不可減

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_DATA_CACHE_BUFFER_H_
#define CYBER_DATA_CACHE_BUFFER_H_

#include <mutex>
#include <vector>

namespace apollo {
namespace cyber {
namespace data {

template <typename T>
class CacheBuffer {
public:
using value_type = T;
using size_type = std::size_t;

explicit CacheBuffer(uint32_t size) {
capacity_ = size + 1;
buffer_.resize(capacity_);
}

CacheBuffer(const CacheBuffer& rhs) {
// Study: Since this is a constructor, not need to lock this object
std::lock_guard<std::mutex> lg(rhs.mutex_);
head_ = rhs.head_;
tail_ = rhs.tail_;
buffer_ = rhs.buffer_;
capacity_ = rhs.capacity_;
}

T& operator[](const uint64_t& pos) { return buffer_[GetIndex(pos)]; }
const T& at(const uint64_t& pos) const { return buffer_[GetIndex(pos)]; }

uint64_t Head() const { return head_ + 1; }
uint64_t Tail() const { return tail_; }
uint64_t Size() const { return tail_ - head_; }

const T& Front() const { return buffer_[GetIndex(head_ + 1)]; }
const T& Back() const { return buffer_[GetIndex(tail_)]; }

// Study: Since tail will never have change to do minus, so this will correct
bool Empty() const { return tail_ == 0; }
// Study: Since tail and head is add only, so this will correct
bool Full() const { return capacity_ - 1 == tail_ - head_; }

void Fill(const T& value) {
if (Full()) {
// Study: using tail + 1 here is same as head
buffer_[GetIndex(head_)] = value;
++head_;
++tail_;
} else {
buffer_[GetIndex(tail_ + 1)] = value;
++tail_;
}
}

std::mutex& Mutex() { return mutex_; }

private:
CacheBuffer& operator=(const CacheBuffer& other) = delete;
uint64_t GetIndex(const uint64_t& pos) const { return pos % capacity_; }

uint64_t head_ = 0;
uint64_t tail_ = 0;
uint64_t capacity_ = 0;
std::vector<T> buffer_;
mutable std::mutex mutex_;
};

} // namespace data
} // namespace cyber
} // namespace apollo

#endif // CYBER_DATA_CACHE_BUFFER_H_

cyber/data/data_notifier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_DATA_DATA_NOTIFIER_H_
#define CYBER_DATA_DATA_NOTIFIER_H_

#include <memory>
#include <mutex>
#include <vector>

#include "cyber/common/log.h"
#include "cyber/common/macros.h"
#include "cyber/data/cache_buffer.h"
#include "cyber/event/perf_event_cache.h"
#include "cyber/time/time.h"

namespace apollo {
namespace cyber {
namespace data {

using apollo::cyber::Time;
using apollo::cyber::base::AtomicHashMap;
using apollo::cyber::event::PerfEventCache;

struct Notifier {
std::function<void()> callback;
};

// Study: Seem to be a observer pattern
class DataNotifier {
public:
using NotifyVector = std::vector<std::shared_ptr<Notifier>>;
~DataNotifier() {}

void AddNotifier(uint64_t channel_id,
const std::shared_ptr<Notifier>& notifier);

bool Notify(const uint64_t channel_id);

private:
std::mutex notifies_map_mutex_;
AtomicHashMap<uint64_t, NotifyVector> notifies_map_;

DECLARE_SINGLETON(DataNotifier)
};

inline DataNotifier::DataNotifier() {}

// Study: It is a centralized place for adding notifier, this is called by the DataVisitor
inline void DataNotifier::AddNotifier(
uint64_t channel_id, const std::shared_ptr<Notifier>& notifier) {
std::lock_guard<std::mutex> lock(notifies_map_mutex_);
NotifyVector* notifies = nullptr;
if (notifies_map_.Get(channel_id, &notifies)) {
notifies->emplace_back(notifier);
} else {
NotifyVector new_notify = {notifier};
notifies_map_.Set(channel_id, new_notify);
}
}

// Study: Call all the cb for specific topic
inline bool DataNotifier::Notify(const uint64_t channel_id) {
NotifyVector* notifies = nullptr;
if (notifies_map_.Get(channel_id, &notifies)) {
for (auto& notifier : *notifies) {
if (notifier->callback) {
notifier->callback();
}
}
return true;
}
return false;
}

} // namespace data
} // namespace cyber
} // namespace apollo

#endif // CYBER_DATA_DATA_NOTIFIER_H_

cyber/data/channel_buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_DATA_CHANNEL_BUFFER_H_
#define CYBER_DATA_CHANNEL_BUFFER_H_

#include <algorithm>
#include <functional>
#include <memory>
#include <vector>

#include "cyber/common/global_data.h"
#include "cyber/common/log.h"
#include "cyber/data/data_notifier.h"
#include "cyber/proto/component_conf.pb.h"

namespace apollo {
namespace cyber {
namespace data {

using apollo::cyber::common::GlobalData;

template <typename T>
class ChannelBuffer {
public:
using BufferType = CacheBuffer<std::shared_ptr<T>>;
// Study: It show that this is a higher level usage of CacheBuffer
ChannelBuffer(uint64_t channel_id, BufferType* buffer)
: channel_id_(channel_id), buffer_(buffer) {}

// Study: the index of index will be change, since it underlaying CacheBuffer
// is using a add-only head, tail which cannot have abstract
bool Fetch(uint64_t* index, std::shared_ptr<T>& m); // NOLINT

bool Latest(std::shared_ptr<T>& m); // NOLINT

bool FetchMulti(uint64_t fetch_size, std::vector<std::shared_ptr<T>>* vec);

uint64_t channel_id() const { return channel_id_; }
std::shared_ptr<BufferType> Buffer() const { return buffer_; }

private:
uint64_t channel_id_;
std::shared_ptr<BufferType> buffer_;
};

template <typename T>
bool ChannelBuffer<T>::Fetch(uint64_t* index,
std::shared_ptr<T>& m) { // NOLINT
std::lock_guard<std::mutex> lock(buffer_->Mutex());
if (buffer_->Empty()) {
return false;
}

// Study: The meaning of it index is not actually index
// ridiculous
if (*index == 0) {
*index = buffer_->Tail();
} else if (*index == buffer_->Tail() + 1) {
return false;
} else if (*index < buffer_->Head()) {
// Study: If the Fetch is slower than the buffer grow
auto interval = buffer_->Tail() - *index;
AWARN << "channel[" << GlobalData::GetChannelById(channel_id_) << "] "
<< "read buffer overflow, drop_message[" << interval << "] pre_index["
<< *index << "] current_index[" << buffer_->Tail() << "] ";
*index = buffer_->Tail();
}
m = buffer_->at(*index);
return true;
}

template <typename T>
bool ChannelBuffer<T>::Latest(std::shared_ptr<T>& m) { // NOLINT
std::lock_guard<std::mutex> lock(buffer_->Mutex());
if (buffer_->Empty()) {
return false;
}

// Study: Simple the last one
m = buffer_->Back();
return true;
}

// Study: Actuatlly this func is more like Latest than Fetch
// Get N Latest
template <typename T>
bool ChannelBuffer<T>::FetchMulti(uint64_t fetch_size,
std::vector<std::shared_ptr<T>>* vec) {
std::lock_guard<std::mutex> lock(buffer_->Mutex());
if (buffer_->Empty()) {
return false;
}

auto num = std::min(buffer_->Size(), fetch_size);
vec->reserve(num);
for (auto index = buffer_->Tail() - num + 1; index <= buffer_->Tail();
++index) {
vec->emplace_back(buffer_->at(index));
}
return true;
}

} // namespace data
} // namespace cyber
} // namespace apollo

#endif // CYBER_DATA_CHANNEL_BUFFER_H_

cyber/data/data_dispatcher

Just a singleton class to dispatch message to the channel buffer.

cyber/data/data_visitor_base

Nothing special. It just ensure the data visitor have 1 callback for the data channel.
And use a pointer point to the data notifier singleton

cyber/data/data_visitor

Just comment on the 4 template argument DataVisitor, since the other is just a partial version of this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_DATA_DATA_VISITOR_H_
#define CYBER_DATA_DATA_VISITOR_H_

#include <algorithm>
#include <functional>
#include <memory>
#include <vector>

#include "cyber/common/log.h"
#include "cyber/data/channel_buffer.h"
#include "cyber/data/data_dispatcher.h"
#include "cyber/data/data_visitor_base.h"
#include "cyber/data/fusion/all_latest.h"
#include "cyber/data/fusion/data_fusion.h"

namespace apollo {
namespace cyber {
namespace data {

struct VisitorConfig {
VisitorConfig(uint64_t id, uint32_t size)
: channel_id(id), queue_size(size) {}
uint64_t channel_id;
uint32_t queue_size;
};

template <typename T>
using BufferType = CacheBuffer<std::shared_ptr<T>>;

// Study: It also implem the 3 argument, 2 argument, 1 argument version
// If want to use more argument, can use the same way to extend
// Yes, this is stupid but work
// - If you want to use variadic template to fix this,
// will cause crazy source code expand, at least 4 time size
// - If move the type declare to runtime instead of compile time
// it can just use writer, reader to replace
template <typename M0, typename M1 = NullType, typename M2 = NullType,
typename M3 = NullType>
class DataVisitor : public DataVisitorBase {
public:
explicit DataVisitor(const std::vector<VisitorConfig>& configs)
: buffer_m0_(configs[0].channel_id,
new BufferType<M0>(configs[0].queue_size)),
buffer_m1_(configs[1].channel_id,
new BufferType<M1>(configs[1].queue_size)),
buffer_m2_(configs[2].channel_id,
new BufferType<M2>(configs[2].queue_size)),
buffer_m3_(configs[3].channel_id,
new BufferType<M3>(configs[3].queue_size)) {
// Study: subscribe the data for different channel
DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);
DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);
DataDispatcher<M2>::Instance()->AddBuffer(buffer_m2_);
DataDispatcher<M3>::Instance()->AddBuffer(buffer_m3_);
// Study: Using the first argument M0 as the main data
// do sync logic when first channel have data
// It mean the data input frequency in same as the frequency of M0
data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);
data_fusion_ = new fusion::AllLatest<M0, M1, M2, M3>(
buffer_m0_, buffer_m1_, buffer_m2_, buffer_m3_);
}

~DataVisitor() {
if (data_fusion_) {
delete data_fusion_;
data_fusion_ = nullptr;
}
}

bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1, // NOLINT
std::shared_ptr<M2>& m2, std::shared_ptr<M3>& m3) { // NOLINT
// Study: next_msg_index_ is the index used to fetch data in m0 channel buffer
if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2, m3)) {
next_msg_index_++;
return true;
}
return false;
}

private:
fusion::DataFusion<M0, M1, M2, M3>* data_fusion_ = nullptr;
ChannelBuffer<M0> buffer_m0_;
ChannelBuffer<M1> buffer_m1_;
ChannelBuffer<M2> buffer_m2_;
ChannelBuffer<M3> buffer_m3_;
};

} // namespace data
} // namespace cyber
} // namespace apollo

#endif // CYBER_DATA_DATA_VISITOR_H_

cyber/data/fusion/data_fusion

Just a base class. It just implemented the all_latest.
I should implement the version that include the interpolate.
It would be simple, but need to use template explicit instantiate
since different data haev different interpolate method

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_DATA_FUSION_DATA_FUSION_H_
#define CYBER_DATA_FUSION_DATA_FUSION_H_

#include <deque>
#include <memory>
#include <string>
#include <type_traits>
#include <typeinfo>
#include <vector>

#include "cyber/common/types.h"

namespace apollo {
namespace cyber {
namespace data {
namespace fusion {

template <typename M0, typename M1 = NullType, typename M2 = NullType,
typename M3 = NullType>
class DataFusion {
public:
virtual ~DataFusion() {}
virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, // NOLINT
std::shared_ptr<M1>& m1, // NOLINT
std::shared_ptr<M2>& m2, // NOLINT
std::shared_ptr<M3>& m3) = 0; // NOLINT
};

template <typename M0, typename M1, typename M2>
class DataFusion<M0, M1, M2, NullType> {
public:
virtual ~DataFusion() {}

virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, // NOLINT
std::shared_ptr<M1>& m1, // NOLINT
std::shared_ptr<M2>& m2) = 0; // NOLINT
};

template <typename M0, typename M1>
class DataFusion<M0, M1, NullType, NullType> {
public:
virtual ~DataFusion() {}

virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, // NOLINT
std::shared_ptr<M1>& m1) = 0; // NOLINT
};

} // namespace fusion
} // namespace data
} // namespace cyber
} // namespace apollo

#endif // CYBER_DATA_FUSION_DATA_FUSION_H_

cyber/data/fusion/all_latest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

template <typename M0, typename M1 = NullType, typename M2 = NullType,
typename M3 = NullType>
class AllLatest : public DataFusion<M0, M1, M2, M3> {
public:
AllLatest(const ChannelBuffer<M0>& buffer_0,
const ChannelBuffer<M1>& buffer_1,
const ChannelBuffer<M2>& buffer_2,
const ChannelBuffer<M3>& buffer_3)
: buffer_m0_(buffer_0),
buffer_m1_(buffer_1),
buffer_m2_(buffer_2),
buffer_m3_(buffer_3) {}
bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1,
std::shared_ptr<M2>& m2, std::shared_ptr<M3>& m3) override {
// Study: this would work well if the datavisior frequency is near to M0,
// Otherwise, haha
return buffer_m0_.Fetch(index, m0) && buffer_m1_.Latest(m1) &&
buffer_m2_.Latest(m2) && buffer_m3_.Latest(m3);
}

private:
ChannelBuffer<M0> buffer_m0_;
ChannelBuffer<M1> buffer_m1_;
ChannelBuffer<M2> buffer_m2_;
ChannelBuffer<M3> buffer_m3_;
};