Skip to content

Commit

Permalink
ext_proc: refactoring onData() to make it modularized (envoyproxy#36999)
Browse files Browse the repository at this point in the history
ext_proc: refactoring onData() to make it modularized based on the
comments:
https://github.com/envoyproxy/envoy/pull/34942/files#r1829466794

Fixes: issue: envoyproxy#36970

---------

Signed-off-by: Yanjun Xiang <[email protected]>
  • Loading branch information
yanjunxiang-google authored Nov 7, 2024
1 parent 285a9d6 commit 1c90708
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 110 deletions.
233 changes: 123 additions & 110 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,124 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st
return status;
}

FilterDataStatus Filter::handleDataBufferedMode(ProcessorState& state, Buffer::Instance& data,
bool end_stream) {
if (end_stream) {
switch (openStream()) {
case StreamOpenState::Error:
return FilterDataStatus::StopIterationNoBuffer;
case StreamOpenState::IgnoreError:
return FilterDataStatus::Continue;
case StreamOpenState::Ok:
// Fall through
break;
}

// The body has been buffered and we need to send the buffer
ENVOY_LOG(debug, "Sending request body message");
state.addBufferedData(data);
ProcessingRequest req = setupBodyChunk(state, *state.bufferedData(), end_stream);
sendBodyChunk(state, ProcessorState::CallbackState::BufferedBodyCallback, req);
// Since we just just moved the data into the buffer, return NoBuffer
// so that we do not buffer this chunk twice.
state.setPaused(true);
return FilterDataStatus::StopIterationNoBuffer;
}
ENVOY_LOG(trace, "onData: Buffering");
state.setPaused(true);
return FilterDataStatus::StopIterationAndBuffer;
}

FilterDataStatus Filter::handleDataStreamedMode(ProcessorState& state, Buffer::Instance& data,
bool end_stream) {
// STREAMED body mode works as follows:
//
// 1) As data callbacks come in to the filter, it "moves" the data into a new buffer, which it
// dispatches via gRPC message to the external processor, and then keeps in a queue. It
// may request a watermark if the queue is higher than the buffer limit to prevent running
// out of memory.
// 2) As a result, filters farther down the chain see empty buffers in some data callbacks.
// 3) When a response comes back from the external processor, it injects the processor's result
// into the filter chain using "inject**codedData". (The processor may respond indicating that
// there is no change, which means that the original buffer stored in the queue is what gets
// injected.)
//
// This way, we pipeline data from the proxy to the external processor, and give the processor
// the ability to modify each chunk, in order. Doing this any other way would have required
// substantial changes to the filter manager. See
// https://github.com/envoyproxy/envoy/issues/16760 for a discussion.
switch (openStream()) {
case StreamOpenState::Error:
return FilterDataStatus::StopIterationNoBuffer;
case StreamOpenState::IgnoreError:
return FilterDataStatus::Continue;
case StreamOpenState::Ok:
// Fall through
break;
}

// Need to first enqueue the data into the chunk queue before sending.
ProcessingRequest req = setupBodyChunk(state, data, end_stream);
state.enqueueStreamingChunk(data, end_stream);
// If the current state is HeadersCallback, stays in that state.
if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) {
sendBodyChunk(state, ProcessorState::CallbackState::HeadersCallback, req);
} else {
sendBodyChunk(state, ProcessorState::CallbackState::StreamedBodyCallback, req);
}
if (end_stream || state.callbackState() == ProcessorState::CallbackState::HeadersCallback) {
state.setPaused(true);
return FilterDataStatus::StopIterationNoBuffer;
} else {
return FilterDataStatus::Continue;
}
}

FilterDataStatus Filter::handleDataBufferedPartialMode(ProcessorState& state,
Buffer::Instance& data, bool end_stream) {
// BUFFERED_PARTIAL mode works as follows:
//
// 1) As data chunks arrive, we move the data into a new buffer, which we store
// in the buffer queue, and continue the filter stream with an empty buffer. This
// is the same thing that we do in STREAMING mode.
// 2) If end of stream is reached before the queue reaches the buffer limit, we
// send the buffered data to the server and essentially behave as if we are in
// buffered mode.
// 3) If instead the buffer limit is reached before end of stream, then we also
// send the buffered data to the server, and raise the watermark to prevent Envoy
// from running out of memory while we wait.
// 4) It is possible that Envoy will keep sending us data even in that case, so
// we must continue to queue data and prepare to re-inject it later.
if (state.partialBodyProcessed()) {
// We already sent and received the buffer, so everything else just falls through.
ENVOY_LOG(trace, "Partial buffer limit reached");
// Make sure that we do not accidentally try to modify the headers before
// we continue, which will result in them possibly being sent.
state.setHeaders(nullptr);
return FilterDataStatus::Continue;
} else if (state.callbackState() == ProcessorState::CallbackState::BufferedPartialBodyCallback) {
// More data came in while we were waiting for a callback result. We need
// to queue it and deliver it later in case the callback changes the data.
state.enqueueStreamingChunk(data, end_stream);
ENVOY_LOG(trace, "Call in progress for partial mode");
state.setPaused(true);
return FilterDataStatus::StopIterationNoBuffer;
} else {
state.enqueueStreamingChunk(data, end_stream);
if (end_stream || state.queueOverHighLimit()) {
// At either end of stream or when the buffer is full, it's time to send what we have
// to the processor.
bool terminate;
FilterDataStatus chunk_result;
std::tie(terminate, chunk_result) = sendStreamChunk(state);
if (terminate) {
return chunk_result;
}
}
return FilterDataStatus::StopIterationNoBuffer;
}
}

FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, bool end_stream) {
if (config_->observabilityMode()) {
return sendDataInObservabilityMode(data, state, end_stream);
Expand Down Expand Up @@ -591,121 +709,16 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
FilterDataStatus result;
switch (state.bodyMode()) {
case ProcessingMode::BUFFERED:
if (end_stream) {
switch (openStream()) {
case StreamOpenState::Error:
return FilterDataStatus::StopIterationNoBuffer;
case StreamOpenState::IgnoreError:
return FilterDataStatus::Continue;
case StreamOpenState::Ok:
// Fall through
break;
}

// The body has been buffered and we need to send the buffer
ENVOY_LOG(debug, "Sending request body message");
state.addBufferedData(data);
auto req = setupBodyChunk(state, *state.bufferedData(), end_stream);
sendBodyChunk(state, ProcessorState::CallbackState::BufferedBodyCallback, req);
// Since we just just moved the data into the buffer, return NoBuffer
// so that we do not buffer this chunk twice.
state.setPaused(true);
result = FilterDataStatus::StopIterationNoBuffer;
break;
}
ENVOY_LOG(trace, "onData: Buffering");
state.setPaused(true);
result = FilterDataStatus::StopIterationAndBuffer;
result = handleDataBufferedMode(state, data, end_stream);
break;
case ProcessingMode::STREAMED: {
// STREAMED body mode works as follows:
//
// 1) As data callbacks come in to the filter, it "moves" the data into a new buffer, which it
// dispatches via gRPC message to the external processor, and then keeps in a queue. It
// may request a watermark if the queue is higher than the buffer limit to prevent running
// out of memory.
// 2) As a result, filters farther down the chain see empty buffers in some data callbacks.
// 3) When a response comes back from the external processor, it injects the processor's result
// into the filter chain using "inject**codedData". (The processor may respond indicating that
// there is no change, which means that the original buffer stored in the queue is what gets
// injected.)
//
// This way, we pipeline data from the proxy to the external processor, and give the processor
// the ability to modify each chunk, in order. Doing this any other way would have required
// substantial changes to the filter manager. See
// https://github.com/envoyproxy/envoy/issues/16760 for a discussion.
switch (openStream()) {
case StreamOpenState::Error:
return FilterDataStatus::StopIterationNoBuffer;
case StreamOpenState::IgnoreError:
return FilterDataStatus::Continue;
case StreamOpenState::Ok:
// Fall through
break;
}

// Need to first enqueue the data into the chunk queue before sending.
auto req = setupBodyChunk(state, data, end_stream);
state.enqueueStreamingChunk(data, end_stream);
// If the current state is HeadersCallback, stays in that state.
if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) {
sendBodyChunk(state, ProcessorState::CallbackState::HeadersCallback, req);
} else {
sendBodyChunk(state, ProcessorState::CallbackState::StreamedBodyCallback, req);
}
if (end_stream || state.callbackState() == ProcessorState::CallbackState::HeadersCallback) {
state.setPaused(true);
result = FilterDataStatus::StopIterationNoBuffer;
} else {
result = FilterDataStatus::Continue;
}
case ProcessingMode::STREAMED:
result = handleDataStreamedMode(state, data, end_stream);
break;
}
case ProcessingMode::BUFFERED_PARTIAL:
// BUFFERED_PARTIAL mode works as follows:
//
// 1) As data chunks arrive, we move the data into a new buffer, which we store
// in the buffer queue, and continue the filter stream with an empty buffer. This
// is the same thing that we do in STREAMING mode.
// 2) If end of stream is reached before the queue reaches the buffer limit, we
// send the buffered data to the server and essentially behave as if we are in
// buffered mode.
// 3) If instead the buffer limit is reached before end of stream, then we also
// send the buffered data to the server, and raise the watermark to prevent Envoy
// from running out of memory while we wait.
// 4) It is possible that Envoy will keep sending us data even in that case, so
// we must continue to queue data and prepare to re-inject it later.
if (state.partialBodyProcessed()) {
// We already sent and received the buffer, so everything else just falls through.
ENVOY_LOG(trace, "Partial buffer limit reached");
// Make sure that we do not accidentally try to modify the headers before
// we continue, which will result in them possibly being sent.
state.setHeaders(nullptr);
result = FilterDataStatus::Continue;
} else if (state.callbackState() ==
ProcessorState::CallbackState::BufferedPartialBodyCallback) {
// More data came in while we were waiting for a callback result. We need
// to queue it and deliver it later in case the callback changes the data.
state.enqueueStreamingChunk(data, end_stream);
ENVOY_LOG(trace, "Call in progress for partial mode");
state.setPaused(true);
result = FilterDataStatus::StopIterationNoBuffer;
} else {
state.enqueueStreamingChunk(data, end_stream);
if (end_stream || state.queueOverHighLimit()) {
// At either end of stream or when the buffer is full, it's time to send what we have
// to the processor.
bool terminate;
FilterDataStatus chunk_result;
std::tie(terminate, chunk_result) = sendStreamChunk(state);
if (terminate) {
return chunk_result;
}
}
result = FilterDataStatus::StopIterationNoBuffer;
}
result = handleDataBufferedPartialMode(state, data, end_stream);
break;
case ProcessingMode::NONE:
ABSL_FALLTHROUGH_INTENDED;
default:
result = FilterDataStatus::Continue;
break;
Expand Down
8 changes: 8 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,15 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,

// Return a pair of whether to terminate returning the current result.
std::pair<bool, Http::FilterDataStatus> sendStreamChunk(ProcessorState& state);

Http::FilterDataStatus handleDataBufferedMode(ProcessorState& state, Buffer::Instance& data,
bool end_stream);
Http::FilterDataStatus handleDataStreamedMode(ProcessorState& state, Buffer::Instance& data,
bool end_stream);
Http::FilterDataStatus handleDataBufferedPartialMode(ProcessorState& state,
Buffer::Instance& data, bool end_stream);
Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream);

Http::FilterTrailersStatus onTrailers(ProcessorState& state, Http::HeaderMap& trailers);
void setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state,
const envoy::service::ext_proc::v3::ProcessingResponse& response);
Expand Down

0 comments on commit 1c90708

Please sign in to comment.