Skip to content

Commit

Permalink
Merge branch 'main' into sm-s3-unittest
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed May 16, 2024
2 parents 8ded8be + 757058b commit 2fa825e
Show file tree
Hide file tree
Showing 16 changed files with 587 additions and 63 deletions.
41 changes: 26 additions & 15 deletions Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public boolean accept(File dir, String name) {

public static final Pattern REPEATED_URI_SEPARATOR_PATTERN = Pattern.compile("//+");

public static final String FILE_URI_SCHEME = "file";

/**
* Cleans the specified path. All files and subdirectories in the path will be deleted. (ie you'll be left with an
* empty directory).
Expand Down Expand Up @@ -282,21 +284,36 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
URI uri;
try {
uri = new URI(source);
if (uri.getScheme() == null) {
// Convert to a "file" URI
return convertToURI(new File(source), isDirectory);
}
if (uri.getScheme().equals(FILE_URI_SCHEME)) {
return convertToURI(new File(uri), isDirectory);
}
String path = uri.getPath();
final boolean endsWithSlash = path.charAt(path.length() - 1) == URI_SEPARATOR_CHAR;
if (!isDirectory && endsWithSlash) {
throw new IllegalArgumentException("Non-directory URI should not end with a slash: " + uri);
}
boolean isUpdated = false;
if (isDirectory && !endsWithSlash) {
path = path + URI_SEPARATOR_CHAR;
isUpdated = true;
}
// Replace two or more consecutive slashes in the path with a single slash
final String path = uri.getPath();
if (path.contains(REPEATED_URI_SEPARATOR)) {
final String canonicalizedPath = REPEATED_URI_SEPARATOR_PATTERN.matcher(path).replaceAll(URI_SEPARATOR);
uri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), canonicalizedPath,
uri.getQuery(), uri.getFragment());
path = REPEATED_URI_SEPARATOR_PATTERN.matcher(path).replaceAll(URI_SEPARATOR);
isUpdated = true;
}
if (isUpdated) {
uri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), path, uri.getQuery(),
uri.getFragment());
}
} catch (final URISyntaxException e) {
// If the URI is invalid, assume it's a file path
return convertToURI(new File(source), isDirectory);
}
if (uri.getScheme() == null) {
// Convert to a "file" URI
return convertToURI(new File(source), isDirectory);
}
return uri;
}

Expand All @@ -314,17 +331,11 @@ public static URI convertToURI(final File file, final boolean isDirectory) {
if (File.separatorChar != URI_SEPARATOR_CHAR) {
absPath = absPath.replace(File.separatorChar, URI_SEPARATOR_CHAR);
}
if (absPath.charAt(0) != URI_SEPARATOR_CHAR) {
absPath = URI_SEPARATOR_CHAR + absPath;
}
if (isDirectory && absPath.charAt(absPath.length() - 1) != URI_SEPARATOR_CHAR) {
absPath = absPath + URI_SEPARATOR_CHAR;
}
if (absPath.startsWith(REPEATED_URI_SEPARATOR)) {
absPath = REPEATED_URI_SEPARATOR + absPath;
}
try {
return new URI("file", null, absPath, null);
return new URI(FILE_URI_SCHEME, null, absPath, null);
} catch (final URISyntaxException e) {
throw new IllegalStateException("Failed to convert file to URI: " + file, e);
}
Expand Down
68 changes: 68 additions & 0 deletions Base/src/test/java/io/deephaven/base/FileUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.base;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;

import junit.framework.TestCase;
import org.junit.Assert;

public class FileUtilsTest extends TestCase {

public void testConvertToFileURI() throws IOException {
final File currentDir = new File("").getAbsoluteFile();
fileUriTestHelper(currentDir.toString(), true, currentDir.toURI().toString());

final File someFile = new File(currentDir, "tempFile");
fileUriTestHelper(someFile.getPath(), false, someFile.toURI().toString());

// Check if trailing slash gets added for a directory
final String expectedDirURI = "file:" + currentDir.getPath() + "/path/to/directory/";
fileUriTestHelper(currentDir.getPath() + "/path/to/directory", true, expectedDirURI);

// Check if multiple slashes get normalized
fileUriTestHelper(currentDir.getPath() + "////path///to////directory////", true, expectedDirURI);

// Check if multiple slashes in the beginning get normalized
fileUriTestHelper("////" + currentDir.getPath() + "/path/to/directory", true, expectedDirURI);

// Check for bad inputs for files with trailing slashes
final String expectedFileURI = someFile.toURI().toString();
fileUriTestHelper(someFile.getPath() + "/", false, expectedFileURI);
Assert.assertEquals(expectedFileURI,
FileUtils.convertToURI("file:" + someFile.getPath() + "/", false).toString());
}

private static void fileUriTestHelper(final String filePath, final boolean isDirectory,
final String expectedURIString) {
Assert.assertEquals(expectedURIString, FileUtils.convertToURI(filePath, isDirectory).toString());
Assert.assertEquals(expectedURIString, FileUtils.convertToURI(new File(filePath), isDirectory).toString());
Assert.assertEquals(expectedURIString, FileUtils.convertToURI(Path.of(filePath), isDirectory).toString());
}

public void testConvertToS3URI() throws URISyntaxException {
Assert.assertEquals("s3://bucket/key", FileUtils.convertToURI("s3://bucket/key", false).toString());

// Check if trailing slash gets added for a directory
Assert.assertEquals("s3://bucket/key/".toString(), FileUtils.convertToURI("s3://bucket/key", true).toString());

// Check if multiple slashes get normalized
Assert.assertEquals("s3://bucket/key/", FileUtils.convertToURI("s3://bucket///key///", true).toString());

try {
FileUtils.convertToURI("", false);
Assert.fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException expected) {
}

try {
FileUtils.convertToURI("s3://bucket/key/", false);
Assert.fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException expected) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.impl.RowSetUtils;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.QueryConstants;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.LongChunk;
Expand All @@ -26,6 +28,8 @@
* its own offset in those arrays.
*/
public class BucketState {
private static final Logger log = LoggerFactory.getLogger(BucketState.class);

private final WritableRowSet rowSet = RowSetFactory.empty();

private RowSet cachedRowSet;
Expand Down Expand Up @@ -310,22 +314,16 @@ public void validate(final boolean usePrev, final DownsampleChunkContext context
values[columnIndex].validate(offset, keyChunk.get(indexInChunk), valueChunks[columnIndex],
indexInChunk, trackNulls ? nulls[columnIndex] : null);
} catch (final RuntimeException e) {
System.out.println(rowSet);
final String msg =
"Bad data! indexInChunk=" + indexInChunk + ", col=" + columnIndex + ", usePrev="
+ usePrev + ", offset=" + offset + ", rowSet=" + keyChunk.get(indexInChunk);
+ usePrev + ", offset=" + offset + ", indexInChunk="
+ keyChunk.get(indexInChunk);
log.error().append(msg).append(", rowSet=").append(rowSet).endl();
throw new IllegalStateException(msg, e);
}
}
}
}
Assert.eqTrue(makeRowSet().subsetOf(rowSet), "makeRowSet().subsetOf(rowSet)");
}

public void close() {
if (cachedRowSet != null) {
cachedRowSet.close();
}
rowSet.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,6 @@ private DownsamplerListener(
allYColumnIndexes = IntStream.range(0, key.yColumnNames.length).toArray();
}

@Override
protected void destroy() {
super.destroy();
states.values().forEach(BucketState::close);
}

@Override
public void onUpdate(final TableUpdate upstream) {
try (final DownsampleChunkContext context =
Expand Down Expand Up @@ -684,7 +678,6 @@ private void performRescans(final DownsampleChunkContext context) {
// if it has no keys at all, remove it so we quit checking it
iterator.remove();
releasePosition(bucket.getOffset());
bucket.close();
} else {
bucket.rescanIfNeeded(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ public boolean equals(Object obj) {

return false;
}

@Override
public int hashCode() {
return displayString.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.parquet.schema.MessageType;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Stream;

import static io.deephaven.extensions.trackedfile.TrackedSeekableChannelsProviderPlugin.FILE_URI_SCHEME;
import static io.deephaven.base.FileUtils.FILE_URI_SCHEME;

/**
* {@link SeekableChannelsProvider} implementation that is constrained by a Deephaven {@link TrackedFileHandleFactory}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@

import java.net.URI;

import static io.deephaven.base.FileUtils.FILE_URI_SCHEME;

/**
* {@link SeekableChannelsProviderPlugin} implementation used for reading files from local disk.
*/
@AutoService(SeekableChannelsProviderPlugin.class)
public final class TrackedSeekableChannelsProviderPlugin implements SeekableChannelsProviderPlugin {

static final String FILE_URI_SCHEME = "file";

@Override
public boolean isCompatible(@NotNull final URI uri, @Nullable final Object object) {
return FILE_URI_SCHEME.equals(uri.getScheme());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ public void subscribeToLogs(
GrpcUtil.safelyError(responseObserver, Code.FAILED_PRECONDITION, "Remote console disabled");
return;
}
// Session close logic implicitly handled in
// io.deephaven.server.session.SessionServiceGrpcImpl.SessionServiceInterceptor
final LogsClient client =
new LogsClient(request, (ServerCallStreamObserver<LogSubscriptionData>) responseObserver);
client.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ public void onCompleted() {
@Override
public StreamObserver<StreamRequest> messageStream(StreamObserver<StreamResponse> responseObserver) {
SessionState session = sessionService.getCurrentSession();
// Session close logic implicitly handled in
// io.deephaven.server.session.SessionServiceGrpcImpl.SessionServiceInterceptor
return new SendMessageObserver(session, responseObserver);
}

Expand Down
Loading

0 comments on commit 2fa825e

Please sign in to comment.