diff --git a/crates/dwn/src/actor/records/read.rs b/crates/dwn/src/actor/records/read.rs index 2b3c3a7..0c4cb70 100644 --- a/crates/dwn/src/actor/records/read.rs +++ b/crates/dwn/src/actor/records/read.rs @@ -2,7 +2,7 @@ use crate::{ actor::{Actor, MessageBuilder, PrepareError, ProcessMessageError}, message::{ descriptor::{records::RecordsRead, Descriptor}, - Data, DwnRequest, Message, + DwnRequest, Message, }, reply::{MessageReply, RecordsReadReply}, store::{DataStore, MessageStore}, @@ -69,18 +69,8 @@ impl<'a, D: DataStore, M: MessageStore> RecordsReadBuilder<'a, D, M> { _ => None, }; - let missing_data = if data_cid.is_some() { - match &reply.record.data { - Some(Data::Base64(data)) => data.is_empty(), - Some(Data::Encrypted(encrypted)) => encrypted.ciphertext.is_empty(), - None => false, - } - } else { - false - }; - - // If we don't have the data, check remote. - if missing_data { + // If we don't have the data, read from remote. + if data_cid.is_some() && reply.record.data.is_none() { if let Some(found) = self.read_remote().await? { return Ok(found); } diff --git a/crates/dwn/src/handlers/records/read.rs b/crates/dwn/src/handlers/records/read.rs index 6164950..36565c5 100644 --- a/crates/dwn/src/handlers/records/read.rs +++ b/crates/dwn/src/handlers/records/read.rs @@ -1,4 +1,3 @@ -use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use libipld::Cid; use crate::{ @@ -7,7 +6,7 @@ use crate::{ records::{FilterDateSort, RecordsFilter}, Descriptor, }, - Data, DwnRequest, EncryptedData, + DwnRequest, }, reply::{MessageReply, RecordsReadReply, Status}, store::{DataStore, MessageStore}, @@ -58,14 +57,15 @@ pub async fn handle_records_read( .to_owned(); // Read data. - let data_bytes = match &latest.descriptor { + latest.data = match &latest.descriptor { Descriptor::RecordsWrite(descriptor) => { if let Some(data_cid) = &descriptor.data_cid { + println!("Reading cid: {}", data_cid); let data_cid = Cid::try_from(data_cid.as_str()).map_err(|e| { HandleMessageError::InvalidDescriptor(format!("Invalid data CID: {}", e)) })?; let res = data_store.get(data_cid.to_string()).await?; - res.map(|res| res.data) + res.map(|res| res.into()) } else { None } @@ -73,23 +73,7 @@ pub async fn handle_records_read( _ => None, }; - if let Some(bytes) = data_bytes { - match &latest.data { - Some(Data::Base64(_)) => { - latest.data = Some(Data::new_base64(&bytes)); - } - Some(Data::Encrypted(data)) => { - latest.data = Some(Data::Encrypted(EncryptedData { - ciphertext: URL_SAFE_NO_PAD.encode(&bytes), - iv: data.iv.clone(), - protected: data.protected.clone(), - recipients: data.recipients.clone(), - tag: data.tag.clone(), - })) - } - None => {} - } - } + println!("Read data: {:?}", latest.data); Ok(RecordsReadReply { record: Box::new(latest), diff --git a/crates/dwn/src/handlers/records/write.rs b/crates/dwn/src/handlers/records/write.rs index 80aaad4..12ea8f4 100644 --- a/crates/dwn/src/handlers/records/write.rs +++ b/crates/dwn/src/handlers/records/write.rs @@ -52,13 +52,23 @@ pub async fn handle_records_write( )); } - // Validate data CID is present. - // We do not validate the CID, because we may want to store a record - // without storing its associated data. - if message.data.is_some() && descriptor.data_cid.is_none() { - return Err(HandleMessageError::InvalidDescriptor( - "Data CID missing".to_string(), - )); + // Validate data CID if data present. + if let Some(data) = &message.data { + let cid = data.cid()?; + + let data_cid = + descriptor + .data_cid + .as_deref() + .ok_or(HandleMessageError::InvalidDescriptor( + "Data CID missing".to_string(), + ))?; + + if cid.to_string() != data_cid { + return Err(HandleMessageError::InvalidDescriptor( + "Data CID does not match data".to_string(), + )); + } } // Get messages for the record. diff --git a/crates/dwn/src/store/mod.rs b/crates/dwn/src/store/mod.rs index 97514bd..ac9e544 100644 --- a/crates/dwn/src/store/mod.rs +++ b/crates/dwn/src/store/mod.rs @@ -1,5 +1,6 @@ use std::future::Future; +use base64::DecodeError; use libipld::Cid; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -8,12 +9,13 @@ use crate::{ encode::EncodeError, message::{ descriptor::{protocols::ProtocolsFilter, records::RecordsFilter}, - DecodeError, Message, + Message, }, }; #[cfg(feature = "s3")] mod s3; +mod stored_data; #[cfg(feature = "surrealdb")] mod surrealdb; @@ -22,6 +24,8 @@ pub use s3::S3; #[cfg(feature = "surrealdb")] pub use surrealdb::SurrealStore; +use self::stored_data::StoredData; + #[derive(Error, Debug)] pub enum DataStoreError { #[error("No data found for CID")] @@ -37,21 +41,15 @@ pub trait DataStore: Send + Sync { fn get( &self, cid: String, - ) -> impl Future, DataStoreError>> + Send + Sync; + ) -> impl Future, DataStoreError>> + Send + Sync; fn put( &self, cid: String, - value: Vec, + value: StoredData, ) -> impl Future> + Send + Sync; } -#[derive(Debug)] -pub struct GetDataResults { - pub size: usize, - pub data: Vec, -} - #[derive(Debug, Deserialize, Serialize)] pub struct PutDataResults { #[serde(rename = "dataSize")] @@ -94,19 +92,19 @@ pub enum MessageStoreError { #[error("Message missing data")] MissingData, #[error(transparent)] - MessageEncode(#[from] EncodeError), + Encode(#[from] EncodeError), #[error(transparent)] - DataEncodeError(#[from] libipld_core::error::SerdeError), + Decode(#[from] DecodeError), + #[error(transparent)] + DataEncode(#[from] libipld_core::error::SerdeError), #[error("Not found")] NotFound, #[error(transparent)] - MessageDecodeError(#[from] DecodeError), - #[error(transparent)] Cid(#[from] libipld::cid::Error), #[error("Failed to create block {0}")] - CreateBlockError(anyhow::Error), + CreateBlock(anyhow::Error), #[error("Failed to interact with backend: {0}")] - BackendError(anyhow::Error), + Backend(anyhow::Error), #[error(transparent)] - DataStoreError(#[from] DataStoreError), + DataStore(#[from] DataStoreError), } diff --git a/crates/dwn/src/store/stored_data.rs b/crates/dwn/src/store/stored_data.rs new file mode 100644 index 0000000..fdc68d7 --- /dev/null +++ b/crates/dwn/src/store/stored_data.rs @@ -0,0 +1,60 @@ +use base64::{engine::general_purpose::URL_SAFE_NO_PAD, DecodeError, Engine}; +use serde::{Deserialize, Serialize}; + +use crate::message::{Data, EncryptedData, Protected}; + +#[derive(Debug, Serialize, Deserialize)] +pub enum StoredData { + Base64(Vec), + Encrypted { + ciphertext: Vec, + iv: String, + protected: Protected, + recipients: Vec, + tag: String, + }, +} + +impl StoredData { + pub fn len(&self) -> usize { + serde_json::to_vec(self).unwrap().len() + } +} + +impl TryFrom for StoredData { + type Error = DecodeError; + + fn try_from(value: Data) -> Result { + match value { + Data::Base64(data) => Ok(Self::Base64(URL_SAFE_NO_PAD.decode(data.as_bytes())?)), + Data::Encrypted(data) => Ok(Self::Encrypted { + ciphertext: URL_SAFE_NO_PAD.decode(data.ciphertext.as_bytes())?, + iv: data.iv, + protected: data.protected, + recipients: data.recipients, + tag: data.tag, + }), + } + } +} + +impl From for Data { + fn from(value: StoredData) -> Self { + match value { + StoredData::Base64(bytes) => Self::new_base64(&bytes), + StoredData::Encrypted { + ciphertext, + iv, + protected, + recipients, + tag, + } => Self::Encrypted(EncryptedData { + ciphertext: URL_SAFE_NO_PAD.encode(ciphertext), + iv, + protected, + recipients, + tag, + }), + } + } +} diff --git a/crates/dwn/src/store/surrealdb/data.rs b/crates/dwn/src/store/surrealdb/data.rs index 1cc9175..e2a32f7 100644 --- a/crates/dwn/src/store/surrealdb/data.rs +++ b/crates/dwn/src/store/surrealdb/data.rs @@ -1,10 +1,11 @@ +use anyhow::anyhow; use serde::{Deserialize, Serialize}; use surrealdb::{ sql::{Id, Table, Thing}, Connection, }; -use crate::store::{DataStore, DataStoreError, GetDataResults, PutDataResults}; +use crate::store::{DataStore, DataStoreError, PutDataResults, StoredData}; use super::SurrealStore; @@ -18,32 +19,23 @@ impl DataStore for SurrealStore { db.delete::>(id) .await - .map_err(|e| DataStoreError::BackendError(anyhow::anyhow!(e)))?; + .map_err(|e| DataStoreError::BackendError(anyhow!(e)))?; Ok(()) } - async fn get(&self, cid: String) -> Result, DataStoreError> { + async fn get(&self, cid: String) -> Result, DataStoreError> { let db = self.data_db().await.map_err(DataStoreError::BackendError)?; let id = Thing::from((Table::from(DATA_TABLE_NAME).to_string(), Id::String(cid))); - let res: DbData = match db - .select(id) - .await - .map_err(|e| DataStoreError::BackendError(anyhow::anyhow!(e)))? - { - Some(res) => res, - None => return Ok(None), - }; - - Ok(Some(GetDataResults { - size: res.data.len(), - data: res.data, - })) + let res: Result, _> = db.select(id).await; + + res.map(|r| r.map(|r| r.data)) + .map_err(|e| DataStoreError::BackendError(anyhow!(e))) } - async fn put(&self, cid: String, data: Vec) -> Result { + async fn put(&self, cid: String, data: StoredData) -> Result { let db = self.data_db().await.map_err(DataStoreError::BackendError)?; let id = Thing::from(( @@ -56,7 +48,7 @@ impl DataStore for SurrealStore { db.create::>(id) .content(DbData { cid, data }) .await - .map_err(|e| DataStoreError::BackendError(anyhow::anyhow!(e)))?; + .map_err(|e| DataStoreError::BackendError(anyhow!(e)))?; Ok(PutDataResults { size }) } @@ -65,5 +57,5 @@ impl DataStore for SurrealStore { #[derive(Serialize, Deserialize, Debug)] struct DbData { cid: String, - data: Vec, + data: StoredData, } diff --git a/crates/dwn/src/store/surrealdb/message.rs b/crates/dwn/src/store/surrealdb/message.rs index 732df96..e243370 100644 --- a/crates/dwn/src/store/surrealdb/message.rs +++ b/crates/dwn/src/store/surrealdb/message.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use anyhow::anyhow; -use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use libipld::Cid; use serde::{Deserialize, Serialize}; use surrealdb::{ @@ -19,7 +18,7 @@ use crate::{ records::{FilterDateCreated, FilterDateSort, RecordsFilter}, Descriptor, }, - Data, EncryptedData, Message, + Message, }, store::{DataStore, MessageStore, MessageStoreError}, }; @@ -39,7 +38,7 @@ impl MessageStore for SurrealStore { let db = self .message_db(tenant) .await - .map_err(MessageStoreError::BackendError)?; + .map_err(MessageStoreError::Backend)?; let id = Thing::from(( Table::from(MESSAGE_TABLE.to_string()).to_string(), @@ -49,7 +48,7 @@ impl MessageStore for SurrealStore { let message: Option = db .select(id.clone()) .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; let message = match message { Some(message) => message, @@ -64,7 +63,7 @@ impl MessageStore for SurrealStore { // Delete the message. db.delete::>(id) .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; if let Some(data_cid) = message.data_cid { let id = Thing::from(( @@ -75,7 +74,7 @@ impl MessageStore for SurrealStore { let db_data_ref: Option = db .select(id.clone()) .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; if let Some(db_data_ref) = db_data_ref { if db_data_ref.ref_count > 1 { @@ -85,12 +84,12 @@ impl MessageStore for SurrealStore { ref_count: db_data_ref.ref_count - 1, }) .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; } else { // Delete the data if this is the only reference. db.delete::>(id) .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; data_store.delete(data_cid).await?; } @@ -111,27 +110,11 @@ impl MessageStore for SurrealStore { // TODO: Only store data in data store if over a certain size. if let Some(data) = message.data.take() { - // Keep the data type, but clear the data. - match &data { - Data::Base64(_) => { - message.data = Some(Data::Base64(String::new())); - } - Data::Encrypted(data) => { - message.data = Some(Data::Encrypted(EncryptedData { - ciphertext: String::new(), - iv: data.iv.clone(), - tag: data.tag.clone(), - protected: data.protected.clone(), - recipients: data.recipients.clone(), - })); - } - } - // Check if the data is already stored. let db = self .message_db(&tenant) .await - .map_err(MessageStoreError::BackendError)?; + .map_err(MessageStoreError::Backend)?; let cid = data.cid()?.to_string(); @@ -143,7 +126,7 @@ impl MessageStore for SurrealStore { let db_data_ref: Option = db .select(id.clone()) .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; if let Some(db_data_ref) = db_data_ref { // Add one to the reference count. @@ -152,25 +135,16 @@ impl MessageStore for SurrealStore { ref_count: db_data_ref.ref_count + 1, }) .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; } else { // Create a new data CID object. db.create::>(id) .content(DbDataCidRefs { ref_count: 1 }) .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; - - let bytes = match data { - Data::Base64(data) => URL_SAFE_NO_PAD - .decode(data.as_bytes()) - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?, - Data::Encrypted(data) => URL_SAFE_NO_PAD - .decode(data.ciphertext.as_bytes()) - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?, - }; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; // Store data in the data store. - data_store.put(cid.clone(), bytes).await?; + data_store.put(cid.clone(), data.try_into()?).await?; } data_cid = Some(cid); @@ -182,7 +156,7 @@ impl MessageStore for SurrealStore { let db = self .message_db(&tenant) .await - .map_err(MessageStoreError::BackendError)?; + .map_err(MessageStoreError::Backend)?; // Store the message. let id = Thing::from(( @@ -239,7 +213,7 @@ impl MessageStore for SurrealStore { tenant, }) .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; Ok(*message_cid) } @@ -253,7 +227,7 @@ impl MessageStore for SurrealStore { let db = self .message_db(&tenant) .await - .map_err(MessageStoreError::BackendError)?; + .map_err(MessageStoreError::Backend)?; let mut conditions = Conditions::new_and(); conditions.add("message.descriptor.interface = 'Protocols'".to_string()); @@ -285,11 +259,11 @@ impl MessageStore for SurrealStore { let mut res = query .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; let mut db_messages: Vec = res .take(0) - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; db_messages.sort_by(|a, b| a.message_timestamp.cmp(&b.message_timestamp)); @@ -309,7 +283,7 @@ impl MessageStore for SurrealStore { let db = self .message_db(&tenant) .await - .map_err(MessageStoreError::BackendError)?; + .map_err(MessageStoreError::Backend)?; let mut binds = HashMap::new(); let mut conditions = Conditions::new_and(); @@ -343,7 +317,7 @@ impl MessageStore for SurrealStore { if let Some(protocol) = protocols.first() { let descriptor = match &protocol.descriptor { Descriptor::ProtocolsConfigure(desc) => desc, - _ => return Err(MessageStoreError::BackendError(anyhow!("Invalid protocol"))), + _ => return Err(MessageStoreError::Backend(anyhow!("Invalid protocol"))), }; if let Some(definition) = &descriptor.definition { @@ -431,7 +405,7 @@ impl MessageStore for SurrealStore { if let Some(FilterDateCreated { from, to }) = filter.message_timestamp { if let Some(from) = from { let from = from.format(&Rfc3339).map_err(|err| { - MessageStoreError::BackendError(anyhow!("Failed to format date: {}", err)) + MessageStoreError::Backend(anyhow!("Failed to format date: {}", err)) })?; binds.insert("from".to_string(), from.to_string()); conditions.add("message_timestamp >= $from".to_string()); @@ -439,7 +413,7 @@ impl MessageStore for SurrealStore { if let Some(to) = to { let to = to.format(&Rfc3339).map_err(|err| { - MessageStoreError::BackendError(anyhow!("Failed to format date: {}", err)) + MessageStoreError::Backend(anyhow!("Failed to format date: {}", err)) })?; binds.insert("to".to_string(), to.to_string()); conditions.add("message_timestamp <= $to".to_string()); @@ -484,11 +458,11 @@ impl MessageStore for SurrealStore { let mut res = query .await - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; let db_messages: Vec = res .take(0) - .map_err(|err| MessageStoreError::BackendError(anyhow!(err)))?; + .map_err(|err| MessageStoreError::Backend(anyhow!(err)))?; Ok(db_messages .into_iter()