Apollo Cyber Study. P7 Data

在解決scheduler, blocker,transport,component幾個大哥前先解決cyber/data

// Study: 是我的筆記

cyber/data/cache_buffer

就一個circular array為實現的buffer, 可加不可減

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_DATA_CACHE_BUFFER_H_
#define CYBER_DATA_CACHE_BUFFER_H_

#include <mutex>
#include <vector>

namespace apollo {
namespace cyber {
namespace data {

template <typename T>
class CacheBuffer {
public:
using value_type = T;
using size_type = std::size_t;

explicit CacheBuffer(uint32_t size) {
capacity_ = size + 1;
buffer_.resize(capacity_);
}

CacheBuffer(const CacheBuffer& rhs) {
// Study: Since this is a constructor, not need to lock this object
std::lock_guard<std::mutex> lg(rhs.mutex_);
head_ = rhs.head_;
tail_ = rhs.tail_;
buffer_ = rhs.buffer_;
capacity_ = rhs.capacity_;
}

T& operator[](const uint64_t& pos) { return buffer_[GetIndex(pos)]; }
const T& at(const uint64_t& pos) const { return buffer_[GetIndex(pos)]; }

uint64_t Head() const { return head_ + 1; }
uint64_t Tail() const { return tail_; }
uint64_t Size() const { return tail_ - head_; }

const T& Front() const { return buffer_[GetIndex(head_ + 1)]; }
const T& Back() const { return buffer_[GetIndex(tail_)]; }

// Study: Since tail will never have change to do minus, so this will correct
bool Empty() const { return tail_ == 0; }
// Study: Since tail and head is add only, so this will correct
bool Full() const { return capacity_ - 1 == tail_ - head_; }

void Fill(const T& value) {
if (Full()) {
// Study: using tail + 1 here is same as head
buffer_[GetIndex(head_)] = value;
++head_;
++tail_;
} else {
buffer_[GetIndex(tail_ + 1)] = value;
++tail_;
}
}

std::mutex& Mutex() { return mutex_; }

private:
CacheBuffer& operator=(const CacheBuffer& other) = delete;
uint64_t GetIndex(const uint64_t& pos) const { return pos % capacity_; }

uint64_t head_ = 0;
uint64_t tail_ = 0;
uint64_t capacity_ = 0;
std::vector<T> buffer_;
mutable std::mutex mutex_;
};

} // namespace data
} // namespace cyber
} // namespace apollo

#endif // CYBER_DATA_CACHE_BUFFER_H_

cyber/data/data_notifier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_DATA_DATA_NOTIFIER_H_
#define CYBER_DATA_DATA_NOTIFIER_H_

#include <memory>
#include <mutex>
#include <vector>

#include "cyber/common/log.h"
#include "cyber/common/macros.h"
#include "cyber/data/cache_buffer.h"
#include "cyber/event/perf_event_cache.h"
#include "cyber/time/time.h"

namespace apollo {
namespace cyber {
namespace data {

using apollo::cyber::Time;
using apollo::cyber::base::AtomicHashMap;
using apollo::cyber::event::PerfEventCache;

struct Notifier {
std::function<void()> callback;
};

// Study: Seem to be a observer pattern
class DataNotifier {
public:
using NotifyVector = std::vector<std::shared_ptr<Notifier>>;
~DataNotifier() {}

void AddNotifier(uint64_t channel_id,
const std::shared_ptr<Notifier>& notifier);

bool Notify(const uint64_t channel_id);

private:
std::mutex notifies_map_mutex_;
AtomicHashMap<uint64_t, NotifyVector> notifies_map_;

DECLARE_SINGLETON(DataNotifier)
};

inline DataNotifier::DataNotifier() {}

// Study: It is a centralized place for adding notifier, this is called by the DataVisitor
inline void DataNotifier::AddNotifier(
uint64_t channel_id, const std::shared_ptr<Notifier>& notifier) {
std::lock_guard<std::mutex> lock(notifies_map_mutex_);
NotifyVector* notifies = nullptr;
if (notifies_map_.Get(channel_id, &notifies)) {
notifies->emplace_back(notifier);
} else {
NotifyVector new_notify = {notifier};
notifies_map_.Set(channel_id, new_notify);
}
}

// Study: Call all the cb for specific topic
inline bool DataNotifier::Notify(const uint64_t channel_id) {
NotifyVector* notifies = nullptr;
if (notifies_map_.Get(channel_id, &notifies)) {
for (auto& notifier : *notifies) {
if (notifier->callback) {
notifier->callback();
}
}
return true;
}
return false;
}

} // namespace data
} // namespace cyber
} // namespace apollo

#endif // CYBER_DATA_DATA_NOTIFIER_H_

cyber/data/channel_buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_DATA_CHANNEL_BUFFER_H_
#define CYBER_DATA_CHANNEL_BUFFER_H_

#include <algorithm>
#include <functional>
#include <memory>
#include <vector>

#include "cyber/common/global_data.h"
#include "cyber/common/log.h"
#include "cyber/data/data_notifier.h"
#include "cyber/proto/component_conf.pb.h"

namespace apollo {
namespace cyber {
namespace data {

using apollo::cyber::common::GlobalData;

template <typename T>
class ChannelBuffer {
public:
using BufferType = CacheBuffer<std::shared_ptr<T>>;
// Study: It show that this is a higher level usage of CacheBuffer
ChannelBuffer(uint64_t channel_id, BufferType* buffer)
: channel_id_(channel_id), buffer_(buffer) {}

// Study: the index of index will be change, since it underlaying CacheBuffer
// is using a add-only head, tail which cannot have abstract
bool Fetch(uint64_t* index, std::shared_ptr<T>& m); // NOLINT

bool Latest(std::shared_ptr<T>& m); // NOLINT

bool FetchMulti(uint64_t fetch_size, std::vector<std::shared_ptr<T>>* vec);

uint64_t channel_id() const { return channel_id_; }
std::shared_ptr<BufferType> Buffer() const { return buffer_; }

private:
uint64_t channel_id_;
std::shared_ptr<BufferType> buffer_;
};

template <typename T>
bool ChannelBuffer<T>::Fetch(uint64_t* index,
std::shared_ptr<T>& m) { // NOLINT
std::lock_guard<std::mutex> lock(buffer_->Mutex());
if (buffer_->Empty()) {
return false;
}

// Study: The meaning of it index is not actually index
// ridiculous
if (*index == 0) {
*index = buffer_->Tail();
} else if (*index == buffer_->Tail() + 1) {
return false;
} else if (*index < buffer_->Head()) {
// Study: If the Fetch is slower than the buffer grow
auto interval = buffer_->Tail() - *index;
AWARN << "channel[" << GlobalData::GetChannelById(channel_id_) << "] "
<< "read buffer overflow, drop_message[" << interval << "] pre_index["
<< *index << "] current_index[" << buffer_->Tail() << "] ";
*index = buffer_->Tail();
}
m = buffer_->at(*index);
return true;
}

template <typename T>
bool ChannelBuffer<T>::Latest(std::shared_ptr<T>& m) { // NOLINT
std::lock_guard<std::mutex> lock(buffer_->Mutex());
if (buffer_->Empty()) {
return false;
}

// Study: Simple the last one
m = buffer_->Back();
return true;
}

// Study: Actuatlly this func is more like Latest than Fetch
// Get N Latest
template <typename T>
bool ChannelBuffer<T>::FetchMulti(uint64_t fetch_size,
std::vector<std::shared_ptr<T>>* vec) {
std::lock_guard<std::mutex> lock(buffer_->Mutex());
if (buffer_->Empty()) {
return false;
}

auto num = std::min(buffer_->Size(), fetch_size);
vec->reserve(num);
for (auto index = buffer_->Tail() - num + 1; index <= buffer_->Tail();
++index) {
vec->emplace_back(buffer_->at(index));
}
return true;
}

} // namespace data
} // namespace cyber
} // namespace apollo

#endif // CYBER_DATA_CHANNEL_BUFFER_H_

cyber/data/data_dispatcher

Just a singleton class to dispatch message to the channel buffer.

cyber/data/data_visitor_base

Nothing special. It just ensure the data visitor have 1 callback for the data channel.
And use a pointer point to the data notifier singleton

cyber/data/data_visitor

Just comment on the 4 template argument DataVisitor, since the other is just a partial version of this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_DATA_DATA_VISITOR_H_
#define CYBER_DATA_DATA_VISITOR_H_

#include <algorithm>
#include <functional>
#include <memory>
#include <vector>

#include "cyber/common/log.h"
#include "cyber/data/channel_buffer.h"
#include "cyber/data/data_dispatcher.h"
#include "cyber/data/data_visitor_base.h"
#include "cyber/data/fusion/all_latest.h"
#include "cyber/data/fusion/data_fusion.h"

namespace apollo {
namespace cyber {
namespace data {

struct VisitorConfig {
VisitorConfig(uint64_t id, uint32_t size)
: channel_id(id), queue_size(size) {}
uint64_t channel_id;
uint32_t queue_size;
};

template <typename T>
using BufferType = CacheBuffer<std::shared_ptr<T>>;

// Study: It also implem the 3 argument, 2 argument, 1 argument version
// If want to use more argument, can use the same way to extend
// Yes, this is stupid but work
// - If you want to use variadic template to fix this,
// will cause crazy source code expand, at least 4 time size
// - If move the type declare to runtime instead of compile time
// it can just use writer, reader to replace
template <typename M0, typename M1 = NullType, typename M2 = NullType,
typename M3 = NullType>
class DataVisitor : public DataVisitorBase {
public:
explicit DataVisitor(const std::vector<VisitorConfig>& configs)
: buffer_m0_(configs[0].channel_id,
new BufferType<M0>(configs[0].queue_size)),
buffer_m1_(configs[1].channel_id,
new BufferType<M1>(configs[1].queue_size)),
buffer_m2_(configs[2].channel_id,
new BufferType<M2>(configs[2].queue_size)),
buffer_m3_(configs[3].channel_id,
new BufferType<M3>(configs[3].queue_size)) {
// Study: subscribe the data for different channel
DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);
DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);
DataDispatcher<M2>::Instance()->AddBuffer(buffer_m2_);
DataDispatcher<M3>::Instance()->AddBuffer(buffer_m3_);
// Study: Using the first argument M0 as the main data
// do sync logic when first channel have data
// It mean the data input frequency in same as the frequency of M0
data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);
data_fusion_ = new fusion::AllLatest<M0, M1, M2, M3>(
buffer_m0_, buffer_m1_, buffer_m2_, buffer_m3_);
}

~DataVisitor() {
if (data_fusion_) {
delete data_fusion_;
data_fusion_ = nullptr;
}
}

bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1, // NOLINT
std::shared_ptr<M2>& m2, std::shared_ptr<M3>& m3) { // NOLINT
// Study: next_msg_index_ is the index used to fetch data in m0 channel buffer
if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2, m3)) {
next_msg_index_++;
return true;
}
return false;
}

private:
fusion::DataFusion<M0, M1, M2, M3>* data_fusion_ = nullptr;
ChannelBuffer<M0> buffer_m0_;
ChannelBuffer<M1> buffer_m1_;
ChannelBuffer<M2> buffer_m2_;
ChannelBuffer<M3> buffer_m3_;
};

} // namespace data
} // namespace cyber
} // namespace apollo

#endif // CYBER_DATA_DATA_VISITOR_H_

cyber/data/fusion/data_fusion

Just a base class. It just implemented the all_latest.
I should implement the version that include the interpolate.
It would be simple, but need to use template explicit instantiate
since different data haev different interpolate method

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_DATA_FUSION_DATA_FUSION_H_
#define CYBER_DATA_FUSION_DATA_FUSION_H_

#include <deque>
#include <memory>
#include <string>
#include <type_traits>
#include <typeinfo>
#include <vector>

#include "cyber/common/types.h"

namespace apollo {
namespace cyber {
namespace data {
namespace fusion {

template <typename M0, typename M1 = NullType, typename M2 = NullType,
typename M3 = NullType>
class DataFusion {
public:
virtual ~DataFusion() {}
virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, // NOLINT
std::shared_ptr<M1>& m1, // NOLINT
std::shared_ptr<M2>& m2, // NOLINT
std::shared_ptr<M3>& m3) = 0; // NOLINT
};

template <typename M0, typename M1, typename M2>
class DataFusion<M0, M1, M2, NullType> {
public:
virtual ~DataFusion() {}

virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, // NOLINT
std::shared_ptr<M1>& m1, // NOLINT
std::shared_ptr<M2>& m2) = 0; // NOLINT
};

template <typename M0, typename M1>
class DataFusion<M0, M1, NullType, NullType> {
public:
virtual ~DataFusion() {}

virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, // NOLINT
std::shared_ptr<M1>& m1) = 0; // NOLINT
};

} // namespace fusion
} // namespace data
} // namespace cyber
} // namespace apollo

#endif // CYBER_DATA_FUSION_DATA_FUSION_H_

cyber/data/fusion/all_latest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

template <typename M0, typename M1 = NullType, typename M2 = NullType,
typename M3 = NullType>
class AllLatest : public DataFusion<M0, M1, M2, M3> {
public:
AllLatest(const ChannelBuffer<M0>& buffer_0,
const ChannelBuffer<M1>& buffer_1,
const ChannelBuffer<M2>& buffer_2,
const ChannelBuffer<M3>& buffer_3)
: buffer_m0_(buffer_0),
buffer_m1_(buffer_1),
buffer_m2_(buffer_2),
buffer_m3_(buffer_3) {}
bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1,
std::shared_ptr<M2>& m2, std::shared_ptr<M3>& m3) override {
// Study: this would work well if the datavisior frequency is near to M0,
// Otherwise, haha
return buffer_m0_.Fetch(index, m0) && buffer_m1_.Latest(m1) &&
buffer_m2_.Latest(m2) && buffer_m3_.Latest(m3);
}

private:
ChannelBuffer<M0> buffer_m0_;
ChannelBuffer<M1> buffer_m1_;
ChannelBuffer<M2> buffer_m2_;
ChannelBuffer<M3> buffer_m3_;
};