Skip to content

Commit

Permalink
Merge pull request #145 from kmwtechnology/LC-469
Browse files Browse the repository at this point in the history
  • Loading branch information
spencersolomon6 authored Aug 30, 2024
2 parents 4fc8e65 + 70e0bb0 commit fe026a7
Show file tree
Hide file tree
Showing 56 changed files with 941 additions and 83 deletions.
2 changes: 1 addition & 1 deletion lucille-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.2.3</version>
<version>4.2.18</version>
</dependency>

<dependency>
Expand Down
71 changes: 43 additions & 28 deletions lucille-core/src/main/java/com/kmwllc/lucille/core/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,35 +122,10 @@ public static void main(String[] args) throws Exception {
return;
}

StopWatch stopWatch = new StopWatch();
stopWatch.start();
RunResult result;

try {

RunType runType;
if (cli.hasOption("usekafka")) {
if (cli.hasOption("local")) {
runType = RunType.KAFKA_LOCAL;
} else {
runType = RunType.KAFKA_DISTRIBUTED;
}
} else {
runType = RunType.LOCAL;
}

result = run(config, runType);

// log detailed metrics
Slf4jReporter.forRegistry(SharedMetricRegistries.getOrCreate(LogUtils.METRICS_REG))
.outputTo(log).withLoggingLevel(getMetricsLoggingLevel(config)).build().report();
RunType runType = getRunType(cli.hasOption("useKafka"), cli.hasOption("local"));

// log run summary
log.info(result.toString());
} finally {
stopWatch.stop();
log.info(String.format("Run took %.2f secs.", (double) stopWatch.getTime(TimeUnit.MILLISECONDS) / 1000));
}
// Kick off the run with a log of the result
RunResult result = runWithResultLog(config, runType);

if (result.getStatus()) {
System.exit(0);
Expand Down Expand Up @@ -243,6 +218,46 @@ public static void renderConfig(Config config) {
log.info(config.root().render(renderOptions));
}

/**
* Derives the RunType for the new run from the 'useKafka' and 'local' parameters.
*/
static RunType getRunType(boolean useKafka, boolean local) {
if (useKafka) {
if (local) {
return RunType.KAFKA_LOCAL;
} else {
return RunType.KAFKA_DISTRIBUTED;
}
} else {
return RunType.LOCAL;
}
}

/**
* Kicks off a new Lucille run and logs information about the run to the console after completion.
*/
public static RunResult runWithResultLog(Config config, RunType runType) throws Exception {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
RunResult result;

try {
result = run(config, runType);

// log detailed metrics
Slf4jReporter.forRegistry(SharedMetricRegistries.getOrCreate(LogUtils.METRICS_REG))
.outputTo(log).withLoggingLevel(getMetricsLoggingLevel(config)).build().report();

// log run summary
log.info(result.toString());

return result;
} finally {
stopWatch.stop();
log.info(String.format("Run took %.2f secs.", (double) stopWatch.getTime(TimeUnit.MILLISECONDS) / 1000));
}
}

/**
* Generates a run ID and performs an end-to-end run of the designated type.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.kmwllc.lucille.core;

import com.kmwllc.lucille.core.Runner;
import com.kmwllc.lucille.core.Runner.RunType;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Public API for starting lucille runs and viewing their status. Will be used by external resources, namely the Admin API to kick
* off lucille runs.
*/
public class RunnerManager {

private static final Logger log = LoggerFactory.getLogger(RunnerManager.class);

// Use the eager-initialization singleton pattern
private static volatile RunnerManager instance = new RunnerManager();

private RunnerManager() {}

public static RunnerManager getInstance() {
return instance;
}

// Check whether the CompleteableFuture exists and is not done
synchronized public boolean isRunning() {
return !future.isDone();
}

/**
* Blocks the calling thread until the current lucille run is completed. This method is primarily intended to be used for testing,
* but has been made public to facilitate testing in other modules, namely lucille-plugins/lucille-api.
*/
public void waitForRunCompletion() throws ExecutionException, InterruptedException {
future.get();
}

private CompletableFuture<Void> future = CompletableFuture.completedFuture(null);

/**
* Main entrypoint for kicking off lucille runs. This method spawns a new thread for lucille to run in, but will not start a new
* instance of lucille until the previous one terminates.
*
* @return boolean representing whether the lucille run was initiated or not. This will return false if and only if the lucille
* run was skipped due to the previous run still existing.
*/
synchronized public boolean run() {
return runWithConfig(ConfigFactory.load());
}

/**
* Internal abstraction used to support testing.
*/
synchronized protected boolean runWithConfig(Config config) {
if (isRunning()) {
log.warn("Skipping new run; previous lucille run is still in progress.");
return false;
}

future = CompletableFuture.runAsync(() -> {
try {
log.info("Starting lucille run via the Runner Manager.");
log.info(config.entrySet().toString());

// For now we will always use local mode without kafka
RunType runType = Runner.getRunType(false, true);

Runner.runWithResultLog(config, runType);
} catch (Exception e) {
log.error("Failed to run lucille via the Runner Manager.", e);
}
});

return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.kmwllc.lucille.indexer;

import com.kmwllc.lucille.core.Document;
import com.kmwllc.lucille.core.Indexer;
import com.kmwllc.lucille.message.IndexerMessenger;
import com.typesafe.config.Config;
import java.util.List;

/**
* The NopIndexer performs no operations and does not send documents to any index. It is intended to be used for testing or
* validating that a pipeline is properly configured without ingesting content.
*/
public class NopIndexer extends Indexer {

public NopIndexer(Config config, IndexerMessenger messenger, boolean bypass, String metricsPrefix) {
super(config, messenger, metricsPrefix);
}

@Override
public boolean validateConnection() {
return true;
}

@Override
protected void sendToIndex(List<Document> documents) throws Exception {
// no-op
}

@Override
public void closeConnection() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.kmwllc.lucille.connector;

import com.kmwllc.lucille.core.ConnectorException;
import com.kmwllc.lucille.core.Publisher;
import com.typesafe.config.Config;

public class SleepConnector extends AbstractConnector {

private final int duration;

public SleepConnector(Config config) {
super(config);
this.duration = config.getInt("duration");
}

@Override
public void execute(Publisher publisher) throws ConnectorException {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
throw new ConnectorException("Sleep was interrupted", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.kmwllc.lucille.core;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Test;
import scala.Int;

public class RunnerManagerTest {

@Test
public void testRunnerManagerFull() throws InterruptedException, ExecutionException {
RunnerManager runnerManager = RunnerManager.getInstance();
Config config = ConfigFactory.load("RunnerManagerTest/sleep.conf");

// Ensure no lucille run is running at the start of the test
assertFalse(runnerManager.isRunning());

// Kick off a lucille run and ensure it is not skipped
assertTrue(runnerManager.runWithConfig(config));

// While we lucille is running, ensure lucille isRunning and a new run is skipped
assertTrue(runnerManager.isRunning());
assertFalse(runnerManager.runWithConfig(config));

// Wait until the run is over
runnerManager.waitForRunCompletion();

// Ensure lucille is not running and make sure we can now kick off a new run
assertFalse(runnerManager.isRunning());
assertTrue(runnerManager.runWithConfig(config));

// Wait for all lucille threads to finish before exiting
runnerManager.waitForRunCompletion();
}

@Test
public void testWaitForRunCompletion() throws InterruptedException, ExecutionException {
RunnerManager runnerManager = RunnerManager.getInstance();
Config config = ConfigFactory.load("RunnerManagerTest/sleep.conf");

// Ensure lucille is not running first
assertFalse(runnerManager.isRunning());

runnerManager.runWithConfig(config);

// Ensure lucille is running, wait for it stop and ensure its stopped
assertTrue(runnerManager.isRunning());
runnerManager.waitForRunCompletion();
assertFalse(runnerManager.isRunning());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ public class ConfigValidationTest {

@Test
public void testConditions() {
testException(NoopStage.class, "conditions-field-missing.conf");
testException(NoopStage.class, "conditions-field-renamed.conf");
testException(NopStage.class, "conditions-field-missing.conf");
testException(NopStage.class, "conditions-field-renamed.conf");

testException(NoopStage.class, "conditions-optional-unknown.conf");
testException(NoopStage.class, "conditions-optional-renamed.conf");
testException(NopStage.class, "conditions-optional-unknown.conf");
testException(NopStage.class, "conditions-optional-renamed.conf");

testException(NoopStage.class, "conditions-optional-wrong.conf");
testException(NopStage.class, "conditions-optional-wrong.conf");
}

@Test
Expand Down Expand Up @@ -69,7 +69,7 @@ public void testDuplicatePipeline() throws Exception {
assertEquals(2, exceptions1.size());

testException(exceptions1.get(0), IllegalArgumentException.class,
"com.kmwllc.lucille.stage.NoopStage: Stage config contains unknown property invalid_property");
"com.kmwllc.lucille.stage.NopStage: Stage config contains unknown property invalid_property");
testException(exceptions1.get(1), IllegalArgumentException.class, "Stage config must contain property fields");
}

Expand All @@ -83,7 +83,7 @@ public void testUnusedPipeline() throws Exception {
assertEquals(2, exceptions1.size());

testException(exceptions1.get(0), IllegalArgumentException.class,
"com.kmwllc.lucille.stage.NoopStage: Stage config contains unknown property invalid_property");
"com.kmwllc.lucille.stage.NopStage: Stage config contains unknown property invalid_property");

testException(exceptions1.get(1), IllegalArgumentException.class, "Stage config must contain property fields");
}
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testValidationModeException() throws Exception {
List<Exception> exceptions2 = exceptions.get("pipeline2");
assertEquals(2, exceptions2.size());

testException(exceptions1.get(0), IllegalArgumentException.class, "com.kmwllc.lucille.stage.NoopStage: " +
testException(exceptions1.get(0), IllegalArgumentException.class, "com.kmwllc.lucille.stage.NopStage: " +
"Stage config contains unknown property invalid_property");

// TODO note that for the following two exceptions, the fields are retrieved before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import com.typesafe.config.Config;

import java.util.Iterator;
import java.util.List;

public class NoopStage extends Stage {
public class NopStage extends Stage {

public NoopStage(Config config) {
public NopStage(Config config) {
super(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ pipelines: [
name: "pipeline1",
stages: [
{
class: "com.kmwllc.lucille.stage.NoopStage",
class: "com.kmwllc.lucille.stage.NopStage",
invalid_property: true
},
{
class: "com.kmwllc.lucille.stage.NoopStage",
class: "com.kmwllc.lucille.stage.NopStage",
conditions = [
{
values = ["MA", "1234", "US", "Russia", "4567"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ pipelines: [
name: "pipeline1",
stages: [
{
class: "com.kmwllc.lucille.stage.NoopStage",
class: "com.kmwllc.lucille.stage.NopStage",
invalid_property: true
},
{
class: "com.kmwllc.lucille.stage.NoopStage",
class: "com.kmwllc.lucille.stage.NopStage",
conditions = [
{
values = ["MA", "1234", "US", "Russia", "4567"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pipelines: [
name: "pipeline1",
stages: [
{
class: "com.kmwllc.lucille.stage.NoopStage",
class: "com.kmwllc.lucille.stage.NopStage",
invalid_property: true
}
]
Expand Down
Loading

0 comments on commit fe026a7

Please sign in to comment.