Apollo Cyber Study P12 Transport RTPS

先來看一下rtps是甚麼
https://wiki.wireshark.org/Protocols/rtps

Apollo用了eprosima-fast-rtps的rtps實現。
可以先看一下Docs
其concept和apollo內部的channel,receiver,transmitter是差不多的

1
2
3
4
5
6
7

At the top of RTPS, we find the Domain, which defines a separate plane of communication. Several domains can coexist at the same time independently. A domain contains any number of Participants, elements capable of sending and receiving data. To do this, the participants use their Endpoints:

Reader: Endpoint able to receive data.
Writer: Endpoint able to send data.

A Participant can have any number of writer and reader endpoints.
1
2
3
4
5
6
7
8
9
10
比較不一樣的是

Communication revolves around Topics, which define the data being exchanged. Topics don’t belong to any participant in particular; instead, all interested participants keep track of changes to the topic data and make sure to keep each other up to date. The unit of communication is called a Change, which represents an update to a topic. Endpoints register these changes on their History, a data structure that serves as a cache for recent changes. When you publish a change through a writer endpoint, the following steps happen behind the scenes:

The change is added to the writer’s history cache.
The writer informs any readers it knows about.
Any interested (subscribed) readers request the change.
After receiving data, readers update their history cache with the new change.

By choosing Quality of Service policies, you can affect how these history caches are managed in several ways, but the communication loop remains the same. You can read more information in Configuration.

比起shared memory版的, rtps版多了

  • qos (define quality of service)
  • have history cache in both reader and writer side

而因為eprosima-fast-rtps已經做了很大一部份的事情了,所以在cyber的相關代碼比shm的簡淺多了

// Study: 是我的筆記

cyber/transport/rtps/sub_listener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/transport/rtps/sub_listener.h"

#include "cyber/common/log.h"
#include "cyber/common/util.h"

namespace apollo {
namespace cyber {
namespace transport {

SubListener::SubListener(const NewMsgCallback& callback)
: callback_(callback) {}

SubListener::~SubListener() {}

void SubListener::onNewDataMessage(eprosima::fastrtps::Subscriber* sub) {
RETURN_IF_NULL(sub);
RETURN_IF_NULL(callback_);
// Study: Using a mutex make life easy, espacially when you are dueing with third library
std::lock_guard<std::mutex> lock(mutex_);

// fetch channel name
auto channel_id = common::Hash(sub->getAttributes().topic.getTopicName());
eprosima::fastrtps::SampleInfo_t m_info;
// Study: Cyber use the same message type for all data type, this can saving the work for creating message type for all data type
// since it must have a type conversion later
UnderlayMessage m;

// Study: Get data if possible
RETURN_IF(!sub->takeNextData(reinterpret_cast<void*>(&m), &m_info));
RETURN_IF(m_info.sampleKind != eprosima::fastrtps::ALIVE);

// fetch MessageInfo
char* ptr =
reinterpret_cast<char*>(&m_info.related_sample_identity.writer_guid());
Identity sender_id(false);
sender_id.set_data(ptr);
msg_info_.set_sender_id(sender_id);

Identity spare_id(false);
spare_id.set_data(ptr + ID_SIZE);
msg_info_.set_spare_id(spare_id);

uint64_t seq_num =
((int64_t)m_info.related_sample_identity.sequence_number().high) << 32 |
m_info.related_sample_identity.sequence_number().low;
msg_info_.set_seq_num(seq_num);

// fetch message string
std::shared_ptr<std::string> msg_str =
std::make_shared<std::string>(m.data());

// callback
callback_(channel_id, msg_str, msg_info_);
}

// Study: Not called at all
void SubListener::onSubscriptionMatched(
eprosima::fastrtps::Subscriber* sub,
eprosima::fastrtps::MatchingInfo& info) {
(void)sub;
(void)info;
}

} // namespace transport
} // namespace cyber
} // namespace apollo

transport/rtps/participant

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/transport/rtps/participant.h"

#include "cyber/common/global_data.h"
#include "cyber/common/log.h"
#include "cyber/proto/transport_conf.pb.h"

namespace apollo {
namespace cyber {
namespace transport {

Participant::Participant(const std::string& name, int send_port,
eprosima::fastrtps::ParticipantListener* listener)
: shutdown_(false),
name_(name),
send_port_(send_port),
listener_(listener),
fastrtps_participant_(nullptr) {}

Participant::~Participant() {}

void Participant::Shutdown() {
if (shutdown_.exchange(true)) {
return;
}

std::lock_guard<std::mutex> lk(mutex_);
if (fastrtps_participant_ != nullptr) {
eprosima::fastrtps::Domain::removeParticipant(fastrtps_participant_);
fastrtps_participant_ = nullptr;
listener_ = nullptr;
}
}

eprosima::fastrtps::Participant* Participant::fastrtps_participant() {
if (shutdown_.load()) {
return nullptr;
}

std::lock_guard<std::mutex> lk(mutex_);
if (fastrtps_participant_ != nullptr) {
return fastrtps_participant_;
}

CreateFastRtpsParticipant(name_, send_port_, listener_);
return fastrtps_participant_;
}

void Participant::CreateFastRtpsParticipant(
const std::string& name, int send_port,
eprosima::fastrtps::ParticipantListener* listener) {
// Study: DomainId: Publishers and Subscribers can only talk to each other if their Participants belong to the same DomainId.
// A interesting part, it selected one domain id only which mean it is impossible to have multiple domain
// But in original fast-rtps, it support multiplt domain
uint32_t domain_id = 80;

// Study: Set in the bash script
const char* val = ::getenv("CYBER_DOMAIN_ID");
if (val != nullptr) {
try {
domain_id = std::stoi(val);
} catch (const std::exception& e) {
AERROR << "convert domain_id error " << e.what();
return;
}
}

auto part_attr_conf = std::make_shared<proto::RtpsParticipantAttr>();
auto& global_conf = common::GlobalData::Instance()->Config();
if (global_conf.has_transport_conf() &&
global_conf.transport_conf().has_participant_attr()) {
part_attr_conf->CopyFrom(global_conf.transport_conf().participant_attr());
}

eprosima::fastrtps::ParticipantAttributes attr;
attr.rtps.defaultSendPort = send_port;
attr.rtps.port.domainIDGain =
static_cast<uint16_t>(part_attr_conf->domain_id_gain());
attr.rtps.port.portBase = static_cast<uint16_t>(part_attr_conf->port_base());
attr.rtps.use_IP6_to_send = false;
attr.rtps.builtin.use_SIMPLE_RTPSParticipantDiscoveryProtocol = true;
attr.rtps.builtin.use_SIMPLE_EndpointDiscoveryProtocol = true;
attr.rtps.builtin.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter =
true;
attr.rtps.builtin.m_simpleEDP.use_PublicationWriterANDSubscriptionReader =
true;
attr.rtps.builtin.domainId = domain_id;
attr.rtps.builtin.leaseDuration.seconds = part_attr_conf->lease_duration();
attr.rtps.builtin.leaseDuration_announcementperiod.seconds =
part_attr_conf->announcement_period();

attr.rtps.setName(name.c_str());

// Study: Since rtps is a network protocol, it must know a ip
std::string ip_env("127.0.0.1");
const char* ip_val = ::getenv("CYBER_IP");
if (ip_val != nullptr) {
ip_env = ip_val;
if (ip_env.size() == 0) {
AERROR << "invalid CYBER_IP (an empty string)";
return;
}
}
ADEBUG << "cyber ip: " << ip_env;

eprosima::fastrtps::rtps::Locator_t locator;
locator.port = 0;
RETURN_IF(!locator.set_IP4_address(ip_env));

locator.kind = LOCATOR_KIND_UDPv4;

attr.rtps.defaultUnicastLocatorList.push_back(locator);
attr.rtps.defaultOutLocatorList.push_back(locator);
attr.rtps.builtin.metatrafficUnicastLocatorList.push_back(locator);

locator.set_IP4_address(239, 255, 0, 1);
attr.rtps.builtin.metatrafficMulticastLocatorList.push_back(locator);

fastrtps_participant_ =
eprosima::fastrtps::Domain::createParticipant(attr, listener);
RETURN_IF_NULL(fastrtps_participant_);
eprosima::fastrtps::Domain::registerType(fastrtps_participant_, &type_);
}

} // namespace transport
} // namespace cyber
} // namespace apollo

transport/rtps/underlay_message
transport/rtps/underlay_message_type

The class generated by the Fast RTPS IDL, used to serialize and deserialize data

transport/dispatcher/shm_dispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/transport/dispatcher/rtps_dispatcher.h"

namespace apollo {
namespace cyber {
namespace transport {

RtpsDispatcher::RtpsDispatcher() : participant_(nullptr) {}

RtpsDispatcher::~RtpsDispatcher() { Shutdown(); }

void RtpsDispatcher::Shutdown() {
if (is_shutdown_.exchange(true)) {
return;
}

{
std::lock_guard<std::mutex> lock(subs_mutex_);
for (auto& item : subs_) {
item.second.sub = nullptr;
}
}

participant_ = nullptr;
}

void RtpsDispatcher::AddSubscriber(const RoleAttributes& self_attr) {
if (participant_ == nullptr) {
AWARN << "please set participant firstly.";
return;
}

uint64_t channel_id = self_attr.channel_id();
std::lock_guard<std::mutex> lock(subs_mutex_);
if (subs_.count(channel_id) > 0) {
return;
}

Subscriber new_sub;
eprosima::fastrtps::SubscriberAttributes sub_attr;
auto& qos = self_attr.qos_profile();
RETURN_IF(!AttributesFiller::FillInSubAttr(self_attr.channel_name(), qos,
&sub_attr));

// Study: So the SubListener is just a subscriber + callback....
new_sub.sub_listener = std::make_shared<SubListener>(
std::bind(&RtpsDispatcher::OnMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));

new_sub.sub = eprosima::fastrtps::Domain::createSubscriber(
participant_->fastrtps_participant(), sub_attr,
new_sub.sub_listener.get());
RETURN_IF_NULL(new_sub.sub);
subs_[channel_id] = new_sub;
}

void RtpsDispatcher::OnMessage(uint64_t channel_id,
const std::shared_ptr<std::string>& msg_str,
const MessageInfo& msg_info) {
if (is_shutdown_.load()) {
return;
}

// Study: Similiar to shm part impl, no big deal
ListenerHandlerBasePtr* handler_base = nullptr;
if (msg_listeners_.Get(channel_id, &handler_base)) {
auto handler =
std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base);
handler->Run(msg_str, msg_info);
}
}

} // namespace transport
} // namespace cyber
} // namespace apollo

transport/transmitter/rtps_transmitter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_TRANSPORT_TRANSMITTER_RTPS_TRANSMITTER_H_
#define CYBER_TRANSPORT_TRANSMITTER_RTPS_TRANSMITTER_H_

#include <memory>
#include <string>

#include "cyber/common/log.h"
#include "cyber/message/message_traits.h"
#include "cyber/transport/rtps/attributes_filler.h"
#include "cyber/transport/rtps/participant.h"
#include "cyber/transport/transmitter/transmitter.h"
#include "fastrtps/Domain.h"
#include "fastrtps/attributes/PublisherAttributes.h"
#include "fastrtps/participant/Participant.h"
#include "fastrtps/publisher/Publisher.h"

namespace apollo {
namespace cyber {
namespace transport {

template <typename M>
class RtpsTransmitter : public Transmitter<M> {
public:
using MessagePtr = std::shared_ptr<M>;

RtpsTransmitter(const RoleAttributes& attr,
const ParticipantPtr& participant);
virtual ~RtpsTransmitter();

void Enable() override;
void Disable() override;

bool Transmit(const MessagePtr& msg, const MessageInfo& msg_info) override;

private:
bool Transmit(const M& msg, const MessageInfo& msg_info);

ParticipantPtr participant_;
eprosima::fastrtps::Publisher* publisher_;
};

template <typename M>
RtpsTransmitter<M>::RtpsTransmitter(const RoleAttributes& attr,
const ParticipantPtr& participant)
: Transmitter<M>(attr), participant_(participant), publisher_(nullptr) {}

template <typename M>
RtpsTransmitter<M>::~RtpsTransmitter() {
Disable();
}

template <typename M>
void RtpsTransmitter<M>::Enable() {
if (this->enabled_) {
return;
}

RETURN_IF_NULL(participant_);

eprosima::fastrtps::PublisherAttributes pub_attr;
// Study: Need fill the attribute according to the qos profile
// Annoying
RETURN_IF(!AttributesFiller::FillInPubAttr(
this->attr_.channel_name(), this->attr_.qos_profile(), &pub_attr));
publisher_ = eprosima::fastrtps::Domain::createPublisher(
participant_->fastrtps_participant(), pub_attr);
RETURN_IF_NULL(publisher_);
this->enabled_ = true;
}

template <typename M>
void RtpsTransmitter<M>::Disable() {
if (this->enabled_) {
publisher_ = nullptr;
this->enabled_ = false;
}
}

template <typename M>
bool RtpsTransmitter<M>::Transmit(const MessagePtr& msg,
const MessageInfo& msg_info) {
return Transmit(*msg, msg_info);
}

template <typename M>
bool RtpsTransmitter<M>::Transmit(const M& msg, const MessageInfo& msg_info) {
if (!this->enabled_) {
ADEBUG << "not enable.";
return false;
}

UnderlayMessage m;
// Study: m.data is just a str, can reuse the original existing serializaion and deserialzation
RETURN_VAL_IF(!message::SerializeToString(msg, &m.data()), false);

eprosima::fastrtps::rtps::WriteParams wparams;

char* ptr =
reinterpret_cast<char*>(&wparams.related_sample_identity().writer_guid());

memcpy(ptr, msg_info.sender_id().data(), ID_SIZE);
memcpy(ptr + ID_SIZE, msg_info.spare_id().data(), ID_SIZE);

wparams.related_sample_identity().sequence_number().high =
(int32_t)((msg_info.seq_num() & 0xFFFFFFFF00000000) >> 32);
wparams.related_sample_identity().sequence_number().low =
(int32_t)(msg_info.seq_num() & 0xFFFFFFFF);

if (participant_->is_shutdown()) {
return false;
}
return publisher_->write(reinterpret_cast<void*>(&m), wparams);
}

} // namespace transport
} // namespace cyber
} // namespace apollo

#endif // CYBER_TRANSPORT_TRANSMITTER_RTPS_TRANSMITTER_H_

transport/receiver/rtps_receiver

Same as shm_receiver, just changed shm_dispatcher to rtps_dispatcher