Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-1634] Fix "Could not build up connection to JobManager" issue on some systems #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class StreamingWordCount {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.socketTextStream(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 9999)
.flatMap(new Splitter())
.groupBy(0)
.sum(1);
Expand Down Expand Up @@ -718,7 +718,7 @@ Example:

~~~java
DataStream<String> stream = env
.addSource(new FlumeSource<String>("localhost", 41414, new SimpleStringSchema()))
.addSource(new FlumeSource<String>(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 41414, new SimpleStringSchema()))
.print();
~~~

Expand All @@ -734,7 +734,7 @@ The followings have to be provided for the `FlumeSink(…)` constructor in order
Example:

~~~java
stream.addSink(new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
stream.addSink(new FlumeSink<String>(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 42424, new StringToByteSerializer()));
~~~

##### Configuration file<a name="config_file"></a>
Expand Down Expand Up @@ -791,7 +791,7 @@ Example:

~~~java
DataStream<String> stream = env
.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema()))
.addSource(new RMQSource<String>(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, "hello", new SimpleStringSchema()))
.print();
~~~

Expand All @@ -807,7 +807,7 @@ The followings have to be provided for the `RMQSink(…)` constructor in order:
Example:

~~~java
stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
stream.addSink(new RMQSink<String>(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, "hello", new StringToByteSerializer()));
~~~


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void setUp() throws Exception {

final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort);
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);

Expand All @@ -108,7 +108,7 @@ public void setUp() throws Exception {
when(generatorMock.compileJobGraph(optimizedPlanMock)).thenReturn(jobGraph);

try {
Tuple2<String, Object> address = new Tuple2<String, Object>("localhost", freePort);
Tuple2<String, Object> address = new Tuple2<String, Object>(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, freePort);
jobManagerSystem = AkkaUtils.createActorSystem(config, new Some<Tuple2<String, Object>>(address));
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public final class ConfigConstants {
*/
public static final String JOB_MANAGER_IPC_ADDRESS_KEY = "jobmanager.rpc.address";

/**
* The config parameter defining the value of the JOB_MANAGER_IPC_ADDRESS
*/
public static final String JOB_MANAGER_IPC_ADDRESS_VALUE = GlobalConfiguration.getString(JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");

/**
* The config parameter defining the network port to connect to
* for communication with the job manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.ObjectOutputStream;
import java.io.OutputStreamWriter;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -184,7 +185,7 @@ private FileInputSplit createTempFile(String contents) throws IOException {
wrt.write(contents);
wrt.close();

return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}

protected static final class MyTextInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat<Record> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.util.Arrays;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -582,7 +583,7 @@ private FileInputSplit createTempFile(String content) throws IOException {
dos.writeBytes(content);
dos.close();

return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}

private final Value[] createIntValues(int num) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.LocatableInputSplit;

import org.junit.Test;


Expand All @@ -41,7 +41,7 @@ public void testSerialSplitAssignmentWithNullHost() {
try {
final int NUM_SPLITS = 50;
final String[][] hosts = new String[][] {
new String[] { "localhost" },
new String[] { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE },
new String[0],
null
};
Expand Down Expand Up @@ -298,7 +298,7 @@ public void testConcurrentSplitAssignmentNullHost() {
final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;

final String[][] hosts = new String[][] {
new String[] { "localhost" },
new String[] { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE },
new String[0],
null
};
Expand Down Expand Up @@ -518,11 +518,11 @@ public void testAssignmentOfManySplitsRandomly() {
final Random rand = new Random(seed);

for (int i = 0; i < splitHosts.length; i++) {
splitHosts[i] = "localHost" + i;
splitHosts[i] = ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE + i;
}
for (int i = 0; i < requestingHosts.length; i++) {
if (i % 2 == 0) {
requestingHosts[i] = "localHost" + i;
requestingHosts[i] = ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE + i;
} else {
requestingHosts[i] = "remoteHost" + i;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.rmi.registry.Registry;

import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;

/**
Expand Down Expand Up @@ -75,7 +76,7 @@ public class RemoteCollectorOutputFormat<T> implements OutputFormat<T> {
* @see RemoteCollectorOutputFormat#PORT
*/
public RemoteCollectorOutputFormat() {
this("localhost", 8888, null);
this(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 8888, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -618,7 +619,7 @@ private FileInputSplit createTempFile(String content) throws IOException {
wrt.write(content);
wrt.close();

return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.io.IOException;

import org.junit.Test;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -183,7 +183,7 @@ private FileInputSplit createInputSplit(String content) throws IOException {
wrt.write(content);
wrt.close();

return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.io.OutputStreamWriter;

import org.junit.Assert;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileInputSplit;
Expand Down Expand Up @@ -330,7 +330,7 @@ private FileInputSplit createTempFile(String content) throws IOException {
dos.writeBytes(content);
dos.close();

return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -177,7 +178,7 @@ private FileInputSplit createTempFile(int[] contents) throws IOException {

dos.close();

return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
// Construction
// --------------------------------------------------------------------------

// NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
// NOTE: THIS MUST BE getByName(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE), which is 127.0.0.1 and
// not getLocalHost(), which may be 127.0.1.1
val HOSTNAME = InetAddress.getByName("localhost").getHostAddress()
val HOSTNAME = InetAddress.getByName(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE).getHostAddress()

val timeout = AkkaUtils.getTimeout(userConfiguration)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
if (blobPort > 0) {

val address = new InetSocketAddress(
currentJobManager.flatMap(_.path.address.host).getOrElse("localhost"),
currentJobManager.flatMap(_.path.address.host).getOrElse(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE),
blobPort)

log.info("Determined BLOB server address to be {}.", address)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.blob;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.net.InetSocketAddress;
import java.security.MessageDigest;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobID;
import org.junit.AfterClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplit;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class MockInputSplitProvider implements InputSplitProvider {
public void addInputSplits(final String path, final int noSplits) {

final InputSplit[] tmp = new InputSplit[noSplits];
final String[] hosts = { "localhost" };
final String[] hosts = { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE };

final String localPath;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
val config = new Configuration()
config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "1 second")

val tm = TestingUtils.startTestingTaskManagerWithConfiguration("localhost",
val tm = TestingUtils.startTestingTaskManagerWithConfiguration(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE,
self.path.toString, config, _system)

watch(tm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea

override def generateConfiguration(userConfig: Configuration): Configuration = {
val cfg = new Configuration()
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE)
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, NetUtils.getAvailablePort())
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object TestingUtils {
val config = new Configuration()

val (tmConfig, netConfig, connectionInfo, _) =
TaskManager.parseTaskManagerConfiguration(config, "localhost", true, true)
TaskManager.parseTaskManagerConfiguration(config, ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, true, true)

val tmProps = Props(classOf[TestingTaskManager], connectionInfo, jmURL, tmConfig, netConfig)
system.actorOf(tmProps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public void testExternalProgram() {
String testData = getClass().getResource(TEST_DATA_FILE).toString();

PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });

Client c = new Client(new InetSocketAddress("localhost", testMiniCluster.getJobManagerRPCPort()),
Client c = new Client(new InetSocketAddress(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, testMiniCluster.getJobManagerRPCPort()),
new Configuration(), program.getUserCodeClassLoader());
c.run(program, 4, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public static void main(String[] args) throws Exception {
// This will execute the word-count embedded in a local context. replace this line by the commented
// succeeding line to send the job to a local installation or to a cluster for execution
LocalExecutor.execute(plan);
// PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
// PlanExecutor ex = new RemoteExecutor(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
// ex.executePlan(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public static void main(String[] args) throws Exception {
// This will execute the word-count embedded in a local context. replace this line by the commented
// succeeding line to send the job to a local installation or to a cluster for execution
LocalExecutor.execute(plan);
// PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
// PlanExecutor ex = new RemoteExecutor(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
// ex.executePlan(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.flink.configuration.ConfigConstants;

import net.spy.memcached.MemcachedClient;

/**
Expand All @@ -36,7 +38,7 @@ public class MemcachedState<V> implements DBState<String, V> {

public MemcachedState() {
try {
memcached = new MemcachedClient(new InetSocketAddress("localhost", 11211));
memcached = new MemcachedClient(new InetSocketAddress(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 11211));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Iterator;
import java.util.Set;

import org.apache.flink.configuration.ConfigConstants;

import redis.clients.jedis.Jedis;

/**
Expand All @@ -40,7 +42,7 @@ public class RedisState<K extends Serializable, V extends Serializable> extends

public RedisState(DBSerializer<K> keySerializer, DBSerializer<V> valueSerializer) {
super(keySerializer, valueSerializer);
jedis = new Jedis("localhost");
jedis = new Jedis(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE);
}

public RedisState() {
Expand Down
Loading