diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java index 1e40309215..62eafc048c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java @@ -39,7 +39,7 @@ public static enum StreamingType { * key - nimbus for storm. */ @JsonProperty - private Map deployments; + private Map deployments; public String getName() { return name; @@ -73,15 +73,15 @@ public void setDescription(String description) { this.description = description; } - public Map getDeployments() { + public Map getDeployments() { return deployments; } - public void setDeployments(Map deployments) { + public void setDeployments(Map deployments) { this.deployments = deployments; } - public static final String NIMBUS_HOST = "nimbusHost"; + public static final String NIMBUS_SEEDS = "nimbusSeeds"; public static final String NIMBUS_THRIFT_PORT = "nimbusThriftPort"; } diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java index 4ca9d5ecd4..c54c55b7af 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java @@ -38,14 +38,14 @@ public static class TopologyMeta { public TopologyUsage usage; public String clusterId; - public String nimbusHost; + public List nimbusSeeds; public String nimbusPort; } public static class StormClusterMeta { public String clusterId; - public String nimbusHost; + public List nimbusSeeds; public String nimbusPort; public String stormVersion; } diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java index 80990a56a3..e32a7b81c5 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java @@ -16,6 +16,7 @@ */ package org.apache.alert.coordinator.mock; +import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.tuple.Pair; @@ -62,7 +63,7 @@ public TopologyMeta creatTopology() { TopologyMeta tm = new TopologyMeta(); tm.topologyId = namePrefix + (i++); tm.clusterId = "default-cluster"; - tm.nimbusHost = "localhost"; + tm.nimbusSeeds = Arrays.asList("localhost"); tm.nimbusPort = "3000"; Pair pair = createEmptyTopology(tm.topologyId); tm.topology = pair.getLeft(); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java index 4e2ad59bf3..98c56bb79e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java @@ -19,13 +19,15 @@ package org.apache.eagle.alert.engine.scheme; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.storm.spout.Scheme; import org.apache.storm.tuple.Fields; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -50,12 +52,21 @@ public Fields getOutputFields() { return new Fields("f1"); } + public static String deserializeString(ByteBuffer buffer) { + if (buffer.hasArray()) { + int base = buffer.arrayOffset(); + return new String(buffer.array(), base + buffer.position(), buffer.remaining()); + } else { + return new String(Utils.toByteArray(buffer), StandardCharsets.UTF_8); + } + } + @Override @SuppressWarnings("rawtypes") public List deserialize(ByteBuffer ser) { try { if (ser != null) { - Map map = mapper.readValue(ser.array(), Map.class); + Map map = mapper.readValue(deserializeString(ser), Map.class); return Arrays.asList(topic, map); } else { if (LOG.isDebugEnabled()) { diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java index 767379e61c..c36f60fd63 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java @@ -22,6 +22,7 @@ import org.apache.storm.spout.Scheme; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +50,13 @@ public PlainStringScheme(String topic, Map conf) { private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; public static final String STRING_SCHEME_KEY = "str"; - public static String deserializeString(byte[] buff) { - return new String(buff, UTF8_CHARSET); + public static String deserializeString(ByteBuffer buffer) { + if (buffer.hasArray()) { + int base = buffer.arrayOffset(); + return new String(buffer.array(), base + buffer.position(), buffer.remaining()); + } else { + return new String(Utils.toByteArray(buffer), StandardCharsets.UTF_8); + } } public Fields getOutputFields() { @@ -61,7 +67,7 @@ public Fields getOutputFields() { @Override public List deserialize(ByteBuffer ser) { Map m = new HashMap<>(); - m.put("value", deserializeString(ser.array())); + m.put("value", deserializeString(ser)); m.put("timestamp", System.currentTimeMillis()); return new Values(topic, m); } diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java index 6eaae7b800..c68edef69a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java @@ -35,10 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; public class TopologyMgmtResourceImpl { @@ -46,7 +43,7 @@ public class TopologyMgmtResourceImpl { @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(TopologyMgmtResourceImpl.class); - private static final String DEFAULT_NIMBUS_HOST = "sandbox.hortonworks.com"; + private static final List DEFAULT_NIMBUS_SEEDS = Arrays.asList("sandbox.hortonworks.com"); private static final Integer DEFAULT_NIMBUS_THRIFT_PORT = 6627; private static final String STORM_JAR_PATH = "topology.stormJarPath"; @@ -55,8 +52,7 @@ public class TopologyMgmtResourceImpl { private Map getStormConf(List clusters, String clusterId) throws Exception { Map stormConf = Utils.readStormConfig(); if (clusterId == null) { - // TODO: change to NIMBUS_SEEDS list in EAGLE-907 - stormConf.put(Config.NIMBUS_HOST, DEFAULT_NIMBUS_HOST); + stormConf.put(Config.NIMBUS_SEEDS, DEFAULT_NIMBUS_SEEDS); stormConf.put(Config.NIMBUS_THRIFT_PORT, DEFAULT_NIMBUS_THRIFT_PORT); } else { if (clusters == null) { @@ -69,9 +65,8 @@ private Map getStormConf(List clusters, String clusterId) thro } else { throw new Exception("Fail to find cluster: " + clusterId); } - // TODO: change to NIMBUS_SEEDS list in EAGLE-907 - stormConf.put(Config.NIMBUS_HOST, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_HOST, DEFAULT_NIMBUS_HOST)); - stormConf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT))); + stormConf.put(Config.NIMBUS_SEEDS, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_SEEDS, DEFAULT_NIMBUS_SEEDS)); + stormConf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(String.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT)))); } return stormConf; } diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java index 7288700383..e92b044cc6 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java @@ -30,12 +30,12 @@ import org.apache.eagle.app.utils.DynamicJarPathFinder; import org.apache.eagle.metadata.model.ApplicationEntity; import org.apache.storm.thrift.TException; -import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Int; import org.apache.storm.trident.spout.RichSpoutBatchExecutor; +import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -70,8 +70,8 @@ public StormEnvironment environment() { public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs"; - private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost"; - private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost"; + private static final String STORM_NIMBUS_SEEDS_CONF_PATH = "application.storm.nimbusSeeds"; + private static final List STORM_NIMBUS_HOST_DEFAULT = Arrays.asList("localhost"); private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627; private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort"; @@ -80,17 +80,20 @@ public StormEnvironment environment() { private org.apache.storm.Config getStormConfig(com.typesafe.config.Config config) { org.apache.storm.Config conf = new org.apache.storm.Config(); conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024)); - // TOPOLOGY_RECEIVER_BUFFER_SIZE has no effect, so no need to set. ref: STORM-596 + // TOPOLOGY_RECEIVER_BUFFER_SIZE has no effect, and it is removed. ref: STORM-596 conf.put(org.apache.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32)); conf.put(org.apache.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384)); conf.put(org.apache.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384)); conf.put(org.apache.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000)); - String nimbusHost = STORM_NIMBUS_HOST_DEFAULT; - if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) { - nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH); - LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost); + conf.put(org.apache.storm.Config.STORM_NIMBUS_RETRY_TIMES, 5); + conf.put(org.apache.storm.Config.STORM_NIMBUS_RETRY_INTERVAL, 4000); + conf.put(org.apache.storm.Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 60000); + List nimbusSeeds = STORM_NIMBUS_HOST_DEFAULT; + if (environment.config().hasPath(STORM_NIMBUS_SEEDS_CONF_PATH)) { + nimbusSeeds = environment.config().getStringList(STORM_NIMBUS_SEEDS_CONF_PATH); + LOG.info("Overriding {} = {}", STORM_NIMBUS_SEEDS_CONF_PATH,nimbusSeeds); } else { - LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT); + LOG.info("Using default {} = {}", STORM_NIMBUS_SEEDS_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT); } Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT; if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) { @@ -99,8 +102,7 @@ private org.apache.storm.Config getStormConfig(com.typesafe.config.Config config } else { LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT); } - // TODO: change to NIMBUS_SEEDS list in EAGLE-907 - conf.put(Config.NIMBUS_HOST, nimbusHost); + conf.put(Config.NIMBUS_SEEDS, nimbusSeeds); conf.put(org.apache.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort); conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "org.apache.storm.security.auth.SimpleTransportPlugin"); if (config.hasPath(WORKERS)) { diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java index a4ade3cf2d..b6a14edc89 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java @@ -19,10 +19,12 @@ import org.apache.storm.spout.Scheme; import org.apache.storm.tuple.Fields; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -43,12 +45,21 @@ public Fields getOutputFields() { return new Fields("f1","f2"); } + public static String deserializeString(ByteBuffer buffer) { + if (buffer.hasArray()) { + int base = buffer.arrayOffset(); + return new String(buffer.array(), base + buffer.position(), buffer.remaining()); + } else { + return new String(Utils.toByteArray(buffer), StandardCharsets.UTF_8); + } + } + @Override @SuppressWarnings("rawtypes") public List deserialize(ByteBuffer ser) { try { if (ser != null) { - Map map = mapper.readValue(ser.array(), Map.class); + Map map = mapper.readValue(deserializeString(ser), Map.class); return Arrays.asList(map.hashCode(), map); } else { if (LOG.isDebugEnabled()) { diff --git a/eagle-examples/eagle-app-example/src/test/resources/application.conf b/eagle-examples/eagle-app-example/src/test/resources/application.conf index 0c9ba513fa..6a4c6fd225 100644 --- a/eagle-examples/eagle-app-example/src/test/resources/application.conf +++ b/eagle-examples/eagle-app-example/src/test/resources/application.conf @@ -51,7 +51,7 @@ } }, "storm": { - "nimbusHost": "localhost" + "nimbusSeeds": ["localhost"] "nimbusThriftPort": 6627 } }, diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf index 9d690846b2..9db6edf3b9 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf @@ -31,7 +31,7 @@ } "appId":"hadoopQueueMonitorJob", "mode":"LOCAL", - application.storm.nimbusHost=localhost, + application.storm.nimbusSeeds=["localhost"], "workers":1, "dataSinkConfig": { diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf index 9d690846b2..9db6edf3b9 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf +++ b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf @@ -31,7 +31,7 @@ } "appId":"hadoopQueueMonitorJob", "mode":"LOCAL", - application.storm.nimbusHost=localhost, + application.storm.nimbusSeeds=["localhost"], "workers":1, "dataSinkConfig": { diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf index 0421f0960b..9d5acc17da 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf @@ -48,6 +48,6 @@ "mode":"LOCAL", "workers" : 3, "siteId" : "sandbox", - application.storm.nimbusHost=localhost + application.storm.nimbusSeeds=["localhost"], topology.message.timeout.secs=1800 } \ No newline at end of file diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf index 3836e3acd8..c2e6ef6210 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf @@ -18,7 +18,7 @@ "mode":"LOCAL", "workers" : 3, "siteId" : "sandbox", - application.storm.nimbusHost=localhost + application.storm.nimbusSeeds=["localhost"], "stormConfig" : { "mrHistoryJobSpoutTasks" : 6, diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application-test.conf b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application-test.conf index 057e189bd0..13f6d6c97a 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application-test.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application-test.conf @@ -34,7 +34,7 @@ application { type = org.apache.eagle.app.sink.KafkaStreamSink } storm { - nimbusHost = "server.eagle.apache.org" + nimbusSeeds = ["server.eagle.apache.org"] nimbusThriftPort = 6627 } mailService { diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf index 6d1be06667..03f3b4ff49 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf @@ -16,7 +16,7 @@ { "appId":"mrRunningJob", "mode":"LOCAL", - application.storm.nimbusHost=localhost, + application.storm.nimbusSeeds=["localhost"], "workers" : 8, "siteId" : "sandbox", diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java index e8c2451222..b829d4bea5 100644 --- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java @@ -20,6 +20,7 @@ import org.apache.storm.tuple.Fields; import com.typesafe.config.Config; import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer; +import org.apache.storm.utils.Utils; import java.lang.reflect.Constructor; import java.nio.ByteBuffer; @@ -52,7 +53,7 @@ public KafkaSourcedSpoutScheme(String deserClsName, Config context){ @Override public List deserialize(ByteBuffer ser) { - Object tmp = deserializer.deserialize(ser.array()); + Object tmp = deserializer.deserialize(Utils.toByteArray(ser)); Map map = (Map)tmp; if(tmp == null) return null; return Arrays.asList(map.get("user"), map.get("timestamp")); diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java index 5e30384548..2610febd25 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java @@ -22,11 +22,13 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.apache.storm.kafka.StringScheme; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; @@ -42,12 +44,21 @@ public Fields getOutputFields() { return new Fields(StringScheme.STRING_SCHEME_KEY); } + public static String deserializeString(ByteBuffer buffer) { + if (buffer.hasArray()) { + int base = buffer.arrayOffset(); + return new String(buffer.array(), base + buffer.position(), buffer.remaining()); + } else { + return new String(Utils.toByteArray(buffer), StandardCharsets.UTF_8); + } + } + @Override @SuppressWarnings("rawtypes") public List deserialize(ByteBuffer ser) { try { if (ser != null) { - Map map = mapper.readValue(ser.array(), Map.class); + Map map = mapper.readValue(deserializeString(ser), Map.class); Object message = map.get(MESSAGE_SCHEME_KEY); if (message != null) { return new Values(map.get(MESSAGE_SCHEME_KEY)); diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf index a889914f29..a3cd623182 100644 --- a/eagle-server-assembly/src/main/conf/eagle.conf +++ b/eagle-server-assembly/src/main/conf/eagle.conf @@ -87,7 +87,7 @@ application { provider = org.apache.eagle.app.messaging.KafkaStreamProvider } storm { - nimbusHost = "server.eagle.apache.org" + nimbusSeeds = ["server.eagle.apache.org"] nimbusThriftPort = 6627 } mailService { diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf index d657f54cc3..96370f743b 100644 --- a/eagle-server/src/main/resources/application.conf +++ b/eagle-server/src/main/resources/application.conf @@ -90,7 +90,7 @@ application { provider = org.apache.eagle.app.messaging.KafkaStreamProvider } storm { - nimbusHost = "server.eagle.apache.org" + nimbusSeeds = ["server.eagle.apache.org"] nimbusThriftPort = 6627 } updateStatus: {