Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
[EAGLE-906] phase-2 nimbus.host to nimbus.seeds
Browse files Browse the repository at this point in the history
  • Loading branch information
senjaliya committed Feb 23, 2017
1 parent 6543eab commit 0ac0657
Show file tree
Hide file tree
Showing 19 changed files with 83 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static enum StreamingType {
* key - nimbus for storm.
*/
@JsonProperty
private Map<String, String> deployments;
private Map<String, Object> deployments;

public String getName() {
return name;
Expand Down Expand Up @@ -73,15 +73,15 @@ public void setDescription(String description) {
this.description = description;
}

public Map<String, String> getDeployments() {
public Map<String, Object> getDeployments() {
return deployments;
}

public void setDeployments(Map<String, String> deployments) {
public void setDeployments(Map<String, Object> deployments) {
this.deployments = deployments;
}

public static final String NIMBUS_HOST = "nimbusHost";
public static final String NIMBUS_SEEDS = "nimbusSeeds";
public static final String NIMBUS_THRIFT_PORT = "nimbusThriftPort";

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public static class TopologyMeta {
public TopologyUsage usage;

public String clusterId;
public String nimbusHost;
public List<String> nimbusSeeds;
public String nimbusPort;

}

public static class StormClusterMeta {
public String clusterId;
public String nimbusHost;
public List<String> nimbusSeeds;
public String nimbusPort;
public String stormVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.alert.coordinator.mock;

import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -62,7 +63,7 @@ public TopologyMeta creatTopology() {
TopologyMeta tm = new TopologyMeta();
tm.topologyId = namePrefix + (i++);
tm.clusterId = "default-cluster";
tm.nimbusHost = "localhost";
tm.nimbusSeeds = Arrays.asList("localhost");
tm.nimbusPort = "3000";
Pair<Topology, TopologyUsage> pair = createEmptyTopology(tm.topologyId);
tm.topology = pair.getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package org.apache.eagle.alert.engine.scheme;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -50,12 +52,21 @@ public Fields getOutputFields() {
return new Fields("f1");
}

public static String deserializeString(ByteBuffer buffer) {
if (buffer.hasArray()) {
int base = buffer.arrayOffset();
return new String(buffer.array(), base + buffer.position(), buffer.remaining());
} else {
return new String(Utils.toByteArray(buffer), StandardCharsets.UTF_8);
}
}

@Override
@SuppressWarnings("rawtypes")
public List<Object> deserialize(ByteBuffer ser) {
try {
if (ser != null) {
Map map = mapper.readValue(ser.array(), Map.class);
Map map = mapper.readValue(deserializeString(ser), Map.class);
return Arrays.asList(topic, map);
} else {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -49,8 +50,13 @@ public PlainStringScheme(String topic, Map conf) {
private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
public static final String STRING_SCHEME_KEY = "str";

public static String deserializeString(byte[] buff) {
return new String(buff, UTF8_CHARSET);
public static String deserializeString(ByteBuffer buffer) {
if (buffer.hasArray()) {
int base = buffer.arrayOffset();
return new String(buffer.array(), base + buffer.position(), buffer.remaining());
} else {
return new String(Utils.toByteArray(buffer), StandardCharsets.UTF_8);
}
}

public Fields getOutputFields() {
Expand All @@ -61,7 +67,7 @@ public Fields getOutputFields() {
@Override
public List<Object> deserialize(ByteBuffer ser) {
Map m = new HashMap<>();
m.put("value", deserializeString(ser.array()));
m.put("value", deserializeString(ser));
m.put("timestamp", System.currentTimeMillis());
return new Values(topic, m);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;


public class TopologyMgmtResourceImpl {
private static final IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(TopologyMgmtResourceImpl.class);

private static final String DEFAULT_NIMBUS_HOST = "sandbox.hortonworks.com";
private static final List<String> DEFAULT_NIMBUS_SEEDS = Arrays.asList("sandbox.hortonworks.com");
private static final Integer DEFAULT_NIMBUS_THRIFT_PORT = 6627;
private static final String STORM_JAR_PATH = "topology.stormJarPath";

Expand All @@ -55,8 +52,7 @@ public class TopologyMgmtResourceImpl {
private Map getStormConf(List<StreamingCluster> clusters, String clusterId) throws Exception {
Map<String, Object> stormConf = Utils.readStormConfig();
if (clusterId == null) {
// TODO: change to NIMBUS_SEEDS list in EAGLE-907
stormConf.put(Config.NIMBUS_HOST, DEFAULT_NIMBUS_HOST);
stormConf.put(Config.NIMBUS_SEEDS, DEFAULT_NIMBUS_SEEDS);
stormConf.put(Config.NIMBUS_THRIFT_PORT, DEFAULT_NIMBUS_THRIFT_PORT);
} else {
if (clusters == null) {
Expand All @@ -69,9 +65,8 @@ private Map getStormConf(List<StreamingCluster> clusters, String clusterId) thro
} else {
throw new Exception("Fail to find cluster: " + clusterId);
}
// TODO: change to NIMBUS_SEEDS list in EAGLE-907
stormConf.put(Config.NIMBUS_HOST, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_HOST, DEFAULT_NIMBUS_HOST));
stormConf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT)));
stormConf.put(Config.NIMBUS_SEEDS, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_SEEDS, DEFAULT_NIMBUS_SEEDS));
stormConf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(String.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT))));
}
return stormConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import org.apache.eagle.app.utils.DynamicJarPathFinder;
import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.storm.thrift.TException;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Int;
import org.apache.storm.trident.spout.RichSpoutBatchExecutor;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -70,8 +70,8 @@ public StormEnvironment environment() {

public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";

private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
private static final String STORM_NIMBUS_SEEDS_CONF_PATH = "application.storm.nimbusSeeds";
private static final List<String> STORM_NIMBUS_HOST_DEFAULT = Arrays.asList("localhost");
private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";

Expand All @@ -80,17 +80,20 @@ public StormEnvironment environment() {
private org.apache.storm.Config getStormConfig(com.typesafe.config.Config config) {
org.apache.storm.Config conf = new org.apache.storm.Config();
conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
// TOPOLOGY_RECEIVER_BUFFER_SIZE has no effect, so no need to set. ref: STORM-596
// TOPOLOGY_RECEIVER_BUFFER_SIZE has no effect, and it is removed. ref: STORM-596
conf.put(org.apache.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
conf.put(org.apache.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
conf.put(org.apache.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
conf.put(org.apache.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
conf.put(org.apache.storm.Config.STORM_NIMBUS_RETRY_TIMES, 5);
conf.put(org.apache.storm.Config.STORM_NIMBUS_RETRY_INTERVAL, 4000);
conf.put(org.apache.storm.Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 60000);
List<String> nimbusSeeds = STORM_NIMBUS_HOST_DEFAULT;
if (environment.config().hasPath(STORM_NIMBUS_SEEDS_CONF_PATH)) {
nimbusSeeds = environment.config().getStringList(STORM_NIMBUS_SEEDS_CONF_PATH);
LOG.info("Overriding {} = {}", STORM_NIMBUS_SEEDS_CONF_PATH,nimbusSeeds);
} else {
LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
LOG.info("Using default {} = {}", STORM_NIMBUS_SEEDS_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
}
Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT;
if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
Expand All @@ -99,8 +102,7 @@ private org.apache.storm.Config getStormConfig(com.typesafe.config.Config config
} else {
LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
}
// TODO: change to NIMBUS_SEEDS list in EAGLE-907
conf.put(Config.NIMBUS_HOST, nimbusHost);
conf.put(Config.NIMBUS_SEEDS, nimbusSeeds);
conf.put(org.apache.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "org.apache.storm.security.auth.SimpleTransportPlugin");
if (config.hasPath(WORKERS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -43,12 +45,21 @@ public Fields getOutputFields() {
return new Fields("f1","f2");
}

public static String deserializeString(ByteBuffer buffer) {
if (buffer.hasArray()) {
int base = buffer.arrayOffset();
return new String(buffer.array(), base + buffer.position(), buffer.remaining());
} else {
return new String(Utils.toByteArray(buffer), StandardCharsets.UTF_8);
}
}

@Override
@SuppressWarnings("rawtypes")
public List<Object> deserialize(ByteBuffer ser) {
try {
if (ser != null) {
Map map = mapper.readValue(ser.array(), Map.class);
Map map = mapper.readValue(deserializeString(ser), Map.class);
return Arrays.asList(map.hashCode(), map);
} else {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
}
},
"storm": {
"nimbusHost": "localhost"
"nimbusSeeds": ["localhost"]
"nimbusThriftPort": 6627
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
}
"appId":"hadoopQueueMonitorJob",
"mode":"LOCAL",
application.storm.nimbusHost=localhost,
application.storm.nimbusSeeds=["localhost"],
"workers":1,

"dataSinkConfig": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
}
"appId":"hadoopQueueMonitorJob",
"mode":"LOCAL",
application.storm.nimbusHost=localhost,
application.storm.nimbusSeeds=["localhost"],
"workers":1,

"dataSinkConfig": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@
"mode":"LOCAL",
"workers" : 3,
"siteId" : "sandbox",
application.storm.nimbusHost=localhost
application.storm.nimbusSeeds=["localhost"],
topology.message.timeout.secs=1800
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"mode":"LOCAL",
"workers" : 3,
"siteId" : "sandbox",
application.storm.nimbusHost=localhost
application.storm.nimbusSeeds=["localhost"],

"stormConfig" : {
"mrHistoryJobSpoutTasks" : 6,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ application {
type = org.apache.eagle.app.sink.KafkaStreamSink
}
storm {
nimbusHost = "server.eagle.apache.org"
nimbusSeeds = ["server.eagle.apache.org"]
nimbusThriftPort = 6627
}
mailService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"appId":"mrRunningJob",
"mode":"LOCAL",
application.storm.nimbusHost=localhost,
application.storm.nimbusSeeds=["localhost"],
"workers" : 8,
"siteId" : "sandbox",

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.storm.tuple.Fields;
import com.typesafe.config.Config;
import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
import org.apache.storm.utils.Utils;

import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -52,7 +53,7 @@ public KafkaSourcedSpoutScheme(String deserClsName, Config context){

@Override
public List<Object> deserialize(ByteBuffer ser) {
Object tmp = deserializer.deserialize(ser.array());
Object tmp = deserializer.deserialize(Utils.toByteArray(ser));
Map<String, Object> map = (Map<String, Object>)tmp;
if(tmp == null) return null;
return Arrays.asList(map.get("user"), map.get("timestamp"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.apache.storm.kafka.StringScheme;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

Expand All @@ -42,12 +44,21 @@ public Fields getOutputFields() {
return new Fields(StringScheme.STRING_SCHEME_KEY);
}

public static String deserializeString(ByteBuffer buffer) {
if (buffer.hasArray()) {
int base = buffer.arrayOffset();
return new String(buffer.array(), base + buffer.position(), buffer.remaining());
} else {
return new String(Utils.toByteArray(buffer), StandardCharsets.UTF_8);
}
}

@Override
@SuppressWarnings("rawtypes")
public List<Object> deserialize(ByteBuffer ser) {
try {
if (ser != null) {
Map map = mapper.readValue(ser.array(), Map.class);
Map map = mapper.readValue(deserializeString(ser), Map.class);
Object message = map.get(MESSAGE_SCHEME_KEY);
if (message != null) {
return new Values(map.get(MESSAGE_SCHEME_KEY));
Expand Down
Loading

0 comments on commit 0ac0657

Please sign in to comment.