Skip to content

Commit

Permalink
dekaf: support reading documents at the requested offset
Browse files Browse the repository at this point in the history
Do this by starting `Read`s up to the max size of one document before the requested offset
  • Loading branch information
jshearer committed Oct 23, 2024
1 parent d18effc commit af36b0c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
20 changes: 16 additions & 4 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::records::{Compression, TimestampType};
use lz4_flex::frame::BlockMode;
use std::time::{Duration, Instant};

pub struct Read {
/// Journal offset to be served by this Read.
Expand All @@ -29,6 +28,9 @@ pub struct Read {
// Keep these details around so we can create a new ReadRequest if we need to skip forward
journal_name: String,

// Offset before which no documents should be emitted
offset_start: i64,

pub(crate) rewrite_offsets_from: Option<i64>,
}

Expand All @@ -47,6 +49,8 @@ pub enum ReadTarget {
Docs(usize),
}

const OFFSET_READBACK: i64 = 2 << 25 + 1; // 64mb, single document max size

impl Read {
pub fn new(
client: journal::Client,
Expand All @@ -61,7 +65,8 @@ impl Read {

let stream = client.clone().read_json_lines(
broker::ReadRequest {
offset,
// Start reading at least 1 document in the past
offset: std::cmp::max(0, offset - OFFSET_READBACK),
block: true,
journal: partition.spec.name.clone(),
begin_mod_time: not_before_sec as i64,
Expand Down Expand Up @@ -89,14 +94,15 @@ impl Read {

journal_name: partition.spec.name.clone(),
rewrite_offsets_from,
offset_start: offset,
}
}

#[tracing::instrument(skip_all,fields(journal_name=self.journal_name))]
pub async fn next_batch(
mut self,
target: ReadTarget,
timeout: Instant,
timeout: std::time::Instant,
) -> anyhow::Result<(Self, BatchResult)> {
use kafka_protocol::records::{
Compression, Record, RecordBatchEncoder, RecordEncodeOptions,
Expand Down Expand Up @@ -155,7 +161,9 @@ impl Read {
transient_errors = transient_errors + 1;

tracing::warn!(error = ?err, "Retrying transient read error");
let delay = Duration::from_millis(rand::thread_rng().gen_range(300..2000));
let delay = std::time::Duration::from_millis(
rand::thread_rng().gen_range(300..2000),
);
tokio::time::sleep(delay).await;
// We can retry transient errors just by continuing to poll the stream
continue;
Expand All @@ -182,6 +190,10 @@ impl Read {
ReadJsonLine::Doc { root, next_offset } => (root, next_offset),
};

if next_offset < self.offset_start {
continue;
}

let mut record_bytes: usize = 0;

let Some(doc::ArchivedNode::String(uuid)) = self.uuid_ptr.query(root.get()) else {
Expand Down
6 changes: 3 additions & 3 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl Session {
// so long as the request is still a data preview request. If not, bail out
Entry::Occupied(entry) => {
let data_preview_state = entry.get();
if fetch_offset > data_preview_state.offset
if fetch_offset >= data_preview_state.offset
|| data_preview_state.offset - fetch_offset > 12
{
bail!("Session was used for fetching preview data, cannot be used for fetching non-preview data.")
Expand Down Expand Up @@ -1318,9 +1318,9 @@ impl Session {
.fetch_partition_offset(partition as usize, -1)
.await?
{
// If fetch_offset is > latest_offset, this is a caught-up consumer
// If fetch_offset is >= latest_offset, this is a caught-up consumer
// polling for new documents, not a data preview request.
if fetch_offset <= latest_offset && latest_offset - fetch_offset < 13 {
if fetch_offset < latest_offset && latest_offset - fetch_offset < 13 {
tracing::info!(
latest_offset,
diff = latest_offset - fetch_offset,
Expand Down

0 comments on commit af36b0c

Please sign in to comment.