Skip to content

Commit

Permalink
Adding more logging around the rejected task executions at the Schedu…
Browse files Browse the repository at this point in the history
…ler and RxJava layer (#559)

* Add diagnostic events for logging visibility

* Refactor logging diagnostics into main Scheduler loop

* Refactor log timing and level and change privacies

* Revert ExecutorStateEvent to accept ExecutorService input type

* Minor style and messaging fixes

* Fix failing unit test

* Refactor diagnostic events to use factory for testing

* Fix constructor overloading for testing

* Refactor DiagnosticEventHandler to no args constructor
  • Loading branch information
micah-jaffe authored and sahilpalvia committed Jun 28, 2019
1 parent 6159b86 commit fa72cf1
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.coordinator;

/**
* An interface to implement various types of stateful events that can be used for diagnostics throughout the KCL.
*/
interface DiagnosticEvent {
/**
* DiagnosticEvent is part of a visitor pattern and it accepts DiagnosticEventHandler visitors.
*
* @param visitor A handler that that controls the behavior of the DiagnosticEvent when invoked.
*/
void accept(DiagnosticEventHandler visitor);

/**
* The string to output to logs when a DiagnosticEvent occurs.
*/
String message();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.coordinator;

import lombok.NoArgsConstructor;
import software.amazon.kinesis.leases.LeaseCoordinator;

import java.util.concurrent.ExecutorService;

/**
* Creates {@link DiagnosticEvent}s for logging and visibility
*/
@NoArgsConstructor
class DiagnosticEventFactory {
ExecutorStateEvent executorStateEvent(ExecutorService executorService, LeaseCoordinator leaseCoordinator) {
return new ExecutorStateEvent(executorService, leaseCoordinator);
}

RejectedTaskEvent rejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable t) {
return new RejectedTaskEvent(executorStateEvent, t);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.coordinator;

/**
* An interface to implement behaviors associated with a {@link DiagnosticEvent}. Uses the visitor pattern to visit
* the DiagnosticEvent when the behavior is desired. A default implementation that performs simple logging is found in
* {@link DiagnosticEventLogger}.
*/
interface DiagnosticEventHandler {
/**
* @param event Log or otherwise react to periodic pulses on the thread pool executor state.
*/
void visit(ExecutorStateEvent event);

/**
* @param event Log or otherwise react to rejected tasks in the RxJavaPlugin layer.
*/
void visit(RejectedTaskEvent event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.coordinator;

import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;

/**
* Internal implementation of {@link DiagnosticEventHandler} used by {@link Scheduler} to log executor state both
* 1) in normal conditions periodically, and 2) in reaction to rejected task exceptions.
*/
@NoArgsConstructor
@Slf4j
@KinesisClientInternalApi
class DiagnosticEventLogger implements DiagnosticEventHandler {
private static final long EXECUTOR_LOG_INTERVAL_MILLIS = 30000L;
private long nextExecutorLogTime = System.currentTimeMillis() + EXECUTOR_LOG_INTERVAL_MILLIS;

/**
* {@inheritDoc}
*
* Only log at info level every 30s to avoid over-logging, else log at debug level
*/
@Override
public void visit(ExecutorStateEvent event) {
if (System.currentTimeMillis() >= nextExecutorLogTime) {
log.info(event.message());
nextExecutorLogTime = System.currentTimeMillis() + EXECUTOR_LOG_INTERVAL_MILLIS;
} else {
log.debug(event.message());
}
}

/**
* {@inheritDoc}
*/
@Override
public void visit(RejectedTaskEvent event) {
log.error(event.message(), event.getThrowable());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.coordinator;

import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.LeaseCoordinator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

@Getter
@ToString(exclude = "isThreadPoolExecutor")
@Slf4j
@KinesisClientInternalApi
class ExecutorStateEvent implements DiagnosticEvent {
private static final String MESSAGE = "Current thread pool executor state: ";

private boolean isThreadPoolExecutor;
private String executorName;
private int currentQueueSize;
private int activeThreads;
private int coreThreads;
private int leasesOwned;
private int largestPoolSize;
private int maximumPoolSize;

ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) {
if (executor instanceof ThreadPoolExecutor) {
this.isThreadPoolExecutor = true;

ThreadPoolExecutor ex = (ThreadPoolExecutor) executor;
this.executorName = ex.getClass().getSimpleName();
this.currentQueueSize = ex.getQueue().size();
this.activeThreads = ex.getActiveCount();
this.coreThreads = ex.getCorePoolSize();
this.largestPoolSize = ex.getLargestPoolSize();
this.maximumPoolSize = ex.getMaximumPoolSize();
}

this.leasesOwned = leaseCoordinator.getAssignments().size();
}

@Override
public void accept(DiagnosticEventHandler visitor) {
// logging is only meaningful for a ThreadPoolExecutor executor service (default config)
if (isThreadPoolExecutor) {
visitor.visit(this);
}
}

@Override
public String message() {
return MESSAGE + this.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.kinesis.coordinator;

import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;

@Getter
@ToString
@Slf4j
@KinesisClientInternalApi
class RejectedTaskEvent implements DiagnosticEvent {
private static final String MESSAGE = "Review your thread configuration to prevent task rejections. " +
"Until next release, KCL will not be resilient to task rejections. ";

private ExecutorStateEvent executorStateEvent;
private Throwable throwable;

RejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable throwable) {
this.executorStateEvent = executorStateEvent;
this.throwable = throwable;
}

@Override
public void accept(DiagnosticEventHandler visitor) {
visitor.visit(this);
}

@Override
public String message() {
return MESSAGE + executorStateEvent.message();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.google.common.annotations.VisibleForTesting;

import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -95,6 +96,8 @@ public class Scheduler implements Runnable {
// parent shards
private final long parentShardPollIntervalMillis;
private final ExecutorService executorService;
private final DiagnosticEventFactory diagnosticEventFactory;
private final DiagnosticEventHandler diagnosticEventHandler;
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final LeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager shardSyncTaskManager;
Expand Down Expand Up @@ -140,6 +143,23 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig,
@NonNull final MetricsConfig metricsConfig,
@NonNull final ProcessorConfig processorConfig,
@NonNull final RetrievalConfig retrievalConfig) {
this(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig,
processorConfig, retrievalConfig, new DiagnosticEventFactory());
}

/**
* Customers do not currently have the ability to customize the DiagnosticEventFactory, but this visibility
* is desired for testing. This constructor is only used for testing to provide a mock DiagnosticEventFactory.
*/
@VisibleForTesting
protected Scheduler(@NonNull final CheckpointConfig checkpointConfig,
@NonNull final CoordinatorConfig coordinatorConfig,
@NonNull final LeaseManagementConfig leaseManagementConfig,
@NonNull final LifecycleConfig lifecycleConfig,
@NonNull final MetricsConfig metricsConfig,
@NonNull final ProcessorConfig processorConfig,
@NonNull final RetrievalConfig retrievalConfig,
@NonNull final DiagnosticEventFactory diagnosticEventFactory) {
this.checkpointConfig = checkpointConfig;
this.coordinatorConfig = coordinatorConfig;
this.leaseManagementConfig = leaseManagementConfig;
Expand Down Expand Up @@ -167,6 +187,8 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig,
this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis();
this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.diagnosticEventFactory = diagnosticEventFactory;
this.diagnosticEventHandler = new DiagnosticEventLogger();

this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory()
.createShardSyncTaskManager(this.metricsFactory);
Expand Down Expand Up @@ -212,7 +234,7 @@ public void run() {
try {
initialize();
log.info("Initialization complete. Starting worker loop.");
} catch (RuntimeException e) {
} catch (RuntimeException e) {
log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e);
workerStateChangeListener.onAllInitializationAttemptsFailed(e);
shutdown();
Expand All @@ -226,8 +248,10 @@ public void run() {
log.info("Worker loop is complete. Exiting from worker.");
}

private void initialize() {
@VisibleForTesting
void initialize() {
synchronized (lock) {
registerErrorHandlerForUndeliverableAsyncTaskExceptions();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false;
Exception lastException = null;
Expand Down Expand Up @@ -305,6 +329,7 @@ void runProcessLoop() {
// clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShards);

logExecutorState();
slog.info("Sleeping ...");
Thread.sleep(shardConsumerDispatchPollIntervalMillis);
} catch (Exception e) {
Expand Down Expand Up @@ -612,6 +637,25 @@ void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
}
}

/**
* Exceptions in the RxJava layer can fail silently unless an error handler is set to propagate these exceptions
* back to the KCL, as is done below.
*/
private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() {
RxJavaPlugins.setErrorHandler(t -> {
ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(executorService,
leaseCoordinator);
RejectedTaskEvent rejectedTaskEvent = diagnosticEventFactory.rejectedTaskEvent(executorStateEvent, t);
rejectedTaskEvent.accept(diagnosticEventHandler);
});
}

private void logExecutorState() {
ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(executorService,
leaseCoordinator);
executorStateEvent.accept(diagnosticEventHandler);
}

/**
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on
Expand Down
Loading

0 comments on commit fa72cf1

Please sign in to comment.