From 665f4b9ef83e9615d0bf9bdfe6013803c2792ce7 Mon Sep 17 00:00:00 2001 From: Marcell Leleszi Date: Fri, 14 Jun 2024 11:44:07 +0200 Subject: [PATCH] Support multitple connections using multithreading --- README.md | 13 +++++++++++++ src/datastore.h | 13 +++++++++++-- src/main.cpp | 3 +-- src/tcp_server.cpp | 28 +++++++++++++++++++--------- src/tcp_server.h | 2 +- 5 files changed, 45 insertions(+), 14 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..f846db5 --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +# TODO + +- allow multiple connections + - multi hreaded: implement thread pool from scratch + - look into std::shared_mutex + - event loop, non blocking IO +- support more data structures + - lists (implement ziplists) + - sets + - hashes +- implement hasmap from scratch +- expiry +- support more commands \ No newline at end of file diff --git a/src/datastore.h b/src/datastore.h index bd2f583..d976a24 100644 --- a/src/datastore.h +++ b/src/datastore.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -8,16 +9,24 @@ class DataStore { public: std::optional get(const std::string &key) { + std::lock_guard lock(mtx); if (store.find(key) == store.end()) return {}; return store[key]; } - void set(const std::string &key, const std::string &val) { store[key] = val; } + void set(const std::string &key, const std::string &val) { + std::lock_guard lock(mtx); + store[key] = val; + } - bool exists(const std::string &key) { return store.contains(key); } + bool exists(const std::string &key) { + std::lock_guard lock(mtx); + return store.contains(key); + } private: std::unordered_map store; + std::mutex mtx; }; \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index ba57fab..4f4e78e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3,7 +3,6 @@ #include "tcp_server.h" int main() { - Controller controller; - TCPServer server{controller}; + TCPServer server; server.start("0.0.0.0", 6379); } diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index 7d1682d..f9f1952 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -1,10 +1,10 @@ #include #include #include -#include #include #include #include +#include #include #include @@ -14,7 +14,7 @@ #include "protocol.h" #include "tcp_server.h" -TCPServer::TCPServer(const Controller &controller) : controller{controller} { +TCPServer::TCPServer() : controller{} { m_serverFD = socket(AF_INET, SOCK_STREAM, 0); if (m_serverFD < 0) { throw std::runtime_error("Failed to create server socket!"); } @@ -58,9 +58,7 @@ TCPServer::TCPServer(const Controller &controller) : controller{controller} { spdlog::info("Client connected from {}:{}", clientIP, clientPort); } - handleRequest(connFD); - - close(connFD); + std::thread(&TCPServer::handleRequest, this, connFD).detach(); } } @@ -73,21 +71,30 @@ void TCPServer::handleRequest(int connFD) { // Read from socket ssize_t bytes_received = recv(connFD, data.data(), RECV_SIZE, 0); - if (bytes_received <= 0) { break; } + if (bytes_received <= 0) { + close(connFD); + break; + } buffer.insert(buffer.end(), data.begin(), data.begin() + bytes_received); // Parse message auto [message, length] = *parseMessage(buffer); - if (!std::holds_alternative(message)) { break; } + if (!std::holds_alternative(message)) { + close(connFD); + break; + } // If successfully parsed message, then erase buffer.erase(buffer.begin(), buffer.begin() + static_cast(length)); auto array = std::get(message).data; - if (!array) { break; } + if (!array) { + close(connFD); + break; + } // Convert message to internal command format std::vector command; @@ -101,7 +108,10 @@ void TCPServer::handleRequest(int connFD) { command.push_back(std::get(item)); } - if (err) { break; } + if (err) { + close(connFD); + break; + } // Handle command RedisType::RedisValue res = controller.handleCommand(command); diff --git a/src/tcp_server.h b/src/tcp_server.h index bb4a1fa..4b3faef 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -6,7 +6,7 @@ class TCPServer { public: - explicit TCPServer(const Controller &controller); + explicit TCPServer(); [[noreturn]] void start(const std::string &address, int port); void handleRequest(int conn_fd);