diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6789e7a --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +*~ +target/ +*dependency-reduced-pom.xml +.idea +*.iml +*.iws +.DS_Store +*.project +*.classpath +*.settings +*.metadata +*hbase-site.xml +*.log +*.swp +*.tmp +*.bak +*.class + +tmp/** +tmp/**/* +temp/** +temp/**/* + +repodata/ diff --git a/demo_loader/README.md b/demo_loader/README.md new file mode 100644 index 0000000..e69de29 diff --git a/demo_loader/pom.xml b/demo_loader/pom.xml new file mode 100644 index 0000000..addae33 --- /dev/null +++ b/demo_loader/pom.xml @@ -0,0 +1,81 @@ + + + + + 4.0.0 + com.hortonworks.metron + demo-loader + 0.4.0 + Demo CSV Loader + + + org.apache.metron + metron-data-management + 0.4.0 + + + + + + maven-assembly-plugin + + src/main/assembly/assembly.xml + + + + make-assembly + package + + single + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.5.1 + + javac-with-errorprone + true + 1.8 + -Xlint:unchecked + 1.8 + true + + + + org.codehaus.plexus + plexus-compiler-javac-errorprone + 2.8 + + + + com.google.errorprone + error_prone_core + 2.0.14 + + + + + + + src/main/resources + + + + + diff --git a/demo_loader/src/main/assembly/assembly.xml b/demo_loader/src/main/assembly/assembly.xml new file mode 100644 index 0000000..0b36f9d --- /dev/null +++ b/demo_loader/src/main/assembly/assembly.xml @@ -0,0 +1,42 @@ + + + + archive + + tar.gz + + false + + + ${project.basedir}/src/main/scripts + /bin + true + + **/*.formatted + **/*.filtered + + 0755 + unix + true + + + ${project.basedir}/target + + ${project.artifactId}-${project.version}.jar + + /lib + true + + + diff --git a/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/DegreeAnalyzer.java b/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/DegreeAnalyzer.java new file mode 100644 index 0000000..cbdaa21 --- /dev/null +++ b/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/DegreeAnalyzer.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hortonworks.metron.loader.csv; + + +import org.apache.metron.guava.base.Splitter; +import org.apache.metron.guava.collect.Iterables; + +import java.io.*; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +public class DegreeAnalyzer { + + public static void processRecords(File f, Function, Void> op) throws IOException { + BufferedReader reader = Files.newBufferedReader(f.toPath(), Charset.defaultCharset()); + for(String line = null;(line = reader.readLine()) != null;) { + Iterable tokens = Splitter.on(",").split(line); + String left = Iterables.getFirst(tokens, "").trim(); + String right = Iterables.getLast(tokens, "").trim(); + if(left.length() > 0 && right.length() > 0) { + Map.Entry lr = new AbstractMap.SimpleEntry(left.trim(), right.trim()); + op.apply(lr); + } + } + } + + public static void main(String... argv) throws IOException { + final ArrayList hosts = new ArrayList<>(); + final Map hostToIndex = new HashMap<>(); + File inputFile = new File(argv[0]); + List importantHosts = new ArrayList<>(); + for(int i = 1;i < argv.length;++i) { + importantHosts.add(argv[i].trim()); + } + + Progress progress = new Progress(System.err); + AtomicInteger numRecs = new AtomicInteger(0); + processRecords(inputFile + , entry -> { + if(!hostToIndex.containsKey(entry.getKey())) { + int idx = hosts.size(); + hosts.add(entry.getKey()); + hostToIndex.put(entry.getKey(), idx); + } + if(!hostToIndex.containsKey(entry.getValue())) { + int idx = hosts.size(); + hosts.add(entry.getValue()); + hostToIndex.put(entry.getValue(), idx); + } + numRecs.incrementAndGet(); + progress.update(); + return null; + } + ); + progress.reset(); + int tenPercent = numRecs.get()/10; + System.err.println("Preprocessed " + numRecs.get() + " records."); + final boolean[][] adjacencyMatrix = new boolean[hosts.size()][hosts.size()]; + for(int i = 0;i < hosts.size();++i) { + for(int j = 0;j < hosts.size();++j) { + adjacencyMatrix[i][j] = i == j; + } + } + final AtomicInteger proc = new AtomicInteger(0); + processRecords( inputFile + , entry -> { + int row = hostToIndex.get(entry.getKey()); + int col = hostToIndex.get(entry.getValue()); + adjacencyMatrix[row][col] = true; + int numProcessed = proc.incrementAndGet(); + progress.update(); + if(tenPercent > 0 && numProcessed % tenPercent == 0) { + System.err.println(" -- " + (numProcessed / tenPercent) + " %"); + } + return null; + } + ); + System.err.println("\nSquaring adjacency matrix (" + adjacencyMatrix.length + "x" + adjacencyMatrix.length + ") to make transitive links..."); + List rowIds = new ArrayList<>(); + for(String importantHost : importantHosts) { + Integer idx = hostToIndex.get(importantHost); + if(idx != null) { + rowIds.add(idx); + } + } + boolean[][] transitiveMatrix = squareMatrix(adjacencyMatrix, rowIds); + Set ret = new HashSet<>(); + for(Integer idx : rowIds) { + boolean[] connectedHosts = transitiveMatrix[idx]; + for(int i = 0;i < connectedHosts.length;++i) { + if(connectedHosts[i]) { + ret.add(hosts.get(i)); + } + } + } + for(String s : ret) { + System.out.println(s); + } + } + + private static boolean innerProduct(boolean[][] adjacencyMatrix, int rowId, int colId) { + boolean ret = false; + for(int i = 0;i < adjacencyMatrix.length;++i) { + boolean lhs = adjacencyMatrix[rowId][i]; + boolean rhs = adjacencyMatrix[i][colId]; + ret = ret | (rhs && lhs); + } + return ret; + } + + private static boolean[][] squareMatrix(boolean[][] adjacencyMatrix, List rowIds) { + boolean[][] ret = new boolean[adjacencyMatrix.length][adjacencyMatrix.length]; + for(Integer i : rowIds) { + for(int j = 0;j < adjacencyMatrix.length;++j) { + ret[i][j] = innerProduct(adjacencyMatrix, i, j); + } + } + return ret; + } + +} diff --git a/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/DemoConfig.java b/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/DemoConfig.java new file mode 100644 index 0000000..4a9e02b --- /dev/null +++ b/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/DemoConfig.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hortonworks.metron.loader.csv; + +import org.apache.metron.common.csv.CSVConverter; + +import java.util.List; +import java.util.Map; + +public class DemoConfig { + + public static class DataSource { + public String outputTopic; + public String inputFile; + public String filter; + public CSVConverter converter; + + public String getOutputTopic() { + return outputTopic; + } + + public void setOutputTopic(String outputTopic) { + this.outputTopic = outputTopic; + } + + public String getInputFile() { + return inputFile; + } + + public void setInputFile(String inputFile) { + this.inputFile = inputFile; + } + + public CSVConverter getConverter() { + return converter; + } + + public void setConfig(Map config) { + converter = new CSVConverter(); + converter.initialize(config); + } + + public String getFilter() { + return filter; + } + + public void setFilter(String filter) { + this.filter = filter; + } + } + + public int stepTimeMs; + public Long timeOffset; + public List sources; + + public int getStepTimeMs() { + return stepTimeMs; + } + + public void setStepTimeMs(int stepTimeMs) { + this.stepTimeMs = stepTimeMs; + } + + public Long getTimeOffset() { + return timeOffset; + } + + public void setTimeOffset(Long timeOffset) { + this.timeOffset = timeOffset; + } + + public List getSources() { + return sources; + } + + public void setSources(List sources) { + this.sources = sources; + } +} diff --git a/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/DemoLoader.java b/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/DemoLoader.java new file mode 100644 index 0000000..03bde3c --- /dev/null +++ b/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/DemoLoader.java @@ -0,0 +1,417 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hortonworks.metron.loader.csv; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.commons.cli.*; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.metron.common.dsl.Context; +import org.apache.metron.common.dsl.MapVariableResolver; +import org.apache.metron.common.dsl.StellarFunctions; +import org.apache.metron.common.dsl.VariableResolver; +import org.apache.metron.common.stellar.StellarPredicateProcessor; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.utils.KafkaUtils; +import org.apache.metron.common.utils.cli.OptionHandler; +import org.apache.metron.dataloads.nonbulk.flatfile.importer.LocalImporter; + +import javax.annotation.Nullable; +import java.io.*; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.zip.GZIPInputStream; + +public class DemoLoader { + public enum LoadOptions { + HELP("h", new OptionHandler() { + + @Nullable + @Override + public Option apply(@Nullable String s) { + return new Option(s, "help", false, "Generate Help screen"); + } + }), + CONFIG("c", new OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "config", true, "The demo config."); + o.setArgName("CONFIG_FILE"); + o.setRequired(true); + return o; + } + + @Override + public Optional getValue(LoadOptions option, CommandLine cli) { + return Optional.ofNullable(option.get(cli).trim()); + } + }), + ZK_QUORUM("z", new OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "zk_quorum", true, "The zookeeper quorum."); + o.setArgName("QUORUM"); + o.setRequired(false); + return o; + } + + @Override + public Optional getValue(LoadOptions option, CommandLine cli) { + return Optional.ofNullable(option.get(cli).trim()); + } + }), + HOSTNAME_FILTER("hf", new OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "host_filter", true, "The Hostname filter."); + o.setArgName("FILTER"); + o.setRequired(false); + return o; + } + + @Override + public Optional getValue(LoadOptions option, CommandLine cli) { + return Optional.ofNullable(option.get(cli).trim()); + } + }), + KAFKA_PRODUCER_CONFIGS("kp", new OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "producer_config", true, "The kafka producer configs."); + o.setArgName("CONFIG_FILE"); + o.setRequired(false); + return o; + } + + @Override + public Optional getValue(LoadOptions option, CommandLine cli) { + return Optional.ofNullable(option.get(cli).trim()); + } + }), + TIME_START("s", new OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "start_time", true, "Time to start loading the data."); + o.setArgName("TS"); + o.setRequired(false); + return o; + } + + @Override + public Optional getValue(LoadOptions option, CommandLine cli) { + return Optional.ofNullable(option.get(cli).trim()); + } + }), + TIME_END("e", new OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "end_time", true, "Time to end loading the data."); + o.setArgName("TS"); + o.setRequired(false); + return o; + } + + @Override + public Optional getValue(LoadOptions option, CommandLine cli) { + return Optional.ofNullable(option.get(cli).trim()); + } + }); + Option option; + String shortCode; + OptionHandler handler; + LoadOptions(String shortCode, OptionHandler optionHandler) { + this.shortCode = shortCode; + this.handler = optionHandler; + this.option = optionHandler.apply(shortCode); + } + + public boolean has(CommandLine cli) { + return cli.hasOption(shortCode); + } + + public String get(CommandLine cli) { + return cli.getOptionValue(shortCode); + } + + public static Options getOptions() { + Options ret = new Options(); + for(LoadOptions o : LoadOptions.values()) { + ret.addOption(o.option); + } + return ret; + } + + public static void printHelp() { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp( "DemoLoader", getOptions()); + } + + public static CommandLine parse(CommandLineParser parser, String[] args) { + try { + CommandLine cli = parser.parse(getOptions(), args); + if(HELP.has(cli)) { + printHelp(); + System.exit(0); + } + return cli; + } catch (ParseException e) { + System.err.println("Unable to parse args: " + + String.join(" ", args) + ); + e.printStackTrace(System.err); + printHelp(); + System.exit(-1); + return null; + } + } + } + + private static KafkaProducer createProducer(List brokers, Map config) { + Map producerConfig = new HashMap<>(); + producerConfig.put("bootstrap.servers", brokers.get(0)); + producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerConfig.put("request.required.acks", 1); + producerConfig.putAll(config); + return new KafkaProducer(producerConfig); + } + + public static void main(String... argv) throws Exception { + CommandLine cli = LoadOptions.parse(new PosixParser(), argv); + DemoConfig config = JSONUtils.INSTANCE.load(new File(LoadOptions.CONFIG.get(cli)), DemoConfig.class); + int stepTimeMs = config.getStepTimeMs(); + Long timeOffset = Optional.ofNullable(config.getTimeOffset()).orElse(System.currentTimeMillis()); + + int start = 1; + if(LoadOptions.TIME_START.has(cli)) { + start = Integer.parseInt(LoadOptions.TIME_START.get(cli)); + } + + int end = -1; + if(LoadOptions.TIME_END.has(cli)) { + end = Integer.parseInt(LoadOptions.TIME_END.get(cli)); + } + + KafkaProducer producer = null; + if(LoadOptions.ZK_QUORUM.has(cli)) { + Map kafkaConfigs = new HashMap<>(); + if (LoadOptions.KAFKA_PRODUCER_CONFIGS.has(cli)) { + kafkaConfigs = JSONUtils.INSTANCE.load(new File(LoadOptions.KAFKA_PRODUCER_CONFIGS.get(cli)), new TypeReference>() { + }); + } + List brokers = KafkaUtils.INSTANCE.getBrokersFromZookeeper(LoadOptions.ZK_QUORUM.get(cli)); + producer = createProducer(brokers, kafkaConfigs); + } + Set hostnameFilter = null; + if(LoadOptions.HOSTNAME_FILTER.has(cli)) { + try(BufferedReader br = Files.newBufferedReader(new File(LoadOptions.HOSTNAME_FILTER.get(cli)).toPath(), Charset.defaultCharset())) { + + hostnameFilter = new HashSet<>(); + for(String line = null;(line = br.readLine()) != null;) { + if(line.trim().length() > 0) { + hostnameFilter.add(line.trim()); + } + } + } + } + System.err.println("Loading the following sources:"); + for(DemoConfig.DataSource ds : config.getSources()) { + System.err.println("\t" + toFileName(ds.getInputFile()) + " => " + ds.getOutputTopic()); + } + System.err.println("Time step set to " + stepTimeMs + "ms."); + System.err.println("Time offset set to " + timeOffset + " ms since Unix epoch."); + System.err.println("Going from offset " + start + " to " + end); + step(start, end, config, producer, timeOffset, stepTimeMs, hostnameFilter); + } + + public static String toFileName(String path) throws IOException { + File f = new File(path); + String fileName = f.getName(); + if(Files.isSymbolicLink(f.toPath())) { + Path p = Files.readSymbolicLink(f.toPath()); + fileName = p.getFileName().toFile().getName(); + } + return fileName; + } + + public static class SourceState { + DemoConfig.DataSource source; + Map buffer; + BufferedReader reader; + public boolean allDone = false; + public SourceState(DemoConfig.DataSource source, BufferedReader reader) { + this.source = source; + this.reader = reader; + } + + public DemoConfig.DataSource getSource() { + return source; + } + + public BufferedReader getReader() { + return reader; + } + + private void updateBuffer() throws IOException { + String line = reader.readLine(); + if(line != null) { + //System.out.println("Read " + line + " from " + getSource().getInputFile()); + Map v = source.getConverter().toMap(line); + buffer = new HashMap<>(); + buffer.putAll(v); + } + else { + //System.out.println("Finished reading " + getSource().getInputFile()); + allDone = true; + buffer = null; + try { + reader.close(); + } + catch(Exception ex){ + throw new IllegalStateException("Unable to close reader.", ex); + } + } + } + + private Integer getTimestamp() { + if(buffer == null) { + return null; + } + Object tsObj = buffer == null?null:buffer.get("timestamp"); + if(tsObj != null) { + return Integer.parseInt("" + tsObj); + } + else { + throw new IllegalStateException("You must have at least a timestamp field."); + } + } + + public List> read(int step) throws IOException { + List> ret= new ArrayList<>(); + if(allDone) { + return ret; + } + if(buffer == null && !allDone) { + updateBuffer(); + } + for( Integer ts = getTimestamp() + ; !allDone && ts != null && ts <= step + ; updateBuffer(),ts=getTimestamp() + ) { + if (ts == step) { + //System.out.println(ts + " == " + step + ": Added " + buffer + " to " + getSource().getInputFile()); + ret.add(buffer); + } + else { + //System.out.println("Missed " + ts + " != " + step + " in " + getSource().getInputFile()); + } + } + return ret; + } + } + + + public static void step( int start + , int end + , DemoConfig config + , KafkaProducer producer + , Long timeOffset + , Integer stepTimeMs + , Set hostnameFilter + ) throws IOException, InterruptedException { + List states = new ArrayList<>(); + for(DemoConfig.DataSource ds : config.getSources()) { + BufferedReader reader = null; + if(ds.getInputFile().endsWith(".gz")) { + reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(ds.getInputFile())), Charset.defaultCharset())); + } + else { + reader = Files.newBufferedReader(new File(ds.getInputFile()).toPath(), Charset.defaultCharset()); + } + states.add(new SourceState(ds, reader)); + } + LocalImporter.Progress progress = new LocalImporter.Progress(); + try { + for (int index = start; index <= end; ++index) { + progress.update(); + boolean allDone = true; + long sTime = System.currentTimeMillis(); + int numMessagesWritten = 0; + for (SourceState s : states) { + String fileName = toFileName(s.getSource().getInputFile()); + List> messages = s.read(index); + for (Map message : messages) { + int timestamp = Integer.parseInt("" + message.get("timestamp")); + message.put("time_offset", timestamp); + message.put("source_file", fileName); + message.put("timestamp", 1000L*timestamp + timeOffset); + if(matchesFilter(s.getSource(), hostnameFilter, message)) { + String jsonMap = JSONUtils.INSTANCE.toJSON(message, false); + if(producer != null) { + numMessagesWritten++; + producer.send(new ProducerRecord(s.getSource().getOutputTopic(), jsonMap)); + } + else { + System.out.println(jsonMap); + } + } + } + allDone &= s.allDone; + } + if(allDone) { + break; + } + else if(numMessagesWritten > 0 && producer != null){ + long eTime = System.currentTimeMillis(); + long durationMs = eTime - sTime; + if(durationMs < stepTimeMs) { + long msSleeping = stepTimeMs - durationMs; + Thread.sleep(msSleeping); + } + } + } + } + finally { + if(producer != null) { + producer.close(); + } + } + } + + static boolean matchesFilter(DemoConfig.DataSource ds, Set hostnameFilter, Map message) { + StellarPredicateProcessor processor = new StellarPredicateProcessor(); + VariableResolver vr = null; + if(hostnameFilter != null) { + Map hf = new HashMap<>(); + hf.put("hostname_filter", hostnameFilter); + vr = new MapVariableResolver(message, hf); + } + else { + vr = new MapVariableResolver(message); + } + return processor.parse(ds.getFilter(), vr, StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT()); + } +} diff --git a/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/Progress.java b/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/Progress.java new file mode 100644 index 0000000..1bb85d5 --- /dev/null +++ b/demo_loader/src/main/java/com/hortonworks/metron/loader/csv/Progress.java @@ -0,0 +1,26 @@ +package com.hortonworks.metron.loader.csv; + +import java.io.PrintStream; + +public class Progress { + private int count = 0; + private String anim= "|/-\\"; + private PrintStream pw; + public Progress(PrintStream pw) { + this.pw = pw; + } + + public Progress() { + this(System.out); + } + + public synchronized void reset() { + count = 0; + } + + public synchronized void update() { + int currentCount = count++; + pw.print("\rProcessed " + currentCount + " - " + anim.charAt(currentCount % anim.length())); + } + +} diff --git a/demo_loader/src/main/scripts/demo_loader.sh b/demo_loader/src/main/scripts/demo_loader.sh new file mode 100755 index 0000000..98c5baa --- /dev/null +++ b/demo_loader/src/main/scripts/demo_loader.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default} +[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase + +# Autodetect JAVA_HOME if not defined +if [ -e /usr/libexec/bigtop-detect-javahome ]; then + . /usr/libexec/bigtop-detect-javahome +elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then + . /usr/lib/bigtop-utils/bigtop-detect-javahome +fi + +export METRON_VERSION=0.4.0 +export METRON_HOME=/usr/metron/$METRON_VERSION +export CLASSNAME="com.hortonworks.metron.loader.csv.DemoLoader" +export DM_JAR=metron-data-management-$METRON_VERSION.jar + +CP="$METRON_HOME/lib/$DM_JAR:$METRON_HOME/lib/demo-loader-$METRON_VERSION.jar" +java $METRON_JVMFLAGS -cp $CP $CLASSNAME "$@" diff --git a/use-cases/lanl/README.md b/use-cases/lanl/README.md new file mode 100644 index 0000000..1333ed7 --- /dev/null +++ b/use-cases/lanl/README.md @@ -0,0 +1 @@ +TODO diff --git a/use-cases/lanl/loader_config/config.json b/use-cases/lanl/loader_config/config.json new file mode 100755 index 0000000..df428cc --- /dev/null +++ b/use-cases/lanl/loader_config/config.json @@ -0,0 +1,23 @@ +{ + "stepTimeMs" : 5000, + "timeOffset" : 1493837313104, + "sources" : [ + { + "inputFile" : "/root/lanl/data/auth.txt", + "outputTopic" : "auth", + "filter" : "source_user == 'U66@DOM1' || (ip_src_addr not in [ null, 'TGT', '?'] && (not(exists(hostname_filter)) || ip_src_addr in hostname_filter) ) || (ip_dst_addr not in [ null, 'TGT', '?' ] && (not(exists(hostname_filter)) || ip_dst_addr in hostname_filter))", + "config" : { + "columns" : [ "timestamp" + , "source_user" + , "dest_user" + , "ip_src_addr" + , "ip_dst_addr" + , "auth_type" + , "logon_type" + , "auth_orientation" + , "success" + ] + } + } + ] +} diff --git a/use-cases/lanl/loader_config/data/host_filter.txt b/use-cases/lanl/loader_config/data/host_filter.txt new file mode 100755 index 0000000..3b63388 --- /dev/null +++ b/use-cases/lanl/loader_config/data/host_filter.txt @@ -0,0 +1,249 @@ +C22571 +C23660 +C23542 +C409 +C529 +C528 +C1295 +C12406 +C768 +C11557 +C15001 +C523 +C3235 +C15003 +C15007 +C13290 +C22698 +C1065 +C13629 +C16561 +C17651 +C19952 +C13988 +C5785 +C12414 +C12654 +C17536 +C15914 +C4581 +C14270 +C18191 +C5638 +C17682 +C10925 +C12547 +C12427 +C15266 +C13998 +C11336 +C5878 +C11455 +C14049 +C20738 +C19509 +C2493 +C4559 +C22462 +C14619 +C12318 +C19730 +C17552 +C436 +C553 +C22907 +C22902 +C22903 +C14296 +C9696 +C14053 +C22901 +C15160 +C14868 +C14866 +C2229 +C14744 +C2106 +C14984 +C14500 +C16811 +C12682 +C20515 +C2908 +C5186 +C14186 +C22483 +C23330 +C17693 +C14519 +C3440 +C1947 +C4651 +C13667 +C13303 +C3204 +C17338 +C453 +C14510 +C14993 +C13661 +C21711 +C1357 +C15180 +C4508 +C20097 +C14529 +C5279 +C13316 +C12347 +C20091 +C467 +C3534 +C586 +C101 +C13795 +C19308 +C13311 +C17926 +C4077 +C5045 +C20084 +C22023 +C21176 +C1243 +C14777 +C354 +C15855 +C16945 +C21739 +C23911 +C23912 +C14090 +C22027 +C26896 +C1453 +C3198 +C3990 +C920 +C2548 +C13457 +C2668 +C122 +C15767 +C17823 +C484 +C14540 +C21527 +C21524 +C21405 +C21075 +C3405 +C4858 +C13907 +C19795 +C1108 +C12257 +C14557 +C15753 +C13583 +C16969 +C23815 +C5154 +C1905 +C23812 +C709 +C21661 +C21782 +C13911 +C5358 +C1435 +C14202 +C21428 +C21302 +C23398 +C2893 +C20681 +C16065 +C17835 +C274 +C14692 +C12393 +C21535 +C3715 +C608 +C14903 +C2868 +C2989 +C14229 +C11996 +C15445 +C17744 +C13378 +C14225 +C18715 +C15207 +C14222 +C161 +C282 +C3163 +C13492 +C21202 +C1423 +C23297 +C1300 +C1420 +C2871 +C14917 +C13948 +C11768 +C19 +C4377 +C612 +C17611 +C3049 +C2514 +C15556 +C15678 +C14477 +C20345 +C23979 +C22526 +C21676 +C18170 +C21582 +C17082 +C13717 +C18179 +C625 +C988 +C5794 +C14804 +C3496 +C12745 +C2725 +C1515 +C19945 +C1754 +C17887 +C17407 +C14122 +C3021 +C1085 +C23767 +C17090 +C23990 +C2850 +C20000 +C18163 +C3143 +C4232 +C515 +C4233 +C17633 +C1645 +C992 +C13960 +C12630 +C14494 +C22668 +C22666 diff --git a/use-cases/lanl/loader_config/reset.sh b/use-cases/lanl/loader_config/reset.sh new file mode 100755 index 0000000..aab58ec --- /dev/null +++ b/use-cases/lanl/loader_config/reset.sh @@ -0,0 +1,13 @@ +#!/bin/bash +export METRON_HOME=/usr/metron/0.4.0 +echo "truncate 'profiler'" | hbase shell +curl -XDELETE "http://localhost:9200/flows*" && curl -XDELETE "http://localhost:9200/dns*" && curl -XDELETE "http://localhost:9200/error*" && curl -XDELETE "http://localhost:9200/auth*" +sudo su - hdfs -c "hadoop fs -rm -skipTrash -r /apps/metron/indexing/indexed/dns" +sudo su - hdfs -c "hadoop fs -rm -skipTrash -r /apps/metron/indexing/indexed/flows" +sudo su - hdfs -c "hadoop fs -rm -skipTrash -r /apps/metron/indexing/indexed/error" +sudo su - hdfs -c "hadoop fs -rm -skipTrash -r /apps/metron/indexing/indexed/auth" +storm kill enrichment +storm kill indexing +storm kill auth +storm kill profiler +$METRON_HOME/bin/start_enrichment_topology.sh && $METRON_HOME/bin/start_elasticsearch_topology.sh && $METRON_HOME/bin/start_parser_topology.sh -k node1:6667 -z node1:2181 -s auth && $METRON_HOME/bin/start_profiler_topology.sh diff --git a/use-cases/lanl/loader_config/run_on_threats.sh b/use-cases/lanl/loader_config/run_on_threats.sh new file mode 100755 index 0000000..6627b4e --- /dev/null +++ b/use-cases/lanl/loader_config/run_on_threats.sh @@ -0,0 +1,2 @@ +#!/bin/bash +/usr/metron/0.4.0/bin/demo_loader.sh -e 1848158 -c ./config.json -z node1:2181 -hf data/host_filter.txt diff --git a/use-cases/lanl/metron-alerts-0.4.0-archive.tar.gz b/use-cases/lanl/metron-alerts-0.4.0-archive.tar.gz new file mode 100644 index 0000000..18a6c23 Binary files /dev/null and b/use-cases/lanl/metron-alerts-0.4.0-archive.tar.gz differ diff --git a/use-cases/lanl/topology_config/profiler.properties b/use-cases/lanl/topology_config/profiler.properties new file mode 100755 index 0000000..3a25bc7 --- /dev/null +++ b/use-cases/lanl/topology_config/profiler.properties @@ -0,0 +1,47 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +##### Storm ##### + +topology.worker.childopts= + +##### Profiler ##### + +profiler.workers=1 +profiler.executors=0 +profiler.input.topic=indexing +profiler.output.topic=enrichments +profiler.period.duration=5 +profiler.period.duration.units=MINUTES +profiler.ttl=30 +profiler.ttl.units=MINUTES +profiler.hbase.salt.divisor=1000 +profiler.hbase.table=profiler +profiler.hbase.column.family=P +profiler.hbase.batch=10 +profiler.hbase.flush.interval.seconds=30 + +##### Kafka ##### + +kafka.zk=node1:2181 +kafka.broker=node1:6667 +# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST +kafka.start=LATEST +kafka.security.protocol=PLAINTEXT diff --git a/use-cases/lanl/topology_config/zookeeper/enrichments/auth.json b/use-cases/lanl/topology_config/zookeeper/enrichments/auth.json new file mode 100755 index 0000000..400329f --- /dev/null +++ b/use-cases/lanl/topology_config/zookeeper/enrichments/auth.json @@ -0,0 +1,64 @@ +{ + "enrichment" : { + "fieldMap": { + "stellar" : { + "config" : { + "distinct_auth_attempts" : [ +"window := PROFILE_WINDOW('from 6 minutes ago')", +"profile := PROFILE_GET('distinct_auth_attempts_by_user', user, window)", +"distinct_auth_attempts := STATS_MEAN( REDUCE(profile, (s, x) -> STATS_ADD(s, HLLP_CARDINALITY(x)) ,STATS_INIT()))", +"profile := null", +"window := null" + ], + "distinct_auth_attempts_stats" : [ +"window := PROFILE_WINDOW('from 6 minutes ago')", +"profile := PROFILE_GET('distinct_auth_attempts_by_user_distribution', 'global', window)", +"stats := STATS_MERGE(profile)", +"distinct_auth_attempts_median := STATS_PERCENTILE(stats, 0.5)", +"distinct_auth_attempts_stddev := STATS_SD(stats)", +"profile := null", +"stats := null", +"window := null" + ], + "num_alerts_by_user" : [ +"window := PROFILE_WINDOW('from 10 minutes ago')", +"profile := PROFILE_GET('num_alerts_by_user', user, window)", +"num_alerts_previous := REDUCE(profile, (s, x) -> s + x, 0)", +"profile := null", +"window := null" + ] + + } + } + } + }, + "threatIntel" : { + "fieldMap": { + "stellar" : { + "config" : [ +"distinct_auth_attempts_alert := distinct_auth_attempts_stddev > 0 && distinct_auth_attempts > 0 && ABS(distinct_auth_attempts - distinct_auth_attempts_median) > 5*distinct_auth_attempts_stddev", +"is_regular_user := is_system_user == null || (not(is_system_user) && user != 'ANONYMOUS LOGON')", +"is_alert := num_alerts_previous == 0 && auth_orientation == 'LogOn' && distinct_auth_attempts_alert" + ] + } + }, + "triageConfig" : { + "riskLevelRules" : [ + { + "name" : "Too many distinct auth attempts: non-regular user", + "rule" : "is_regular_user != null && not(is_regular_user)", + "score" : 10, + "reason" : "FORMAT('The distinct number of machines that user %s attempted to login to (%d) is more than 5 standard deviations (%.2f) from the median (%.2f)', user, TO_INTEGER(distinct_auth_attempts), distinct_auth_attempts_stddev, distinct_auth_attempts_median)" + }, + { + "name" : "Too many distinct auth attempts: regular user", + "rule" : "is_regular_user != null && is_regular_user", + "score" : 100, + "reason" : "FORMAT('The distinct number of machines that user %s attempted to login to (%d) is more than 5 standard deviations (%.2f) from the median (%.2f)', user, TO_INTEGER(distinct_auth_attempts), distinct_auth_attempts_stddev, distinct_auth_attempts_median)" + } + ], + "aggregator" : "MAX" + } + } + } +} diff --git a/use-cases/lanl/topology_config/zookeeper/enrichments/profiler.json b/use-cases/lanl/topology_config/zookeeper/enrichments/profiler.json new file mode 100755 index 0000000..5805512 --- /dev/null +++ b/use-cases/lanl/topology_config/zookeeper/enrichments/profiler.json @@ -0,0 +1,6 @@ +{ + "enrichment" : { + }, + "threatIntel": { + } +} diff --git a/use-cases/lanl/topology_config/zookeeper/global.json b/use-cases/lanl/topology_config/zookeeper/global.json new file mode 100644 index 0000000..c154f89 --- /dev/null +++ b/use-cases/lanl/topology_config/zookeeper/global.json @@ -0,0 +1,9 @@ + +{ +"es.clustername": "metron", +"es.ip": "node1:9300", +"es.date.format": "yyyy.MM.dd.HH", +"parser.error.topic": "indexing", +"profiler.client.period.duration" : "5", +"profiler.client.period.duration.units" : "MINUTES" +} diff --git a/use-cases/lanl/topology_config/zookeeper/indexing/auth.json b/use-cases/lanl/topology_config/zookeeper/indexing/auth.json new file mode 100755 index 0000000..4c9022f --- /dev/null +++ b/use-cases/lanl/topology_config/zookeeper/indexing/auth.json @@ -0,0 +1,17 @@ +{ + "hdfs" : { + "index": "auth", + "batchSize": 5, + "enabled" : true + }, + "elasticsearch" : { + "index": "auth", + "batchSize": 5, + "enabled" : true + }, + "solr" : { + "index": "auth", + "batchSize": 5, + "enabled" : true + } +} diff --git a/use-cases/lanl/topology_config/zookeeper/indexing/profiler.json b/use-cases/lanl/topology_config/zookeeper/indexing/profiler.json new file mode 100755 index 0000000..d445280 --- /dev/null +++ b/use-cases/lanl/topology_config/zookeeper/indexing/profiler.json @@ -0,0 +1,17 @@ +{ + "hdfs" : { + "index": "profiler", + "batchSize": 5, + "enabled" : false + }, + "elasticsearch" : { + "index": "profiler", + "batchSize": 5, + "enabled" : false + }, + "solr" : { + "index": "profiler", + "batchSize": 5, + "enabled" : false + } +} diff --git a/use-cases/lanl/topology_config/zookeeper/parsers/auth.json b/use-cases/lanl/topology_config/zookeeper/parsers/auth.json new file mode 100755 index 0000000..6ee0b90 --- /dev/null +++ b/use-cases/lanl/topology_config/zookeeper/parsers/auth.json @@ -0,0 +1,15 @@ +{ + "parserClassName":"org.apache.metron.parsers.json.JSONMapParser", + "sensorTopic":"auth", + "fieldTransformations" : [ + { + "transformation" : "STELLAR" + ,"output" : [ "user", "success", "is_system_user" ] + ,"config" : { + "user" : "GET_FIRST(SPLIT(source_user, '@'))", + "success" : "success == 'Success'", + "is_system_user" : "STARTS_WITH(user, 'C')" + } + } + ] +} diff --git a/use-cases/lanl/topology_config/zookeeper/profiler.json b/use-cases/lanl/topology_config/zookeeper/profiler.json new file mode 100755 index 0000000..7feaed2 --- /dev/null +++ b/use-cases/lanl/topology_config/zookeeper/profiler.json @@ -0,0 +1,45 @@ +{ + "profiles": [ + { + "profile": "distinct_auth_attempts_by_user", + "foreach": "user", + "onlyif": "source.type == 'auth' && auth_orientation != null && auth_orientation == 'LogOn' && user != null && LENGTH(user) > 0 && ip_dst_addr != null && ip_dst_addr != '?'", + "init" : { + "total" : "HLLP_INIT(5,6)" + }, + "update": { + "total" : "HLLP_ADD(total, ip_dst_addr)" + }, + "result" : { + "profile" : "total", + "triage" : { + "total_count" : "HLLP_CARDINALITY(total)" + } + } + }, + { + "profile": "distinct_auth_attempts_by_user_distribution", + "foreach": "'global'", + "onlyif": "source.type == 'profiler' && profile == 'distinct_auth_attempts_by_user'", + "init" : { + "s" : "STATS_INIT()" + }, + "update": { + "s" : "STATS_ADD(s, total_count)" + }, + "result": "s" + }, + { + "profile": "num_alerts_by_user", + "foreach": "user", + "onlyif": "source.type == 'auth' && is_alert != null && (is_alert == true || is_alert == 'true')", + "init" : { + "count" : "0" + }, + "update": { + "count" : "count + 1" + }, + "result" : "count" + } + ] +}