Apollo Cyber Study P14 Service

// Study: 是我的筆記

service/client_base

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_SERVICE_CLIENT_BASE_H_
#define CYBER_SERVICE_CLIENT_BASE_H_

#include <chrono>
#include <string>

#include "cyber/common/macros.h"

namespace apollo {
namespace cyber {

class ClientBase {
public:
explicit ClientBase(const std::string& service_name)
: service_name_(service_name) {}
virtual ~ClientBase() {}

virtual void Destroy() = 0;

const std::string& ServiceName() const { return service_name_; }

virtual bool ServiceIsReady() const = 0;

protected:
std::string service_name_;

bool WaitForServiceNanoseconds(std::chrono::nanoseconds time_out) {
bool has_service = false;
auto step_duration = std::chrono::nanoseconds(5 * 1000 * 1000);
while (time_out.count() > 0) {
// Study: Check have service yet?
has_service = service_discovery::TopologyManager::Instance()
->service_manager()
->HasService(service_name_);
if (!has_service) {
// Study: loop until time out
std::this_thread::sleep_for(step_duration);
time_out -= step_duration;
} else {
break;
}
}
return has_service;
}
};

} // namespace cyber
} // namespace apollo

#endif // CYBER_SERVICE_CLIENT_BASE_H_

service/client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
template <typename Request, typename Response>
class Client : public ClientBase {
public:
using SharedRequest = typename std::shared_ptr<Request>;
using SharedResponse = typename std::shared_ptr<Response>;
using Promise = std::promise<SharedResponse>;
using SharedPromise = std::shared_ptr<Promise>;
using SharedFuture = std::shared_future<SharedResponse>;
using CallbackType = std::function<void(SharedFuture)>;

Client(const std::string& node_name, const std::string& service_name)
: ClientBase(service_name),
node_name_(node_name),
request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX),
sequence_number_(0) {}

Client() = delete;

virtual ~Client() {}

bool Init();

// Study: Traditional request response api
SharedResponse SendRequest(
SharedRequest request,
const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
SharedResponse SendRequest(
const Request& request,
const std::chrono::seconds& timeout_s = std::chrono::seconds(5));

// Study: Don't return the response directly, return a future response
SharedFuture AsyncSendRequest(SharedRequest request);
SharedFuture AsyncSendRequest(const Request& request);
SharedFuture AsyncSendRequest(SharedRequest request, CallbackType&& cb);

bool ServiceIsReady() const;
void Destroy();

template <typename RatioT = std::milli>
bool WaitForService(std::chrono::duration<int64_t, RatioT> timeout =
std::chrono::duration<int64_t, RatioT>(-1)) {
return WaitForServiceNanoseconds(
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout));
}

private:
void HandleResponse(const std::shared_ptr<Response>& response,
const transport::MessageInfo& request_info);
bool IsInit(void) const { return response_receiver_ != nullptr; }

std::string node_name_;

std::function<void(const std::shared_ptr<Response>&,
const transport::MessageInfo&)>
response_callback_;

std::unordered_map<uint64_t,
std::tuple<SharedPromise, CallbackType, SharedFuture>>
pending_requests_;
std::mutex pending_requests_mutex_;

// Study: Access the transport layer directly
std::shared_ptr<transport::Transmitter<Request>> request_transmitter_;
std::shared_ptr<transport::Receiver<Response>> response_receiver_;
std::string request_channel_;
std::string response_channel_;

transport::Identity writer_id_;
uint64_t sequence_number_;
};

template <typename Request, typename Response>
void Client<Request, Response>::Destroy() {}

template <typename Request, typename Response>
bool Client<Request, Response>::Init() {
proto::RoleAttributes role;
role.set_node_name(node_name_);
role.set_channel_name(request_channel_);
auto channel_id = common::GlobalData::RegisterChannel(request_channel_);
role.set_channel_id(channel_id);
role.mutable_qos_profile()->CopyFrom(
transport::QosProfileConf::QOS_PROFILE_SERVICES_DEFAULT);
auto transport = transport::Transport::Instance();
// Study: Selected rtps, so the client seem to be used to handle network conection
request_transmitter_ =
transport->CreateTransmitter<Request>(role, proto::OptionalMode::RTPS);
if (request_transmitter_ == nullptr) {
AERROR << "Create request pub failed.";
return false;
}
writer_id_ = request_transmitter_->id();

response_callback_ =
std::bind(&Client<Request, Response>::HandleResponse, this,
std::placeholders::_1, std::placeholders::_2);

role.set_channel_name(response_channel_);
channel_id = common::GlobalData::RegisterChannel(response_channel_);
role.set_channel_id(channel_id);

// Study: Let the receiver to wake up itself callback instead of polling itself
response_receiver_ = transport->CreateReceiver<Response>(
role,
[=](const std::shared_ptr<Response>& request,
const transport::MessageInfo& message_info,
const proto::RoleAttributes& reader_attr) {
(void)message_info;
(void)reader_attr;
response_callback_(request, message_info);
},
proto::OptionalMode::RTPS);
if (response_receiver_ == nullptr) {
AERROR << "Create response sub failed.";
request_transmitter_.reset();
return false;
}
return true;
}

template <typename Request, typename Response>
typename Client<Request, Response>::SharedResponse
Client<Request, Response>::SendRequest(SharedRequest request,
const std::chrono::seconds& timeout_s) {
if (!IsInit()) { return nullptr; }
auto future = AsyncSendRequest(request);
if (!future.valid()) {
return nullptr;
}
auto status = future.wait_for(timeout_s);
if (status == std::future_status::ready) {
return future.get();
} else {
return nullptr;
}
}

template <typename Request, typename Response>
typename Client<Request, Response>::SharedResponse
Client<Request, Response>::SendRequest(const Request& request,
const std::chrono::seconds& timeout_s) {
if (!IsInit()) { return nullptr; }
auto request_ptr = std::make_shared<const Request>(request);
return SendRequest(request_ptr, timeout_s);
}

template <typename Request, typename Response>
typename Client<Request, Response>::SharedFuture
Client<Request, Response>::AsyncSendRequest(const Request& request) {
auto request_ptr = std::make_shared<const Request>(request);
return AsyncSendRequest(request_ptr);
}

// Study: Using SharedXX can be reduce the work for this api user
template <typename Request, typename Response>
typename Client<Request, Response>::SharedFuture
Client<Request, Response>::AsyncSendRequest(SharedRequest request) {
return AsyncSendRequest(request, [](SharedFuture) {});
}

template <typename Request, typename Response>
typename Client<Request, Response>::SharedFuture
Client<Request, Response>::AsyncSendRequest(SharedRequest request,
CallbackType&& cb) {
if (IsInit()) {
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
sequence_number_++;
// Study: Using transport module directly, write the response to the response channel
transport::MessageInfo info(writer_id_, sequence_number_, writer_id_);
request_transmitter_->Transmit(request, info);
SharedPromise call_promise = std::make_shared<Promise>();
SharedFuture f(call_promise->get_future());
// Study: Record what should do when receiver wake it up
pending_requests_[info.seq_num()] =
std::make_tuple(call_promise, std::forward<CallbackType>(cb), f);
return f;
} else {
return std::shared_future<std::shared_ptr<Response>>();
}
}

template <typename Request, typename Response>
bool Client<Request, Response>::ServiceIsReady() const {
return true;
}

template <typename Request, typename Response>
void Client<Request, Response>::HandleResponse(
const std::shared_ptr<Response>& response,
const transport::MessageInfo& request_header) {
ADEBUG << "client recv response.";
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
// Study: Allow multiple client access to the same service, need to classify who should get this response
if (request_header.spare_id() != writer_id_) {
return;
}
// Study: The spare_id + seq_num can be the uuid of each request
uint64_t sequence_number = request_header.seq_num();
if (this->pending_requests_.count(sequence_number) == 0) {
return;
}
// Study: Is the response of each request
auto tuple = this->pending_requests_[sequence_number];
auto call_promise = std::get<0>(tuple);
auto callback = std::get<1>(tuple);
auto future = std::get<2>(tuple);
this->pending_requests_.erase(sequence_number);
call_promise->set_value(response);
callback(future);
}

service/service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_SERVICE_SERVICE_H_
#define CYBER_SERVICE_SERVICE_H_

#include <list>
#include <memory>
#include <string>
#include <utility>

#include "cyber/common/types.h"
#include "cyber/node/node_channel_impl.h"
#include "cyber/scheduler/scheduler.h"
#include "cyber/service/service_base.h"

namespace apollo {
namespace cyber {

template <typename Request, typename Response>
class Service : public ServiceBase {
public:
using ServiceCallback = std::function<void(const std::shared_ptr<Request>&,
std::shared_ptr<Response>&)>;
Service(const std::string& node_name, const std::string& service_name,
const ServiceCallback& service_callback)
: ServiceBase(service_name),
node_name_(node_name),
service_callback_(service_callback),
request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX) {}

Service(const std::string& node_name, const std::string& service_name,
ServiceCallback&& service_callback)
: ServiceBase(service_name),
node_name_(node_name),
service_callback_(service_callback),
request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX) {}

Service() = delete;
~Service() {
inited_ = false;
// Study: need a inner thread to keep checking request
condition_.notify_all();
if (thread_.joinable()) {
thread_.join();
}
}
bool Init();
void destroy();

private:
void HandleRequest(const std::shared_ptr<Request>& request,
const transport::MessageInfo& message_info);

void SendResponse(const transport::MessageInfo& message_info,
const std::shared_ptr<Response>& response);
bool IsInit(void) const { return request_receiver_ != nullptr; }

std::string node_name_;
ServiceCallback service_callback_;

std::function<void(const std::shared_ptr<Request>&,
const transport::MessageInfo&)>
request_callback_;
std::shared_ptr<transport::Transmitter<Response>> response_transmitter_;
std::shared_ptr<transport::Receiver<Request>> request_receiver_;
std::string request_channel_;
std::string response_channel_;
std::mutex service_handle_request_mutex_;

volatile bool inited_;
void Enqueue(std::function<void()>&& task);
void Process();
std::thread thread_;
std::mutex queue_mutex_;
std::condition_variable condition_;
std::list<std::function<void()>> tasks_;
};

template <typename Request, typename Response>
void Service<Request, Response>::destroy() {
inited_ = false;
condition_.notify_all();
if (thread_.joinable()) {
thread_.join();
}
}

template <typename Request, typename Response>
inline void Service<Request, Response>::Enqueue(std::function<void()>&& task) {
std::lock_guard<std::mutex> lg(queue_mutex_);
tasks_.emplace_back(std::move(task));
condition_.notify_one();
}

template <typename Request, typename Response>
void Service<Request, Response>::Process() {
while (!cyber::IsShutdown()) {
std::unique_lock<std::mutex> ul(queue_mutex_);
condition_.wait(ul, [this]() { return !inited_ || !this->tasks_.empty(); });
if (!inited_) {
break;
}
if (!tasks_.empty()) {
auto task = tasks_.front();
tasks_.pop_front();
ul.unlock();
task();
}
}
}

template <typename Request, typename Response>
bool Service<Request, Response>::Init() {
if (IsInit()) {
return true;
}
proto::RoleAttributes role;
role.set_node_name(node_name_);
role.set_channel_name(response_channel_);
auto channel_id = common::GlobalData::RegisterChannel(response_channel_);
role.set_channel_id(channel_id);
role.mutable_qos_profile()->CopyFrom(
transport::QosProfileConf::QOS_PROFILE_SERVICES_DEFAULT);
auto transport = transport::Transport::Instance();
response_transmitter_ =
transport->CreateTransmitter<Response>(role, proto::OptionalMode::RTPS);
if (response_transmitter_ == nullptr) {
AERROR << " Create response pub failed.";
return false;
}

request_callback_ =
std::bind(&Service<Request, Response>::HandleRequest, this,
std::placeholders::_1, std::placeholders::_2);

role.set_channel_name(request_channel_);
channel_id = common::GlobalData::RegisterChannel(request_channel_);
role.set_channel_id(channel_id);
// Stduy: The process of getting request and handle request is separated
// Can maximize the performance since the callblock operation will not block io
request_receiver_ = transport->CreateReceiver<Request>(
role,
[=](const std::shared_ptr<Request>& request,
const transport::MessageInfo& message_info,
const proto::RoleAttributes& reader_attr) {
(void)reader_attr;
(void)reader_attr;
auto task = [this, request, message_info]() {
this->HandleRequest(request, message_info);
};
Enqueue(std::move(task));
},
proto::OptionalMode::RTPS);
inited_ = true;
// Study: Handle the received request in other thread
thread_ = std::thread(&Service<Request, Response>::Process, this);
if (request_receiver_ == nullptr) {
AERROR << " Create request sub failed." << request_channel_;
response_transmitter_.reset();
return false;
}
return true;
}

template <typename Request, typename Response>
void Service<Request, Response>::HandleRequest(
const std::shared_ptr<Request>& request,
const transport::MessageInfo& message_info) {
if (!IsInit()) {
// LOG_DEBUG << "not inited error.";
return;
// Study: Handle request and put the response back to transport module again
}
ADEBUG << "handling request:" << request_channel_;
std::lock_guard<std::mutex> lk(service_handle_request_mutex_);
auto response = std::make_shared<Response>();
service_callback_(request, response);
transport::MessageInfo msg_info(message_info);
msg_info.set_sender_id(response_transmitter_->id());
SendResponse(msg_info, response);
}

template <typename Request, typename Response>
void Service<Request, Response>::SendResponse(
const transport::MessageInfo& message_info,
const std::shared_ptr<Response>& response) {
if (!IsInit()) {
// LOG_DEBUG << "not inited error.";
return;
}
// publish return value ?
// LOG_DEBUG << "send response id:" << message_id.sequence_number;
response_transmitter_->Transmit(response, message_info);
}

} // namespace cyber
} // namespace apollo

#endif // CYBER_SERVICE_SERVICE_H_