Skip to content

Commit

Permalink
feat(spooler): Add partition_id tag to more metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Nov 26, 2024
1 parent 35e9bff commit 15b9629
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 53 deletions.
5 changes: 4 additions & 1 deletion relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.db");
let db = setup_db(&db_path);
let envelope_store = SqliteEnvelopeStore::new(db.clone(), Duration::from_millis(100));
let envelope_store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(100));

let runtime = Runtime::new().unwrap();

Expand All @@ -99,6 +99,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
});

let stack = SqliteEnvelopeStack::new(
0,
envelope_store.clone(),
disk_batch_size,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
Expand Down Expand Up @@ -135,6 +136,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
reset_db(db.clone()).await;

let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store.clone(),
disk_batch_size,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
Expand Down Expand Up @@ -175,6 +177,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
});

let stack = SqliteEnvelopeStack::new(
0,
envelope_store.clone(),
disk_batch_size,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
Expand Down
74 changes: 49 additions & 25 deletions relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ pub struct SqliteEnvelopeStack {
/// Boolean representing whether calls to `push()` and `peek()` check disk in case not enough
/// elements are available in the `batches_buffer`.
check_disk: bool,
/// The tag value of this partition which is used for reporting purposes.
partition_tag: String,
}

impl SqliteEnvelopeStack {
/// Creates a new empty [`SqliteEnvelopeStack`].
pub fn new(
partition_id: u8,
envelope_store: SqliteEnvelopeStore,
batch_size_bytes: usize,
own_key: ProjectKey,
Expand All @@ -59,6 +62,7 @@ impl SqliteEnvelopeStack {
sampling_key,
batch: vec![],
check_disk,
partition_tag: partition_id.to_string(),
}
}

Expand All @@ -78,18 +82,25 @@ impl SqliteEnvelopeStack {
return Ok(());
};

relay_statsd::metric!(counter(RelayCounters::BufferSpooledEnvelopes) += batch.len() as u64);
relay_statsd::metric!(
counter(RelayCounters::BufferSpooledEnvelopes) += batch.len() as u64,
partition_id = &self.partition_tag
);

// When early return here, we are acknowledging that the elements that we popped from
// the buffer are lost in case of failure. We are doing this on purposes, since if we were
// to have a database corruption during runtime, and we were to put the values back into
// the buffer we will end up with an infinite cycle.
relay_statsd::metric!(timer(RelayTimers::BufferSpool), {
self.envelope_store
.insert_batch(batch)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?;
});
relay_statsd::metric!(
timer(RelayTimers::BufferSpool),
partition_id = &self.partition_tag,
{
self.envelope_store
.insert_batch(batch)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?;
}
);

// If we successfully spooled to disk, we know that data should be there.
self.check_disk = true;
Expand All @@ -106,12 +117,16 @@ impl SqliteEnvelopeStack {
/// envelope will not be unspooled and unspooling will continue with the remaining envelopes.
async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
debug_assert!(self.batch.is_empty());
let batch = relay_statsd::metric!(timer(RelayTimers::BufferUnspool), {
self.envelope_store
.delete_batch(self.own_key, self.sampling_key)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?
});
let batch = relay_statsd::metric!(
timer(RelayTimers::BufferUnspool),
partition_id = &self.partition_tag,
{
self.envelope_store
.delete_batch(self.own_key, self.sampling_key)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?
}
);

match batch {
Some(batch) => {
Expand All @@ -121,7 +136,8 @@ impl SqliteEnvelopeStack {
}

relay_statsd::metric!(
counter(RelayCounters::BufferUnspooledEnvelopes) += self.batch.len() as u64
counter(RelayCounters::BufferUnspooledEnvelopes) += self.batch.len() as u64,
partition_id = &self.partition_tag
);
Ok(())
}
Expand All @@ -146,10 +162,11 @@ impl EnvelopeStack for SqliteEnvelopeStack {
self.spool_to_disk().await?;
}

let encoded_envelope =
relay_statsd::metric!(timer(RelayTimers::BufferEnvelopesSerialization), {
DatabaseEnvelope::try_from(envelope.as_ref())?
});
let encoded_envelope = relay_statsd::metric!(
timer(RelayTimers::BufferEnvelopesSerialization),
partition_id = &self.partition_tag,
{ DatabaseEnvelope::try_from(envelope.as_ref())? }
);
self.batch.push(encoded_envelope);

Ok(())
Expand Down Expand Up @@ -208,8 +225,9 @@ mod tests {
#[should_panic]
async fn test_push_with_mismatching_project_keys() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
10,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand All @@ -226,13 +244,14 @@ mod tests {
#[tokio::test]
async fn test_push_when_db_is_not_valid() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));

// Create envelopes first so we can calculate actual size
let envelopes = mock_envelopes(4);
let threshold_size = calculate_compressed_size(&envelopes) - 1;

let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
threshold_size,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand Down Expand Up @@ -271,8 +290,9 @@ mod tests {
#[tokio::test]
async fn test_pop_when_db_is_not_valid() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand All @@ -290,8 +310,9 @@ mod tests {
#[tokio::test]
async fn test_pop_when_stack_is_empty() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand All @@ -307,8 +328,9 @@ mod tests {
#[tokio::test]
async fn test_push_below_threshold_and_pop() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
9999,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand Down Expand Up @@ -346,14 +368,15 @@ mod tests {
#[tokio::test]
async fn test_push_above_threshold_and_pop() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));

// Create envelopes first so we can calculate actual size
let envelopes = mock_envelopes(7);
let threshold_size = calculate_compressed_size(&envelopes[..5]) - 1;

// Create stack with threshold just below the size of first 5 envelopes
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
threshold_size,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand Down Expand Up @@ -419,8 +442,9 @@ mod tests {
#[tokio::test]
async fn test_drain() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store.clone(),
10 * COMPRESSED_ENVELOPE_SIZE,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand Down
Loading

0 comments on commit 15b9629

Please sign in to comment.