38namespace DiscordCoreAPI {
40 namespace DiscordCoreInternal {
59 class CoRoutineThreadPool :
protected DiscordCoreAPI::UnorderedMap<uint64_t, UniquePtr<WorkerThread>> {
61 using map_type = DiscordCoreAPI::UnorderedMap<uint64_t, UniquePtr<WorkerThread>>;
70 uint64_t indexNew =
currentIndex.load(std::memory_order_acquire);
71 emplace(indexNew, std::move(workerThread));
72 getMap()[indexNew]->thread = makeUnique<ThreadWrapper>([=,
this](
StopToken stopToken) {
81 bool areWeAllBusy{
true };
82 uint64_t currentLowestValue{ std::numeric_limits<uint64_t>::max() };
83 uint64_t currentLowestIndex{ std::numeric_limits<uint64_t>::max() };
85 for (
auto& [key, value]: getMap()) {
86 if (!value->areWeCurrentlyWorking.load(std::memory_order_acquire)) {
88 if (value->tasks.size() < currentLowestValue) {
89 currentLowestValue = value->tasks.size();
90 currentLowestIndex = key;
99 uint64_t indexNew =
currentIndex.load(std::memory_order_acquire);
102 getMap().emplace(indexNew, std::move(workerThread));
103 getMap()[indexNew]->thread = makeUnique<ThreadWrapper>([=,
this](
StopToken stopToken) {
108 getMap()[currentLowestIndex]->tasks.send(std::move(coro));
123 if (!getMap().contains(index)) {
126 std::coroutine_handle<> coroHandle{};
127 if (getMap()[index]->tasks.tryReceive(coroHandle)) {
128 getMap()[index]->areWeCurrentlyWorking.store(
true, std::memory_order_release);
133 if (!contains(index)) {
136 getMap()[index]->areWeCurrentlyWorking.store(
false, std::memory_order_release);
140 while (extraWorkers > 0) {
143 auto oldThread = begin();
144 if (oldThread->second->thread->joinable()) {
145 oldThread->second->thread->requestStop();
146 oldThread->second->thread->detach();
148 getMap().erase(oldThread->first);
152 std::this_thread::sleep_for(Nanoseconds{ 100000 });
156 inline map_type& getMap() {
DiscordCoreClient - The main class for this library.
A struct representing a worker thread for coroutine-based tasks.
UniquePtr< ThreadWrapper > thread
Joinable thread.
std::atomic_bool areWeCurrentlyWorking
Atomic flag indicating if the thread is working.
UnboundedMessageBlock< std::coroutine_handle<> > tasks
Queue of coroutine tasks.
A class representing a coroutine-based thread pool.
const uint64_t threadCount
Total thread count.
std::atomic_uint64_t currentIndex
Current index of worker threads.
CoRoutineThreadPool()
Constructor to create a coroutine thread pool. Initializes the worker threads.
void submitTask(std::coroutine_handle<> coro)
Submit a coroutine task to the thread pool.
void threadFunction(StopToken stopToken, uint64_t index)
Thread function for each worker thread.
std::shared_mutex workerAccessMutex
Shared mutex for worker thread access.
std::atomic_uint64_t currentCount
Current count of worker threads.
A token used to control thread stopping.
bool stopRequested()
Check if stop has been requested.
A thread-safe messaging block for data-structures.
A smart pointer class that provides unique ownership semantics.