diff --git a/build.xml b/build.xml index 4dc6e995cf..defee3d4b0 100644 --- a/build.xml +++ b/build.xml @@ -363,11 +363,10 @@ - + - diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index ec79f675ea..e344f916e7 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -270,7 +270,7 @@ public static Pair fromFilename(File directory, String name, nexttok = tokenStack.pop(); // generation OR format type SSTableFormat.Type fmt = SSTableFormat.Type.LEGACY; - if (!CharMatcher.DIGIT.matchesAllOf(nexttok)) + if (!CharMatcher.inRange('0', '9').matchesAllOf(nexttok)) { fmt = SSTableFormat.Type.validate(nexttok); nexttok = tokenStack.pop(); diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java index 43919469fd..8f254fcb93 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java @@ -60,7 +60,7 @@ private Type(String name, SSTableFormat info) { //Since format comes right after generation //we disallow formats with numeric names - assert !CharMatcher.DIGIT.matchesAllOf(name); + assert !CharMatcher.inRange('0', '9').matchesAllOf(name); this.name = name; this.info = info; diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java index 6daa2e132c..584e13b424 100644 --- a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java +++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java @@ -154,7 +154,7 @@ public static InetAddressAndPort getByNameOverrideDefaults(String name, Integer { port = hap.getPort(); } - return getByAddressOverrideDefaults(InetAddress.getByName(hap.getHostText()), port); + return getByAddressOverrideDefaults(InetAddress.getByName(hap.getHost()), port); } public static InetAddressAndPort getByAddress(byte[] address) throws UnknownHostException diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 6f89a86ae2..c04f79ad0b 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -87,7 +87,7 @@ public void run() } // When all snapshot complete, send validation requests ListenableFuture> allSnapshotTasks = Futures.allAsList(snapshotTasks); - validations = Futures.transform(allSnapshotTasks, new AsyncFunction, List>() + validations = Futures.transformAsync(allSnapshotTasks, new AsyncFunction, List>() { public ListenableFuture> apply(List endpoints) { @@ -105,7 +105,7 @@ public ListenableFuture> apply(List endpoints) } // When all validations complete, submit sync tasks - ListenableFuture> syncResults = Futures.transform(validations, new AsyncFunction, List>() + ListenableFuture> syncResults = Futures.transformAsync(validations, new AsyncFunction, List>() { public ListenableFuture> apply(List trees) { @@ -228,7 +228,7 @@ public void onSuccess(TreeResponse result) // failure is handled at root of job chain public void onFailure(Throwable t) {} - }); + }, taskExecutor); currentTask = nextTask; } // start running tasks @@ -285,7 +285,7 @@ public void onSuccess(TreeResponse result) // failure is handled at root of job chain public void onFailure(Throwable t) {} - }); + }, taskExecutor); currentTask = nextTask; } // start running tasks diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 7a9590becc..c68378a5eb 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -309,7 +309,7 @@ public void onFailure(Throwable t) totalProgress, message)); } - }); + }, executor); futures.add(session); } @@ -318,7 +318,7 @@ public void onFailure(Throwable t) final Collection> successfulRanges = new ArrayList<>(); final AtomicBoolean hasFailure = new AtomicBoolean(); final ListenableFuture> allSessions = Futures.successfulAsList(futures); - ListenableFuture anticompactionResult = Futures.transform(allSessions, new AsyncFunction, Object>() + ListenableFuture anticompactionResult = Futures.transformAsync(allSessions, new AsyncFunction, Object>() { @SuppressWarnings("unchecked") public ListenableFuture apply(List results) @@ -337,7 +337,7 @@ public ListenableFuture apply(List results) } return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges); } - }); + }, executor); Futures.addCallback(anticompactionResult, new FutureCallback() { public void onSuccess(Object result) @@ -385,7 +385,7 @@ private void repairComplete() } executor.shutdownNow(); } - }); + }, executor); } private void addRangeToNeighbors(List, ? extends Collection>>> neighborRangeList, Range range, Set neighbors) diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 1207d36316..1a4718c50f 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -296,7 +296,7 @@ public void onFailure(Throwable t) Tracing.traceRepair("Session completed with the following error: {}", t); forceShutdown(t); } - }); + }, taskExecutor); } public void terminate() diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 626aa918ff..b022ac7983 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -183,7 +183,7 @@ public void run() failureDetector.unregisterFailureDetectionEventListener(task); gossiper.unregister(task); } - }, MoreExecutors.sameThreadExecutor()); + }, MoreExecutors.directExecutor()); } public synchronized void terminateSessions() diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 44bdb6782f..2cf404aee1 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -41,6 +41,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.*; import com.google.common.util.concurrent.*; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.lang3.StringUtils; @@ -1726,7 +1727,7 @@ public void onFailure(Throwable e) progressSupport.progress("bootstrap", new ProgressEvent(ProgressEventType.ERROR, 1, 1, message)); progressSupport.progress("bootstrap", new ProgressEvent(ProgressEventType.COMPLETE, 1, 1, "Resume bootstrap complete")); } - }); + }, MoreExecutors.directExecutor()); return true; } else @@ -2847,7 +2848,7 @@ public void onFailure(Throwable t) // We still want to send the notification sendReplicationNotification(notifyEndpoint); } - }); + }, MoreExecutors.directExecutor()); } // needs to be modified to accept either a keyspace or ARS. diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 481e93d48c..d90d844a1e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,7 +141,7 @@ private void attachConnection(InetAddress from, int sessionIndex, IncomingStream public void addEventListener(StreamEventHandler listener) { - Futures.addCallback(this, listener); + Futures.addCallback(this, listener, MoreExecutors.directExecutor()); eventListeners.add(listener); } diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java index 077a9d165b..7440c3002f 100644 --- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -125,7 +126,7 @@ public void onSuccess(@Nullable Boolean aBoolean) { HintsService.instance.resumeDispatch(); } - }); + }, MoreExecutors.directExecutor()); Futures.allAsList( noMessagesWhilePaused, diff --git a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java index 2219c5ac76..3b98824aec 100644 --- a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java +++ b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java @@ -54,7 +54,7 @@ public class MockMessagingSpy */ public ListenableFuture> captureMockedMessageIn() { - return Futures.transform(captureMockedMessageInN(1), (List> result) -> result.isEmpty() ? null : result.get(0)); + return Futures.transform(captureMockedMessageInN(1), (List> result) -> result.isEmpty() ? null : result.get(0), executor); } /** @@ -90,7 +90,7 @@ public ListenableFuture expectMockedMessageIn(int noOfMessages) */ public ListenableFuture> captureMessageOut() { - return Futures.transform(captureMessageOut(1), (List> result) -> result.isEmpty() ? null : result.get(0)); + return Futures.transform(captureMessageOut(1), (List> result) -> result.isEmpty() ? null : result.get(0), executor); } /** diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java index 979ecc5812..25a022b014 100644 --- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@ -237,7 +237,7 @@ public void testNoTreesRetainedAfterDifference() throws Throwable // SyncTasks themselves should not contain significant memory assertTrue(ObjectSizes.measureDeep(syncTasks) < 0.8 * singleTreeSize); - ListenableFuture> syncResults = Futures.transform(Futures.immediateFuture(mockTreeResponses), new AsyncFunction, List>() + ListenableFuture> syncResults = Futures.transformAsync(Futures.immediateFuture(mockTreeResponses), new AsyncFunction, List>() { public ListenableFuture> apply(List treeResponses) { diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 8f3061a042..4752e74550 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -125,7 +126,7 @@ public void onFailure(Throwable t) { fail(); } - }); + }, MoreExecutors.directExecutor()); // should be complete immediately futureResult.get(100, TimeUnit.MILLISECONDS); }