Skip to content

Commit

Permalink
Merge branch 'main' into sm-s3-unittest
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed May 14, 2024
2 parents b7d8d76 + 2dbbf32 commit 5e80889
Show file tree
Hide file tree
Showing 43 changed files with 1,598 additions and 718 deletions.
1 change: 0 additions & 1 deletion Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//
package io.deephaven.base;

import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import org.jetbrains.annotations.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ enum ChannelType {
private final RAPriQueue<PerPathPool> releasePriority =
new RAPriQueue<>(8, PerPathPool.RAPQ_ADAPTER, PerPathPool.class);

public CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider,
public static CachedChannelProvider create(@NotNull final SeekableChannelsProvider wrappedProvider,
final int maximumPooledCount) {
if (wrappedProvider instanceof CachedChannelProvider) {
throw new IllegalArgumentException("Cannot wrap a CachedChannelProvider in another CachedChannelProvider");
}
return new CachedChannelProvider(wrappedProvider, maximumPooledCount);
}

private CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider,
final int maximumPooledCount) {
this.wrappedProvider = wrappedProvider;
this.maximumPooledCount = Require.gtZero(maximumPooledCount, "maximumPooledCount");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ private SeekableChannelsProviderLoader() {
}

/**
* Create a new {@link SeekableChannelsProvider} based on given URI and object using the plugins loaded by the
* {@link ServiceLoader}. For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can
* read files from S3.
* Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the
* plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a
* {@link SeekableChannelsProvider} which can read files from S3.
*
* @param uri The URI
* @param object An optional object to pass to the {@link SeekableChannelsProviderPlugin} implementations.
* @param specialInstructions An optional object to pass special instructions to the provider.
* @return A {@link SeekableChannelsProvider} for the given URI.
*/
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, @Nullable final Object object) {
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri,
@Nullable final Object specialInstructions) {
for (final SeekableChannelsProviderPlugin plugin : providers) {
if (plugin.isCompatible(uri, object)) {
return plugin.createProvider(uri, object);
if (plugin.isCompatible(uri, specialInstructions)) {
return plugin.createProvider(uri, specialInstructions);
}
}
throw new UnsupportedOperationException("No plugin found for uri: " + uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
Expand All @@ -32,7 +33,7 @@ public class CachedChannelProviderTest {
@Test
public void testSimpleRead() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int ii = 0; ii < 100; ++ii) {
final SeekableByteChannel[] sameFile = new SeekableByteChannel[10];
for (int jj = 0; jj < sameFile.length; ++jj) {
Expand All @@ -55,7 +56,7 @@ public void testSimpleRead() throws IOException {
@Test
public void testSimpleReadWrite() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc =
((i / 100) % 2 == 0 ? cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i)
Expand All @@ -69,7 +70,7 @@ public void testSimpleReadWrite() throws IOException {
@Test
public void testSimpleWrite() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + i, false);
// Call write to hit the assertions inside the mock channel
Expand All @@ -86,7 +87,7 @@ public void testSimpleWrite() throws IOException {
@Test
public void testSimpleAppend() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("a" + i, true);
rc.close();
Expand All @@ -100,7 +101,7 @@ public void testSimpleAppend() throws IOException {
@Test
public void testCloseOrder() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 20; i++) {
List<SeekableByteChannel> channels = new ArrayList<>();
for (int j = 0; j < 50; j++) {
Expand All @@ -121,7 +122,7 @@ public void testCloseOrder() throws IOException {
@Test
public void testReuse() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 50);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 50);
final SeekableByteChannel[] someResult = new SeekableByteChannel[50];
final ByteBuffer buffer = ByteBuffer.allocate(1);
for (int ci = 0; ci < someResult.length; ++ci) {
Expand Down Expand Up @@ -149,7 +150,7 @@ public void testReuse() throws IOException {
@Test
public void testReuse10() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
final SeekableByteChannel[] someResult = new SeekableByteChannel[100];
for (int pi = 0; pi < 10; ++pi) {
for (int ci = 0; ci < 10; ++ci) {
Expand All @@ -173,6 +174,17 @@ public void testReuse10() throws IOException {
assertEquals(0, closed.size());
}

@Test
void testRewrapCachedChannelProvider() {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
try {
CachedChannelProvider.create(cachedChannelProvider, 100);
fail("Expected IllegalArgumentException on rewrapping CachedChannelProvider");
} catch (final IllegalArgumentException expected) {
}
}


private class TestChannelProvider implements SeekableChannelsProvider {

Expand Down
26 changes: 26 additions & 0 deletions Util/src/main/java/io/deephaven/util/thread/ThreadHelpers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.util.thread;

import io.deephaven.configuration.Configuration;

public class ThreadHelpers {
/**
* Get the number of threads to use for a given configuration key, defaulting to the number of available processors
* if the configuration key is set to a non-positive value, or the configuration key is not set and the provided
* default is non-positive.
*
* @param configKey The configuration key to look up
* @param defaultValue The default value to use if the configuration key is not set
* @return The number of threads to use
*/
public static int getOrComputeThreadCountProperty(final String configKey, final int defaultValue) {
final int numThreads = Configuration.getInstance().getIntegerWithDefault(configKey, defaultValue);
if (numThreads <= 0) {
return Runtime.getRuntime().availableProcessors();
} else {
return numThreads;
}
}
}
Loading

0 comments on commit 5e80889

Please sign in to comment.