DiscordCoreAPI
A Discord bot library written in C++, with custom asynchronous coroutines.
Loading...
Searching...
No Matches
VoiceConnection.cpp
Go to the documentation of this file.
1/*
2 MIT License
3
4 DiscordCoreAPI, A bot library for Discord, written in C++, and featuring explicit multithreading through the usage of custom, asynchronous C++ CoRoutines.
5
6 Copyright 2022, 2023 Chris M. (RealTimeChris)
7
8 Permission is hereby granted, free of charge, to any person obtaining a copy
9 of this software and associated documentation files (the "Software"), to deal
10 in the Software without restriction, including without limitation the rights
11 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 copies of the Software, and to permit persons to whom the Software is
13 furnished to do so, subject to the following conditions:
14
15 The above copyright notice and this permission notice shall be included in all
16 copies or substantial portions of the Software.
17
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 SOFTWARE.
25*/
26/// VoiceConnection.cpp - Source file for the voice connection class.
27/// Jul 15, 2021
28/// https://discordcoreapi.com
29/// \file VoiceConnection.cpp
30
35
36namespace jsonifier {
37
38 template<> struct core<discord_core_api::voice_session_description_data> {
39 using value_type = discord_core_api::voice_session_description_data;
40 static constexpr auto parseValue = createValue("secret_key", &value_type::secretKey);
41 };
42
43 template<> struct core<discord_core_api::discord_core_internal::websocket_message_data<discord_core_api::speaking_data>> {
44 using value_type = discord_core_api::discord_core_internal::websocket_message_data<discord_core_api::speaking_data>;
45 static constexpr auto parseValue = createValue("d", &value_type::d);
46 };
47
48 template<> struct core<discord_core_api::speaking_data> {
49 using value_type = discord_core_api::speaking_data;
50 static constexpr auto parseValue = createValue("ssrc", &value_type::ssrc, "user_id", &value_type::userId);
51 };
52
53 template<> struct core<discord_core_api::voice_connection_hello_data> {
54 using value_type = discord_core_api::voice_connection_hello_data;
55 static constexpr auto parseValue = createValue("heartbeat_interval", &value_type::heartBeatInterval);
56 };
57
58 template<> struct core<discord_core_api::voice_user_disconnect_data> {
59 using value_type = discord_core_api::voice_user_disconnect_data;
60 static constexpr auto parseValue = createValue("user_id", &value_type::userId);
61 };
62
63 template<> struct core<discord_core_api::voice_socket_ready_data> {
64 using value_type = discord_core_api::voice_socket_ready_data;
65 static constexpr auto parseValue = createValue("modes", &value_type::modes, "ip", &value_type::ip, "port", &value_type::port, "ssrc", &value_type::ssrc);
66 };
67
68}
69
70namespace discord_core_api {
71
72 static DCA_INLINE thread_local discord_core_internal::audio_mixer audioMixer{};
73
74 voice_user::voice_user(snowflake userIdNew) {
75 userId = userIdNew;
76 }
77
78 voice_user& voice_user::operator=(voice_user&& other) noexcept {
79 payloads = std::move(other.payloads);
80 decoder = std::move(other.decoder);
81 userId = other.userId;
82 return *this;
83 }
84
85 discord_core_internal::opus_decoder_wrapper& voice_user::getDecoder() {
86 return decoder;
87 }
88
89 void voice_user::insertPayload(jsonifier::string_view_base<uint8_t> data) {
90 payloads.writeData(data.data(), data.size());
91 }
92
93 jsonifier::string_view_base<uint8_t> voice_user::extractPayload() {
94 return payloads.readData();
95 }
96
97 snowflake voice_user::getUserId() {
98 return userId;
99 }
100
101 rtp_packet_encrypter::rtp_packet_encrypter(uint32_t ssrcNew, const jsonifier::string_base<uint8_t>& keysNew) {
102 keys = keysNew;
103 ssrc = ssrcNew;
104 }
105
106 jsonifier::string_view_base<uint8_t> rtp_packet_encrypter::encryptPacket(discord_core_internal::encoder_return_data& audioData) {
107 if (keys.size() > 0) {
108 ++sequence;
109 timeStamp += static_cast<uint32_t>(audioData.sampleCount);
110 static constexpr uint8_t headerSize{ 12 };
111 static constexpr uint8_t version{ 0x80 };
112 static constexpr uint8_t flags{ 0x78 };
113 uint8_t header[headerSize]{};
114 discord_core_internal::storeBits(header, version);
115 discord_core_internal::storeBits(header + 1, flags);
116 discord_core_internal::storeBits(header + 2, sequence);
117 discord_core_internal::storeBits(header + 4, timeStamp);
118 discord_core_internal::storeBits(header + 8, ssrc);
119 uint8_t nonceForLibSodium[crypto_secretbox_NONCEBYTES]{};
120 for (int8_t x = 0; x < headerSize; ++x) {
121 nonceForLibSodium[x] = header[x];
122 }
123 const uint64_t numOfBytes{ headerSize + audioData.data.size() + crypto_secretbox_MACBYTES };
124 if (data.size() < numOfBytes) {
125 data.resize(numOfBytes);
126 }
127 for (uint8_t x = 0; x < headerSize; ++x) {
128 data.at(x) = static_cast<uint8_t>(header[x]);
129 }
130 if (crypto_secretbox_easy(data.data() + headerSize, audioData.data.data(), audioData.data.size(), nonceForLibSodium, keys.data()) != 0) {
131 return {};
132 }
133 return jsonifier::string_view_base<uint8_t>{ data.data(), numOfBytes };
134 }
135 return {};
136 }
137
138 moving_averager::moving_averager(uint64_t collectionCountNew) {
139 collectionCount = collectionCountNew;
140 }
141
142 moving_averager moving_averager::operator+=(int64_t value) {
143 values.emplace_front(value);
144 if (values.size() >= collectionCount) {
145 values.pop_back();
146 }
147 return *this;
148 }
149
150 moving_averager::operator float() {
151 float returnData{};
152 if (values.size() > 0) {
153 for (auto& value: values) {
154 returnData += static_cast<float>(value);
155 }
156 return returnData / static_cast<float>(values.size());
157 } else {
158 return 0.0f;
159 }
160 }
161
162 voice_connection_bridge::voice_connection_bridge(unordered_map<uint64_t, unique_ptr<voice_user>>* voiceUsersPtrNew, jsonifier::string_base<uint8_t>& encryptionKeyNew,
163 stream_type streamType, const jsonifier::string& baseUrlNew, const uint16_t portNew, snowflake guildIdNew,
164 std::coroutine_handle<discord_core_api::co_routine<void, false>::promise_type>* tokenNew)
165 : udp_connection{ baseUrlNew, portNew, streamType, tokenNew } {
166 voiceUsersPtr = voiceUsersPtrNew;
167 encryptionKey = encryptionKeyNew;
168 guildId = guildIdNew;
169 token = tokenNew;
170 }
171
172 DCA_INLINE void voice_connection_bridge::applyGainRamp(int64_t sampleCount) {
173 increment = (endGain - currentGain) / static_cast<float>(sampleCount);
174 for (int64_t x = 0; x < sampleCount / audioMixer.byteBlocksPerRegister; ++x) {
175 audioMixer.collectSingleRegister(upSampledVector.data() + (x * audioMixer.byteBlocksPerRegister), downSampledVector.data() + (x * audioMixer.byteBlocksPerRegister),
176 currentGain, increment);
177 currentGain += increment * static_cast<float>(audioMixer.byteBlocksPerRegister);
178 }
179 }
180
181 bool compareUint8Strings(jsonifier::string_view_base<uint8_t> stringToCheck, const char* wordToCheck) {
182 jsonifier::string newString{};
183 auto stringLength = std::char_traits<char>::length(wordToCheck);
184 if (stringLength == stringToCheck.size()) {
185 newString.resize(stringToCheck.size());
186 std::memcpy(newString.data(), stringToCheck.data(), stringLength);
187 return wordToCheck == newString;
188 }
189 return false;
190 }
191
192 void voice_connection_bridge::parseOutgoingVoiceData() {
193 jsonifier::string_view_base<uint8_t> buffer = getInputBuffer();
194 if (compareUint8Strings(buffer, "goodbye") || compareUint8Strings(buffer, "connected1") || compareUint8Strings(buffer, "connecting")) {
195 return;
196 }
197 if (buffer.size() > 0) {
198 audio_frame_data frame{};
199 frame += buffer;
200 frame.type = audio_frame_type::raw_pcm;
201 discord_core_client::getInstance()->getSongAPI(guildId).audioDataBuffer.send(std::move(frame));
202 }
203 }
204
205 void voice_connection_bridge::disconnect() {
206 if (streamType != stream_type::none) {
207 outputBuffer.clear();
208 static constexpr char newStringArray[]{ "goodbye" };
209 jsonifier::string_base<uint8_t> newString{};
210 static constexpr auto stringSize = std::size(newStringArray);
211 newString.resize(stringSize - 1);
212 std::memcpy(newString.data(), newStringArray, stringSize - 1);
213 writeData(newString);
214 try {
215 processIO();
216 } catch (const dca_exception& error) {
217 message_printer::printError<print_message_type::websocket>(error.what());
218 }
219 }
220 udp_connection::disconnect();
221 }
222
223 void voice_connection_bridge::handleAudioBuffer() {
224 parseOutgoingVoiceData();
225 }
226
227 void voice_connection_bridge::mixAudio() {
228 opus_int32 voiceUserCountReal{};
229 int64_t decodedSize{};
230 std::fill(upSampledVector.data(), upSampledVector.data() + upSampledVector.size(), 0);
231 for (auto& [key, value]: *voiceUsersPtr) {
232 jsonifier::string_view_base<uint8_t> payload{ value->extractPayload() };
233 if (payload.size() <= 44) {
234 continue;
235 } else {
236 static constexpr uint64_t headerSize{ 12 };
237 const uint64_t csrcCount{ static_cast<uint64_t>(payload.at(0)) & 0b0000'1111 };
238 const uint64_t offsetToData{ headerSize + sizeof(uint32_t) * csrcCount };
239 const uint64_t encryptedDataLength{ payload.size() - offsetToData };
240
241 if (decryptedDataString.size() < encryptedDataLength) {
242 decryptedDataString.resize(encryptedDataLength);
243 }
244
245 uint8_t nonce[24]{};
246 for (uint64_t x = 0; x < headerSize; ++x) {
247 nonce[x] = payload[x];
248 }
249
250 if (crypto_secretbox_open_easy(decryptedDataString.data(), payload.data() + offsetToData, encryptedDataLength, nonce, encryptionKey.data())) {
251 continue;
252 }
253
254 jsonifier::string_view_base newString{ decryptedDataString.data(), encryptedDataLength - crypto_secretbox_MACBYTES };
255
256 if (static_cast<int8_t>(payload[0] >> 4) & 0b0001) {
257 uint16_t extenstionLengthInWords{};
258 std::memcpy(&extenstionLengthInWords, newString.data() + 2, sizeof(int16_t));
259 extenstionLengthInWords = ntohs(extenstionLengthInWords);
260 const uint64_t extensionLength{ sizeof(uint32_t) * extenstionLengthInWords };
261 const uint64_t extensionHeaderLength{ sizeof(uint16_t) * 2 };
262 newString = newString.substr(extensionHeaderLength + extensionLength);
263 }
264
265 if (newString.size() > 44) {
266 jsonifier::string_view_base<opus_int16> decodedData{};
267 try {
268 decodedData = value->getDecoder().decodeData(newString);
269 } catch (const dca_exception& error) {
270 message_printer::printError<print_message_type::websocket>(error.what());
271 }
272 if (decodedData.size() > 0) {
273 decodedSize = static_cast<int64_t>(std::max(static_cast<uint64_t>(decodedSize), decodedData.size()));
274 ++voiceUserCountReal;
275 auto newPtr = decodedData.data();
276 auto newerPtr = upSampledVector.data();
277 for (uint64_t x = 0; x < decodedData.size() / audioMixer.byteBlocksPerRegister;
278 ++x, newPtr += audioMixer.byteBlocksPerRegister, newerPtr += audioMixer.byteBlocksPerRegister) {
279 audioMixer.combineSamples(newPtr, newerPtr);
280 }
281 }
282 }
283 }
284 }
285 if (decodedSize > 0) {
286 voiceUserCountAverage += voiceUserCountReal;
287 endGain = 1.0f / voiceUserCountAverage;
288 applyGainRamp(decodedSize);
289 if (resampleVector.size() < static_cast<uint64_t>(decodedSize) * 2) {
290 resampleVector.resize(static_cast<uint64_t>(decodedSize) * 2);
291 }
292 std::memcpy(resampleVector.data(), downSampledVector.data(), static_cast<uint64_t>(decodedSize) * 2);
293 writeData(jsonifier::string_view_base<uint8_t>{ resampleVector.data(), static_cast<uint64_t>(decodedSize * 2) });
294 currentGain = endGain;
295 }
296 }
297
298 voice_udpconnection::voice_udpconnection(const jsonifier::string& baseUrlNew, uint16_t portNew, stream_type streamType, voice_connection* ptrNew,
299 std::coroutine_handle<discord_core_api::co_routine<void, false>::promise_type>* tokenNew)
300 : udp_connection{ baseUrlNew, portNew, streamType, tokenNew } {
301 voiceConnection = ptrNew;
302 };
303
304 void voice_udpconnection::disconnect() {
305 if (streamType != stream_type::none) {
306 outputBuffer.clear();
307 static constexpr char newStringArray[] = { "goodbye" };
308 jsonifier::string_base<uint8_t> newString{};
309 auto stringSize = std::size(newStringArray);
310 newString.resize(stringSize - 1);
311 std::memcpy(newString.data(), newStringArray, stringSize - 1);
312 writeData(newString);
313 try {
314 processIO();
315 } catch (const dca_exception& error) {
316 message_printer::printError<print_message_type::websocket>(error.what());
317 }
318 }
319 udp_connection::disconnect();
320 }
321
323 : websocket_core(&discord_core_client::getInstance()->configManager, discord_core_internal::websocket_type::voice) {
324 dataOpCode = discord_core_internal::websocket_op_code::Op_Text;
325 msPerPacket = 20;
326 samplesPerPacket = sampleRatePerSecond / 1000 * msPerPacket;
327 configManager = &discord_core_client::getInstance()->configManager;
328 baseShard = baseShardNew;
329 doWeQuit = doWeQuitNew;
330 }
331
333 return voiceConnectInitData.channelId;
334 }
335
336 void voice_connection::parseIncomingVoiceData(jsonifier::string_view_base<uint8_t> rawDataBufferNew) {
337 if ((72 <= (static_cast<int8_t>(rawDataBufferNew.at(1)) & 0b0111'1111) && ((static_cast<int8_t>(rawDataBufferNew.at(1)) & 0b0111'1111) <= 76)) ||
338 rawDataBufferNew.size() <= 44) {
339 return;
340 }
341 uint32_t speakerSsrc{};
342 std::memcpy(&speakerSsrc, rawDataBufferNew.data() + 8, sizeof(uint32_t));
343 speakerSsrc = ntohl(speakerSsrc);
344 if (!voiceUsers.contains(speakerSsrc)) {
345 voiceUsers.emplace(speakerSsrc, makeUnique<voice_user>());
346 }
347 voiceUsers[speakerSsrc]->insertPayload(rawDataBufferNew);
348 }
349
351 voiceConnectInitData = initData;
352 prevActiveState.store(voice_active_state::stopped, std::memory_order_release);
353 activeState.store(voice_active_state::connecting, std::memory_order_release);
354 if (taskThread.getStatus() != co_routine_status::running) {
355 taskThread = runVoice();
356 } else {
357 activeState.store(voice_active_state::connecting, std::memory_order_release);
358 }
359 }
360
361 unbounded_message_block<audio_frame_data>& voice_connection::getAudioBuffer() {
362 return discord_core_client::getInstance()->getSongAPI(voiceConnectInitData.guildId).audioDataBuffer;
363 }
364
365 void voice_connection::checkForAndSendHeartBeat(const bool isImmedate) {
366 if (heartBeatStopWatch.hasTimeElapsed() || isImmedate) {
367 discord_core_internal::websocket_message_data<uint32_t> message{};
368 message.jsonifierExcludedKeys.emplace("T");
369 message.jsonifierExcludedKeys.emplace("s");
370 jsonifier::string_base<uint8_t> string{};
371 message.d = static_cast<uint32_t>(std::chrono::duration_cast<nanoseconds>(sys_clock::now().time_since_epoch()).count());
372 message.op = 3;
373 parser.serializeJson(message, string);
374 createHeader(string, dataOpCode);
375 if (!sendMessage(string, true)) {
376 onClosed();
377 return;
378 }
379 if (activeState.load(std::memory_order_acquire) == voice_active_state::paused || activeState.load(std::memory_order_acquire) == voice_active_state::stopped) {
380 sendSilence();
381 }
382 haveWeReceivedHeartbeatAck = false;
383 heartBeatStopWatch.reset();
384 }
385 }
386
387 void voice_connection::sendSpeakingMessage(const bool isSpeaking) {
388 discord_core_internal::websocket_message_data<discord_core_internal::send_speaking_data> data{};
389 data.jsonifierExcludedKeys.emplace("T");
390 data.jsonifierExcludedKeys.emplace("s");
391 if (!isSpeaking) {
392 data.d.type = discord_core_internal::send_speaking_type::None;
393 sendSilence();
394 if (udpConnection.processIO() != discord_core_internal::connection_status::NO_Error) {
395 onClosed();
396 return;
397 }
398 } else {
399 data.d.type = discord_core_internal::send_speaking_type::Microphone;
400 }
401 data.d.delay = 0;
402 data.d.ssrc = audioSSRC;
403 data.op = 5;
404 jsonifier::string_base<uint8_t> string{};
405 parser.serializeJson(data, string);
406 createHeader(string, dataOpCode);
407 sendMessage(string, true);
408 }
409
410 bool voice_connection::onMessageReceived(jsonifier::string_view_base<uint8_t> data) {
411 discord_core_internal::websocket_message message{};
412 message_printer::printSuccess<print_message_type::websocket>("message received from voice websocket: " + jsonifier::string{ data });
413 parser.parseJson(message, data);
414 switch (static_cast<voice_socket_op_codes>(message.op)) {
416 discord_core_internal::websocket_message_data<voice_socket_ready_data> dataNew{};
417 parser.parseJson(dataNew, data);
418 audioSSRC = dataNew.d.ssrc;
419 voiceIp = dataNew.d.ip;
420 port = dataNew.d.port;
421 for (auto& value: dataNew.d.modes) {
422 if (value == "xsalsa20_poly1305") {
423 audioEncryptionMode = value;
424 }
425 }
426 connectionState.store(voice_connection_state::Initializing_DatagramSocket, std::memory_order_release);
427 break;
428 }
430 discord_core_internal::websocket_message_data<voice_session_description_data> dataNew{};
431 encryptionKey.clear();
432 parser.parseJson(dataNew, data);
433 for (auto& value: dataNew.d.secretKey) {
434 encryptionKey.emplace_back(static_cast<uint8_t>(value));
435 }
436 packetEncrypter = rtp_packet_encrypter{ audioSSRC, encryptionKey };
437 connectionState.store(voice_connection_state::Collecting_Init_Data, std::memory_order_release);
438 break;
439 }
441 discord_core_internal::websocket_message_data<speaking_data> dataNew{};
442 parser.parseJson(dataNew, data);
443 const uint32_t ssrc = dataNew.d.ssrc;
444 auto userId = dataNew.d.userId;
445 unique_ptr<voice_user> user{ makeUnique<voice_user>(userId) };
446 if (voiceConnectInitData.streamInfo.type != stream_type::none &&
447 (voiceConnectInitData.streamInfo.streamBotAudio || !users::getCachedUser({ .userId = user->getUserId() }).getFlagValue(user_flags::Bot))) {
448 if (!voiceUsers.contains(ssrc)) {
449 voiceUsers.emplace(ssrc, std::move(user));
450 }
451 }
452 break;
453 }
455 haveWeReceivedHeartbeatAck = true;
456 break;
457 }
459 discord_core_internal::websocket_message_data<voice_connection_hello_data> dataNew{};
460 parser.parseJson(dataNew, data);
461 heartBeatStopWatch = stop_watch<milliseconds>{ dataNew.d.heartBeatInterval };
462 heartBeatStopWatch.reset();
463 areWeHeartBeating = true;
464 connectionState.store(voice_connection_state::Sending_Identify, std::memory_order_release);
465 currentState.store(discord_core_internal::websocket_state::authenticated, std::memory_order_release);
466 haveWeReceivedHeartbeatAck = true;
467 break;
468 }
470 connectionState.store(voice_connection_state::Initializing_DatagramSocket, std::memory_order_release);
471 break;
472 }
474 discord_core_internal::websocket_message_data<voice_user_disconnect_data> dataNew{};
475 parser.parseJson(dataNew, data);
476 const auto userId = dataNew.d.userId;
477 for (auto& [key, value]: voiceUsers) {
478 if (userId == value->getUserId()) {
479 voiceUsers.erase(key);
480 break;
481 }
482 }
483 break;
484 }
486 [[fallthrough]];
487 }
489 [[fallthrough]];
490 }
492 [[fallthrough]];
493 }
495 break;
496 }
497 }
498 return true;
499 }
500
501 void voice_connection::connectInternal() {
502 stop_watch<milliseconds> stopWatch{ 10000ms };
503 stopWatch.reset();
504 if (currentReconnectTries >= maxReconnectTries) {
505 doWeQuit->store(true, std::memory_order_release);
506 return;
507 }
508 if (streamSocket) {
509 streamSocket->inputBuffer.clear();
510 streamSocket->outputBuffer.clear();
511 }
512 areWeHeartBeating = false;
513 switch (connectionState.load(std::memory_order_acquire)) {
515 baseShard->voiceConnectionDataBufferMap[voiceConnectInitData.guildId.operator const uint64_t&()] = &voiceConnectionDataBuffer;
516 baseShard->voiceConnectionDataBufferMap[voiceConnectInitData.guildId.operator const uint64_t&()]->clearContents();
517 baseShard->getVoiceConnectionData(voiceConnectInitData);
518
519 if (waitForTimeToPass(voiceConnectionDataBuffer, voiceConnectionData, 10000)) {
520 onClosed();
521 return;
522 }
523 baseUrl = voiceConnectionData.endPoint.substr(0, voiceConnectionData.endPoint.find(":"));
524 connectionState.store(voice_connection_state::Initializing_WebSocket, std::memory_order_release);
525 connectInternal();
526 break;
527 }
529 currentState.store(discord_core_internal::websocket_state::upgrading, std::memory_order_release);
530 if (!websocket_core::connect(baseUrl, "/?v=4", 443)) {
531 onClosed();
532 return;
533 }
534 shard.at(0) = 0;
535 shard.at(1) = 1;
536 while (currentState.load(std::memory_order_acquire) != discord_core_internal::websocket_state::Collecting_Hello && !token.promise().stopRequested()) {
537 if (websocket_core::tcpConnection.processIO(10) != discord_core_internal::connection_status::NO_Error) {
538 onClosed();
539 return;
540 }
541 }
542 connectionState.store(voice_connection_state::Collecting_Hello, std::memory_order_release);
543 connectInternal();
544 break;
545 }
547 stopWatch.reset();
548 while (connectionState.load(std::memory_order_acquire) != voice_connection_state::Sending_Identify && !token.promise().stopRequested()) {
549 if (stopWatch.hasTimeElapsed()) {
550 onClosed();
551 return;
552 }
553 if (websocket_core::tcpConnection.processIO(10) != discord_core_internal::connection_status::NO_Error) {
554 onClosed();
555 return;
556 }
557 std::this_thread::sleep_for(1ms);
558 }
559 currentReconnectTries = 0;
560 connectInternal();
561 break;
562 }
564 haveWeReceivedHeartbeatAck = true;
565 discord_core_internal::websocket_message_data<discord_core_internal::voice_identify_data> data{};
566 data.jsonifierExcludedKeys.emplace("T");
567 data.jsonifierExcludedKeys.emplace("s");
568 data.d.serverId = voiceConnectInitData.guildId.operator jsonifier::string();
569 data.d.sessionId = voiceConnectionData.sessionId;
570 data.d.token = voiceConnectionData.token;
571 data.d.userId = voiceConnectInitData.userId;
572 data.op = 0;
573 data.s = 0;
574 jsonifier::string_base<uint8_t> string{};
575 parser.serializeJson(data, string);
576 createHeader(string, dataOpCode);
577 if (!websocket_core::sendMessage(string, true)) {
578 onClosed();
579 return;
580 }
581 connectionState.store(voice_connection_state::Collecting_Ready, std::memory_order_release);
582 connectInternal();
583 break;
584 }
586 stopWatch.reset();
587 while (connectionState.load(std::memory_order_acquire) != voice_connection_state::Initializing_DatagramSocket && !token.promise().stopRequested()) {
588 if (stopWatch.hasTimeElapsed()) {
589 onClosed();
590 return;
591 }
592 if (websocket_core::tcpConnection.processIO(100) != discord_core_internal::connection_status::NO_Error) {
593 onClosed();
594 return;
595 }
596 std::this_thread::sleep_for(1ms);
597 }
598 connectInternal();
599 break;
600 }
602 if (!voiceConnect()) {
603 onClosed();
604 return;
605 }
606 connectionState.store(voice_connection_state::Sending_Select_Protocol, std::memory_order_release);
607 connectInternal();
608 break;
609 }
611 discord_core_internal::websocket_message_data<discord_core_internal::voice_socket_protocol_payload_data> data{};
612 data.jsonifierExcludedKeys.emplace("T");
613 data.jsonifierExcludedKeys.emplace("s");
614 data.d.data.mode = audioEncryptionMode;
615 data.d.data.address = externalIp;
616 data.d.data.port = port;
617 data.op = 1;
618 jsonifier::string_base<uint8_t> string{};
619 parser.serializeJson(data, string);
620 createHeader(string, dataOpCode);
621 if (!websocket_core::sendMessage(string, true)) {
622 ;
623 onClosed();
624 return;
625 }
626 connectionState.store(voice_connection_state::Collecting_Session_Description, std::memory_order_release);
627 connectInternal();
628 break;
629 }
631 stopWatch.reset();
632 while (connectionState.load(std::memory_order_acquire) != voice_connection_state::Collecting_Init_Data && !token.promise().stopRequested()) {
633 if (stopWatch.hasTimeElapsed()) {
634 onClosed();
635 return;
636 }
637 if (websocket_core::tcpConnection.processIO(10) != discord_core_internal::connection_status::NO_Error) {
638 onClosed();
639 return;
640 }
641 std::this_thread::sleep_for(1ms);
642 }
643 baseShard->voiceConnectionDataBufferMap[voiceConnectInitData.guildId.operator const uint64_t&()]->clearContents();
644 connectionState.store(voice_connection_state::Collecting_Init_Data, std::memory_order_release);
645 activeState.store(prevActiveState.load(std::memory_order_acquire), std::memory_order_release);
646 if (voiceConnectInitData.streamInfo.type != stream_type::none) {
647 streamSocket = makeUnique<voice_connection_bridge>(&voiceUsers, encryptionKey, voiceConnectInitData.streamInfo.type, voiceConnectInitData.streamInfo.address,
648 voiceConnectInitData.streamInfo.port, voiceConnectInitData.guildId, &token);
649 if (streamSocket->currentStatus != discord_core_internal::connection_status::NO_Error) {
650 onClosed();
651 return;
652 }
653 play();
654 }
655 return;
656 }
657 }
658 }
659
660 co_routine<void, false> voice_connection::runVoice() {
661 token = co_await newThreadAwaitable<void, false>();
662 stop_watch<milliseconds> stopWatch{ 20000ms };
663 stopWatch.reset();
664 stop_watch<milliseconds> sendSilenceStopWatch{ 5000ms };
665 sendSilenceStopWatch.reset();
666 while (!token.promise().stopRequested() && !doWeQuit->load(std::memory_order_acquire) && activeState.load(std::memory_order_acquire) != voice_active_state::exiting) {
667 try {
668 switch (activeState.load(std::memory_order_acquire)) {
670 connectInternal();
671 sendSpeakingMessage(false);
672 break;
673 }
675 sendSpeakingMessage(false);
676 xferAudioData.clearData();
677 while (!token.promise().stopRequested() && activeState.load(std::memory_order_acquire) == voice_active_state::stopped) {
678 if (udpConnection.processIO() != discord_core_internal::connection_status::NO_Error) {
679 onClosed();
680 }
681 std::this_thread::sleep_for(1ms);
682 if (!token.promise().stopRequested() && voice_connection::areWeConnected()) {
683 if (websocket_core::tcpConnection.processIO(10) != discord_core_internal::connection_status::NO_Error) {
684 onClosed();
685 } else if (!websocket_core::areWeConnected()) {
686 onClosed();
687 }
688 } else {
689 onClosed();
690 }
691 if (!token.promise().stopRequested() && voice_connection::areWeConnected()) {
692 checkForAndSendHeartBeat(false);
693 }
694 }
695 break;
696 }
698 sendSpeakingMessage(false);
699 while (!token.promise().stopRequested() && activeState.load(std::memory_order_acquire) == voice_active_state::paused) {
700 if (udpConnection.processIO() != discord_core_internal::connection_status::NO_Error) {
701 onClosed();
702 }
703 std::this_thread::sleep_for(1ms);
704 if (!token.promise().stopRequested() && voice_connection::areWeConnected()) {
705 if (websocket_core::tcpConnection.processIO(10) != discord_core_internal::connection_status::NO_Error) {
706 onClosed();
707 } else if (!websocket_core::areWeConnected()) {
708 onClosed();
709 }
710 } else {
711 onClosed();
712 }
713 if (!token.promise().stopRequested() && voice_connection::areWeConnected()) {
714 checkForAndSendHeartBeat(false);
715 }
716 }
717 break;
718 }
720 sendSpeakingMessage(false);
721 sendSpeakingMessage(true);
722 sendSilence();
723 xferAudioData.clearData();
724
725 auto targetTime{ sys_clock::now() + intervalCount };
726
727 while (!token.promise().stopRequested() && activeState.load(std::memory_order_acquire) == voice_active_state::playing) {
728 int64_t bytesPerSample{ 4 };
729 if (!token.promise().stopRequested() && voice_connection::areWeConnected()) {
730 checkForAndSendHeartBeat(false);
731 }
732 discord_core_client::getInstance()->getSongAPI(voiceConnectInitData.guildId).audioDataBuffer.tryReceive(xferAudioData);
733 if ((doWeSkip.load(std::memory_order_acquire) && xferAudioData.currentSize == 0)) {
734 skipInternal();
735 }
736
737 audio_frame_type frameType{ xferAudioData.type };
738 uint64_t frameSize{};
739 if (xferAudioData.currentSize <= 0) {
740 xferAudioData.clearData();
741 } else if (xferAudioData.type == audio_frame_type::raw_pcm) {
742 intervalCount = nanoseconds{ static_cast<uint64_t>(static_cast<double>(xferAudioData.currentSize / bytesPerSample) /
743 static_cast<double>(sampleRatePerSecond) * static_cast<double>(nsPerSecond)) };
744 uint64_t framesPerSecond = 1000 / static_cast<uint64_t>(msPerPacket);
745 frameSize = std::min(bytesPerSample * sampleRatePerSecond / framesPerSecond, xferAudioData.data.size());
746 } else {
747 intervalCount = nanoseconds{ 20000000 };
748 }
749 jsonifier::string_view_base<uint8_t> frame{};
750 switch (frameType) {
752 auto encodedFrameData = encoder.encodeData(jsonifier::string_view_base<uint8_t>(xferAudioData.data.data(), frameSize));
753 xferAudioData.clearData();
754 if (encodedFrameData.data.size() != 0) {
755 frame = packetEncrypter.encryptPacket(encodedFrameData);
756 }
757 break;
758 }
760 try {
761 discord_core_internal::encoder_return_data returnData{};
762 returnData.data = { xferAudioData.data.data(), static_cast<uint64_t>(xferAudioData.currentSize) };
763 returnData.sampleCount = 960;
764 if (returnData.data.size() != 0) {
765 frame = packetEncrypter.encryptPacket(returnData);
766 xferAudioData.clearData();
767 }
768 } catch (const dca_exception& error) {
770 break;
771 }
772 break;
773 }
775 xferAudioData.clearData();
776 break;
777 }
778 }
779 auto waitTime = targetTime - sys_clock::now();
780 auto waitTimeCount = waitTime.count();
781 int64_t minimumFreeTimeForCheckingProcessIO{ static_cast<int64_t>(static_cast<double>(intervalCount.count()) * 0.60l) };
782 if (voice_connection::areWeConnected()) {
783 if (waitTimeCount >= minimumFreeTimeForCheckingProcessIO && !token.promise().stopRequested()) {
784 if (websocket_core::tcpConnection.processIO(0) != discord_core_internal::connection_status::NO_Error) {
785 onClosed();
786 }
787 }
788
789 } else {
790 onClosed();
791 }
792
793 waitTime = targetTime - sys_clock::now();
794 waitTimeCount = static_cast<int64_t>(static_cast<double>(waitTime.count()) * 0.95l);
795 if (waitTimeCount > 0) {
796 nanoSleep(waitTimeCount);
797 }
798 waitTime = targetTime - sys_clock::now();
799 waitTimeCount = waitTime.count();
800 if (waitTimeCount > 0 && waitTimeCount < intervalCount.count()) {
801 spinLock(static_cast<uint64_t>(waitTimeCount));
802 }
803 if (udpConnection.areWeStillConnected()) {
804 udpConnection.writeData(frame);
805 if (udpConnection.processIO() != discord_core_internal::connection_status::NO_Error) {
806 onClosed();
807 }
808 } else {
809 xferAudioData.clearData();
810 onClosed();
811 }
812
813 targetTime = sys_clock::now() + intervalCount;
814
815 if (streamSocket) {
816 streamSocket->mixAudio();
817 if (streamSocket->areWeStillConnected()) {
818 if (streamSocket->processIO() != discord_core_internal::connection_status::NO_Error) {
819 std::this_thread::sleep_for(5s);
820 onClosed();
821 }
822 } else {
823 std::this_thread::sleep_for(5s);
824 onClosed();
825 }
826 }
827 }
828 break;
829 }
830 case voice_active_state::exiting: {
831 co_return;
832 }
833 }
834 if (token.promise().stopRequested() || activeState == voice_active_state::exiting) {
835 co_return;
836 }
837 std::this_thread::sleep_for(1ms);
838 } catch (const dca_exception& error) {
840 }
841 }
842 };
843
844 void voice_connection::skipInternal(uint32_t currentRecursionDepth) {
845 if (currentRecursionDepth >= 10) {
846 stop();
847 return;
848 }
849 ++currentRecursionDepth;
850 song_completion_event_data completionEventData{};
851 completionEventData.guildId = voiceConnectInitData.guildId;
852 completionEventData.wasItAFail = wasItAFail.load(std::memory_order_acquire);
853 completionEventData.guildMemberId = currentUserId;
854 try {
855 xferAudioData.clearData();
856 if (discord_core_client::getInstance()->getSongAPI(voiceConnectInitData.guildId).onSongCompletionEvent.functions.size() > 0) {
857 discord_core_client::getInstance()->getSongAPI(voiceConnectInitData.guildId).onSongCompletionEvent(completionEventData);
858 } else {
859 stop();
860 }
861 } catch (const dca_exception& error) {
863 std::this_thread::sleep_for(150ms);
864 skipInternal(currentRecursionDepth);
865 }
866 }
867
868 bool voice_connection::areWeCurrentlyPlaying() {
869 return (activeState.load(std::memory_order_acquire) == voice_active_state::playing) || activeState.load(std::memory_order_acquire) == voice_active_state::paused;
870 }
871
872 void voice_udpconnection::handleAudioBuffer() {
873 if (voiceConnection->connectionState.load(std::memory_order_acquire) == voice_connection_state::Initializing_DatagramSocket) {
874 } else {
875 jsonifier::string_view_base<uint8_t> string = getInputBuffer();
876 if (voiceConnection->streamSocket && voiceConnection->encryptionKey.size() > 0) {
877 voiceConnection->parseIncomingVoiceData(string);
878 }
879 }
880 }
881
882 bool voice_connection::areWeConnected() {
883 return websocket_core::areWeConnected() && udpConnection.areWeStillConnected();
884 }
885
886 bool voice_connection::voiceConnect() {
887 udpConnection = voice_udpconnection{ voiceIp, port, stream_type::none, this, &token };
888 if (!udpConnection.areWeStillConnected()) {
889 return false;
890 }
891 uint8_t packet[74]{};
892 static constexpr uint16_t val1601{ 0x01 };
893 static constexpr uint16_t val1602{ 70 };
894 packet[0] = static_cast<uint8_t>(val1601 >> 8);
895 packet[1] = static_cast<uint8_t>(val1601 >> 0);
896 packet[2] = static_cast<uint8_t>(val1602 >> 8);
897 packet[3] = static_cast<uint8_t>(val1602 >> 0);
898 packet[4] = static_cast<uint8_t>(audioSSRC >> 24);
899 packet[5] = static_cast<uint8_t>(audioSSRC >> 16);
900 packet[6] = static_cast<uint8_t>(audioSSRC >> 8);
901 packet[7] = static_cast<uint8_t>(audioSSRC);
902 udpConnection.getInputBuffer();
903 udpConnection.writeData(jsonifier::string_view_base<uint8_t>{ packet, std::size(packet) });
904 jsonifier::string_view_base<uint8_t> inputStringFirst{};
905 jsonifier::string_base<uint8_t> inputString{};
906
907 stop_watch<milliseconds> stopWatch{ 5500ms };
908 stopWatch.reset();
909 while (inputStringFirst.size() < 74 && !doWeQuit->load(std::memory_order_acquire) && activeState.load(std::memory_order_acquire) != voice_active_state::exiting) {
910 if (udpConnection.processIO() != discord_core_internal::connection_status::NO_Error) {
911 onClosed();
912 return false;
913 }
914 inputStringFirst = udpConnection.getInputBuffer();
915 std::this_thread::sleep_for(1ms);
916 if (stopWatch.hasTimeElapsed()) {
917 return false;
918 }
919 }
920 inputString.insert(inputString.begin(), inputStringFirst.begin(), inputStringFirst.end());
921 inputString = inputString.substr(8);
922 const auto endLineFind = inputString.find(static_cast<uint8_t>('\u0000'), static_cast<uint64_t>(6));
923 if (endLineFind != jsonifier::string::npos) {
924 inputString = inputString.substr(0, endLineFind);
925 }
926 jsonifier::string_view_base returnString{ inputStringFirst.data() + 8, inputString.size() };
927 if (externalIp.size() < returnString.size()) {
928 externalIp.resize(returnString.size());
929 }
930 std::memcpy(externalIp.data(), returnString.data(), returnString.size());
931 voiceConnectionDataBuffer.clearContents();
932 return true;
933 }
934
935 void voice_connection::sendSilence() {
936 jsonifier::vector<jsonifier::string_base<uint8_t>> frames{};
937 static constexpr uint8_t arrayNew[3]{ 0xf8, 0xff, 0xfe };
938 for (uint64_t x = 0; x < 5; ++x) {
939 discord_core_internal::encoder_return_data frame{};
940 frame.data = jsonifier::string_view_base<uint8_t>{ arrayNew, 3 };
941 frame.sampleCount = 3;
942 auto packetNew = packetEncrypter.encryptPacket(frame);
943 frames.emplace_back(jsonifier::string_base<uint8_t>{ packetNew.data(), packetNew.size() });
944 }
945 for (auto& value: frames) {
946 udpConnection.writeData(value);
947 if (udpConnection.processIO() != discord_core_internal::connection_status::NO_Error) {
948 onClosed();
949 return;
950 }
951 }
952 }
953
954 bool voice_connection::pauseToggle() {
955 if (activeState.load(std::memory_order_acquire) == voice_active_state::paused) {
956 activeState.store(voice_active_state::playing, std::memory_order_release);
957 return true;
958 } else if (activeState.load(std::memory_order_acquire) == voice_active_state::playing) {
959 activeState.store(voice_active_state::paused, std::memory_order_release);
960 return true;
961 } else {
962 return false;
963 }
964 }
965
966 void voice_connection::disconnect() {
967 activeState.store(voice_active_state::exiting, std::memory_order_release);
968 if (taskThread.getStatus() == co_routine_status::running) {
969 taskThread.cancel();
970 }
971 if (streamSocket) {
972 streamSocket->disconnect();
973 streamSocket.reset();
974 };
975 udpConnection.disconnect();
976 websocket_core::disconnect();
977 areWeHeartBeating = false;
978 currentReconnectTries = 0;
979 voiceUsers.clear();
980 prevActiveState.store(voice_active_state::stopped, std::memory_order_release);
981 activeState.store(voice_active_state::connecting, std::memory_order_release);
982 connectionState.store(voice_connection_state::Collecting_Init_Data, std::memory_order_release);
983 currentState.store(discord_core_internal::websocket_state::disconnected, std::memory_order_release);
984 }
985
986 void voice_connection::onClosed() {
987 connectionState.store(voice_connection_state::Collecting_Init_Data, std::memory_order_release);
988 if (activeState.load(std::memory_order_acquire) != voice_active_state::exiting && currentReconnectTries < maxReconnectTries) {
989 if (activeState.load(std::memory_order_acquire) != voice_active_state::connecting) {
990 prevActiveState.store(activeState.load(std::memory_order_acquire), std::memory_order_release);
991 }
992 websocket_core::disconnect();
993 ++currentReconnectTries;
994 activeState.store(voice_active_state::connecting, std::memory_order_release);
995 if (streamSocket) {
996 streamSocket->disconnect();
997 }
998 udpConnection.disconnect();
999 } else if (currentReconnectTries >= maxReconnectTries) {
1000 activeState.store(voice_active_state::exiting, std::memory_order_release);
1001 }
1002 }
1003
1004 bool voice_connection::stop() {
1005 activeState.store(voice_active_state::stopped, std::memory_order_release);
1006 doWeSkip.store(false, std::memory_order_release);
1007 return true;
1008 }
1009
1010 bool voice_connection::play() {
1011 activeState.store(voice_active_state::playing, std::memory_order_release);
1012 return true;
1013 }
1014
1015 bool voice_connection::skip(bool wasItAFailNew) {
1016 if (activeState.load(std::memory_order_acquire) == voice_active_state::stopped) {
1017 return false;
1018 } else {
1019 wasItAFail.store(wasItAFailNew, std::memory_order_release);
1020 doWeSkip.store(true, std::memory_order_release);
1021 return true;
1022 }
1023 }
1024}
A co_routine - representing a potentially asynchronous operation/function.
A websocket client, for communication via a tcp-connection.
static DCA_INLINE void printSuccess(const string_type &what, std::source_location where=std::source_location::current())
Print a success message of the specified type.
Definition Base.hpp:289
static DCA_INLINE void printError(const string_type &what, std::source_location where=std::source_location::current())
Print an error message of the specified type.
Definition Base.hpp:252
A class representing a snowflake identifier with various operations.
Definition Base.hpp:701
A thread-safe messaging block for data-structures.
A smart pointer class that provides unique ownership semantics.
Definition UniquePtr.hpp:44
static user_cache_data getCachedUser(get_user_data dataPackage)
Collects a given user from the library's cache.
snowflake getChannelId()
Collects the currently connected-to voice channel_data's id.
void connect(const voice_connect_init_data &initData)
Connects to a currently held voice channel.
voice_connection(discord_core_internal::websocket_client *baseShardNew, std::atomic_bool *doWeQuitNew)
DCA_INLINE void storeBits(value_type *to, return_type num)
Stores the bits of a number into a character array.
Definition Etf.hpp:88
DCA_INLINE auto newThreadAwaitable()
An awaitable that can be used to launch the co_routine onto a new thread - as well as return the hand...
DCA_INLINE unique_ptr< value_type, deleter > makeUnique(arg_types &&... args)
Helper function to create a unique_ptr for a non-array object.
audio_frame_type
Audio frame types.
stream_type
For selecting the type of streamer that the given bot is, one must be one server and one of client pe...
Definition Base.hpp:868
The main namespace for the forward-facing interfaces.
voice_socket_op_codes
The various opcodes that could be sent/received by the voice-websocket.
@ resumed
Acknowledge a successful session resume.
@ heartbeat
Keep the websocket connection alive.
@ Ready_Server
complete the websocket handshake.
@ hello
Time to wait between sending heartbeats in milliseconds.
@ Heartbeat_ACK
Sent to acknowledge a received client heartbeat.
@ identify
Begin a voice websocket connection.
@ Client_Disconnect
A client has disconnected from the voice channel.
@ Select_Protocol
Select the voice protocol.
@ speaking
Indicate which users are speaking.
@ Collecting_Init_Data
collecting initialization data.
@ Collecting_Ready
collecting the client ready.
@ Sending_Identify
Sending the identify payload.
@ Collecting_Hello
collecting the client hello.
@ Initializing_WebSocket
Initializing the websocket.
@ Initializing_DatagramSocket
Initializing the datagram udp SOCKET.
@ Collecting_Session_Description
collecting the session-description payload.
@ Sending_Select_Protocol
Sending the select-protocol payload.
Structure to hold the encoded data and sample count returned by the encoder.
A wrapper class for the opus audio decoder.
For connecting to a voice-channel. "streamInfo" is used when a SOCKET is created to connect this bot ...