Skip to content

Commit

Permalink
Merge pull request fedimint#6049 from dpc/24-09-19-rocksdb-refactor
Browse files Browse the repository at this point in the history
refactor: eliminate duplication in rocksdb impl
  • Loading branch information
dpc authored Sep 20, 2024
2 parents da8c105 + a79301c commit be1fde5
Showing 1 changed file with 16 additions and 61 deletions.
77 changes: 16 additions & 61 deletions fedimint-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ use tracing::debug;

use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;

// turn an `iter` into a `Stream` where every `next` is ran inside
// `block_in_place` to offload the blocking calls
fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item>
where
I: Iterator + Send + 'i,
I::Item: Send,
{
stream::unfold(iter, |mut iter| async {
fedimint_core::runtime::block_in_place(|| {
let item = iter.next();
item.map(|item| (item, iter))
})
})
}

#[derive(Debug)]
pub struct RocksDb(rocksdb::OptimisticTransactionDB);

Expand Down Expand Up @@ -202,21 +217,6 @@ impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> {
}

async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
// turn an `iter` into a `Stream` where every `next` is ran inside
// `block_in_place` to offload the blocking calls
fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item>
where
I: Iterator + Send + 'i,
I::Item: Send,
{
stream::unfold(iter, |mut iter| async {
fedimint_core::runtime::block_in_place(|| {
let item = iter.next();
item.map(|item| (item, iter))
})
})
}

Ok(fedimint_core::runtime::block_in_place(|| {
let prefix = key_prefix.to_vec();
let mut options = rocksdb::ReadOptions::default();
Expand All @@ -236,21 +236,6 @@ impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> {
}

async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
// turn an `iter` into a `Stream` where every `next` is ran inside
// `block_in_place` to offload the blocking calls
fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item>
where
I: Iterator + Send + 'i,
I::Item: Send,
{
stream::unfold(iter, |mut iter| async {
fedimint_core::runtime::block_in_place(|| {
let item = iter.next();
item.map(|item| (item, iter))
})
})
}

Ok(fedimint_core::runtime::block_in_place(|| {
let range = Range {
start: range.start.to_vec(),
Expand Down Expand Up @@ -322,7 +307,7 @@ impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> {
.starts_with(&prefix)
.then_some((key_bytes.to_vec(), value_bytes.to_vec()))
});
Box::pin(stream::iter(rocksdb_iter))
Box::pin(convert_to_async_stream(rocksdb_iter))
}))
}
}
Expand Down Expand Up @@ -367,21 +352,6 @@ impl<'a> IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'a> {
}

async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
// turn an `iter` into a `Stream` where every `next` is ran inside
// `block_in_place` to offload the blocking calls
fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item>
where
I: Iterator + Send + 'i,
I::Item: Send,
{
stream::unfold(iter, |mut iter| async {
fedimint_core::runtime::block_in_place(|| {
let item = iter.next();
item.map(|item| (item, iter))
})
})
}

Ok(fedimint_core::runtime::block_in_place(|| {
let range = Range {
start: range.start.to_vec(),
Expand All @@ -403,21 +373,6 @@ impl<'a> IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'a> {
}

async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
// turn an `iter` into a `Stream` where every `next` is ran inside
// `block_in_place` to offload the blocking calls
fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item>
where
I: Iterator + Send + 'i,
I::Item: Send,
{
stream::unfold(iter, |mut iter| async {
fedimint_core::runtime::block_in_place(|| {
let item = iter.next();
item.map(|item| (item, iter))
})
})
}

Ok(fedimint_core::runtime::block_in_place(|| {
let prefix = key_prefix.to_vec();
let mut options = rocksdb::ReadOptions::default();
Expand Down

0 comments on commit be1fde5

Please sign in to comment.