Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove streaming cache method #888

Merged
merged 2 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Changed
- Removed streaming cache method. Fixed is now the only option.

## [0.21.0-rc1]
### Added
### Changed
Expand Down
1 change: 0 additions & 1 deletion lading/src/bin/payloadtool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ fn generate_and_check(
let start = Instant::now();
let blocks = match block::Cache::fixed(&mut rng, total_bytes, &block_sizes, config)? {
block::Cache::Fixed { blocks, idx: _ } => blocks,
_ => unreachable!(),
};
info!("Payload generation took {:?}", start.elapsed());
debug!("Payload: {:#?}", blocks);
Expand Down
2 changes: 1 addition & 1 deletion lading/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ blackhole:
8_f64,
byte_unit::ByteUnit::MB
)?,
block_cache_method: block::CacheMethod::Streaming,
block_cache_method: block::CacheMethod::Fixed,
},
headers: HeaderMap::default(),
bytes_per_second: byte_unit::Byte::from_unit(
Expand Down
6 changes: 0 additions & 6 deletions lading/src/generator/file_gen/logrotate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,6 @@ impl Server {
NonZeroU32::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as u32)
.ok_or(Error::Zero)?;
let block_cache = match config.block_cache_method {
block::CacheMethod::Streaming => block::Cache::stream(
config.seed,
total_bytes,
&block_sizes,
config.variant.clone(),
)?,
block::CacheMethod::Fixed => {
block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &config.variant)?
}
Expand Down
6 changes: 0 additions & 6 deletions lading/src/generator/file_gen/traditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,6 @@ impl Server {
NonZeroU32::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as u32)
.ok_or(Error::Zero)?;
let block_cache = match config.block_cache_method {
block::CacheMethod::Streaming => block::Cache::stream(
config.seed,
total_bytes,
&block_sizes,
config.variant.clone(),
)?,
block::CacheMethod::Fixed => {
block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &config.variant)?
}
Expand Down
6 changes: 0 additions & 6 deletions lading/src/generator/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,6 @@ impl Grpc {
NonZeroU32::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as u32)
.ok_or(Error::Zero)?;
let block_cache = match config.block_cache_method {
block::CacheMethod::Streaming => block::Cache::stream(
config.seed,
total_bytes,
&block_sizes,
config.variant.clone(),
)?,
block::CacheMethod::Fixed => {
block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &config.variant)?
}
Expand Down
6 changes: 0 additions & 6 deletions lading/src/generator/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,6 @@ impl Http {
NonZeroU32::new(maximum_prebuild_cache_size_bytes.get_bytes() as u32)
.ok_or(Error::Zero)?;
let block_cache = match block_cache_method {
block::CacheMethod::Streaming => block::Cache::stream(
config.seed,
total_bytes,
&block_sizes,
variant.clone(),
)?,
block::CacheMethod::Fixed => {
block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &variant)?
}
Expand Down
3 changes: 0 additions & 3 deletions lading/src/generator/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,6 @@ impl SplunkHec {
NonZeroU32::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as u32)
.ok_or(Error::Zero)?;
let block_cache = match config.block_cache_method {
block::CacheMethod::Streaming => {
block::Cache::stream(config.seed, total_bytes, &block_sizes, payload_config)?
}
block::CacheMethod::Fixed => {
block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &payload_config)?
}
Expand Down
6 changes: 0 additions & 6 deletions lading/src/generator/unix_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,6 @@ impl UnixDatagram {
NonZeroU32::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as u32)
.ok_or(Error::Zero)?;
let block_cache = match config.block_cache_method {
block::CacheMethod::Streaming => block::Cache::stream(
config.seed,
total_bytes,
&block_sizes,
config.variant.clone(),
)?,
block::CacheMethod::Fixed => {
block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &config.variant)?
}
Expand Down
6 changes: 0 additions & 6 deletions lading/src/generator/unix_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,6 @@ impl UnixStream {
NonZeroU32::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as u32)
.ok_or(Error::Zero)?;
let block_cache = match config.block_cache_method {
block::CacheMethod::Streaming => block::Cache::stream(
config.seed,
total_bytes,
&block_sizes,
config.variant.clone(),
)?,
block::CacheMethod::Fixed => {
block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &config.variant)?
}
Expand Down
179 changes: 4 additions & 175 deletions lading_payload/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
//! from that, decoupling the create/send operations. This module is the
//! mechanism by which 'blocks' -- that is, byte blobs of a predetermined size
//! -- are created.
use std::{collections::VecDeque, num::NonZeroU32};
use std::num::NonZeroU32;

use bytes::{buf::Writer, BufMut, Bytes, BytesMut};
use rand::{prelude::SliceRandom, rngs::StdRng, Rng, SeedableRng};
use rand::Rng;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{self, error::SendError, Sender};
use tokio::sync::mpsc::{error::SendError, Sender};
use tracing::{error, info, span, warn, Level};

const MAX_CHUNKS: usize = 16_384;
Expand Down Expand Up @@ -113,14 +113,12 @@ impl<'a> arbitrary::Arbitrary<'a> for Block {
pub enum CacheMethod {
/// Create a single fixed size block cache and rotate through it
Fixed,
/// Maintain a fixed sized block cache buffer and stream from it
Streaming,
}

/// The default cache method.
#[must_use]
pub fn default_cache_method() -> CacheMethod {
CacheMethod::Streaming
CacheMethod::Fixed
}

#[derive(Debug)]
Expand All @@ -143,47 +141,9 @@ pub enum Cache {
/// The store of blocks.
blocks: Vec<Block>,
},
/// A streaming cache of blocks. Blocks are generated on the fly and
/// streamed through a queue.
Stream {
/// The seed used to construct the `Cache`.
seed: [u8; 32],
/// The total number of bytes that will be generated.
total_bytes: u32,
/// The sizes of the blocks that will be generated.
block_chunks: Vec<u32>,
/// The payload that will be generated.
payload: crate::Config,
},
}

impl Cache {
/// Construct a streaming `Cache`.
///
/// This constructor makes an internal pool of `Block` instances up to
/// `total_bytes`, each of which are roughly the size of one of the
/// `block_byte_sizes`. Internally, `Blocks` are replaced as they are spun out.
///
/// # Errors
///
/// Function will return an error if `block_byte_sizes` is empty or if a member
/// of `block_byte_sizes` is large than `total_bytes`.
pub fn stream(
seed: [u8; 32],
total_bytes: NonZeroU32,
block_byte_sizes: &[NonZeroU32],
payload: crate::Config,
) -> Result<Self, Error> {
let mut block_chunks: [u32; MAX_CHUNKS] = [0; MAX_CHUNKS];
let total_chunks = chunk_bytes(total_bytes, block_byte_sizes, &mut block_chunks)?;
Ok(Self::Stream {
seed,
total_bytes: total_bytes.get(),
block_chunks: block_chunks[..total_chunks].to_vec(),
payload,
})
}

/// Construct a `Cache` of fixed size.
///
/// This constructor makes an internal pool of `Block` instances up to
Expand Down Expand Up @@ -392,91 +352,6 @@ impl Cache {
snd.blocking_send(blocks[idx].clone())?;
idx = (idx + 1) % blocks.len();
},
Cache::Stream {
seed,
total_bytes,
block_chunks,
payload,
} => stream_inner(seed, total_bytes, &block_chunks, &payload, snd),
}
}
}

#[allow(clippy::needless_pass_by_value)]
#[inline]
fn stream_inner(
seed: [u8; 32],
total_bytes: u32,
block_chunks: &[u32],
payload: &crate::Config,
snd: Sender<Block>,
) -> Result<(), SpinError> {
let mut rng = StdRng::from_seed(seed);

match payload {
crate::Config::TraceAgent(enc) => {
let ta = match enc {
crate::Encoding::Json => crate::TraceAgent::json(&mut rng),
crate::Encoding::MsgPack => crate::TraceAgent::msg_pack(&mut rng),
};

stream_block_inner(&mut rng, total_bytes, &ta, block_chunks, &snd)
}
crate::Config::Syslog5424 => {
let pyld = crate::Syslog5424::default();
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::DogStatsD(conf) => {
match conf.valid() {
Ok(()) => (),
Err(e) => {
warn!("Invalid DogStatsD configuration: {}", e);
return Err(SpinError::InvalidConfig(e));
}
}
let pyld = crate::DogStatsD::new(*conf, &mut rng)?;

stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::Fluent => {
let pyld = crate::Fluent::new(&mut rng);
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::SplunkHec { encoding } => {
let pyld = crate::SplunkHec::new(*encoding);
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::ApacheCommon => {
let pyld = crate::ApacheCommon::new(&mut rng);
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::Ascii => {
let pyld = crate::Ascii::new(&mut rng);
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::DatadogLog => {
let pyld = crate::DatadogLog::new(&mut rng);
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::Json => {
let pyld = crate::Json;
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::Static { ref static_path } => {
let pyld = crate::Static::new(static_path)?;
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::OpentelemetryTraces => {
let pyld = crate::OpentelemetryTraces::new(&mut rng);
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::OpentelemetryLogs => {
let pyld = crate::OpentelemetryLogs::new(&mut rng);
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
crate::Config::OpentelemetryMetrics => {
let pyld = crate::OpentelemetryMetrics::new(&mut rng);
stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd)
}
}
}
Expand Down Expand Up @@ -651,52 +526,6 @@ where
}
}

#[inline]
fn stream_block_inner<R, S>(
mut rng: &mut R,
total_bytes: u32,
serializer: &S,
block_chunks: &[u32],
snd: &Sender<Block>,
) -> Result<(), SpinError>
where
S: crate::Serialize,
R: Rng + ?Sized,
{
let total_bytes: u64 = u64::from(total_bytes);
let mut accum_bytes: u64 = 0;
let mut cache: VecDeque<Block> = VecDeque::new();

loop {
// Attempt to read from the cache first, being sure to subtract the
// bytes we send out.
if let Some(block) = cache.pop_front() {
accum_bytes -= u64::from(block.total_bytes.get());
snd.blocking_send(block)?;
}
// There are no blocks in the cache. In order to minimize latency we
// push blocks into the sender until such time as it's full. When that
// happens we overflow into the cache until such time as that's full.
'refill: loop {
let block_size = block_chunks.choose(&mut rng).ok_or(SpinError::EmptyRng)?;
let block = construct_block(&mut rng, serializer, *block_size)?;
match snd.try_reserve() {
Ok(permit) => permit.send(block),
Err(err) => match err {
mpsc::error::TrySendError::Full(()) => {
if accum_bytes < total_bytes {
accum_bytes += u64::from(block.total_bytes.get());
cache.push_back(block);
break 'refill;
}
}
mpsc::error::TrySendError::Closed(()) => return Ok(()),
},
}
}
}
}

/// Construct a new block
///
/// # Panics
Expand Down
Loading