DiscordCoreAPI
A Discord bot library written in C++, with custom asynchronous coroutines.
Loading...
Searching...
No Matches
TCPConnection.hpp
Go to the documentation of this file.
1/*
2 MIT License
3
4 DiscordCoreAPI, A bot library for Discord, written in C++, and featuring explicit multithreading through the usage of custom, asynchronous C++ CoRoutines.
5
6 Copyright 2022, 2023 Chris M. (RealTimeChris)
7
8 Permission is hereby granted, free of charge, to any person obtaining a copy
9 of this software and associated documentation files (the "Software"), to deal
10 in the Software without restriction, including without limitation the rights
11 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 copies of the Software, and to permit persons to whom the Software is
13 furnished to do so, subject to the following conditions:
14
15 The above copyright notice and this permission notice shall be included in all
16 copies or substantial portions of the Software.
17
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 SOFTWARE.
25*/
26/// TCPConnection.hpp - Header file for the "Tcp connection" stuff.
27/// Dec 12, 2021
28/// https://discordcoreapi.com
29/// \file TCPConnection.hpp
30#pragma once
31
34
35#if !defined(OPENSSL_NO_DEPRECATED)
36 #define OPENSSL_NO_DEPRECATED
37#endif
38
39#include <openssl/err.h>
40#include <openssl/ssl.h>
41
42#if defined(_WIN32)
43 #if defined(EWOULDBLOCK)
44 #undef EWOULDBLOCK
45 #endif
46 #if defined(SHUT_RDWR)
47 #undef SHUT_RDWR
48 #endif
49 #if defined(pollfd)
50 #undef pollfd
51 #endif
52 #if defined(connect)
53 #undef connect
54 #endif
55 #if defined(errno)
56 #undef errno
57 #endif
58 #if defined(close)
59 #undef close
60 #endif
61 #if defined(poll)
62 #undef poll
63 #endif
64 #define EWOULDBLOCK WSAEWOULDBLOCK
65 #define SHUT_RDWR SD_BOTH
66 #define pollfd WSAPOLLFD
67 #define connect(x, y, z) WSAConnect(x, y, z, nullptr, nullptr, nullptr, nullptr)
68 #define errno WSAGetLastError()
69 #define close closesocket
70 #define poll(x, y, z) WSAPoll(x, y, z)
71 #pragma comment(lib, "Ws2_32.lib")
72 #include <WinSock2.h>
73 #include <ws2tcpip.h>
74DCA_INLINE bool isValidSocket(SOCKET s) {
75 return s != INVALID_SOCKET;
76};
77
78#else
79using SOCKET = int32_t;
80DCA_INLINE bool isValidSocket(SOCKET s) {
81 return s >= 0;
82};
83 #include <netinet/tcp.h>
84 #include <netinet/in.h>
85 #include <sys/socket.h>
86 #include <sys/types.h>
87 #include <arpa/inet.h>
88 #include <unistd.h>
89 #include <errno.h>
90 #include <netdb.h>
91 #include <fcntl.h>
92 #include <poll.h>
93#endif
94
95#if !defined(SOCKET_ERROR)
96 #define SOCKET_ERROR SOCKET(-1)
97#endif
98
99#if !defined(INVALID_SOCKET)
100 #define INVALID_SOCKET (-1)
101#endif
102
103namespace discord_core_api {
104
105 namespace discord_core_internal {
106
107 enum class connection_status {
108 NO_Error = 0,
109 CONNECTION_Error = 1,
110 POLLHUP_Error = 2,
111 POLLNVAL_Error = 3,
112 POLLERR_Error = 4,
113 READ_Error = 5,
114 WRITE_Error = 6,
115 SOCKET_Error = 7
116 };
117
118 DCA_INLINE jsonifier::string reportSSLError(jsonifier::string_view errorPosition, int32_t errorValue = 0, SSL* SSL = nullptr) {
119 std::stringstream stream{};
120 stream << errorPosition << " error: ";
121 if (SSL) {
122 stream << ERR_error_string(static_cast<unsigned long>(SSL_get_error(SSL, errorValue)), nullptr) << ", " << ERR_error_string(ERR_get_error(), nullptr);
123 } else {
124 stream << ERR_error_string(ERR_get_error(), nullptr);
125 }
126 return jsonifier::string{ stream.str() };
127 }
128
129 DCA_INLINE jsonifier::string reportError(jsonifier::string_view errorPosition) {
130 std::stringstream stream{};
131 stream << errorPosition << " error: ";
132#if defined(_WIN32)
133 #if defined(UNICODE)
134 unique_ptr<wchar_t[]> string{ makeUnique<wchar_t[]>(1024) };
135 #else
136 unique_ptr<char[]> string{ makeUnique<char[]>(1024) };
137 #endif
138 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, nullptr, static_cast<DWORD>(WSAGetLastError()), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
139 static_cast<LPTSTR>(string.get()), 1024, nullptr);
140 stream << WSAGetLastError() << ", " << string;
141#else
142 stream << strerror(errno);
143#endif
144 return jsonifier::string{ stream.str() };
145 }
146
147#if defined(_WIN32)
148 struct wsadata_wrapper {
149 struct wsadata_deleter {
150 DCA_INLINE void operator()(WSADATA* other) {
151 WSACleanup();
152 delete other;
153 }
154 };
155
156 DCA_INLINE wsadata_wrapper() {
157 auto returnData = WSAStartup(MAKEWORD(2, 2), ptr.get());
158 if (returnData) {
159 message_printer::printError<print_message_type::general>(reportError("wsadata_wrapper::wsadata_wrapper()").data());
160 }
161 }
162
163 protected:
164 unique_ptr<WSADATA, wsadata_deleter> ptr{ makeUnique<WSADATA, wsadata_deleter>() };
165 };
166#endif
167
168 struct poll_fd_wrapper {
169 jsonifier::vector<uint64_t> indices{};
170 jsonifier::vector<pollfd> polls{};
171 };
172
173 struct ssl_ctx_wrapper {
174 struct ssl_ctx_deleter {
175 DCA_INLINE void operator()(SSL_CTX* other) {
176 if (other) {
177 SSL_CTX_free(other);
178 other = nullptr;
179 }
180 }
181 };
182
183 DCA_INLINE ssl_ctx_wrapper& operator=(SSL_CTX* other) {
184 ptr.reset(other);
185 return *this;
186 }
187
188 DCA_INLINE operator SSL_CTX*() {
189 return ptr.get();
190 }
191
192 protected:
193 unique_ptr<SSL_CTX, ssl_ctx_deleter> ptr{};
194 };
195
196 class ssl_wrapper {
197 public:
198 struct ssl_deleter {
199 DCA_INLINE void operator()(SSL* other) {
200 if (other) {
201 SSL_shutdown(other);
202 SSL_free(other);
203 other = nullptr;
204 }
205 }
206 };
207
208 DCA_INLINE ssl_wrapper() = default;
209
210 DCA_INLINE ssl_wrapper& operator=(ssl_wrapper&& other) noexcept {
211 ptr = std::move(other.ptr);
212 return *this;
213 }
214
215 DCA_INLINE ssl_wrapper(ssl_wrapper&& other) noexcept {
216 *this = std::move(other);
217 }
218
219 DCA_INLINE ssl_wrapper& operator=(SSL* other) {
220 ptr.reset(other);
221 return *this;
222 }
223
224 DCA_INLINE explicit operator bool() {
225 return ptr.operator bool();
226 }
227
228 DCA_INLINE operator SSL*() {
229 return ptr.get();
230 }
231
232 protected:
233 unique_ptr<SSL, ssl_deleter> ptr{};
234 };
235
236 class socket_wrapper {
237 public:
238 struct socket_deleter {
239 DCA_INLINE void operator()(SOCKET* ptrNew) {
240 if (ptrNew && *ptrNew != INVALID_SOCKET) {
241 shutdown(*ptrNew, SHUT_RDWR);
242 close(*ptrNew);
243 *ptrNew = INVALID_SOCKET;
244 delete ptrNew;
245 ptrNew = nullptr;
246 };
247 }
248 };
249
250 DCA_INLINE socket_wrapper() = default;
251
252 DCA_INLINE socket_wrapper& operator=(socket_wrapper&& other) noexcept {
253 ptr = std::move(other.ptr);
254 return *this;
255 }
256
257 DCA_INLINE socket_wrapper(socket_wrapper&& other) noexcept {
258 *this = std::move(other);
259 }
260
261 DCA_INLINE socket_wrapper& operator=(SOCKET other) {
262 ptr.reset(new SOCKET{ other });
263 return *this;
264 }
265
266 DCA_INLINE socket_wrapper(SOCKET other) {
267 *this = other;
268 }
269
270 DCA_INLINE explicit operator bool() {
271 return ptr.operator bool();
272 }
273
274 DCA_INLINE operator SOCKET() {
275 if (ptr.operator bool()) {
276 return *ptr;
277 } else {
278 return INVALID_SOCKET;
279 }
280 }
281
282 protected:
283 unique_ptr<SOCKET, socket_deleter> ptr{};
284 };
285
286 struct addrinfo_wrapper {
287 DCA_INLINE addrinfo* operator->() {
288 return ptr;
289 }
290
291 DCA_INLINE operator addrinfo**() {
292 return &ptr;
293 }
294
295 DCA_INLINE operator addrinfo*() {
296 return ptr;
297 }
298
299 protected:
300 addrinfo value{};
301 addrinfo* ptr{ &value };
302 };
303
304 class ssl_context_holder {
305 public:
306 DCA_INLINE static ssl_ctx_wrapper context{};
307 DCA_INLINE static std::mutex accessMutex{};
308
309 DCA_INLINE static bool initialize() {
310 if (ssl_context_holder::context = SSL_CTX_new(TLS_client_method()); !ssl_context_holder::context) {
311 return false;
312 }
313
314 if (!SSL_CTX_set_min_proto_version(ssl_context_holder::context, TLS1_2_VERSION)) {
315 return false;
316 }
317
318#if defined(SSL_OP_IGNORE_UNEXPECTED_EOF)
319 auto originalOptions{ SSL_CTX_get_options(ssl_context_holder::context) | SSL_OP_IGNORE_UNEXPECTED_EOF };
320 if (SSL_CTX_set_options(ssl_context_holder::context, SSL_OP_IGNORE_UNEXPECTED_EOF) != originalOptions) {
321 return false;
322 }
323#endif
324 return true;
325 }
326 };
327
328 template<typename value_type> class ssl_data_interface {
329 public:
330 template<typename value_type2> friend class tcp_connection;
331 friend class https_client;
332
333 ssl_data_interface& operator=(ssl_data_interface<value_type>&& other) noexcept {
334 outputBuffer = std::move(other.outputBuffer);
335 inputBuffer = std::move(other.inputBuffer);
336 bytesRead = other.bytesRead;
337 return *this;
338 }
339
340 ssl_data_interface(ssl_data_interface<value_type>&& other) noexcept {
341 *this = std::move(other);
342 }
343
344 template<typename value_type_new> DCA_INLINE void writeData(jsonifier::string_view_base<value_type_new> dataToWrite, bool priority) {
345 if (static_cast<value_type*>(this)->areWeStillConnected()) {
346 if (dataToWrite.size() > 0 && static_cast<value_type*>(this)->ssl) {
347 if (priority && dataToWrite.size() < maxBufferSize) {
348 outputBuffer.clear();
349 outputBuffer.writeData(dataToWrite.data(), dataToWrite.size());
350 static_cast<value_type*>(this)->processWriteData();
351 } else {
352 uint64_t remainingBytes{ dataToWrite.size() };
353 while (remainingBytes > 0) {
354 uint64_t amountToCollect{ dataToWrite.size() >= maxBufferSize ? maxBufferSize : dataToWrite.size() };
355 outputBuffer.writeData(dataToWrite.data(), amountToCollect);
356 dataToWrite = jsonifier::string_view_base{ dataToWrite.data() + amountToCollect, dataToWrite.size() - amountToCollect };
357 remainingBytes = dataToWrite.size();
358 }
359 }
360 }
361 return;
362 } else {
363 return;
364 }
365 }
366
367 DCA_INLINE auto getInputBuffer() {
368 return inputBuffer.readData();
369 }
370
371 DCA_INLINE int64_t getBytesRead() {
372 return bytesRead;
373 }
374
375 DCA_INLINE void reset() {
376 outputBuffer.clear();
377 inputBuffer.clear();
378 bytesRead = 0;
379 }
380
381 protected:
382 const uint64_t maxBufferSize{ (1024 * 16) };
383 ring_buffer<uint8_t, 16> outputBuffer{};
384 ring_buffer<uint8_t, 64> inputBuffer{};
385 int64_t bytesRead{};
386
387 DCA_INLINE ssl_data_interface() = default;
388
389 virtual ~ssl_data_interface() = default;
390 };
391
392 template<typename value_type> class tcp_connection : public ssl_data_interface<tcp_connection<value_type>> {
393 public:
394 connection_status currentStatus{ connection_status::NO_Error };
395 socket_wrapper socket{};
396 bool writeWantWrite{};
397 bool writeWantRead{};
398 bool readWantWrite{};
399 bool readWantRead{};
400 ssl_wrapper ssl{};
401
402 tcp_connection& operator=(tcp_connection&& other) = default;
403 tcp_connection(tcp_connection&& other) = default;
404 tcp_connection& operator=(const tcp_connection& other) = default;
405 tcp_connection(const tcp_connection& other) = default;
406
407 DCA_INLINE tcp_connection(const jsonifier::string& baseUrlNew, const uint16_t portNew) {
408 jsonifier::string addressString{};
409 auto httpsFind = baseUrlNew.find("https://");
410 auto comFind = baseUrlNew.find(".com");
411 auto orgFind = baseUrlNew.find(".org");
412 if (httpsFind != jsonifier::string::npos && comFind != jsonifier::string::npos) {
413 addressString = baseUrlNew.substr(httpsFind + jsonifier::string_view{ "https://" }.size(),
414 comFind + jsonifier::string_view{ ".com" }.size() - jsonifier::string_view{ "https://" }.size());
415 } else if (httpsFind != jsonifier::string::npos && orgFind != jsonifier::string::npos) {
416 addressString = baseUrlNew.substr(httpsFind + jsonifier::string_view{ "https://" }.size(),
417 orgFind + jsonifier::string_view{ ".org" }.size() - jsonifier::string_view{ "https://" }.size());
418 } else {
419 addressString = baseUrlNew;
420 }
421 addrinfo_wrapper hints{}, address{};
422 hints->ai_family = AF_INET;
423 hints->ai_socktype = SOCK_STREAM;
424 hints->ai_protocol = IPPROTO_TCP;
425
426 if (getaddrinfo(addressString.data(), jsonifier::toString(portNew).data(), hints, address)) {
427 message_printer::printError<print_message_type::general>(reportError("Tcp_connection::getaddrinfo(), to: " + baseUrlNew));
428 currentStatus = connection_status::CONNECTION_Error;
429 socket = INVALID_SOCKET;
430 return;
431 }
432
433 if (socket = ::socket(address->ai_family, address->ai_socktype, address->ai_protocol); !isValidSocket(socket.operator SOCKET())) {
434 message_printer::printError<print_message_type::general>(reportError("Tcp_connection::SOCKET(), to: " + baseUrlNew));
435 currentStatus = connection_status::CONNECTION_Error;
436 socket = INVALID_SOCKET;
437 return;
438 }
439
440 if (::connect(socket, address->ai_addr, static_cast<int32_t>(address->ai_addrlen)) == SOCKET_ERROR) {
441 message_printer::printError<print_message_type::general>(reportError("Tcp_connection::connect(), to: " + baseUrlNew));
442 currentStatus = connection_status::CONNECTION_Error;
443 socket = INVALID_SOCKET;
444 return;
445 }
446
447 std::unique_lock lock{ ssl_context_holder::accessMutex };
448 if (ssl = SSL_new(ssl_context_holder::context); !ssl) {
450 reportSSLError("Tcp_connection::connect::SSL_new(), to: " + baseUrlNew) + "\n" + reportError("Tcp_connection::connect::SSL_new(), to: " + baseUrlNew));
451 currentStatus = connection_status::CONNECTION_Error;
452 socket = INVALID_SOCKET;
453 ssl = nullptr;
454 return;
455 }
456 lock.unlock();
457
458 if (auto result{ SSL_set_fd(ssl, static_cast<int32_t>(socket)) }; result != 1) {
459 message_printer::printError<print_message_type::general>(reportSSLError("Tcp_connection::connect::SSL_set_fd(), to: " + baseUrlNew) + "\n" +
460 reportError("Tcp_connection::connect::SSL_set_fd(), to: " + baseUrlNew));
461 currentStatus = connection_status::CONNECTION_Error;
462 socket = INVALID_SOCKET;
463 ssl = nullptr;
464 return;
465 }
466
467 /* sni */
468 if (auto result{ SSL_set_tlsext_host_name(ssl, addressString.data()) }; result != 1) {
469 message_printer::printError<print_message_type::general>(reportSSLError("Tcp_connection::connect::SSL_set_tlsext_host_name(), to: " + baseUrlNew) + "\n" +
470 reportError("Tcp_connection::connect::SSL_set_tlsext_host_name(), to: " + baseUrlNew));
471 currentStatus = connection_status::CONNECTION_Error;
472 socket = INVALID_SOCKET;
473 ssl = nullptr;
474 return;
475 }
476
477 if (auto result{ SSL_connect(ssl) }; result != 1) {
478 message_printer::printError<print_message_type::general>(reportSSLError("Tcp_connection::connect::SSL_connect(), to: " + baseUrlNew) + "\n" +
479 reportError("Tcp_connection::connect::SSL_connect(), to: " + baseUrlNew));
480 currentStatus = connection_status::CONNECTION_Error;
481 socket = INVALID_SOCKET;
482 ssl = nullptr;
483 return;
484 }
485
486#if defined(_WIN32)
487 u_long value02{ 1 };
488 if (auto returnData{ ioctlsocket(socket, FIONBIO, &value02) }; returnData == SOCKET_ERROR) {
489 message_printer::printError<print_message_type::general>(reportError("Tcp_connection::connect::ioctlsocket(), to: " + baseUrlNew));
490 currentStatus = connection_status::CONNECTION_Error;
491 socket = INVALID_SOCKET;
492 ssl = nullptr;
493 return;
494 }
495#else
496 if (auto returnData{ fcntl(socket, F_SETFL, fcntl(socket, F_GETFL, 0) | O_NONBLOCK) }; returnData == SOCKET_ERROR) {
497 message_printer::printError<print_message_type::general>(reportError("Tcp_connection::connect::fcntl(), to: " + baseUrlNew));
498 currentStatus = connection_status::CONNECTION_Error;
499 socket = INVALID_SOCKET;
500 ssl = nullptr;
501 return;
502 }
503#endif
504 currentStatus = connection_status::NO_Error;
505 }
506
507 DCA_INLINE connection_status processIO(int32_t waitTimeInMs) {
508 if (!areWeStillConnected()) {
509 return currentStatus;
510 };
511 pollfd readWriteSet{};
512 readWriteSet.fd = static_cast<SOCKET>(socket);
513 if (writeWantRead || readWantRead) {
514 readWriteSet.events = POLLIN;
515 } else if (writeWantWrite || readWantWrite) {
516 readWriteSet.events = POLLOUT;
517 } else if (static_cast<value_type*>(this)->outputBuffer.getUsedSpace() > 0) {
518 readWriteSet.events = POLLIN | POLLOUT;
519 } else {
520 readWriteSet.events = POLLIN;
521 }
522 if (auto returnValue = poll(&readWriteSet, 1, waitTimeInMs); returnValue == SOCKET_ERROR) {
524 reportSSLError("Tcp_connection::processIO() 00") + "\n" + reportError("Tcp_connection::processIO() 00"));
525 socket = INVALID_SOCKET;
526 ssl = nullptr;
527 currentStatus = connection_status::SOCKET_Error;
528 return currentStatus;
529 } else if (returnValue == 0) {
530 return currentStatus;
531 } else {
532 if (readWriteSet.revents & POLLOUT || (POLLIN && writeWantRead)) {
533 if (!processWriteData()) {
535 reportSSLError("Tcp_connection::processIO() 01") + "\n" + reportError("Tcp_connection::processIO() 01"));
536 currentStatus = connection_status::WRITE_Error;
537 socket = INVALID_SOCKET;
538 ssl = nullptr;
539 return currentStatus;
540 }
541 }
542 if (readWriteSet.revents & POLLIN || (POLLOUT && readWantWrite)) {
543 if (!processReadData()) {
545 reportSSLError("Tcp_connection::processIO() 02") + "\n" + reportError("Tcp_connection::processIO() 02"));
546 currentStatus = connection_status::READ_Error;
547 socket = INVALID_SOCKET;
548 ssl = nullptr;
549 return currentStatus;
550 }
551 }
552 if (readWriteSet.revents & POLLERR) {
554 reportSSLError("Tcp_connection::processIO() 03") + "\n" + reportError("Tcp_connection::processIO() 03"));
555 currentStatus = connection_status::POLLERR_Error;
556 socket = INVALID_SOCKET;
557 ssl = nullptr;
558 }
559 if (readWriteSet.revents & POLLNVAL) {
561 reportSSLError("Tcp_connection::processIO() 04") + "\n" + reportError("Tcp_connection::processIO() 04"));
562 currentStatus = connection_status::POLLNVAL_Error;
563 socket = INVALID_SOCKET;
564 ssl = nullptr;
565 }
566 if (readWriteSet.revents & POLLHUP) {
567 currentStatus = connection_status::POLLHUP_Error;
568 socket = INVALID_SOCKET;
569 ssl = nullptr;
570 }
571 }
572 return currentStatus;
573 }
574
575 DCA_INLINE bool areWeStillConnected() {
576 if (socket.operator bool() && socket.operator SOCKET() != INVALID_SOCKET && currentStatus == connection_status::NO_Error && ssl.operator bool()) {
577 pollfd fdEvent = {};
578 fdEvent.fd = socket;
579 fdEvent.events = POLLOUT;
580 int32_t result = poll(&fdEvent, 1, 1);
581 if (result == SOCKET_ERROR || fdEvent.revents & POLLHUP || fdEvent.revents & POLLNVAL || fdEvent.revents & POLLERR) {
582 socket = INVALID_SOCKET;
583 ssl = nullptr;
584 return false;
585 }
586 return true;
587 } else {
588 return false;
589 }
590 }
591
592 DCA_INLINE bool processWriteData() {
593 writeWantRead = false;
594 writeWantWrite = false;
595 if (static_cast<value_type*>(this)->outputBuffer.getUsedSpace() > 0 && areWeStillConnected()) {
596 uint64_t bytesToWrite{ static_cast<value_type*>(this)->outputBuffer.getCurrentTail()->getUsedSpace() };
597
598 size_t writtenBytes{};
599 auto returnData{ SSL_write_ex(ssl, static_cast<value_type*>(this)->outputBuffer.readData().data(), bytesToWrite, &writtenBytes) };
600 auto errorValue{ SSL_get_error(ssl, returnData) };
601 switch (errorValue) {
602 case SSL_ERROR_WANT_READ: {
603 writeWantRead = true;
604 return true;
605 }
606 case SSL_ERROR_WANT_WRITE: {
607 writeWantWrite = true;
608 return true;
609 }
610 case SSL_ERROR_NONE: {
611 return true;
612 }
613 case SSL_ERROR_ZERO_RETURN: {
614 socket = INVALID_SOCKET;
615 ssl = nullptr;
616 return false;
617 }
618 default: {
619 return false;
620 }
621 }
622 }
623 return true;
624 }
625
626 DCA_INLINE bool processReadData() {
627 readWantRead = false;
628 readWantWrite = false;
629 if (!static_cast<value_type*>(this)->inputBuffer.isItFull() && areWeStillConnected()) {
630 do {
631 size_t readBytes{};
632 uint64_t bytesToRead{ static_cast<value_type*>(this)->maxBufferSize };
633 auto returnData{ SSL_read_ex(ssl, static_cast<value_type*>(this)->inputBuffer.getCurrentHead()->getCurrentHead(), bytesToRead, &readBytes) };
634 auto errorValue{ SSL_get_error(ssl, returnData) };
635 if (static_cast<int64_t>(readBytes) > 0) {
636 static_cast<value_type*>(this)->inputBuffer.getCurrentHead()->modifyReadOrWritePosition(ring_buffer_access_type::write, readBytes);
637 static_cast<value_type*>(this)->inputBuffer.modifyReadOrWritePosition(ring_buffer_access_type::write, 1);
638 static_cast<value_type*>(this)->bytesRead += readBytes;
639 static_cast<value_type*>(this)->handleBuffer();
640 }
641 switch (errorValue) {
642 case SSL_ERROR_WANT_READ: {
643 readWantRead = true;
644 return true;
645 }
646 case SSL_ERROR_WANT_WRITE: {
647 readWantWrite = true;
648 return true;
649 }
650 case SSL_ERROR_NONE: {
651 break;
652 }
653 case SSL_ERROR_ZERO_RETURN: {
654 socket = INVALID_SOCKET;
655 ssl = nullptr;
656 return true;
657 }
658 default: {
659 return false;
660 }
661 }
662 } while (areWeStillConnected() && SSL_pending(ssl) && !static_cast<value_type*>(this)->inputBuffer.isItFull() && !readWantRead);
663 }
664 return true;
665 }
666
667 template<typename value_type2> DCA_INLINE static unordered_map<uint64_t, value_type2*> processIO(unordered_map<uint64_t, value_type2*>& shardMap) {
668 unordered_map<uint64_t, value_type2*> returnData{};
669 poll_fd_wrapper readWriteSet{};
670 for (auto& [key, value]: shardMap) {
671 if (value->areWeStillConnected()) {
672 pollfd fdSet{};
673 fdSet.fd = static_cast<SOCKET>(value->socket);
674 if (value->writeWantRead || value->readWantRead) {
675 fdSet.events = POLLIN;
676 } else if (value->writeWantWrite || value->readWantWrite) {
677 fdSet.events = POLLOUT;
678 } else if (value->outputBuffer.getUsedSpace() > 0) {
679 fdSet.events = POLLIN | POLLOUT;
680 } else {
681 fdSet.events = POLLIN;
682 }
683 readWriteSet.indices.emplace_back(key);
684 readWriteSet.polls.emplace_back(fdSet);
685 } else {
686 returnData.emplace(key, value);
687 }
688 }
689
690 if (readWriteSet.polls.size() == 0) {
691 return returnData;
692 }
693 if (auto returnDataNew = poll(readWriteSet.polls.data(), static_cast<u_long>(readWriteSet.polls.size()), 1); returnDataNew == SOCKET_ERROR) {
694 bool didWeFindTheSocket{};
695 for (uint64_t x = 0; x < readWriteSet.polls.size(); ++x) {
696 if (readWriteSet.polls.at(x).revents & POLLERR || readWriteSet.polls.at(x).revents & POLLHUP || readWriteSet.polls.at(x).revents & POLLNVAL) {
697 shardMap.at(readWriteSet.indices.at(x))->currentStatus = connection_status::SOCKET_Error;
698 returnData.emplace(readWriteSet.indices.at(x), shardMap.at(readWriteSet.indices.at(x)));
699 readWriteSet.indices.erase(readWriteSet.indices.begin() + static_cast<int64_t>(x));
700 readWriteSet.polls.erase(readWriteSet.polls.begin() + static_cast<int64_t>(x));
701 didWeFindTheSocket = true;
702 }
703 }
704 if (!didWeFindTheSocket) {
705 for (uint64_t x = 0; x < readWriteSet.polls.size(); ++x) {
706 shardMap.at(readWriteSet.indices.at(x))->currentStatus = connection_status::SOCKET_Error;
707 returnData.emplace(readWriteSet.indices.at(x), shardMap.at(readWriteSet.indices.at(x)));
708 }
709 return returnData;
710 }
711
712 } else if (returnDataNew == 0) {
713 return returnData;
714 }
715 for (uint64_t x = 0; x < readWriteSet.polls.size(); ++x) {
716 if (readWriteSet.polls.at(x).revents & POLLOUT || (POLLIN && shardMap.at(readWriteSet.indices.at(x))->writeWantRead)) {
717 if (!shardMap.at(readWriteSet.indices.at(x))->processWriteData()) {
718 shardMap.at(readWriteSet.indices.at(x))->currentStatus = connection_status::WRITE_Error;
719 returnData.emplace(readWriteSet.indices.at(x), shardMap.at(readWriteSet.indices.at(x)));
720 continue;
721 }
722 }
723 if (readWriteSet.polls.at(x).revents & POLLIN || (POLLOUT && shardMap.at(readWriteSet.indices.at(x))->readWantWrite)) {
724 if (!shardMap.at(readWriteSet.indices.at(x))->processReadData()) {
725 shardMap.at(readWriteSet.indices.at(x))->currentStatus = connection_status::READ_Error;
726 returnData.emplace(readWriteSet.indices.at(x), shardMap.at(readWriteSet.indices.at(x)));
727 continue;
728 }
729 }
730 if (readWriteSet.polls.at(x).revents & POLLERR) {
731 shardMap.at(readWriteSet.indices.at(x))->currentStatus = connection_status::POLLERR_Error;
732 shardMap.at(readWriteSet.indices.at(x))->socket = INVALID_SOCKET;
733 shardMap.at(readWriteSet.indices.at(x))->ssl = nullptr;
734 returnData.emplace(readWriteSet.indices.at(x), shardMap.at(readWriteSet.indices.at(x)));
735 continue;
736 }
737 if (readWriteSet.polls.at(x).revents & POLLNVAL) {
738 shardMap.at(readWriteSet.indices.at(x))->currentStatus = connection_status::POLLNVAL_Error;
739 shardMap.at(readWriteSet.indices.at(x))->socket = INVALID_SOCKET;
740 shardMap.at(readWriteSet.indices.at(x))->ssl = nullptr;
741 returnData.emplace(readWriteSet.indices.at(x), shardMap.at(readWriteSet.indices.at(x)));
742 continue;
743 }
744 if (readWriteSet.polls.at(x).revents & POLLHUP) {
745 shardMap.at(readWriteSet.indices.at(x))->currentStatus = connection_status::POLLHUP_Error;
746 shardMap.at(readWriteSet.indices.at(x))->socket = INVALID_SOCKET;
747 shardMap.at(readWriteSet.indices.at(x))->ssl = nullptr;
748 returnData.emplace(readWriteSet.indices.at(x), shardMap.at(readWriteSet.indices.at(x)));
749 continue;
750 }
751 }
752 return returnData;
753 }
754
755 virtual DCA_INLINE void handleBuffer() = 0;
756
757 DCA_INLINE void disconnect() {
758 currentStatus = connection_status::CONNECTION_Error;
759 static_cast<value_type*>(this)->reset();
760 socket = INVALID_SOCKET;
761 ssl = nullptr;
762 }
763
764 virtual DCA_INLINE ~tcp_connection() = default;
765
766 protected:
767 DCA_INLINE tcp_connection() = default;
768 };
769 }
770
771#if defined(connect)
772 #undef connect
773#endif
774}
static DCA_INLINE void printError(const string_type &what, std::source_location where=std::source_location::current())
Print an error message of the specified type.
Definition Base.hpp:252
@ connect
Allows for joining of a voice channel.
@ stream
Allows the user to go live.
DCA_INLINE unique_ptr< value_type, deleter > makeUnique(arg_types &&... args)
Helper function to create a unique_ptr for a non-array object.
The main namespace for the forward-facing interfaces.