Skip to content

Commit

Permalink
feat: Minimal server/client arch
Browse files Browse the repository at this point in the history
  • Loading branch information
can-keklik committed Jan 28, 2024
1 parent 1c71027 commit 8d5c300
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 100 deletions.
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ edition = "2021"
[dependencies]
clap = { version = "4.4.6", features = ["derive"] }
liblykia = { path = "../liblykia" }
tokio = { version = "~1.35.1" }
73 changes: 55 additions & 18 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{fs::File, io::{BufReader, Read}};

use clap::Parser;
use liblykia::Request;
use liblykia::protocol::connection::{CommunicationError, Connection, Message, Request};
use tokio::net::TcpStream;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
Expand All @@ -12,26 +14,13 @@ struct Args {
print_ast: bool,
}

pub fn init() {
/*pub fn init() {
let args = Args::parse();
match args.filename {
Some(filename) => run_file(&filename, args.print_ast),
None => run_repl(),
}
}

fn run_file(filename: &str, print_ast: bool) {
let file = File::open(filename).expect("File couldn't be opened.");

let mut content: String = String::new();

BufReader::new(file)
.read_to_string(&mut content)
.expect("File couldn't be read.");

// send file to server
println!("{:?}", Request::Execute(content));
}
}*/

fn run_repl() {
/*println!("REPL mode");
Expand All @@ -49,6 +38,54 @@ fn run_repl() {
}*/
}

fn main() {
init();

pub struct ClientSession {
conn: Connection,
}

impl ClientSession {
pub fn new(stream: TcpStream) -> Self {
ClientSession {
conn: Connection::new(stream)
}
}

pub async fn handle(&mut self) {
if let Some(message) = self.conn.read().await.unwrap() {
println!("{:?}", message);
}
}

pub async fn send(&mut self, msg: Message) -> Result<(), CommunicationError> {
self.conn.write(msg).await
}
}

async fn run_file(filename: &str, print_ast: bool) {
let file = File::open(filename).expect("File couldn't be opened.");

let mut content: String = String::new();


BufReader::new(file)
.read_to_string(&mut content)
.expect("File couldn't be read.");

let socket = TcpStream::connect("localhost:19191").await.unwrap();

// Initialize the connection state. This allocates read/write buffers to
// perform redis protocol frame parsing.
let mut session = ClientSession::new(socket);

session.send(Message::Request(Request::Run(content))).await.unwrap();

}

#[tokio::main]
async fn main() {
let args = Args::parse();
match args.filename {
Some(filename) => run_file(&filename, args.print_ast).await,
None => (),
};
}
13 changes: 12 additions & 1 deletion liblykia/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,15 @@ version = "0.1.0"
edition = "2021"

[dependencies]
bson = "2.9.0"
bson = { version = "2.9.0" }
serde = { version = "1.0.188", features=["derive", "rc"] }
bytes = "1.5.0"
tokio = { version = "~1.35.1", features = [
"macros",
"rt",
"rt-multi-thread",
"net",
"io-util",
"time",
"sync",
] }
12 changes: 1 addition & 11 deletions liblykia/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1 @@
use bson::Bson;

#[derive(Debug, Clone)]
pub enum Request {
Execute(String),
}

#[derive(Debug, Clone)]
pub enum Response {
Execute(Bson),
}
pub mod protocol;
78 changes: 78 additions & 0 deletions liblykia/src/protocol/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use bson::Bson;
use bytes::BytesMut;
use serde::{Deserialize, Serialize};
use tokio::{io::{copy, AsyncReadExt, AsyncWriteExt, BufWriter}, net::TcpStream};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Request {
Run(String)
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Response {
Value(Bson)
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Message {
Request(Request),
Response(Response)
}

#[derive(Debug)]
pub enum CommunicationError {
BsonError(bson::ser::Error),
IoError(std::io::Error),
GenericError(String),
}

impl From<std::io::Error> for CommunicationError {
fn from(value: std::io::Error) -> Self {
CommunicationError::IoError(value)
}
}

impl From<bson::ser::Error> for CommunicationError {
fn from(value: bson::ser::Error) -> Self {
CommunicationError::BsonError(value)
}
}

pub struct Connection {
pub stream: BufWriter<TcpStream>,
pub read_buffer: BytesMut,
}

impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
read_buffer: BytesMut::with_capacity(4096),
}
}

pub async fn read(&mut self) -> Result<Option<Message>, CommunicationError> {
loop {
// TODO(vck): Replace .to_vec call with something cheaper
if let Ok(parsed) = bson::from_slice::<Message>(&self.read_buffer.to_vec()) {
return Ok(Some(parsed));
}

if 0 == self.stream.read_buf(&mut self.read_buffer).await? {
if self.read_buffer.is_empty() {
return Ok(None);
} else {
return Err(CommunicationError::GenericError("Connection reset by peer".to_owned()));
}
}
}
}

pub async fn write(&mut self, message: Message) -> Result<(), CommunicationError> {
let vec = bson::to_vec(&bson::to_bson(&message)?)?;
let mut buffer = vec.as_slice();
copy(&mut buffer, &mut self.stream).await?;
self.stream.flush().await?;
Ok(())
}
}
1 change: 1 addition & 0 deletions liblykia/src/protocol/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod connection;
12 changes: 2 additions & 10 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,15 @@ edition = "2021"

[dependencies]
liblykia = { path = "../liblykia" }
bson = { version = "2.9.0" }
bumpalo = "3.12.2"
phf = { version = "0.11", default-features = false, features = ["macros"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.188", features=["derive", "rc"] }
serde_json = "1.0.105"
ariadne = { features = ["auto-color"] }
assert-json-diff = "2.0.2"
tokio = { version = "~1.35.1", features = [
"macros",
"rt",
"rt-multi-thread",
"net",
"io-util",
"time",
"sync",
] }

tokio = { version = "~1.35.1" }
tokio-stream = { version = "~0.1.6", features = ["net"] }

[dev-dependencies]
Expand Down
56 changes: 0 additions & 56 deletions server/src/cli/mod.rs

This file was deleted.

1 change: 0 additions & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod cli;
pub mod lang;
pub mod runtime;
pub mod util;
90 changes: 87 additions & 3 deletions server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,89 @@
use lykiadb_server::cli;
use std::io::Error;

use liblykia::protocol::connection::{CommunicationError, Connection, Message, Request, Response};
use lykiadb_server::runtime::types::RV;
use lykiadb_server::runtime::{Runtime, RuntimeMode};
use tokio::net::{TcpListener, TcpStream};
use tokio_stream::wrappers::TcpListenerStream;
use tokio_stream::StreamExt as _;

struct Server {
listener: Option<TcpListener>
}

pub struct ServerSession {
conn: Connection,
//runtime: Runtime
}

impl ServerSession {
pub fn new(stream: TcpStream) -> Self {
ServerSession {
conn: Connection::new(stream),
//runtime: Runtime::new(RuntimeMode::File)
}
}

pub async fn handle(&mut self) {
if let Some(message) = self.conn.read().await.unwrap() {
println!("{:?}", message);
match message {
Message::Request(req) => match req {
Request::Run(command) => {
/*let execution = self.runtime.interpret(&command);
if execution.is_ok() {
let bson_response = bson::to_bson(&execution.ok().or_else(|| Some(RV::Undefined)));
self.conn.write(Message::Response(Response::Value(bson_response.unwrap()))).await;
}*/
self.conn.write(Message::Response(Response::Value(bson::to_bson(&1515).unwrap()))).await.unwrap();
},
},
_ => panic!("Unsupported message type")
}
}
}

pub async fn send(&mut self, msg: Message) -> Result<(), CommunicationError> {
self.conn.write(msg).await
}
}

impl Server {
pub fn new() -> Result<Self, Error> {
Ok(Server {
listener: None,
})
}

pub async fn listen(mut self, addr: &str) -> Result<Self, Error> {
let listener = TcpListener::bind(addr).await?;
println!("Listening on {}", listener.local_addr()?);
self.listener = Some(listener);
Ok(self)
}

pub async fn serve(self) -> Result<(), Error> {
if let Some(listener) = self.listener {
let mut stream = TcpListenerStream::new(listener);
while let Some(socket) = stream.try_next().await? {
let peer = socket.peer_addr()?;
tokio::spawn(async move {
let mut session = ServerSession::new(socket);
println!("Client {} connected", peer);
session.handle().await;
println!("Client {} disconnected", peer);
});
}
}
Ok(())
}

fn main() {
cli::init();
}
#[tokio::main]
async fn main() -> Result<(), Error> {
Server::new()?
.listen("0.0.0.0:19191")
.await?
.serve()
.await
}

0 comments on commit 8d5c300

Please sign in to comment.