Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Adjustments to Stores & Agents #9

Merged
merged 8 commits into from
Mar 21, 2024
41 changes: 19 additions & 22 deletions src/delegation/agent.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{payload::Payload, policy::Predicate, store::Store, Delegation};
use crate::ability::arguments::Named;
use crate::did;
use crate::{
crypto::{signature::Envelope, varsig, Nonce},
did::Did,
Expand All @@ -14,40 +15,38 @@ use std::{collections::BTreeMap, marker::PhantomData};
use thiserror::Error;
use web_time::SystemTime;

/// A stateful agent capable of delegatint to others, and being delegated to.
/// A stateful agent capable of delegating to others, and being delegated to.
///
/// This is helpful for sessions where more than one delegation will be made.
#[derive(Debug)]
pub struct Agent<
'a,
DID: Did,
S: Store<DID, V, Enc>,
V: varsig::Header<Enc>,
Enc: Codec + TryFrom<u64> + Into<u64>,
DID: Did = did::preset::Verifier,
V: varsig::Header<Enc> + Clone = varsig::header::Preset,
Enc: Codec + Into<u64> + TryFrom<u64> = varsig::encoding::Preset,
> {
/// The [`Did`][Did] of the agent.
pub did: &'a DID,
pub did: DID,

/// The attached [`deleagtion::Store`][super::store::Store].
pub store: &'a mut S,
pub store: S,

signer: &'a <DID as Did>::Signer,
signer: <DID as Did>::Signer,
_marker: PhantomData<(V, Enc)>,
}

impl<
'a,
DID: Did + Clone,
S: Store<DID, V, Enc> + Clone,
DID: Did + Clone,
V: varsig::Header<Enc> + Clone,
Enc: Codec + TryFrom<u64> + Into<u64>,
> Agent<'a, DID, S, V, Enc>
> Agent<S, DID, V, Enc>
where
Ipld: Encode<Enc>,
Payload<DID>: TryFrom<Named<Ipld>>,
Named<Ipld>: From<Payload<DID>>,
{
pub fn new(did: &'a DID, signer: &'a <DID as Did>::Signer, store: &'a mut S) -> Self {
pub fn new(did: DID, signer: <DID as Did>::Signer, store: S) -> Self {
Self {
did,
store,
Expand All @@ -73,7 +72,7 @@ where
let nonce = Nonce::generate_12(&mut salt);

if let Some(ref sub) = subject {
if sub == self.did {
if sub == &self.did {
let payload: Payload<DID> = Payload {
issuer: self.did.clone(),
audience,
Expand All @@ -88,19 +87,17 @@ where
};

return Ok(
Delegation::try_sign(self.signer, varsig_header, payload).expect("FIXME")
Delegation::try_sign(&self.signer, varsig_header, payload).expect("FIXME")
);
}
}

let to_delegate = &self
let proofs = &self
.store
.get_chain(&self.did, &subject, "/".into(), vec![], now)
.map_err(DelegateError::StoreError)?
.ok_or(DelegateError::ProofsNotFound)?
.first()
.1
.payload();
.ok_or(DelegateError::ProofsNotFound)?;
let to_delegate = proofs.first().1.payload();

let mut policy = to_delegate.policy.clone();
policy.append(&mut new_policy.clone());
Expand All @@ -118,19 +115,19 @@ where
not_before: not_before.map(Into::into),
};

Ok(Delegation::try_sign(self.signer, varsig_header, payload).expect("FIXME"))
Ok(Delegation::try_sign(&self.signer, varsig_header, payload).expect("FIXME"))
}

pub fn receive(
&mut self,
&self,
cid: Cid, // FIXME remove and generate from the capsule header?
delegation: Delegation<DID, V, Enc>,
) -> Result<(), ReceiveError<S::DelegationStoreError, DID>> {
if self.store.get(&cid).is_ok() {
return Ok(());
}

if delegation.audience() != self.did {
if delegation.audience() != &self.did {
return Err(ReceiveError::WrongAudience(delegation.audience().clone()));
}

Expand Down
100 changes: 76 additions & 24 deletions src/delegation/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use libipld_core::codec::Encode;
use libipld_core::ipld::Ipld;
use libipld_core::{cid::Cid, codec::Codec};
use nonempty::NonEmpty;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{
collections::{BTreeMap, BTreeSet},
convert::Infallible,
};
use web_time::SystemTime;

#[cfg_attr(doc, aquamarine::aquamarine)]
Expand Down Expand Up @@ -69,36 +73,77 @@ use web_time::SystemTime;
/// linkStyle 6 stroke:orange;
/// linkStyle 1 stroke:orange;
/// ```
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
pub struct MemoryStore<
DID: did::Did + Ord = did::preset::Verifier,
V: varsig::Header<C> = varsig::header::Preset,
C: Codec + TryFrom<u64> + Into<u64> = varsig::encoding::Preset,
> {
ucans: BTreeMap<Cid, Delegation<DID, V, C>>,
inner: Arc<RwLock<MemoryStoreInner<DID, V, C>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions, oh wise Rustacean 🦀

Generality

I guess RwLock here because it's the most general option? Is this a typical interface for libraries (as opposed to applications)? It really makes me want HKTs 😛

Deadlocks

From the std docs:

The priority policy of the lock is dependent on the underlying operating system’s implementation, and this type does not guarantee that any particular policy will be used.
[...]
Potential deadlock example

That's... unsettling. Are there ways to guard against starvation other than "don't use a bad OS"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wise Rustacean 🦀

😂 🤦‍♂️ 😅

Deadlocks [...] That's... unsettling. Are there ways to guard against starvation other than "don't use a bad OS"?

It is unsettling! And you're wise in reading the documentation more than I have 😛
I chose RwLock because it should be faster than Mutex or equally fast in all cases, but admittedly, the deadlock potential on some OSes worries me, too. And performance shouldn't matter too much here, and if someone is worried about that, they could/should write a store implementation based on DashMap.
Should we change it to Mutex?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading up a bit I think I had some misconceptions about RwLocks. They can be slower than Mutexes in some cases. Some people recommend "Use Mutex unless you know what you're doing".
And I clearly don't! So let's switch to mutex 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way -

Is this a typical interface for libraries (as opposed to applications)?

I'm not sure I understand. The RwLock/Mutex is totally an implementation detail and shouldn't leak to the interface. It's a private field.
The MemoryStore/MemoryStoreInner is a typical way of structuring code.

}

#[derive(Debug, Clone, PartialEq)]
struct MemoryStoreInner<
DID: did::Did + Ord = did::preset::Verifier,
V: varsig::Header<C> = varsig::header::Preset,
C: Codec + TryFrom<u64> + Into<u64> = varsig::encoding::Preset,
> {
ucans: BTreeMap<Cid, Arc<Delegation<DID, V, C>>>,
index: BTreeMap<Option<DID>, BTreeMap<DID, BTreeSet<Cid>>>,
revocations: BTreeSet<Cid>,
}

impl MemoryStore {
impl<DID: did::Did + Ord, V: varsig::Header<C>, C: Codec + TryFrom<u64> + Into<u64>>
MemoryStore<DID, V, C>
{
pub fn new() -> Self {
Self::default()
}

pub fn len(&self) -> usize {
self.ucans.len()
self.read().ucans.len()
}

pub fn is_empty(&self) -> bool {
self.ucans.is_empty() // FIXME acocunt for revocations?
self.read().ucans.is_empty() // FIXME acocunt for revocations?
}

fn read(&self) -> RwLockReadGuard<'_, MemoryStoreInner<DID, V, C>> {
expede marked this conversation as resolved.
Show resolved Hide resolved
match self.inner.read() {
Ok(guard) => guard,
Err(poison) => {
// We ignore lock poisoning for simplicity
poison.into_inner()
}
}
}

fn write(&self) -> RwLockWriteGuard<'_, MemoryStoreInner<DID, V, C>> {
match self.inner.write() {
Ok(guard) => guard,
Err(poison) => {
// We ignore lock poisoning for simplicity
poison.into_inner()
}
}
}
}

impl<DID: Did + Ord, V: varsig::Header<C>, C: Codec + TryFrom<u64> + Into<u64>> Default
impl<DID: did::Did + Ord, V: varsig::Header<C>, C: Codec + TryFrom<u64> + Into<u64>> Default
for MemoryStore<DID, V, C>
{
fn default() -> Self {
MemoryStore {
Self {
inner: Default::default(),
}
}
}

impl<DID: Did + Ord, V: varsig::Header<C>, C: Codec + TryFrom<u64> + Into<u64>> Default
for MemoryStoreInner<DID, V, C>
{
fn default() -> Self {
MemoryStoreInner {
ucans: BTreeMap::new(),
index: BTreeMap::new(),
revocations: BTreeSet::new(),
Expand All @@ -117,34 +162,39 @@ where
delegation::Payload<DID>: TryFrom<Named<Ipld>>,
Delegation<DID, V, Enc>: Encode<Enc>,
{
type DelegationStoreError = String; // FIXME misisng
type DelegationStoreError = Infallible;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we account for a poisoned RwLock?

Copy link
Member Author

@matheus23 matheus23 Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this a bit. The poisoning is mostly important if you keep using a delegation store after it had panicked.
I don't think we have a reasonable way of restoring the invariants when it happens, or at least I don't think it's worth looking into that.
And given this is in the implementation of get_chain:

let all_powerlines = read_tx.index.get(&None).unwrap_or(&blank_map);
let all_aud_for_subject = read_tx.index.get(subject).unwrap_or(&blank_map);
let powerline_candidates = all_powerlines.get(aud).unwrap_or(&blank_set);
let sub_candidates = all_aud_for_subject.get(aud).unwrap_or(&blank_set);

Broken invariants at least wouldn't cause further panics.


fn get(&self, cid: &Cid) -> Result<&Delegation<DID, V, Enc>, Self::DelegationStoreError> {
self.ucans
.get(cid)
.ok_or(format!("not found in delegation memstore: {:?}", cid).into())
fn get(
&self,
cid: &Cid,
) -> Result<Option<Arc<Delegation<DID, V, Enc>>>, Self::DelegationStoreError> {
// cheap Arc clone
Ok(self.read().ucans.get(cid).cloned())
// FIXME
}

fn insert(
&mut self,
&self,
cid: Cid,
delegation: Delegation<DID, V, Enc>,
) -> Result<(), Self::DelegationStoreError> {
self.index
let mut write_tx = self.write();

write_tx
.index
.entry(delegation.subject().clone())
.or_default()
.entry(delegation.audience().clone())
.or_default()
.insert(cid);

self.ucans.insert(cid.clone(), delegation);
write_tx.ucans.insert(cid.clone(), Arc::new(delegation));

Ok(())
}

fn revoke(&mut self, cid: Cid) -> Result<(), Self::DelegationStoreError> {
self.revocations.insert(cid);
fn revoke(&self, cid: Cid) -> Result<(), Self::DelegationStoreError> {
self.write().revocations.insert(cid);
Ok(())
}

Expand All @@ -155,12 +205,14 @@ where
command: String,
policy: Vec<Predicate>, // FIXME
now: SystemTime,
) -> Result<Option<NonEmpty<(Cid, &Delegation<DID, V, Enc>)>>, Self::DelegationStoreError> {
) -> Result<Option<NonEmpty<(Cid, Arc<Delegation<DID, V, Enc>>)>>, Self::DelegationStoreError>
{
let blank_set = BTreeSet::new();
let blank_map = BTreeMap::new();
let read_tx = self.read();

let all_powerlines = self.index.get(&None).unwrap_or(&blank_map);
let all_aud_for_subject = self.index.get(subject).unwrap_or(&blank_map);
let all_powerlines = read_tx.index.get(&None).unwrap_or(&blank_map);
let all_aud_for_subject = read_tx.index.get(subject).unwrap_or(&blank_map);
let powerline_candidates = all_powerlines.get(aud).unwrap_or(&blank_set);
let sub_candidates = all_aud_for_subject.get(aud).unwrap_or(&blank_set);

Expand All @@ -185,11 +237,11 @@ where
}

'inner: for cid in parent_cid_candidates {
if self.revocations.contains(cid) {
if read_tx.revocations.contains(cid) {
continue;
}

if let Some(delegation) = self.ucans.get(cid) {
if let Some(delegation) = read_tx.ucans.get(cid) {
if delegation.check_time(now).is_err() {
continue;
}
Expand Down Expand Up @@ -217,7 +269,7 @@ where
}
}

hypothesis_chain.push((cid.clone(), delegation));
hypothesis_chain.push((cid.clone(), Arc::clone(delegation)));

let issuer = delegation.issuer().clone();

Expand Down
36 changes: 21 additions & 15 deletions src/delegation/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ use crate::{
};
use libipld_core::{cid::Cid, codec::Codec};
use nonempty::NonEmpty;
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};
use web_time::SystemTime;

pub trait Store<DID: Did, V: varsig::Header<Enc>, Enc: Codec + TryFrom<u64> + Into<u64>> {
type DelegationStoreError: Debug;

fn get(&self, cid: &Cid) -> Result<&Delegation<DID, V, Enc>, Self::DelegationStoreError>;
fn get(
&self,
cid: &Cid,
) -> Result<Option<Arc<Delegation<DID, V, Enc>>>, Self::DelegationStoreError>;

fn insert(
&mut self,
&self,
cid: Cid,
delegation: Delegation<DID, V, Enc>,
) -> Result<(), Self::DelegationStoreError>;

// FIXME validate invocation
// store invocation
// just... move to invocation
fn revoke(&mut self, cid: Cid) -> Result<(), Self::DelegationStoreError>;
fn revoke(&self, cid: Cid) -> Result<(), Self::DelegationStoreError>;

fn get_chain(
&self,
Expand All @@ -31,7 +34,7 @@ pub trait Store<DID: Did, V: varsig::Header<Enc>, Enc: Codec + TryFrom<u64> + In
command: String,
policy: Vec<Predicate>,
now: SystemTime,
) -> Result<Option<NonEmpty<(Cid, &Delegation<DID, V, Enc>)>>, Self::DelegationStoreError>;
) -> Result<Option<NonEmpty<(Cid, Arc<Delegation<DID, V, Enc>>)>>, Self::DelegationStoreError>;

fn get_chain_cids(
&self,
Expand Down Expand Up @@ -60,32 +63,34 @@ pub trait Store<DID: Did, V: varsig::Header<Enc>, Enc: Codec + TryFrom<u64> + In
fn get_many(
&self,
cids: &[Cid],
) -> Result<Vec<&Delegation<DID, V, Enc>>, Self::DelegationStoreError> {
cids.iter().try_fold(vec![], |mut acc, cid| {
acc.push(self.get(cid)?);
Ok(acc)
})
) -> Result<Vec<Option<Arc<Delegation<DID, V, Enc>>>>, Self::DelegationStoreError> {
cids.iter()
.map(|cid| self.get(cid))
.collect::<Result<_, Self::DelegationStoreError>>()
}
}

impl<T: Store<DID, V, C>, DID: Did, V: varsig::Header<C>, C: Codec + TryFrom<u64> + Into<u64>>
Store<DID, V, C> for &mut T
Store<DID, V, C> for &T
{
type DelegationStoreError = <T as Store<DID, V, C>>::DelegationStoreError;

fn get(&self, cid: &Cid) -> Result<&Delegation<DID, V, C>, Self::DelegationStoreError> {
fn get(
&self,
cid: &Cid,
) -> Result<Option<Arc<Delegation<DID, V, C>>>, Self::DelegationStoreError> {
(**self).get(cid)
}

fn insert(
&mut self,
&self,
cid: Cid,
delegation: Delegation<DID, V, C>,
) -> Result<(), Self::DelegationStoreError> {
(**self).insert(cid, delegation)
}

fn revoke(&mut self, cid: Cid) -> Result<(), Self::DelegationStoreError> {
fn revoke(&self, cid: Cid) -> Result<(), Self::DelegationStoreError> {
(**self).revoke(cid)
}

Expand All @@ -96,7 +101,8 @@ impl<T: Store<DID, V, C>, DID: Did, V: varsig::Header<C>, C: Codec + TryFrom<u64
command: String,
policy: Vec<Predicate>,
now: SystemTime,
) -> Result<Option<NonEmpty<(Cid, &Delegation<DID, V, C>)>>, Self::DelegationStoreError> {
) -> Result<Option<NonEmpty<(Cid, Arc<Delegation<DID, V, C>>)>>, Self::DelegationStoreError>
{
(**self).get_chain(audience, subject, command, policy, now)
}
}
Loading
Loading