Skip to content

Commit

Permalink
webdav/pool: support RFC 3230 headers with multiple checksum algorithms
Browse files Browse the repository at this point in the history
Motivation:

Support HTTP clients that need to know the checksums of a file generated
using multiple algorithms.

Modification:

Update QualityValue to support an optional mapping.  Add new unit-tests
to verify correct behaviour.

Update RFC3230 string parsing to support multiple algorithms.  Update
unit tests to match new API and (where appropriate) new behaviour.  Add
some additional unit tests to verify case insensitivity.

Update WebDAV door to send multiple checksums to pool if `Want-Digest`
is specified on HTTP PUT or COPY (HTTP-TPC) requests and the request
header value contains multiple (supported) algorithms with the same
q-value.  Door will inform client of requested checksum values; for
HTTP-TPC requests, this requires the client to support response
trailers.

Update pool to calculate multiple checksum algorithms if clients
includes a `Want-Digest` header on HTTP PUT request with multiple
(supported) algorithms with the same q-value (assuming door redirects
client to pool).

Update WebDAV door to respond will all matching, known checksum values
if client included a multi-algorithm `Want-Digest` header on GET or HEAD
requests.

Result:

dCache now supports accepting data (via HTTP or HTTP-TPC) and
calculating multiple client-requests checksum algorithms.

Querying checksums via RFC 3230 now supports returning multiple
checksums.

Target: master
Request: 10.2
Requires-notes: yes
Requires-book: no
  • Loading branch information
paulmillar committed Dec 7, 2024
1 parent 484e967 commit ff119ff
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.security.auth.Subject;
import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -1595,8 +1597,9 @@ private static boolean isDigestRequested() {
case HEAD:
case GET:
return wantDigest()
.flatMap(Checksums::parseWantDigest)
.isPresent();
.map(Checksums::parseWantDigest)
.map(c -> !c.isEmpty())
.orElse(false);
default:
return false;
}
Expand Down Expand Up @@ -1674,7 +1677,7 @@ void removeExtendedAttribute(FsPath path, String name) throws CacheException {
private class HttpTransfer extends RedirectedTransfer<String> {

private URI _location;
private ChecksumType _wantedChecksum;
private Set<ChecksumType> _wantedChecksums = EnumSet.noneOf(ChecksumType.class);
private InetSocketAddress _clientAddressForPool;
protected HttpProtocolInfo.Disposition _disposition;
private boolean _isSSL;
Expand All @@ -1696,9 +1699,19 @@ public HttpTransfer(PnfsHandler pnfs, Subject subject,
}

protected ProtocolInfo createProtocolInfo(InetSocketAddress address) {
List<ChecksumType> wantedChecksums = _wantedChecksum == null
? Collections.emptyList()
: List.of(_wantedChecksum);
List<ChecksumType> wantedChecksums;
if (_wantedChecksums.isEmpty()) {
wantedChecksums = Collections.emptyList();
} else {
ChecksumType preferred = _wantedChecksums.stream()
.sorted(Checksums.PREFERRED_CHECKSUM_TYPE_ORDERING)
.findFirst()
.orElseThrow(() -> new RuntimeException("Failed to identified preferred checksum in " + _wantedChecksums));
wantedChecksums = Stream.concat(
Stream.of(preferred),
_wantedChecksums.stream().filter(c -> c != preferred))
.collect(Collectors.toList());
}
HttpProtocolInfo protocolInfo =
new HttpProtocolInfo(
_isSSL ? PROTOCOL_INFO_SSL_NAME : PROTOCOL_INFO_NAME,
Expand Down Expand Up @@ -1728,8 +1741,8 @@ public void setLocation(URI location) {
_location = location;
}

public void setWantedChecksum(ChecksumType type) {
_wantedChecksum = type;
public void setWantedChecksums(Set<ChecksumType> checksums) {
_wantedChecksums = requireNonNull(checksums);
}

public void setProxyTransfer(boolean isProxyTransfer) {
Expand Down Expand Up @@ -1841,8 +1854,9 @@ public WriteTransfer(PnfsHandler pnfs, Subject subject,
_mtime = OwncloudClients.parseMTime(request);

wantDigest()
.flatMap(Checksums::parseWantDigest)
.ifPresent(this::setWantedChecksum);
.map(Checksums::parseWantDigest)
.filter(v -> !v.isEmpty())
.ifPresent(this::setWantedChecksums);

try {
_contentMd5 = Optional.ofNullable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import javax.security.auth.Subject;
Expand Down Expand Up @@ -763,7 +764,7 @@ private class RemoteTransfer {
private final ImmutableMap<String, String> _transferHeaders;
private final Direction _direction;
private final boolean _overwriteAllowed;
private final Optional<String> _wantDigest;
private final Set<ChecksumType> _wantedDigests;
private final PnfsHandler _pnfs;
private final Instant _whenSubmitted = Instant.now();
private final SettableFuture<Optional<String>> _transferResult = SettableFuture.create();
Expand Down Expand Up @@ -818,7 +819,9 @@ public RemoteTransfer(Subject subject, Restriction restriction,
_transferHeaders = transferHeaders;
_direction = direction;
_overwriteAllowed = overwriteAllowed;
_wantDigest = wantDigest;
_wantedDigests = wantDigest
.map(Checksums::parseWantDigest)
.orElse(EnumSet.noneOf(ChecksumType.class));
}

private IoDoorEntry describe() {
Expand Down Expand Up @@ -850,9 +853,9 @@ private FileAttributes resolvePath() throws ErrorResponseException {
try {
switch (_direction) {
case PUSH:
EnumSet<FileAttribute> desired = _wantDigest.isPresent()
? EnumSet.of(PNFSID, SIZE, TYPE, CHECKSUM)
: EnumSet.of(PNFSID, SIZE, TYPE);
EnumSet<FileAttribute> desired = _wantedDigests.isEmpty()
? EnumSet.of(PNFSID, SIZE, TYPE)
: EnumSet.of(PNFSID, SIZE, TYPE, CHECKSUM);
desired.addAll(TransferManagerHandler.ATTRIBUTES_FOR_PUSH);
try {
FileAttributes attributes = _pnfs.getFileAttributes(_path.toString(),
Expand Down Expand Up @@ -975,7 +978,7 @@ public synchronized ListenableFuture<Optional<String>> start()
_async = servletRequest.startAsync();
_async.setTimeout(0); // Disable timeout as we don't know how long we'll take.

if (_direction == Direction.PULL && _wantDigest.isPresent()) {
if (_direction == Direction.PULL && !_wantedDigests.isEmpty()) {
// Ensure this is called before any perf-marker data is sent.
addTrailerCallback();
}
Expand Down Expand Up @@ -1029,14 +1032,25 @@ private IpProtocolInfo buildProtocolInfo() throws ErrorResponseException {
"Unknown " + target + " hostname");
}

Optional<ChecksumType> desiredChecksum = _wantDigest.flatMap(
Checksums::parseWantDigest);
var desiredChecksums = desiredChecksum
.map(List::of)
.orElseGet(Collections::emptyList);
List<ChecksumType> desiredChecksums;
if (_wantedDigests.isEmpty()) {
desiredChecksums = Collections.emptyList();
} else {
ChecksumType preferred = _wantedDigests.stream()
.sorted(Checksums.PREFERRED_CHECKSUM_TYPE_ORDERING)
.findFirst()
.orElseThrow(() -> new RuntimeException("Failed to identified preferred checksum in " + _wantedDigests));
desiredChecksums = Stream.concat(
Stream.of(preferred),
_wantedDigests.stream().filter(c -> c != preferred))
.collect(Collectors.toList());
}

switch (_type) {
case GSIFTP:
Optional<ChecksumType> desiredChecksum = desiredChecksums.isEmpty()
? Optional.empty()
: Optional.of(desiredChecksums.get(0));
return new RemoteGsiftpTransferProtocolInfo("RemoteGsiftpTransfer",
1, 1, address, _destination.toASCIIString(), null,
null, buffer, MiB.toBytes(1), _privateKey, _certificateChain,
Expand Down Expand Up @@ -1080,19 +1094,18 @@ private HttpFields getTrailers() {
}

private void fetchChecksums() {
if (_direction == Direction.PULL && _wantDigest.isPresent()) {
Optional<String> empty = Optional.empty();
_digestValue = _wantDigest.map(h -> {
try {
FileAttributes attributes = _pnfs.getFileAttributes(_path,
EnumSet.of(CHECKSUM));
return Checksums.digestHeader(h, attributes);
} catch (CacheException e) {
LOGGER.warn("Failed to acquire checksum of fetched file: {}",
e.getMessage());
return empty;
}
}).orElse(empty);
if (_direction == Direction.PULL && !_wantedDigests.isEmpty()) {
try {
FileAttributes attributes = _pnfs.getFileAttributes(_path,
EnumSet.of(CHECKSUM));
_digestValue = Checksums.digestHeader(_wantedDigests, attributes);
} catch (CacheException e) {
LOGGER.warn("Failed to acquire checksum of fetched file: {}",
e.getMessage());
_digestValue = Optional.empty();
}
} else {
_digestValue = Optional.empty();
}
}

Expand Down Expand Up @@ -1127,13 +1140,13 @@ private void addDigestResponseHeader(FileAttributes attributes) {

switch (_direction) {
case PULL:
if (_wantDigest.isPresent()) {
if (!_wantedDigests.isEmpty()) {
response.setHeader("Trailer", "Digest");
}
break;

case PUSH:
_wantDigest.flatMap(h -> Checksums.digestHeader(h, attributes))
Checksums.digestHeader(_wantedDigests, attributes)
.ifPresent(v -> response.setHeader("Digest", v));
break;
}
Expand Down
5 changes: 5 additions & 0 deletions modules/dcache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@
<artifactId>everit-json-schema</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.npathai</groupId>
<artifactId>hamcrest-optional</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,15 @@
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.dcache.namespace.FileAttribute;
import org.dcache.pool.movers.NettyTransferService;
import org.dcache.pool.movers.RepositoryFileRegion;
Expand Down Expand Up @@ -132,7 +135,7 @@ public class HttpPoolRequestHandler extends HttpRequestHandler {
*/
private NettyTransferService<HttpProtocolInfo>.NettyMoverChannel _writeChannel;

private Optional<ChecksumType> _wantedDigest;
private Set<ChecksumType> _wantedDigests = EnumSet.noneOf(ChecksumType.class);

/**
* A simple data class to encapsulate the errors to return by the mover to the pool for file
Expand Down Expand Up @@ -522,8 +525,10 @@ protected ChannelFuture doOnPut(ChannelHandlerContext context, HttpRequest reque
}

file.getProtocolInfo().getWantedChecksums().forEach(file::addChecksumType);
_wantedDigest = wantDigest(request).flatMap(Checksums::parseWantDigest);
_wantedDigest.ifPresent(file::addChecksumType);
_wantedDigests = wantDigest(request)
.map(Checksums::parseWantDigest)
.orElseGet(() -> EnumSet.noneOf(ChecksumType.class));
_wantedDigests.forEach(file::addChecksumType);

if (is100ContinueExpected(request)) {
context.writeAndFlush(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE))
Expand Down Expand Up @@ -589,10 +594,11 @@ protected ChannelFuture doOnContent(ChannelHandlerContext context, HttpContent c
@Override
public void onSuccess(Void result) {
try {
Optional<String> digest = _wantedDigest
.flatMap(t -> Checksums.digestHeader(t,
writeChannel.getFileAttributes()));
context.writeAndFlush(new HttpPutResponse(size, location, digest),
Optional<String> digestResponseHeader =
Checksums.digestHeader(_wantedDigests,
writeChannel.getFileAttributes());
context.writeAndFlush(new HttpPutResponse(size,
location, digestResponseHeader),
promise);
} catch (IOException e) {
context.writeAndFlush(
Expand Down
Loading

0 comments on commit ff119ff

Please sign in to comment.