Apollo Cyber Study P16 Task

// Study: 是我的筆記

task/task_manager.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_TASK_TASK_MANAGER_H_
#define CYBER_TASK_TASK_MANAGER_H_

#include <atomic>
#include <future>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

#include "cyber/base/bounded_queue.h"
#include "cyber/scheduler/scheduler_factory.h"

namespace apollo {
namespace cyber {

class TaskManager {
public:
virtual ~TaskManager();

void Shutdown();

// Study: F&& = allow using std::move for the func, args&& is the same reason
// decltype === std::result_of, it mean this enqueue return type is equal to the
// return type of F(Args...) wrapped by std::future
template <typename F, typename... Args>
auto Enqueue(F&& func, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
// Study: Combine the F(Args...) in to one thing, don;t need to pass argument later
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(func), std::forward<Args>(args)...));
if (!stop_.load()) {
task_queue_->Enqueue([task]() { (*task)(); });
for (auto& task : tasks_) {
scheduler::Instance()->NotifyTask(task);
}
}

// Study: Wrap by future, provide a way to get the result after it done
std::future<return_type> res(task->get_future());
return res;
}

private:
uint32_t num_threads_ = 0;
uint32_t task_queue_size_ = 1000;
std::atomic<bool> stop_ = {false};
std::vector<uint64_t> tasks_;
std::shared_ptr<base::BoundedQueue<std::function<void()>>> task_queue_;
DECLARE_SINGLETON(TaskManager);
};

} // namespace cyber
} // namespace apollo

#endif // CYBER_TASK_TASK_MANAGER_H_

task/task_manager.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**************Scheduler::****************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Vesched_infoon 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#include "cyber/task/task_manager.h"

#include "cyber/common/global_data.h"
#include "cyber/croutine/croutine.h"
#include "cyber/croutine/routine_factory.h"
#include "cyber/scheduler/scheduler_factory.h"

namespace apollo {
namespace cyber {

using apollo::cyber::common::GlobalData;
// Study: Classifiy the internal task and the user define task
static const char* const task_prefix = "/internal/task";

TaskManager::TaskManager()
: task_queue_size_(1000),
task_queue_(new base::BoundedQueue<std::function<void()>>()) {

// Study: BlockWaitStrategy is similiar to traditional os handle
if (!task_queue_->Init(task_queue_size_, new base::BlockWaitStrategy())) {
AERROR << "Task queue init failed";
throw std::runtime_error("Task queue init failed");
}
auto func = [this]() {
while (!stop_) {
std::function<void()> task;
// Study: No Task, then make this routine to wait data
if (!task_queue_->Dequeue(&task)) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
routine->HangUp();
continue;
}
task();
}
};

auto pool_size = scheduler::Instance()->TaskPoolSize();
// Study: If not provide the data visitor to CreateRoutineFactory
// it assume the function inside can get data by themselve
auto factory = croutine::CreateRoutineFactory(std::move(func));
tasks_.reserve(pool_size);
for (uint32_t i = 0; i < pool_size; i++) {
auto task_name = task_prefix + std::to_string(i);
tasks_.push_back(common::GlobalData::RegisterTaskName(task_name));
scheduler::Instance()->CreateTask(factory, task_name);
}
}

TaskManager::~TaskManager() { Shutdown(); }

void TaskManager::Shutdown() {
if (stop_.exchange(true)) {
return;
}

for (uint32_t i = 0; i < num_threads_; i++) {
scheduler::Instance()->RemoveTask(task_prefix + std::to_string(i));
}
}

} // namespace cyber
} // namespace apollo

task/task.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/

#ifndef CYBER_TASK_TASK_H_
#define CYBER_TASK_TASK_H_

#include <future>
#include <utility>

#include "cyber/task/task_manager.h"

namespace apollo {
namespace cyber {

template <typename F, typename... Args>
static auto Async(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
// Study: A Wrapper Function only
return TaskManager::Instance()->Enqueue(std::forward<F>(f),
std::forward<Args>(args)...);
}

// Study: A Wrapper Function that support both routine and thread
static inline void Yield() {
if (croutine::CRoutine::GetCurrentRoutine()) {
croutine::CRoutine::Yield();
} else {
std::this_thread::yield();
}
}

// Study: A Wrapper Function that support both routine and thread
template <typename Rep, typename Period>
static void SleepFor(const std::chrono::duration<Rep, Period>& sleep_duration) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
if (routine == nullptr) {
std::this_thread::sleep_for(sleep_duration);
} else {
routine->Sleep(sleep_duration);
}
}

// Study: Actually same as the above function, just use different time unit
static inline void USleep(useconds_t usec) {
auto routine = croutine::CRoutine::GetCurrentRoutine();
if (routine == nullptr) {
std::this_thread::sleep_for(std::chrono::microseconds{usec});
} else {
routine->Sleep(croutine::Duration(usec));
}
}

} // namespace cyber
} // namespace apollo

#endif // CYBER_TASK_TASK_H_