Skip to content

Commit

Permalink
refactor data storage + enforce data CID
Browse files Browse the repository at this point in the history
  • Loading branch information
kayhhh committed Apr 30, 2024
1 parent 2c34aaa commit 18ef6df
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 125 deletions.
16 changes: 3 additions & 13 deletions crates/dwn/src/actor/records/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
actor::{Actor, MessageBuilder, PrepareError, ProcessMessageError},
message::{
descriptor::{records::RecordsRead, Descriptor},
Data, DwnRequest, Message,
DwnRequest, Message,
},
reply::{MessageReply, RecordsReadReply},
store::{DataStore, MessageStore},
Expand Down Expand Up @@ -69,18 +69,8 @@ impl<'a, D: DataStore, M: MessageStore> RecordsReadBuilder<'a, D, M> {
_ => None,
};

let missing_data = if data_cid.is_some() {
match &reply.record.data {
Some(Data::Base64(data)) => data.is_empty(),
Some(Data::Encrypted(encrypted)) => encrypted.ciphertext.is_empty(),
None => false,
}
} else {
false
};

// If we don't have the data, check remote.
if missing_data {
// If we don't have the data, read from remote.
if data_cid.is_some() && reply.record.data.is_none() {
if let Some(found) = self.read_remote().await? {
return Ok(found);
}
Expand Down
26 changes: 5 additions & 21 deletions crates/dwn/src/handlers/records/read.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use libipld::Cid;

use crate::{
Expand All @@ -7,7 +6,7 @@ use crate::{
records::{FilterDateSort, RecordsFilter},
Descriptor,
},
Data, DwnRequest, EncryptedData,
DwnRequest,
},
reply::{MessageReply, RecordsReadReply, Status},
store::{DataStore, MessageStore},
Expand Down Expand Up @@ -58,38 +57,23 @@ pub async fn handle_records_read(
.to_owned();

// Read data.
let data_bytes = match &latest.descriptor {
latest.data = match &latest.descriptor {
Descriptor::RecordsWrite(descriptor) => {
if let Some(data_cid) = &descriptor.data_cid {
println!("Reading cid: {}", data_cid);
let data_cid = Cid::try_from(data_cid.as_str()).map_err(|e| {
HandleMessageError::InvalidDescriptor(format!("Invalid data CID: {}", e))
})?;
let res = data_store.get(data_cid.to_string()).await?;
res.map(|res| res.data)
res.map(|res| res.into())
} else {
None
}
}
_ => None,
};

if let Some(bytes) = data_bytes {
match &latest.data {
Some(Data::Base64(_)) => {
latest.data = Some(Data::new_base64(&bytes));
}
Some(Data::Encrypted(data)) => {
latest.data = Some(Data::Encrypted(EncryptedData {
ciphertext: URL_SAFE_NO_PAD.encode(&bytes),
iv: data.iv.clone(),
protected: data.protected.clone(),
recipients: data.recipients.clone(),
tag: data.tag.clone(),
}))
}
None => {}
}
}
println!("Read data: {:?}", latest.data);

Ok(RecordsReadReply {
record: Box::new(latest),
Expand Down
24 changes: 17 additions & 7 deletions crates/dwn/src/handlers/records/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,23 @@ pub async fn handle_records_write(
));
}

// Validate data CID is present.
// We do not validate the CID, because we may want to store a record
// without storing its associated data.
if message.data.is_some() && descriptor.data_cid.is_none() {
return Err(HandleMessageError::InvalidDescriptor(
"Data CID missing".to_string(),
));
// Validate data CID if data present.
if let Some(data) = &message.data {
let cid = data.cid()?;

let data_cid =
descriptor
.data_cid
.as_deref()
.ok_or(HandleMessageError::InvalidDescriptor(
"Data CID missing".to_string(),
))?;

if cid.to_string() != data_cid {
return Err(HandleMessageError::InvalidDescriptor(
"Data CID does not match data".to_string(),
));
}
}

// Get messages for the record.
Expand Down
30 changes: 14 additions & 16 deletions crates/dwn/src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::future::Future;

use base64::DecodeError;
use libipld::Cid;
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand All @@ -8,12 +9,13 @@ use crate::{
encode::EncodeError,
message::{
descriptor::{protocols::ProtocolsFilter, records::RecordsFilter},
DecodeError, Message,
Message,
},
};

#[cfg(feature = "s3")]
mod s3;
mod stored_data;
#[cfg(feature = "surrealdb")]
mod surrealdb;

Expand All @@ -22,6 +24,8 @@ pub use s3::S3;
#[cfg(feature = "surrealdb")]
pub use surrealdb::SurrealStore;

use self::stored_data::StoredData;

#[derive(Error, Debug)]
pub enum DataStoreError {
#[error("No data found for CID")]
Expand All @@ -37,21 +41,15 @@ pub trait DataStore: Send + Sync {
fn get(
&self,
cid: String,
) -> impl Future<Output = Result<Option<GetDataResults>, DataStoreError>> + Send + Sync;
) -> impl Future<Output = Result<Option<StoredData>, DataStoreError>> + Send + Sync;

fn put(
&self,
cid: String,
value: Vec<u8>,
value: StoredData,
) -> impl Future<Output = Result<PutDataResults, DataStoreError>> + Send + Sync;
}

#[derive(Debug)]
pub struct GetDataResults {
pub size: usize,
pub data: Vec<u8>,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct PutDataResults {
#[serde(rename = "dataSize")]
Expand Down Expand Up @@ -94,19 +92,19 @@ pub enum MessageStoreError {
#[error("Message missing data")]
MissingData,
#[error(transparent)]
MessageEncode(#[from] EncodeError),
Encode(#[from] EncodeError),
#[error(transparent)]
DataEncodeError(#[from] libipld_core::error::SerdeError),
Decode(#[from] DecodeError),
#[error(transparent)]
DataEncode(#[from] libipld_core::error::SerdeError),
#[error("Not found")]
NotFound,
#[error(transparent)]
MessageDecodeError(#[from] DecodeError),
#[error(transparent)]
Cid(#[from] libipld::cid::Error),
#[error("Failed to create block {0}")]
CreateBlockError(anyhow::Error),
CreateBlock(anyhow::Error),
#[error("Failed to interact with backend: {0}")]
BackendError(anyhow::Error),
Backend(anyhow::Error),
#[error(transparent)]
DataStoreError(#[from] DataStoreError),
DataStore(#[from] DataStoreError),
}
60 changes: 60 additions & 0 deletions crates/dwn/src/store/stored_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, DecodeError, Engine};
use serde::{Deserialize, Serialize};

use crate::message::{Data, EncryptedData, Protected};

#[derive(Debug, Serialize, Deserialize)]
pub enum StoredData {
Base64(Vec<u8>),
Encrypted {
ciphertext: Vec<u8>,
iv: String,
protected: Protected,
recipients: Vec<String>,
tag: String,
},
}

impl StoredData {
pub fn len(&self) -> usize {
serde_json::to_vec(self).unwrap().len()
}
}

impl TryFrom<Data> for StoredData {
type Error = DecodeError;

fn try_from(value: Data) -> Result<Self, Self::Error> {
match value {
Data::Base64(data) => Ok(Self::Base64(URL_SAFE_NO_PAD.decode(data.as_bytes())?)),
Data::Encrypted(data) => Ok(Self::Encrypted {
ciphertext: URL_SAFE_NO_PAD.decode(data.ciphertext.as_bytes())?,
iv: data.iv,
protected: data.protected,
recipients: data.recipients,
tag: data.tag,
}),
}
}
}

impl From<StoredData> for Data {
fn from(value: StoredData) -> Self {
match value {
StoredData::Base64(bytes) => Self::new_base64(&bytes),
StoredData::Encrypted {
ciphertext,
iv,
protected,
recipients,
tag,
} => Self::Encrypted(EncryptedData {
ciphertext: URL_SAFE_NO_PAD.encode(ciphertext),
iv,
protected,
recipients,
tag,
}),
}
}
}
30 changes: 11 additions & 19 deletions crates/dwn/src/store/surrealdb/data.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use surrealdb::{
sql::{Id, Table, Thing},
Connection,
};

use crate::store::{DataStore, DataStoreError, GetDataResults, PutDataResults};
use crate::store::{DataStore, DataStoreError, PutDataResults, StoredData};

use super::SurrealStore;

Expand All @@ -18,32 +19,23 @@ impl<T: Connection> DataStore for SurrealStore<T> {

db.delete::<Option<DbData>>(id)
.await
.map_err(|e| DataStoreError::BackendError(anyhow::anyhow!(e)))?;
.map_err(|e| DataStoreError::BackendError(anyhow!(e)))?;

Ok(())
}

async fn get(&self, cid: String) -> Result<Option<GetDataResults>, DataStoreError> {
async fn get(&self, cid: String) -> Result<Option<StoredData>, DataStoreError> {
let db = self.data_db().await.map_err(DataStoreError::BackendError)?;

let id = Thing::from((Table::from(DATA_TABLE_NAME).to_string(), Id::String(cid)));

let res: DbData = match db
.select(id)
.await
.map_err(|e| DataStoreError::BackendError(anyhow::anyhow!(e)))?
{
Some(res) => res,
None => return Ok(None),
};

Ok(Some(GetDataResults {
size: res.data.len(),
data: res.data,
}))
let res: Result<Option<DbData>, _> = db.select(id).await;

res.map(|r| r.map(|r| r.data))
.map_err(|e| DataStoreError::BackendError(anyhow!(e)))
}

async fn put(&self, cid: String, data: Vec<u8>) -> Result<PutDataResults, DataStoreError> {
async fn put(&self, cid: String, data: StoredData) -> Result<PutDataResults, DataStoreError> {
let db = self.data_db().await.map_err(DataStoreError::BackendError)?;

let id = Thing::from((
Expand All @@ -56,7 +48,7 @@ impl<T: Connection> DataStore for SurrealStore<T> {
db.create::<Option<DbData>>(id)
.content(DbData { cid, data })
.await
.map_err(|e| DataStoreError::BackendError(anyhow::anyhow!(e)))?;
.map_err(|e| DataStoreError::BackendError(anyhow!(e)))?;

Ok(PutDataResults { size })
}
Expand All @@ -65,5 +57,5 @@ impl<T: Connection> DataStore for SurrealStore<T> {
#[derive(Serialize, Deserialize, Debug)]
struct DbData {
cid: String,
data: Vec<u8>,
data: StoredData,
}
Loading

0 comments on commit 18ef6df

Please sign in to comment.