Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
amitayh committed Feb 21, 2024
1 parent ae64471 commit eac3a03
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 124 deletions.
78 changes: 78 additions & 0 deletions src/storage/attribute_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::rc::Rc;

use crate::datom::*;
use crate::schema::attribute::*;
use crate::schema::*;

pub struct AttributeBuilder {
id: u64,
version: u64,
ident: Option<Rc<str>>,
value_type: Option<ValueType>,
cardinality: Option<Cardinality>,
doc: Option<Rc<str>>,
unique: bool,
}

impl AttributeBuilder {
pub fn new(id: u64) -> Self {
Self {
id,
version: 0,
ident: None,
value_type: None,
cardinality: None,
doc: None,
unique: false,
}
}

pub fn consume(&mut self, datom: Datom) {
self.version = self.version.max(datom.tx);
match datom {
Datom {
attribute: DB_ATTR_IDENT_ID,
value: Value::Str(ident),
..
} => self.ident = Some(ident),
Datom {
attribute: DB_ATTR_TYPE_ID,
value: Value::U64(value_type),
..
} => self.value_type = ValueType::try_from(value_type).ok(),
Datom {
attribute: DB_ATTR_CARDINALITY_ID,
value: Value::U64(cardinality),
..
} => self.cardinality = Cardinality::try_from(cardinality).ok(),
Datom {
attribute: DB_ATTR_DOC_ID,
value: Value::Str(doc),
..
} => self.doc = Some(doc),
Datom {
attribute: DB_ATTR_UNIQUE_ID,
value: Value::U64(1),
..
} => self.unique = true,
_ => (),
}
}

pub fn build(self) -> Option<Attribute> {
let ident = self.ident?;
let value_type = self.value_type?;
let cardinality = self.cardinality?;
Some(Attribute {
id: self.id,
version: self.version,
definition: AttributeDefinition {
ident,
value_type,
cardinality,
doc: self.doc,
unique: self.unique,
},
})
}
}
78 changes: 2 additions & 76 deletions src/storage/attribute_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use thiserror::Error;
use crate::datom::*;
use crate::schema::attribute::*;
use crate::schema::*;
use crate::storage::attribute_builder::*;
use crate::storage::ReadStorage;

use super::Restricts;
Expand Down Expand Up @@ -61,7 +62,7 @@ fn resolve_id<'a, S: ReadStorage<'a>>(
attribute_id: u64,
tx: u64,
) -> Result<Option<Attribute>, S::Error> {
let mut builder = Builder::new(attribute_id);
let mut builder = AttributeBuilder::new(attribute_id);
// [?attribute _ _]
let restricts = Restricts::new(tx).with_entity(attribute_id);
for datom in storage.find(restricts) {
Expand All @@ -70,81 +71,6 @@ fn resolve_id<'a, S: ReadStorage<'a>>(
Ok(builder.build())
}

// ------------------------------------------------------------------------------------------------

struct Builder {
id: u64,
version: u64,
ident: Option<Rc<str>>,
value_type: Option<ValueType>,
cardinality: Option<Cardinality>,
doc: Option<Rc<str>>,
unique: bool,
}

impl Builder {
fn new(id: u64) -> Self {
Self {
id,
version: 0,
ident: None,
value_type: None,
cardinality: None,
doc: None,
unique: false,
}
}

fn consume(&mut self, datom: Datom) {
self.version = self.version.max(datom.tx);
match datom {
Datom {
attribute: DB_ATTR_IDENT_ID,
value: Value::Str(ident),
..
} => self.ident = Some(ident),
Datom {
attribute: DB_ATTR_TYPE_ID,
value: Value::U64(value_type),
..
} => self.value_type = ValueType::try_from(value_type).ok(),
Datom {
attribute: DB_ATTR_CARDINALITY_ID,
value: Value::U64(cardinality),
..
} => self.cardinality = Cardinality::try_from(cardinality).ok(),
Datom {
attribute: DB_ATTR_DOC_ID,
value: Value::Str(doc),
..
} => self.doc = Some(doc),
Datom {
attribute: DB_ATTR_UNIQUE_ID,
value: Value::U64(1),
..
} => self.unique = true,
_ => (),
}
}

fn build(self) -> Option<Attribute> {
let ident = self.ident?;
let value_type = self.value_type?;
let cardinality = self.cardinality?;
Some(Attribute {
id: self.id,
version: self.version,
definition: AttributeDefinition {
ident,
value_type,
cardinality,
doc: self.doc,
unique: self.unique,
},
})
}
}

#[cfg(test)]
mod tests {
use std::cell::Cell;
Expand Down
1 change: 1 addition & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod attribute_resolver;
pub mod attribute_builder;
pub mod disk;
pub mod memory;
pub mod restricts;
Expand Down
5 changes: 4 additions & 1 deletion src/tx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ pub enum TransactionError<S> {
#[error("storage error")]
StorageError(#[from] S),
#[error("invalid attribute type")]
InvalidAttributeType,
InvalidAttributeType {
attribute: u64,
value: Value,
},
#[error("duplicate temp ID `{0}`")]
DuplicateTempId(Rc<str>),
#[error("temp ID `{0}` not found")]
Expand Down
123 changes: 76 additions & 47 deletions src/tx/transactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Transactor {
impl Transactor {
pub fn new() -> Self {
Self {
next_entity_id: 100,
next_entity_id: 100, // TODO: How to initialize with latest ID from storage?
attribute_resolver: AttributeResolver::new(),
}
}
Expand Down Expand Up @@ -99,9 +99,8 @@ impl Transactor {
datoms: &mut Vec<Datom>,
unique_values: &mut HashSet<(u64, Value)>,
) -> Result<(), S::Error> {
let operation_attributes = operation.attributes.len();
let entity = self.resolve_entity(operation.entity, temp_ids)?;
let mut retract_attributes = HashSet::with_capacity(operation_attributes);
let mut retract_attributes = HashSet::with_capacity(operation.attributes.len());
for attribute_value in operation.attributes {
let attribute =
self.attribute_resolver
Expand All @@ -113,32 +112,11 @@ impl Transactor {
retract_attributes.insert(attribute.id);
}

let value = match attribute_value.value {
AttributeValue::Value(value) => Ok(value),
AttributeValue::TempId(temp_id) => temp_ids.get(&temp_id).map(Value::Ref),
}?;

if attribute.definition.value_type != ValueType::from(&value) {
// Value type is incompatible with attribute, reject transaction.
return Err(TransactionError::InvalidAttributeType);
}

let value = resolve_value(attribute_value.value, temp_ids)?;
verify_type(attribute, &value)?;
if attribute.definition.unique {
if !unique_values.insert((attribute.id, value.clone())) {
return Err(TransactionError::DuplicateUniqueValue {
attribute: attribute.id,
value,
});
}
let restricts = Restricts::new(tx)
.with_attribute(attribute.id)
.with_value(value.clone());
if storage.find(restricts).count() > 0 {
return Err(TransactionError::DuplicateUniqueValue {
attribute: attribute.id,
value,
});
}
verify_uniqueness1(attribute, &value, unique_values)?;
verify_uniqueness2(attribute, &value, storage, tx)?;
}

datoms.push(Datom {
Expand All @@ -151,30 +129,12 @@ impl Transactor {
}

for attribute_id in retract_attributes {
self.retract_old_values(storage, entity, attribute_id, tx, datoms)?;
retract_old_values(storage, entity, attribute_id, tx, datoms)?;
}

Ok(())
}

fn retract_old_values<'a, S: ReadStorage<'a>>(
&self,
storage: &'a S,
entity: u64,
attribute: u64,
tx: u64,
datoms: &mut Vec<Datom>,
) -> Result<(), S::Error> {
// Retract previous values
let restricts = Restricts::new(tx)
.with_entity(entity)
.with_attribute(attribute);
for datom in storage.find(restricts) {
datoms.push(Datom::retract(entity, attribute, datom?.value, tx));
}
Ok(())
}

fn resolve_entity<E>(
&mut self,
entity: OperatedEntity,
Expand All @@ -188,6 +148,75 @@ impl Transactor {
}
}

fn resolve_value<E>(attribute_value: AttributeValue, temp_ids: &TempIds) -> Result<Value, E> {
match attribute_value {
AttributeValue::Value(value) => Ok(value),
AttributeValue::TempId(temp_id) => temp_ids.get(&temp_id).map(Value::Ref),
}
}

fn verify_type<E>(attribute: &Attribute, value: &Value) -> Result<(), E> {
if attribute.definition.value_type != ValueType::from(value) {
// Value type is incompatible with attribute, reject transaction.
return Err(TransactionError::InvalidAttributeType {
attribute: attribute.id,
value: value.clone(),
});
}
Ok(())
}

fn verify_uniqueness1<E>(
attribute: &Attribute,
value: &Value,
unique_values: &mut HashSet<(u64, Value)>,
) -> Result<(), E> {
// Find duplicate values within transaction.
if !unique_values.insert((attribute.id, value.clone())) {
return Err(TransactionError::DuplicateUniqueValue {
attribute: attribute.id,
value: value.clone(),
});
}
Ok(())
}

fn verify_uniqueness2<'a, S: ReadStorage<'a>>(
attribute: &Attribute,
value: &Value,
storage: &'a S,
basis_tx: u64,
) -> Result<(), S::Error> {
// Find duplicate values previously saved.
let restricts = Restricts::new(basis_tx)
.with_attribute(attribute.id)
.with_value(value.clone());
if storage.find(restricts).count() > 0 {
return Err(TransactionError::DuplicateUniqueValue {
attribute: attribute.id,
value: value.clone(),
});
}
Ok(())
}

fn retract_old_values<'a, S: ReadStorage<'a>>(
storage: &'a S,
entity: u64,
attribute: u64,
tx: u64,
datoms: &mut Vec<Datom>,
) -> Result<(), S::Error> {
// Retract previous values
let restricts = Restricts::new(tx)
.with_entity(entity)
.with_attribute(attribute);
for datom in storage.find(restricts) {
datoms.push(Datom::retract(entity, attribute, datom?.value, tx));
}
Ok(())
}

struct TempIds(HashMap<TempId, EntityId>);

impl TempIds {
Expand Down

0 comments on commit eac3a03

Please sign in to comment.