o2versioner: Distributed Versioning in Asynchronous Rust
Distributed Versioning: Consistent Replication for Scaling Back-end Databases of Dynamic Content Web Sites
Link to Paper
Cristiana Amza, Alan L. Cox and Willy Zwaenepoel
Using dev build
# build
cargo build
# build and run
cargo run -- --help
# build and test
cargo test
# build and test - show stdout output at end
cargo test -- --show-output
# build and test - show stdout alive
cargo test -- --nocapture
Using release build
# build
cargo build --release
# build and run
cargo run --release -- --help
# build and test
cargo test --release
# build and test - show stdout output at end
cargo test --release -- --show-output
# build and test - show stdout alive
cargo test --release -- --nocapture
Refer to: conf.rs
find o2versioner/ -name '*.rs' | xargs wc -l | sort -nr
Refer to: README
find -name '*.py' | xargs wc -l | sort -nr
# Sonnect to the admin addr via netcat or telnet
netcat localhost 9999
telnet localhost 9999
# Send a "perf" to the admin
perf
Single run analysis
python3 -m analyzer single ./final_perf/xx
Multi run analysis
python3 -m analyzer multi ./final_perf
- Scheduler
- Sequencer
- DbProxy
- Msql interface
- Better debugging and logging
- msql: simple sql
- msql: Msql and MsqlText interface
- msql: annotation-based
- msql: query auto annotation
- Begin tx stmt
- Query stmt
- Commit&Abort tx stmt
- Single write (and unoptimized single read)
- Single read
- Early release
o2versioner
├── src
│ ├── comm # communication-related
│ ├── core # core data structure and algorithm
│ ├── dbproxy # dbproxy library
│ ├── scheduler # scheduler library
│ ├── sequencer # sequencer library
│ ├── util # utility library
│ ├── lib.rs # declaration of the mods above
│ └── main.rs # main executable
└── tests # system level testing
sequenecer_main()
- main entrance- Handler
- For every incomming tcp connection -
tokio::spawn()
- For each incomming tcp connection
- Run until the connection is closed
- Process all requests through this connection
- Receive a single request, process the request, and send one response back
- Keep a central state for versions assigned for each table
- Lifetime is till all incoming connections are closed if the max connection is set
- For every incomming tcp connection -
scheduler_main()
- main entrance- Modularized, decoupled, running concurrently, communicate through channels
- Handler, receive and reply client requests
- Dispatcher, process client requests
- Transceiver, communicate with Dbproxy
- Handler
- For every incoming tcp connection -
tokio::spawn()
- For each incomming tcp connection
- Until the connection is closed
- Process all requests through this connection
- Keep a connection/session state
- Receive a single request, process the request, and send one response back
- For Sequencer action, send a request to Sequencer and wait for reply
- For Dbproxy action, send a requst to dispatcher and wait for reply
- Manages a
DispatcherAddr
object to the Dispatcher
- Manages a pool connection to Sequencer
- Lifetime is till all incoming connections are closed if the max connection is set
- For every incoming tcp connection -
- Dispatcher
- Manages the DbVN for each Dbproxy
- A single event loop for receiving requests from handler via
DispatcherAddr
object - Request is sent via
DispatcherAddr
object to the eventloop. The request also includes a single-useOneshot::Sender
channel for replying back to the handler - Only reply back the handler the response received from the first Dbproxy replying, the rest of the reponses are not sent back to the handler, but they are still needed to update the internal state of the Dispatcher
- Lifetime is till all
DispatcherAddr
objects are dropped - Incoming queries are executed concurrently
- For each query, the query is sent to each transceiver in serial. After all queries are sent to all transceivers, waiting for the trasceiver replies concurrently. Since no Dbproxy replies are able to arrive before all requests are sent to transceiver, this guarantees the query ordering within the same transaction.
- Transceiver
- Manges a single
TcpStream
socket for a single Dbproxy. The socket is used for reading and writing to Dbproxy concurrently. TransceiverAddr
mechanism works same asDispathcerAddr
- Two separate event loops in serial:
- Receiving request from dispatcher and forwards to Dbproxy
- Receiving response from Dbproxy and forwards to dispatcher
- For each client (with the single Dbproxy), a
LinkedList
is used as a FIFO queue for tracking the outstanding requests. Push front upon transmitting and pop back upon receiving. The outgoing request and incoming response all haveRequestMeta
that can uniquely identify a request for each client, this is used to make sure that Dbproxy does not reorder the queries within a single transaction.
- Manges a single
- Admin Handler (Optional)
- Only process a single incoming tcp connection at a time
- Receive a single request in raw bytes, process the request, and send one response back
- Supports remotely stopping the main handler for taking in any new connections, this also stops Sequencer from taking in any new connections
- Can send Block and Unblock request to Sequencer to block new transactions
dbproxy_main()
- main entrance- Modularized, decoupled, running concurrently, communicate through channels
- Transceiver, receive and reply client Scheduler
- Dispatcher, process sql requests
- Dispatcher
- FIFO
Queue
in order of request receiving order from Scheduler fromTransceiver::Receiver
- Manages a DbVN
- An event loop that is triggered by new incoming requests into the
Queue
or a version release - For each iteration:
- Find requests that have their version ready, and can be executed
- If multiple requests of the same transaction are in the Queue, only the first one is checked
- Each transaction is spawned as a separate task, with a channel open for receiving subsequent requests within the same transaction
- Once the request is finished, the reply will be sent to
Transceiver::Responder
- Once the transaction is finished, the task is shutdown
- FIFO
- Transceiver
- Manges a single
TcpStream
socket. The socket is used for reading and writing to Scheduler concurrently. - Two separate event loops in parallel:
- Receiver: receiving request from Scheduler and push into the
Queue
- Responder: send response back to Scheduler, also performs version release
- Receiver: receiving request from Scheduler and push into the
- Manges a single
- Everything is around objects that are implemented with
trait Future<Output=T>
ortrait Stream<Item=T>
. Future<Output=T>
is an asynchronous version ofT
;Stream<Item=T>
is an asynchonous version ofIterator<Item=T>
.Stream<Item=T>
is essentially an iterator ofFuture<Output=T>
that resolves intoT
when being handled.Future
andStream
must be run to complete; otherwise their enclosed closures won't be executed.
- An object implementing
trait Future<Output=T>
can only be transformed into another object implementingtrait Future<Output=Y>
with side affects once being resolved. This can be done via provided methods fromtrait FutureExt
(and/ortrait TryFutureExt
) which is provided for every object that implementstrait Future
. Or, by using keyword.await
to nonblockingly yieldT
fromFuture<Output=T>
. - However,
.await
must resides withinasync
functions or blocks, which returns an anonymous object that implementstrait Future
. The only way for aFuture
to fully resolve is via an executor, such astokio::spawn
. - Functions or closures with
.await
inside must be declared withasync
, and they will return an anonymous object that implementstrait Future
. .await
means nonblockingly executing the future. The program is still executed from top to bottom as usual..await
only means the current task won't block the current thread that runs the current task. After.await
is returned, the next line is executed.- Multithreading: OS maps N threads onto K CPU cores.
Asynchronous: Tokio maps N spawned tasks (viatokio::spawn()
) onto K worker threads. .await
(or nonblocking) means yielding the current task, so that the current worker thread can execute other spawned async tasks. Blocking means the current async task will fully occupy the current worker thread to spin and do nothing, basically wasting the worker thread pool resources.- By default, all
Future
are executed one after one, as they are treated as a single task. On the other hand,tokio::spawn()
spawns the argumentFuture
as a separate task, which may run on the current thread or another thread depending on thetokio::runtime
, but in any cases, the spawned task is "decoupled" from the parent task, so they can run concurrently (not necessarily in parallel). tokio::spawn()
returns a handle ofFuture
, and thisFuture
must also be properly.await
ortokio::try_join!()
for it to execute to complete, similar to a thread join.async
closure is not yet supported, but can be achieved by having a closure returning aFuture
. This can be done by:
|item| {
// clone the data that needs to go into the async block
let x_cloned = x.clone();
// This block basically returns a `Future<Output=Result<(), _>>` that captures the actions enclosed
async move {
some_async_fn(x_cloned).await;
Ok(())
}
})
trait Stream
does not implytrait Future
, they are different.- Object implementing
trait Stream
can be drained to completion. This draining process is however aFuture
. Stream<Item=T>
is essentiallyIterator<Item: Future<Output=T>>
, and operations onStream
is similar to those onIterator
objects. These provided methods are insidetrait StreamExt
(and/ortrait TryStreamExt
), which are free for objects implementingtrait Stream
. All of these methods are essentially applying a closure onT
that is yielded byStream<Item=T>
.trait Stream::map()
: synchronously convertsStream<Item=T>
toStream<Item=Y>
by mapping yieldedT
directly toY
via the closure. The closure is synchronous.trait Stream::then()
: asynchronously convertsStream<Item=T>
toStream<Item=Y>
by mapping yieldedT
toFuture<Output=Y>
via the closure. The closure is asynchronous. Then thisFuture<Output=Y>
is yielded (like calling.await
on it) toY
implicitly and automatically by the method.trait Stream::for_each
: asynchronously convertsStream<Item=T>
toFuture<Output=()>
by mapping yieldedT
toFuture<Output=()>
via the closure. The closure is asynchronous. As all items inside theStream<Item=T>
are asynchronously converted toFuture<Output=()>
, this essentially means theStream
is drained to complete. The returnedFuture<Output=()>
can then be.await
ortokio::spawn()
to faciliate side affect of the closure to be executed. Note, allFuture<Output=()>
returned by the closure are then being yielded back to back as one serial task, usingtrait Stream::for_each_concurrent
can spawn them as concurrent tasks onceT
is yielded andFuture<Output=()>
is returned by the closure.