Skip to content

Commit

Permalink
fix(server): release lock before async save to prevent deadlock (#1567)
Browse files Browse the repository at this point in the history
- In `consumer_offsets.rs`, clone the consumer offset's path and
  drop the mutable guard before awaiting the async save.
- Refactor `save_consumer_offset` to accept a new offset value
  and path instead of a ConsumerOffset reference.
- Update error messages and trace logs accordingly.
- Bump version in Cargo.toml from 0.4.210 to 0.4.211.
  • Loading branch information
hubcio authored Feb 21, 2025
1 parent dc36d85 commit 1f246c7
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions integration/tests/streaming/consumer_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ async fn assert_persisted_offset(
consumer_offset: &ConsumerOffset,
expected_offsets_count: u32,
) {
storage.save_consumer_offset(consumer_offset).await.unwrap();
storage
.save_consumer_offset(consumer_offset.offset, &consumer_offset.path)
.await
.unwrap();
let consumer_offsets = storage
.load_consumer_offsets(consumer_offset.kind, path)
.await
Expand All @@ -51,11 +54,7 @@ async fn assert_persisted_offset(
assert_eq!(consumer_offsets.len(), expected_offsets_count);
let loaded_consumer_offset = consumer_offsets.get(expected_offsets_count - 1).unwrap();

// TODO(hubcio): This is a workaround: sometimes offset is 4, sometimes 5
let offset_ok = loaded_consumer_offset.offset == consumer_offset.offset
|| loaded_consumer_offset.offset == consumer_offset.offset + 1
|| loaded_consumer_offset.offset == consumer_offset.offset - 1;
assert!(offset_ok);
assert!(loaded_consumer_offset.offset == consumer_offset.offset);

assert_eq!(loaded_consumer_offset.kind, consumer_offset.kind);
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.210"
version = "0.4.211"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
9 changes: 5 additions & 4 deletions server/src/streaming/partitions/consumer_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ impl Partition {
let consumer_offsets = self.get_consumer_offsets(kind);
if let Some(mut consumer_offset) = consumer_offsets.get_mut(&consumer_id) {
consumer_offset.offset = offset;
let path = consumer_offset.path.clone();
drop(consumer_offset);
self.storage
.partition
.save_consumer_offset(&consumer_offset)
.save_consumer_offset(offset, &path)
.await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to save consumer offset, consumer ID: {}, offset: {}",
consumer_id, offset
"{COMPONENT} (error: {error}) - failed to save consumer offset, consumer ID: {consumer_id}, offset: {offset}, path: {path}",
)
})?;
return Ok(());
Expand All @@ -98,7 +99,7 @@ impl Partition {
let consumer_offset = ConsumerOffset::new(kind, consumer_id, offset, path);
self.storage
.partition
.save_consumer_offset(&consumer_offset)
.save_consumer_offset(offset, &consumer_offset.path)
.await
.with_error_context(|error| {
format!(
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/partitions/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct ConsumerOffset {
pub kind: ConsumerKind,
pub consumer_id: u32,
pub offset: u64,
pub path: String,
pub path: Arc<String>,
}

impl ConsumerOffset {
Expand All @@ -63,7 +63,7 @@ impl ConsumerOffset {
kind,
consumer_id,
offset,
path: format!("{path}/{consumer_id}"),
path: Arc::new(format!("{path}/{consumer_id}")),
}
}
}
Expand Down
18 changes: 6 additions & 12 deletions server/src/streaming/partitions/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,21 +335,15 @@ impl PartitionStorage for FilePartitionStorage {
Ok(())
}

async fn save_consumer_offset(&self, offset: &ConsumerOffset) -> Result<(), IggyError> {
async fn save_consumer_offset(&self, offset: u64, path: &str) -> Result<(), IggyError> {
self.persister
.overwrite(&offset.path, &offset.offset.to_le_bytes())
.overwrite(path, &offset.to_le_bytes())
.await
.with_error_context(|error| format!(
"{COMPONENT} (error: {error}) - failed to overwrite consumer offset with value: {}, kind: {}, consumer ID: {}, path: {}",
offset.offset, offset.kind, offset.consumer_id, offset.path,
"{COMPONENT} (error: {error}) - failed to overwrite consumer offset with value: {}, path: {}",
offset, path,
))?;
trace!(
"Stored consumer offset value: {} for {} with ID: {}, path: {}",
offset.offset,
offset.kind,
offset.consumer_id,
offset.path
);
trace!("Stored consumer offset value: {}, path: {}", offset, path);
Ok(())
}

Expand Down Expand Up @@ -390,7 +384,7 @@ impl PartitionStorage for FilePartitionStorage {
continue;
}

let path = path.unwrap().to_string();
let path = Arc::new(path.unwrap().to_string());
let consumer_id = consumer_id.unwrap();
let mut file = file::open(&path)
.await
Expand Down
5 changes: 3 additions & 2 deletions server/src/streaming/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ pub trait PartitionStorage: Send {
fn delete(&self, partition: &Partition) -> impl Future<Output = Result<(), IggyError>> + Send;
fn save_consumer_offset(
&self,
offset: &ConsumerOffset,
offset: u64,
path: &str,
) -> impl Future<Output = Result<(), IggyError>> + Send;
fn load_consumer_offsets(
&self,
Expand Down Expand Up @@ -177,7 +178,7 @@ impl PartitionStorageKind {
-> Result<(), IggyError>;
async fn save(&self, partition: &mut Partition) -> Result<(), IggyError>;
async fn delete(&self, partition: &Partition) -> Result<(), IggyError>;
async fn save_consumer_offset(&self, offset: &ConsumerOffset) -> Result<(), IggyError>;
async fn save_consumer_offset(&self, offset: u64, path: &str) -> Result<(), IggyError>;
async fn load_consumer_offsets(
&self,
kind: ConsumerKind,
Expand Down

0 comments on commit 1f246c7

Please sign in to comment.