36using namespace std::literals;
40 namespace discord_core_internal {
52 this->
areWeCurrentlyWorking.store(other.areWeCurrentlyWorking.load(std::memory_order_acquire), std::memory_order_release);
53 std::swap(this->
thread, other.thread);
54 tasks = std::move(other.tasks);
59 *
this = std::move(other);
72 using map_type = discord_core_api::unordered_map<uint64_t, unique_ptr<worker_thread>>;
79 uint64_t indexNew =
currentIndex.load(std::memory_order_acquire);
80 getMap().emplace(indexNew, makeUnique<worker_thread>());
81 getMap()[indexNew]->thread = std::jthread([=,
this](std::stop_token tokenNew)
mutable {
90 bool areWeAllBusy{
true };
91 uint64_t currentLowestValue{ std::numeric_limits<uint64_t>::max() };
92 uint64_t currentLowestIndex{ std::numeric_limits<uint64_t>::max() };
94 for (
auto& [key, value]: getMap()) {
95 if (!value->areWeCurrentlyWorking.load(std::memory_order_acquire)) {
97 if (value->tasks.size() < currentLowestValue) {
98 currentLowestValue = value->tasks.size();
99 currentLowestIndex = key;
107 uint64_t indexNew =
currentIndex.load(std::memory_order_acquire);
110 getMap().emplace(indexNew, makeUnique<worker_thread>());
111 getMap()[indexNew]->thread = std::jthread([=,
this](std::stop_token tokenNew)
mutable {
116 getMap()[currentLowestIndex]->tasks.send(std::move(coro));
121 doWeQuit.store(
true, std::memory_order_release);
135 while (!
doWeQuit.load(std::memory_order_acquire) && !tokenNew.stop_requested()) {
136 std::coroutine_handle<> coroHandle{};
137 if (thread->tasks.tryReceive(coroHandle)) {
138 thread->areWeCurrentlyWorking.store(
true, std::memory_order_release);
141 while (!coroHandle.done()) {
142 std::this_thread::sleep_for(1ms);
144 }
catch (
const std::runtime_error& error) {
145 message_printer::printError<print_message_type::general>(error.what());
147 thread->areWeCurrentlyWorking.store(
false, std::memory_order_release);
151 while (extraWorkers > 0) {
153 size_type currentHighestIndex{};
154 for (
const auto& [key, value]: *
this) {
155 if (key > currentHighestIndex) {
156 currentHighestIndex = key;
160 auto oldThread = begin() + currentHighestIndex;
161 if (oldThread->second->thread.joinable()) {
162 oldThread->second->thread.request_stop();
163 oldThread->second->thread.detach();
165 getMap().erase(oldThread->first);
169 std::this_thread::sleep_for(std::chrono::nanoseconds{ 100000 });
173 inline map_type& getMap() {
A class representing a coroutine-based thread pool.
std::shared_mutex workerAccessMutex
Shared mutex for worker thread access.
void threadFunction(worker_thread *thread, std::stop_token tokenNew)
Thread function for each worker thread.
std::atomic_bool doWeQuit
Whether or not we're quitting.
co_routine_thread_pool()
Constructor to create a coroutine thread pool. initializes the worker threads.
std::atomic_uint64_t currentIndex
current index of worker threads.
std::atomic_uint64_t currentCount
current count of worker threads.
const uint64_t threadCount
Total thread count.
void submitTask(std::coroutine_handle<> coro)
Submit a coroutine task to the thread pool.
A thread-safe messaging block for data-structures.
The main namespace for the forward-facing interfaces.
A struct representing a worker thread for coroutine-based tasks.
std::jthread thread
Joinable thread.
unbounded_message_block< std::coroutine_handle<> > tasks
Queue of coroutine tasks.
std::atomic_bool areWeCurrentlyWorking
Atomic flag indicating if the thread is working.