Skip to content

Commit

Permalink
Make ackUri always required client-side
Browse files Browse the repository at this point in the history
This leaves the decision whether client ack is handled or noop
to the spooling manager implementation.
  • Loading branch information
wendigo committed Nov 18, 2024
1 parent 50a0b22 commit 58a7e59
Show file tree
Hide file tree
Showing 15 changed files with 49 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,9 @@ public InputStream load(SpooledSegment segment)
@Override
public void acknowledge(SpooledSegment segment)
{
if (!segment.getAckUri().isPresent()) {
return;
}
Request ackRequest = new Request.Builder()
.get()
.url(segment.getAckUri().get().toString())
.url(segment.getAckUri().toString())
.headers(toHeaders(segment.getHeaders()))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static Segment inlined(byte[] data, DataAttributes attributes)
return new InlineSegment(data, attributes);
}

public static Segment spooled(URI retrieveUri, Optional<URI> ackUri, DataAttributes attributes, Map<String, List<String>> headers)
public static Segment spooled(URI retrieveUri, URI ackUri, DataAttributes attributes, Map<String, List<String>> headers)
{
return new SpooledSegment(retrieveUri, ackUri, attributes, headers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.MoreObjects.firstNonNull;
import static java.lang.String.format;
Expand All @@ -31,20 +30,20 @@ public final class SpooledSegment
extends Segment
{
private final URI dataUri;
private final Optional<URI> ackUri;
private final URI ackUri;
private final Map<String, List<String>> headers;

@JsonCreator
public SpooledSegment(
@JsonProperty("uri") URI dataUri,
@JsonProperty("ackUri") Optional<URI> ackUri,
@JsonProperty("ackUri") URI ackUri,
@JsonProperty("metadata") Map<String, Object> metadata,
@JsonProperty("headers") Map<String, List<String>> headers)
{
this(dataUri, ackUri, new DataAttributes(metadata), headers);
}

SpooledSegment(URI dataUri, Optional<URI> ackUri, DataAttributes metadata, Map<String, List<String>> headers)
SpooledSegment(URI dataUri, URI ackUri, DataAttributes metadata, Map<String, List<String>> headers)
{
super(metadata);
this.dataUri = requireNonNull(dataUri, "dataUri is null");
Expand All @@ -59,7 +58,7 @@ public URI getDataUri()
}

@JsonProperty("ackUri")
public Optional<URI> getAckUri()
public URI getAckUri()
{
return ackUri;
}
Expand All @@ -74,6 +73,6 @@ public Map<String, List<String>> getHeaders()
@Override
public String toString()
{
return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s, ack=%b}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet(), ackUri.isPresent());
return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -295,6 +294,6 @@ private static QueryResults fromSegments(Segment... segments)

private static Segment spooledSegment()
{
return spooled(URI.create("http://localhost"), Optional.empty(), DataAttributes.empty(), ImmutableMap.of());
return spooled(URI.create("http://localhost"), URI.create("http://localhost"), DataAttributes.empty(), ImmutableMap.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ static class OutputSpoolingOperator
implements Operator
{
private final OutputSpoolingController controller;
private final boolean explicitAck;

enum State
{
Expand Down Expand Up @@ -166,7 +165,6 @@ public OutputSpoolingOperator(OperatorContext operatorContext, QueryDataEncoder
spoolingConfig.getMaximumSegmentSize().toBytes(),
spoolingConfig.getInitialSegmentSize().toBytes(),
spoolingConfig.getMaximumSegmentSize().toBytes());
this.explicitAck = spoolingConfig.isExplicitAck();
this.userMemoryContext = operatorContext.newLocalUserMemoryContext(OutputSpoolingOperator.class.getSimpleName());
this.queryDataEncoder = requireNonNull(queryDataEncoder, "queryDataEncoder is null");
this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null");
Expand Down Expand Up @@ -280,7 +278,7 @@ private Page spool(List<Page> pages, boolean finished)
controller.recordEncoded(attributes.get(SEGMENT_SIZE, Integer.class));

// This page is small (hundreds of bytes) so there is no point in tracking its memory usage
return emptySingleRowPage(SpooledBlock.forLocation(spoolingManager.location(segmentHandle), attributes, explicitAck).serialize());
return emptySingleRowPage(SpooledBlock.forLocation(spoolingManager.location(segmentHandle), attributes).serialize());
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@ public class CoordinatorSegmentResource
private final SpoolingManager spoolingManager;
private final SegmentRetrievalMode retrievalMode;
private final InternalNodeManager nodeManager;
private final boolean explicitAck;

@Inject
public CoordinatorSegmentResource(SpoolingManager spoolingManager, SpoolingConfig config, InternalNodeManager nodeManager)
{
this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null");
this.retrievalMode = requireNonNull(config, "config is null").getRetrievalMode();
this.explicitAck = config.isExplicitAck();
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
}

Expand Down Expand Up @@ -99,12 +97,6 @@ public Response download(@Context UriInfo uriInfo, @PathParam("identifier") Stri
public Response acknowledge(@PathParam("identifier") String identifier, @Context HttpHeaders headers)
throws IOException
{
if (!explicitAck) {
return Response.status(Response.Status.NOT_ACCEPTABLE)
.entity("Explicit segment acknowledgment is disabled")
.build();
}

try {
spoolingManager.acknowledge(handle(identifier, headers));
return Response.ok().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@
import static io.airlift.json.JsonCodec.listJsonCodec;
import static io.airlift.json.JsonCodec.mapJsonCodec;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.VarcharType.VARCHAR;

public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers, DataAttributes attributes, boolean explicitAck)
public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers, DataAttributes attributes)
{
private static final JsonCodec<Map<String, List<String>>> HEADERS_CODEC = mapJsonCodec(String.class, listJsonCodec(String.class));
private static final JsonCodec<DataAttributes> ATTRIBUTES_CODEC = JsonCodec.jsonCodec(DataAttributes.class);
Expand All @@ -48,8 +47,7 @@ public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String
new RowType.Field(Optional.of("identifier"), VARCHAR),
new RowType.Field(Optional.of("directLocation"), VARCHAR),
new RowType.Field(Optional.of("headers"), VARCHAR),
new RowType.Field(Optional.of("metadata"), VARCHAR),
new RowType.Field(Optional.of("explicitAck"), BOOLEAN)));
new RowType.Field(Optional.of("metadata"), VARCHAR)));

public static final String SPOOLING_METADATA_COLUMN_NAME = "$spooling:metadata$";
public static final Symbol SPOOLING_METADATA_SYMBOL = new Symbol(SPOOLING_METADATA_TYPE, SPOOLING_METADATA_COLUMN_NAME);
Expand All @@ -65,33 +63,29 @@ public static SpooledBlock deserialize(Page page)
VARCHAR.getSlice(row.getRawFieldBlock(0), 0),
Optional.empty(), // Not a direct location
HEADERS_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(2), 0).toStringUtf8()),
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()),
BOOLEAN.getBoolean(row.getRawFieldBlock(4), 0));
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()));
}

return new SpooledBlock(
VARCHAR.getSlice(row.getRawFieldBlock(0), 0),
Optional.of(URI.create(VARCHAR.getSlice(row.getRawFieldBlock(1), 0).toStringUtf8())),
HEADERS_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(2), 0).toStringUtf8()),
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()),
BOOLEAN.getBoolean(row.getRawFieldBlock(4), 0));
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()));
}

public static SpooledBlock forLocation(SpooledLocation location, DataAttributes attributes, boolean explicitAck)
public static SpooledBlock forLocation(SpooledLocation location, DataAttributes attributes)
{
return switch (location) {
case DirectLocation directLocation -> new SpooledBlock(
directLocation.identifier(),
Optional.of(directLocation.directUri()),
directLocation.headers(),
attributes,
explicitAck);
attributes);
case CoordinatorLocation coordinatorLocation -> new SpooledBlock(
coordinatorLocation.identifier(),
Optional.empty(),
coordinatorLocation.headers(),
attributes,
explicitAck);
attributes);
};
}

Expand All @@ -115,7 +109,6 @@ void serialize(RowBlockBuilder rowBlockBuilder)
}
VARCHAR.writeSlice(rowEntryBuilder.get(2), utf8Slice(HEADERS_CODEC.toJson(headers)));
VARCHAR.writeSlice(rowEntryBuilder.get(3), utf8Slice(ATTRIBUTES_CODEC.toJson(attributes)));
BOOLEAN.writeBoolean(rowEntryBuilder.get(4), explicitAck);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

import static io.trino.client.spooling.DataAttribute.ROWS_COUNT;
Expand Down Expand Up @@ -79,7 +78,7 @@ public QueryData produce(ExternalUriInfo uriInfo, Session session, QueryResultRo
builder.withSegment(spooled(
metadata.directUri()
.orElseGet(() -> buildSegmentDownloadURI(uriBuilder, metadata.identifier())),
metadata.explicitAck() ? Optional.of(buildSegmentAckURI(uriBuilder, metadata.identifier())) : Optional.empty(),
buildSegmentAckURI(uriBuilder, metadata.identifier()),
attributes,
metadata.headers()));
currentOffset += attributes.get(ROWS_COUNT, Long.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class SpoolingConfig
private Optional<Duration> storageRedirectTtl = Optional.empty();

private boolean allowInlining = true;
private boolean explicitAck = true;
private long maximumInlinedRows = 1000;
private DataSize maximumInlinedSize = DataSize.of(128, KILOBYTE);
private DataSize initialSegmentSize = DataSize.of(8, MEGABYTE);
Expand Down Expand Up @@ -124,19 +123,6 @@ public SpoolingConfig setAllowInlining(boolean allowInlining)
return this;
}

public boolean isExplicitAck()
{
return explicitAck;
}

@ConfigDescription("Allow client to acknowledge segment retrieval and its eager removal")
@Config("protocol.spooling.explicit-ack.enabled")
public SpoolingConfig setExplicitAck(boolean explicitAck)
{
this.explicitAck = explicitAck;
return this;
}

public long getMaximumInlinedRows()
{
return maximumInlinedRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Set;

Expand Down Expand Up @@ -139,11 +138,11 @@ public void testSpooledQueryDataSerialization()
inlined("super".getBytes(UTF_8), dataAttributes(0, 100, 5)),
spooled(
URI.create("http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/1"),
Optional.of(URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/1")),
URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/1"),
dataAttributes(100, 100, 1024), Map.of("x-amz-server-side-encryption", List.of("AES256"))),
spooled(
URI.create("http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2"),
Optional.empty(),
URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/2"),
dataAttributes(200, 100, 1024), Map.of("x-amz-server-side-encryption", List.of("AES256")))))
.withAttributes(DataAttributes.builder()
.set(SCHEMA, "serializedSchema")
Expand Down Expand Up @@ -180,6 +179,7 @@ public void testSpooledQueryDataSerialization()
{
"type": "spooled",
"uri": "http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2",
"ackUri": "http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/2",
"metadata": {
"rowOffset": 200,
"rowsCount": 100,
Expand All @@ -199,15 +199,9 @@ public void testEncodedQueryDataToString()

EncodedQueryData spooledQueryData = new EncodedQueryData("json+zstd", ImmutableMap.of("decryption_key", "secret"), ImmutableList.of(spooled(
URI.create("http://coordinator:8080/v1/segments/uuid"),
Optional.of(URI.create("http://coordinator:8080/v1/segments/uuid")),
dataAttributes(10, 2, 1256), headers())));
assertThat(spooledQueryData.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption], ack=true}], metadata=[decryption_key]}");

EncodedQueryData spooledQueryDataWithoutAck = new EncodedQueryData("json+zstd", ImmutableMap.of("decryption_key", "secret"), ImmutableList.of(spooled(
URI.create("http://coordinator:8080/v1/segments/uuid"),
Optional.empty(),
dataAttributes(10, 2, 1256), headers())));
assertThat(spooledQueryDataWithoutAck.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption], ack=false}], metadata=[decryption_key]}");
assertThat(spooledQueryData.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption]}], metadata=[decryption_key]}");
}

private void testRoundTrip(QueryData queryData, String expectedDataRepresentation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,31 @@ public void verifySerialization(Slice identifier, Optional<URI> directUri, Map<S

public void verifySerializationRoundTrip(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1200), true);
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1200));
Page page = new Page(metadata.serialize());
SpooledBlock retrieved = SpooledBlock.deserialize(page);
assertThat(metadata).isEqualTo(retrieved);
}

private void verifySerializationRoundTripWithNonEmptyPage(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1100), false);
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1100));
Page page = new Page(blockWithPositions(1, true), metadata.serialize());
SpooledBlock retrieved = SpooledBlock.deserialize(page);
assertThat(metadata).isEqualTo(retrieved);
}

private void verifyThrowsErrorOnNonNullPositions(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(20, 1200), true);
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(20, 1200));

assertThatThrownBy(() -> SpooledBlock.deserialize(new Page(blockWithPositions(1, false), metadata.serialize())))
.hasMessage("Spooling metadata block must have all but last channels null");
}

private void verifyThrowsErrorOnMultiplePositions(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(30, 1300), false);
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(30, 1300));
RowBlockBuilder rowBlockBuilder = SPOOLING_METADATA_TYPE.createBlockBuilder(null, 2);
metadata.serialize(rowBlockBuilder);
metadata.serialize(rowBlockBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ public void testDefaults()
.setMaximumSegmentSize(DataSize.of(16, MEGABYTE))
.setMaximumInlinedRows(1000)
.setMaximumInlinedSize(DataSize.of(128, KILOBYTE))
.setAllowInlining(true)
.setExplicitAck(true));
.setAllowInlining(true));
}

@Test
Expand All @@ -58,7 +57,6 @@ public void testExplicitPropertyMappings()
.put("protocol.spooling.retrieval-mode", "coordinator_storage_redirect")
.put("protocol.spooling.coordinator-storage-redirect-ttl", "60s")
.put("protocol.spooling.inlining.enabled", "false")
.put("protocol.spooling.explicit-ack.enabled", "false")
.put("protocol.spooling.initial-segment-size", "1kB")
.put("protocol.spooling.maximum-segment-size", "8kB")
.put("protocol.spooling.inlining.max-rows", "1024")
Expand All @@ -73,8 +71,7 @@ public void testExplicitPropertyMappings()
.setMaximumSegmentSize(DataSize.of(8, KILOBYTE))
.setMaximumInlinedRows(1024)
.setMaximumInlinedSize(DataSize.of(1, MEGABYTE))
.setAllowInlining(false)
.setExplicitAck(false);
.setAllowInlining(false);

assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit 58a7e59

Please sign in to comment.