Skip to content

Commit

Permalink
store data refs in data db to fix multi-tenant error
Browse files Browse the repository at this point in the history
  • Loading branch information
kayhhh committed Apr 30, 2024
1 parent 18ef6df commit fa06912
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 28 deletions.
2 changes: 1 addition & 1 deletion crates/dwn/src/actor/records/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<'a, D: DataStore, M: MessageStore> RecordsReadBuilder<'a, D, M> {
_ => None,
};

// If we don't have the data, read from remote.
// If we don't have the data, check remote.
if data_cid.is_some() && reply.record.data.is_none() {
if let Some(found) = self.read_remote().await? {
return Ok(found);
Expand Down
3 changes: 0 additions & 3 deletions crates/dwn/src/handlers/records/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ pub async fn handle_records_read(
latest.data = match &latest.descriptor {
Descriptor::RecordsWrite(descriptor) => {
if let Some(data_cid) = &descriptor.data_cid {
println!("Reading cid: {}", data_cid);
let data_cid = Cid::try_from(data_cid.as_str()).map_err(|e| {
HandleMessageError::InvalidDescriptor(format!("Invalid data CID: {}", e))
})?;
Expand All @@ -73,8 +72,6 @@ pub async fn handle_records_read(
_ => None,
};

println!("Read data: {:?}", latest.data);

Ok(RecordsReadReply {
record: Box::new(latest),
status: Status::ok(),
Expand Down
42 changes: 19 additions & 23 deletions crates/dwn/src/store/surrealdb/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ impl<T: Connection> MessageStore for SurrealStore<T> {
.await
.map_err(MessageStoreError::Backend)?;

let id = Thing::from((
Table::from(MESSAGE_TABLE.to_string()).to_string(),
Id::String(cid),
));
let id = Thing::from((Table::from(MESSAGE_TABLE).to_string(), Id::String(cid)));

let message: Option<DbMessage> = db
.select(id.clone())
Expand Down Expand Up @@ -71,23 +68,23 @@ impl<T: Connection> MessageStore for SurrealStore<T> {
Id::String(data_cid.to_string()),
));

let db_data_ref: Option<DbDataCidRefs> = db
let db_data_ref: Option<DataRefs> = db
.select(id.clone())
.await
.map_err(|err| MessageStoreError::Backend(anyhow!(err)))?;

if let Some(db_data_ref) = db_data_ref {
if db_data_ref.ref_count > 1 {
// Decrement the reference count for the data CID.
db.update::<Option<DbDataCidRefs>>(id)
.content(DbDataCidRefs {
db.update::<Option<DataRefs>>(id)
.content(DataRefs {
ref_count: db_data_ref.ref_count - 1,
})
.await
.map_err(|err| MessageStoreError::Backend(anyhow!(err)))?;
} else {
// Delete the data if this is the only reference.
db.delete::<Option<DbDataCidRefs>>(id)
db.delete::<Option<DataRefs>>(id)
.await
.map_err(|err| MessageStoreError::Backend(anyhow!(err)))?;

Expand All @@ -110,36 +107,33 @@ impl<T: Connection> MessageStore for SurrealStore<T> {
// TODO: Only store data in data store if over a certain size.

if let Some(data) = message.data.take() {
// Check if the data is already stored.
let db = self
.message_db(&tenant)
.await
.map_err(MessageStoreError::Backend)?;
let db = self.data_db().await.map_err(MessageStoreError::Backend)?;

// Check if the data is already stored.
let cid = data.cid()?.to_string();

let id = Thing::from((
Table::from(DATA_REF_TABLE).to_string(),
Id::String(cid.clone()),
));

let db_data_ref: Option<DbDataCidRefs> = db
let db_data_ref: Option<DataRefs> = db
.select(id.clone())
.await
.map_err(|err| MessageStoreError::Backend(anyhow!(err)))?;

if let Some(db_data_ref) = db_data_ref {
// Add one to the reference count.
db.update::<Option<DbDataCidRefs>>(id)
.content(DbDataCidRefs {
db.update::<Option<DataRefs>>(id.clone())
.content(DataRefs {
ref_count: db_data_ref.ref_count + 1,
})
.await
.map_err(|err| MessageStoreError::Backend(anyhow!(err)))?;
} else {
// Create a new data CID object.
db.create::<Option<DbDataCidRefs>>(id)
.content(DbDataCidRefs { ref_count: 1 })
db.create::<Option<DataRefs>>(id.clone())
.content(DataRefs { ref_count: 1 })
.await
.map_err(|err| MessageStoreError::Backend(anyhow!(err)))?;

Expand All @@ -150,17 +144,17 @@ impl<T: Connection> MessageStore for SurrealStore<T> {
data_cid = Some(cid);
}

let cbor = encode_cbor(&message)?;
let message_cid = cbor.cid();

let db = self
.message_db(&tenant)
.await
.map_err(MessageStoreError::Backend)?;

let cbor = encode_cbor(&message)?;
let message_cid = cbor.cid();

// Store the message.
let id = Thing::from((
Table::from(MESSAGE_TABLE.to_string()).to_string(),
Table::from(MESSAGE_TABLE).to_string(),
Id::String(message_cid.to_string()),
));

Expand Down Expand Up @@ -201,6 +195,8 @@ impl<T: Connection> MessageStore for SurrealStore<T> {
})
.unwrap_or_default();

// TODO: When updating a record, if data changes delete old data / decrement ref

db.create::<Option<DbMessage>>(id)
.content(DbMessage {
author,
Expand Down Expand Up @@ -472,7 +468,7 @@ impl<T: Connection> MessageStore for SurrealStore<T> {
}

#[derive(Serialize, Deserialize, Debug)]
struct DbDataCidRefs {
struct DataRefs {
ref_count: usize,
}

Expand Down
13 changes: 12 additions & 1 deletion crates/dwn/tests/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn test_records() {
let store = SurrealStore::new(db).await.unwrap();
let dwn = Arc::new(DWN::from(store));

let actor = Actor::new_did_key(dwn).unwrap();
let actor = Actor::new_did_key(dwn.clone()).unwrap();

// Create a new record.
let data = "Hello, world!".bytes().collect::<Vec<_>>();
Expand Down Expand Up @@ -72,6 +72,17 @@ async fn test_records() {
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, None);

// Create a record with the same data from another tenant.
let actor_two = Actor::new_did_key(dwn).unwrap();
let create_two = actor_two
.create_record()
.data(data.clone())
.data_format(Application::Json.into())
.process()
.await
.unwrap();
assert_eq!(create_two.reply.status.code, 200);

// Create a new record.
let create = actor
.create_record()
Expand Down

0 comments on commit fa06912

Please sign in to comment.