Note that the following elements in the code examples below must be replaced by actual values from Franca:
// "<Attribute>" the Franca name of the attribute
// "<AttributeType>" the Franca name of the attribute type
// "<broadcast>" the Franca name of the broadcast, starting with a lowercase letter
// "<Broadcast>" the Franca name of the broadcast, starting with capital letter
// "BroadcastFilter<Attribute>" Attribute is the Franca attributes name
// "<Filter>" the Franca name of the broadcast filter
// "<interface>" the Franca interface name, starting with a lowercase letter
// "<Interface>" the Franca interface name, starting with capital letter
// "<method>" the Franca method name, starting with a lowercase letter
// "<Method>" the Franca method name, starting with capital letter
// "<OutputType>" the Franca broadcast output type name
// "<Package>" the Franca package name
// "<ProviderDomain>" the provider domain name used by provider and client
// "<ReturnType>" the Franca return type name
The Franca <Package>
will be transformed to the Java package joynr.<Package>
.
The Franca <TypeCollection>
will be transformed to the Java package joynr.<Package>.<TypeCollection>
.
Any Franca complex type <TypeCollection>.<Type>
will result in the creation of a class joynr.<Package>.<TypeCollection>.<Type>
(see above).
The same <Type>
will be used for all elements in the event that this type is used as an element of other complex types, as a method input or output argument, or as a broadcast output argument.
Getter and Setter methods will be created for any element of a struct type. Also a standard constructor, full arguments constructor and object argument constructor will be created automatically.
Note that in order to use an instance of the class directly or indirectly as input or output argument for any joynr call (e.g. method call, broadcast publication etc.), all its members must be properly initialized; especially any references must be != null (i.e. point to initialized instances).
The Franca <Interface>
will be used as a prefix to create the following Java classes or interfaces:
public abstract class joynr.<Package>.<Interface>AbstractProvider
public class joynr.<Package>.Default<Interface>Provider
public interface joynr.<Package>.<Interface>Async
public interface joynr.<Package>.<Interface>BroadcastInterface
public interface joynr.<Package>.<Interface>FireAndForget
public interface joynr.<Package>.<Interface>
public abstract class joynr.<Package>.<Interface><Broadcast>BroadcastFilter
public interface joynr.<Package>.<Interface>Provider
public interface joynr.<Package>.<Interface>Proxy
public interface joynr.<Package>.<Interface>SubscriptionInterface
public interface joynr.<Package>.<Interface>SubscriptionPublisher
public class joynr.<Package>.<Interface>SubscriptionPublisherImpl
public interface joynr.<Package>.<Interface>Sync
Choose how you want your application to connect to the joynr network by initializing the JoynrRuntime with the appropriate joynr RuntimeModule, which uses guice to inject the desired functionality.
If you have multiple nodes running locally, they can share a cluster controller, which handles
access control, local discovery and message routing for the local cluster. joynr currently supports
connecting Java nodes to a cluster controller via WebSockets. The individual applications are
configured using a LibjoynrWebSocketRuntimeModule
.
For a single node deployment, it may however be simpler to combine the cluster controller logic and
the application in a single Java process. Use a CCInProcessRuntimeModule
in this case.
See the Radio example, in particular MyRadioConsumerApplication
and
MyRadioProviderApplication
, for a detailed example of how this is done.
See the Java Configuration Reference for a complete listing of all available configuration properties available to use in joynr Java applications.
joynr is able to communicate to other clusters via HTTP using Atmosphere, or MQTT using Eclipe Paho. Guice is also used to inject the required functionality. Though both (Atmosphere and MQTT) can be injected together at the same time joynr is only able to communicate over one global transport middleware at the same time. The only exception are fire and forget method calls which do not expect an answer.
After choosing which RuntimeModule you are using, override it with the
AtmosphereMessagingModule
or the MqttPahoModule
. See the Radio example, in particular
MyRadioConsumerApplication
and MyRadioProviderApplication
for a detailed example of how
this is done.
If using more than one global transport middleware, PROPERTY_MESSAGING_PRIMARYGLOBALTRANSPORT (see Java Configuration Reference) has to be set to select the transport middleware which is used to register providers. Providers will be reachable via the selected global transport middleware.
A java joynr application inherits from AbstractJoynrApplication
class and contains at least a
main()
, run()
and shutdown()
method.
The following base imports are required for a Java Consumer application:
import io.joynr.arbitration.ArbitrationStrategy;
import io.joynr.arbitration.DiscoveryQos;
import io.joynr.arbitration.DiscoveryScope;
import io.joynr.exceptions.ApplicationException;
import io.joynr.exceptions.DiscoveryException;
import io.joynr.exceptions.JoynrCommunicationException;
import io.joynr.exceptions.JoynrRuntimeException;
import io.joynr.messaging.mqtt.paho.client.MqttPahoModule;
import io.joynr.messaging.MessagingPropertyKeys;
import io.joynr.messaging.MessagingQos;
import io.joynr.proxy.Callback;
import io.joynr.proxy.Future;
import io.joynr.proxy.ProxyBuilder;
import io.joynr.runtime.AbstractJoynrApplication;
import io.joynr.runtime.CCInProcessRuntimeModule;
import io.joynr.runtime.JoynrApplication;
import io.joynr.runtime.JoynrApplicationModule;
import io.joynr.runtime.JoynrInjectorFactory;
import java.io.IOException;
import java.util.Properties;
import joynr.<Package>.<Interface>Proxy;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.name.Named;
import com.google.inject.util.Modules;
// required imports
...
public class MyConsumerApplication extends AbstractJoynrApplication {
@Inject
@Named(APP_CONFIG_PROVIDER_DOMAIN)
private String providerDomain;
private <Interface>Proxy <interface>Proxy;
public static void main(String[] args) throws IOException {
// initialization, perhaps including setting the domain
// for this instance of the application.
}
public void run() {
// main application logic
}
public void shutdown() {
// unregister and cleanup
}
}
The main()
method must setup the configuration (provider domain etc.) and create the
JoynrApplication
instance by instantiating a new JoynrApplicationModule
. Then the
run()
method of the consumer application can be called to do the work.
As a prerequisite, the provider and consumer domain need to be defined using Properties
as shown below.
public static void main(String[] args) throws IOException {
String providerDomain = "<ProviderDomain>";
Properties joynrConfig = new Properties();
joynrConfig.setProperty(MessagingPropertyKeys.PERSISTENCE_FILE, STATIC_PERSISTENCE_FILE);
joynrConfig.setProperty(PROPERTY_JOYNR_DOMAIN_LOCAL, "my_local_domain");
Properties appConfig = new Properties();
appConfig.setProperty(APP_CONFIG_PROVIDER_DOMAIN, providerDomain);
Module runtimeModule = Modules.override(new CCInProcessRuntimeModule()).with(new MqttPahoModule());
JoynrApplication myConsumerApp =
new JoynrInjectorFactory(joynrConfig, runtimeModule).createApplication(
new JoynrApplicationModule(MyApplication.class, appConfig));
myConsumerApp.run();
myConsumerApp.shutdown();
}
The class DiscoveryQos
configures how the search for a provider will be handled. It has the following members:
- discoveryTimeoutMs Timeout for discovery process (milliseconds) if no compatible provider was found within the given time. A timeout triggers a DiscoveryException or NoCompatibleProviderFoundException containing the versions of the discovered incompatible providers.
- retryIntervalMs The time to wait between discovery retries after encountering a discovery error.
- cacheMaxAgeMs Defines the maximum allowed age of cached entries (milliseconds), only younger entries will be considered. If no suitable providers are found, then depending on the discoveryScope, a remote global lookup may be triggered.
- arbitrationStrategy The arbitration strategy (details see below)
- discoveryScope The discovery scope (details see below)
- providerMustSupportOnChange If set to true, select only providers which support onChange subscriptions (set by the provider in its providerQos settings)
- customParameters special parameters, that must match, e.g. keyword (see below)
The enumeration discoveryScope defines options to decide whether a suitable provider will be searched in the local capabilities directory or in the global one.
Available values are as follows:
- LOCAL_ONLY Only entries from local capability directory will be searched
- LOCAL_THEN_GLOBAL Entries will be taken from local capabilities directory, unless no such entries exist, in which case global entries will be looked at as well.
- LOCAL_AND_GLOBAL Entries will be taken from local capabilities directory and from global capabilities directory.
- GLOBAL_ONLY Only the global entries will be looked at.
Default discovery scope: LOCAL_AND_GLOBAL
Whenever global entries are involved, they are first searched in the local cache. In case no global entries are found in the cache, a remote lookup is triggered.
The enumeration ArbitrationStrategy
defines how the results of the scoped lookup will be
sorted and / or filtered to select a Provider:
- LastSeen The participant that was last refreshed (i.e. with the most current last seen date) will be selected
- NotSet (not allowed in the app, otherwise arbitration will throw DiscoveryException)
- HighestPriority Entries will be considered according to priority
- Keyword Only entries that have a matching keyword will be considered
- FixedChannel select provider which matches the participantId provided as custom parameter in DiscoveryQos (see below), if existing
- Custom Allows you to provide a
ArbitrationStrategyFunction
to allow custom selection of discovered entries
Default arbitration strategy: LastSeen
The priority used by the arbitration strategy HighestPriority is set by the provider through the
call providerQos.setPriority()
.
Class ArbitrationConstants
provides keys for the key-value pair for the custom Parameters of
discoveryScope:
- PRIORITY_PARAMETER (apparently not implemented as of now)
- KEYWORD_PARAMETER
- FIXEDPARTICIPANT_KEYWORD
Example for Keyword arbitration strategy:
discoveryQos.addCustomParameter(ArbitrationConstants.KEYWORD_PARAMETER, "keyword");
Example for FixedChannel arbitration strategy:
discoveryQos.addCustomParameter(ArbitrationConstants.FIXED_PARTICIPANT_KEYWORD, participantId);
Example for the creation of a DiscoveryQos class object:
DiscoveryQos discoveryQos = new DiscoveryQos();
discoveryQos.setDiscoveryTimeoutMs(10000); // optional, default 30000
discoveryQos.setRetryIntervalMs(1000); // optional, default 1000
discoveryQos.setCacheMaxAgeMs(Long.MAX_VALUE); // optional, default 0
// optional, default as stated above
discoveryQos.setArbitrationStrategy(ArbitrationStrategy.HighestPriority);
// optional, default as stated above
discoveryQos.setDiscoveryScope(DiscoveryScope.LOCAL_AND_GLOBAL);
discoveryQos.setProviderMustSupportOnChange(true); // optional, default false
discoveryQos.addCustomParameter(key, value); // optional, default none
The MesssagingQos
class defines the roundtrip timeout in milliseconds for RPC requests
(getter/setter/method calls) and unsubscribe requests and it allows definition of additional custom
message headers.
The ttl for subscription requests is calculated from the expiryDateMs
in the SubscriptionQos settings.
For internal joynr messages, the value of PROPERTY_MESSAGING_MAXIMUM_TTL_MS is used.
GlobalDomainAccessControllerClient and GlobalCapabilitiesDirectoryClient use TTL_30_DAYS_IN_MS
(30 days)
If no specific setting is given, the default roundtrip timeout is 60 seconds. The keys of custom message headers may contain ascii alphanumeric or hyphen. The values of custom message headers may contain alphanumeric, space, semi-colon, colon, comma, plus, ampersand, question mark, hyphen, dot, star, forward slash and back slash. If a key or value is invalid, the API method called to introduce the custom message header throws an IllegalArgumentException.
Example:
long ttl_ms = 60000;
MessagingQos messagingQos = new MessagingQos(ttl_ms);
// optional custom headers
Map<String, String> customMessageHeaders = new Map<String, String>();
customMessageHeaders.put("key1", "value1");
...
customMessageHeaders.put("keyN", "valueN");
messagingQos.putAllCustomMessageHeaders(customMessageHeaders);
...
messagingQos.putCustomMessageHeader("anotherKey", "anotherValue");
Inside the run()
method, the consumer application instance must create one proxy per used Franca interface in order to be able to
- call its methods (RPC) either synchronously or asynchronously
- subscribe or unsubscribe to its attributes or update a subscription
- subscribe or unsubscribe to its broadcasts or update a subscription
In case no suitable provider can be found during discovery, a DiscoveryException
or
NoCompatibleProviderFoundException
is thrown.
In case of communication errors, a JoynrCommunicationException
is thrown.
@Override
public void run() {
DiscoveryQos discoveryQos = new DiscoveryQos();
MessagingQos messagingQos = new MessagingQos();
// the qos can be fine tuned here by calling setters
ProxyBuilder<<Interface>Proxy> proxyBuilder =
runtime.getProxyBuilder(providerDomain, <Interface>Proxy.class);
try {
// Also can call proxyBuilder.build(callback)
<interface>Proxy = proxyBuilder.
setMessagingQos(messagingQos). // optional
setDiscoveryQos(discoveryQos). // optional
build();
// call methods, subscribe to broadcasts etc.
// enter some event loop
} catch (DiscoveryException e) {
// no provider found
} catch (JoynrCommunicationException e) {
// could not send message
}
}
A callback can also be added to the proxyBuilder.build() call, allowing application code to be notified when the discovery process has completed.
It is also possible to obtain a proxy for targeting multiple providers. You can either do
this by specifying a set of domains, by providing a custom ArbitrationStrategyFunction
(see The discovery quality of service above) or a combination of the two.
When you create such a multi-proxy, a call to a method on that proxy will result in 'n'
calls to the providers, where 'n' is the number of providers targeted.
It is only possible to send calls to multiple providers if the methods are fire-and-forget.
Attempts to make calls to non-fire-and-forget methods from a multi-proxy will result in an
exception being thrown.
So, for example, if we change the code above to target two domains, we get:
...
Set<String> domains = new HashSet<>();
domains.add(providerDomainOne);
domains.add(providerDomainTwo);
ProxyBuilder<<Interface>Proxy> proxyBuilder =
runtime.getProxyBuilder(domains, <Interface>Proxy.class);
...
While the provider executes the call asynchronously in any case, the consumer will wait until the call is finished, i.e. the thread will be blocked. Note that the message order on Joynr RPCs will not be preserved.
Example for calls with single return parameter:
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
public void run() {
// setup proxy named <interface>Proxy
...
try {
<ReturnType> retval;
retval = <interface>Proxy.<method>([inputVal1, ..., inputValN]);
} catch (ApplicationException e) {
// optional special error handling in case model contains error enumeration
} catch (JoynrRuntimeException e) {
// error handling
}
}
In case of multiple return parameters the parameters will be wrapped into a class named
<Method>Returned
. Each parameter value is available through a public member variable inside this class.
Example:
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
public void run() {
// setup proxy named <interface>Proxy
...
try {
<Method>Returned retval;
retval = <interface>Proxy.<method>([inputVal1, ..., inputValN]);
// handle return parameters
// retval.<returnParameter1>
// ...
// retval.<returnParameterN>
} catch (ApplicationException e) {
// optional special error handling in case model contains error enumeration
} catch (JoynrRuntimeException e) {
// error handling
}
}
For methods which are modelled with error enumerations, additionally, ApplicationExceptions have to be caught. The ApplicationException serves as container for the actual error enumeration which can be retrieved by calling e.getError().
Using asynchronous method calls allows the current thread to continue its work. For this purpose a callback has to be provided for the API call in order to receive the result and error respectively. Note the current thread will still be blocked until the Joynr message is internally set up and serialized. It will then be enqueued and handled by a Joynr Middleware thread.
The message order on Joynr RPCs will not be preserved.
If no return type exists, the term Void
is used instead.
Example for calls with single return parameter:
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
public class MyCallback implements Callback<<ReturnType>> {
@Override
void onSuccess(<ReturnType> result) {
// handle result
}
@Override
void onFailure(JoynrRuntimeException error) {
// handle error
}
}
...
public void run() {
// setup proxy named "<interface>Proxy"
...
public Future<<ReturnType>> future;
MyCallback myCallback = new MyCallback();
future = <interface>Proxy.<method>(
myCallback,
[inputVal1, ..., inputValN]
);
try {
long timeoutInMilliseconds;
// set timeout value here
<ReturnType> result = future.get(timeOutInMilliseconds);
} catch (InterruptedException|JoynrRuntimeException e) {
// handle error
} catch (ApplicationException e) {
// optional special error handling in case model contains error enumeration
}
...
}
If the Franca model includes error enums, then the Callback will also need to implement onFailure for the modeled error:
@Override
public void onFailure(<Method>ErrorEnum errorEnum) {
switch (errorEnum) {
case <ENUM_LITERAL_A>:
break;
case <ENUM_LITERAL_B>:
break;
default:
// handle default error case
break;
}
}
In case of multiple return parameters the parameters will be wrapped into a class named
<Method>Returned
. Each parameter value is available through a public member variable inside this class.
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection.<Type>;
...
public class MyCallback implements Callback<<Method>Returned> {
void onSuccess(<Method>Returned result) {
// handle result
}
void onFailure(JoynrRuntimeException error) {
// handle error
}
}
...
public void run() {
// setup proxy named "<interface>Proxy"
...
public Future<<Method>Returned> future;
MyCallback myCallback = new MyCallback();
future = <interface>Proxy.<method>(
myCallback,
[inputVal1, ..., inputValN]
);
try {
long timeoutInMilliseconds;
// set timeout value here
<Method>Returned result = future.get(timeOutInMilliseconds);
// handle return parameters
// result.<returnParameter1>
// ...
// result.<returnParameterN>
} catch (InterruptedException|JoynrRuntimeException e) {
// handle error
} catch (ApplicationException e) {
// optional special error handling in case model contains error enumeration
}
...
}
If the Franca model includes error enums, then the Callback will also need to implement onFailure for the modeled error:
@Override
public void onFailure(<Method>ErrorEnum errorEnum) {
switch (errorEnum) {
case <ENUM_LITERAL_A>:
break;
case <ENUM_LITERAL_B>:
break;
default:
// handle default error case
break;
}
}
In contrast to both the synchronous and asynchronous RPC mechanisms described above, the stateless asynchronous replies can be handled by any runtime in a cluster of an application.
In order to accomplish this, the application provides a callback implementation by registering it with the joynr runtime similar to the way providers are registered. Then, when a request is made for a service for which a callback was registered, the reply data is routed to that callback.
In order to be able to logically match the request with the reply, a unique ID is provided
when sending the request, via the MessageIdCallback
, which the application can use to
persist any relevant context information. When the reply arrives, the same ID is provided to
the callback as part of the ReplyContext
as the last parameter, and the application can
then use this to load the context information.
IMPORTANT: it is not guaranteed that the message has actually left the system
when the MessageIdCallback
is called. It is possible that the message gets stuck
in the lower layers due to, e.g., infrastructure issues. If the application persists
data for the message IDs returned, it may also want to run periodic clean-up jobs
to see if there are any stale entries due to messages not being transmitted
successfully.
So that an application can use the same service in multiple use cases, during registration of the callback and when creating the service proxy, a unique 'use case' name must be provided, matching the proxy to the callback.
IMPORTANT: due to the stateless nature of this communication pattern, there are some cases where the message can't be delivered but will not trigger a callback. Specifically if the TTL of the message has expired when it reaches the provider, no error callback will be triggered. Equally, if the message gets lost by the infrastructure en route, no error callback will be triggered. If required, you must guard against these cases in your application code by, e.g. storing a timestamp in the context data you persist for a given message ID, and track the success of the request / reply rountrip using the callback methods. If the status is then not set within a given timeframe, you can react accordingly.
For a full example showing how to use the stateless async API, see examples/stateless-async.
...
public MyStatelessAsyncCallback implements <interface>StatelessAsyncCallback {
@Override
public String getUseCase() {
return <usecase>;
}
// Called for replies to proxy.myMethod(...)
@Override
public void myMethodSuccess(<service output parameters>, ReplyContext replyContext) {
... handle the reply data ...
}
@Override
public void myMethodFailed(<application error enum>, ReplyContext replyContext) {
... handle business errors ...
}
@Override
public void myMethodFailed(JoynrRuntimeException e, ReplyContext replyContext) {
... handle ProviderRuntimeExceptions and other runtime exceptions ...
}
}
...
public void run() {
...
runtime.registerStatelessAsyncCallback(new MyStatelessAsyncCallback());
...
}
...
...
public void run() {
...
// callback already registered as above
...
ProxyBuilder builder = runtime.getProxyBuilder(<domain>, <interface>Proxy.class);
...
builder.setStatelessAsyncCallbackUseCase(<usecase>);
<interface>StatelessAsync proxy = builder.build();
proxy.myMethod(<input parameters>, messageId -> this::persistMyMethodContext);
// ^ Reply handled by the myMethod* callbacks
...
}
...
It's essential that for a given piece of business logic the use case of the callback to be used matches that passed into the proxy being built. The requests sent from that proxy, will then be handled by the callback with the same use case identifier.
The abstract class SubscriptionQos
has the following members:
- expiryDateMs Absolute time until notifications will be send (milliseconds)
- publicationTtlMs Lifespan of a notification (milliseconds), the notification will be deleted afterwards Known Issue: subscriptionQos passed when subscribing to a non-selective broadcast are ignored. The API will be changed in the future: proxy subscribe calls will no longer take a subscriptionQos; instead the publication TTL will be settable on the provider side.
The class MulticastSubscriptionQos
inherits from SubscriptionQos
.
This class should be used for subscriptions to non-selective broadcasts.
The class PeriodicSubscriptionQos
inherits from SubscriptionQos
and has the following
additional members:
- periodMs defines how long to wait before sending an update even if the value did not change
- alertAfterIntervalMs Timeout for notifications, afterwards a missed publication notification will be raised (milliseconds)
This class can be used for subscriptions to attributes.
Note that updates will be send only based on the specified interval and not based on changes of the attribute.
The class OnChangeSubscriptionQos
inherits from UnicastSubscriptionQos
which inherits
from SubscriptionQos
. It has the following additional members:
- minIntervalMs Minimum time to wait between successive notifications (milliseconds)
- publicationTtlMs Notification messages will be sent with this time-to-live. If a notification message can not be delivered within its time to live, it will be deleted from the system. This value is provided in milliseconds.
This class should be used for subscriptions to selective broadcasts. It can also be used for subscriptions to attributes if no periodic update is required.
The class OnChangeWithKeepAliveSubscriptionQos
inherits from OnChangeSubscriptionQos
and has the following additional members:
- maxIntervalMs Maximum time to wait between notifications, if value has not changed
- alertAfterIntervalMs Timeout for notifications, afterwards a missed publication notification will be raised (milliseconds)
This class can be used for subscriptions to attributes. Updates will then be sent based both
periodically and after a change (i.e. this acts like a combination of PeriodicSubscriptionQos
and OnChangeSubscriptionQos
).
Using it for subscriptions to broadcasts is theoretically possible because of inheritance but makes no sense (in this case the additional members will be ignored).
Attribute subscription - depending on the subscription quality of service settings used - informs the application either periodically and / or on change of an attribute about the current value.
The subscriptionId can be retrieved via the callback (onSubscribed) and via the future returned by the subscribeTo call. It can be used later to update the subscription or to unsubscribe from it. The subscriptionId will be available when the subscription is successfully registered at the provider. If the subscription failed, a SubscriptionException will be returned via the callback (onError) and thrown by future.get().
To receive the subscription, a callback has to be provided which is done providing a listener class as outlined below. Since the callback is called by a communication middleware thread, it should not be blocked, wait for user interaction, or do larger computation. The callback methods (onReceive, onSubscribed, onError) are optional. Only the required methods have to be implemented.
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
import joynr.OnChangeWithKeepAliveSubscriptionQos;
import io.joynr.pubsub.subscription.AttributeSubscriptionAdapter;
...
public void run() {
// setup proxy named "<interface>Proxy"
...
// Quality of service settings can be provided as either
// "PeriodicSubscriptionQos",
// "OnChangeSubscriptionQos" or "OnChangeWithKeepAliveSubscriptionQos" class
// referenced by <QosClass> below
<QosClass> qos = new <QosClass>(... parameters ...);
...
Future<String> subscriptionIdFuture;
try {
subscriptionIdFuture = <interface>Proxy.subscribeTo<Attribute>(
new AttributeSubscriptionAdapter<AttributeType>() {
// Gets called on every received publication
@Override
public void onReceive(<AttributeType> value) {
// handle info
}
// Gets called when the subscription is successfully registered at the provider
@Override
public void onSubscribed(String subscriptionId) {
// save the subscriptionId for updating the subscription or unsubscribing from it
// the subscriptionId can also be taken from the future returned by the subscribeTo call
}
// Gets called on every error that is detected on the subscription
@Override
public void onError(JoynrRuntimeException e) {
// handle subscription error, e.g.:
// - SubscriptionException if the subscription registration failed at the provider
// - PublicationMissedException if a periodic subscription publication does not
// arrive in time
}
},
qos
);
} catch (JoynrRuntimeException e) {
// handle error
}
...
// get the subscriptionId from the Future when needed
String subscriptionId;
if (subscriptionIdFuture != null) {
try {
subscriptionId = subscriptionIdFuture.get();
} catch (JoynrRuntimeException | InterruptedException | ApplicationException e) {
// handle error
}
}
}
The subscribeTo method can also be used to update an existing subscription, when the subscriptionId is given as additional parameter as follows:
try {
subscriptionIdFuture = <interface>Proxy.subscribeTo<Attribute>(
subscriptionId,
new AttributeSubscriptionAdapter<AttributeType>() {
// Gets called on every received publication
@Override
public void onReceive(<AttributeType> value) {
// handle info
}
// Gets called when the subscription is successfully updated at the provider
@Override
public void onSubscribed(String subscriptionId) {
// save the subscriptionId for updating the subscription or unsubscribing from it
// the subscriptionId can also be taken from the future returned by the subscribeTo call
}
// Gets called on every error that is detected on the subscription
@Override
public void onError(JoynrRuntimeException e) {
// handle subscription error, e.g.:
// - SubscriptionException if the subscription registration failed at the provider
// - PublicationMissedException if a periodic subscription publication does not
// arrive in time
}
},
qos
);
} catch (JoynrRuntimeException e) {
// handle error
}
Unsubscribing from an attribute subscription also requires the subscriptionId returned by the ealier subscribeTo call.
// for required imports see subscription info
...
public void run() {
// setup proxy named "<interface>Proxy"
...
private String subscriptionId;
...
try {
// subscriptionId must have been assigned by previous call
...
<interface>Proxy.unsubscribeFrom<Attribute>(subscriptionId);
} catch (JoynrRuntimeException e) {
// handle error
}
}
A broadcast subscription informs the application in case a broadcast is fired by a provider. The output value is returned to the consumer via a callback function.
A broadcast is selective only if it is declared with the selective keyword in Franca, otherwise it is non-selective.
Non-selective broadcast subscriptions can be passed optional partitions. A partition is a hierarchical list of strings similar to a URL path. Subscribing to a partition will cause only those broadcasts to be sent to the consumer that match the partition. Note that the partition is set when subscribing on the consumer side, and must match the partition set on the provider side when the broadcast is performed.
Example: a consumer could set a partition of "europe", "germany", "munich" to receive broadcasts for Munich only. The matching provider would use the same partition when sending the broadcast.
The subscriptionId can be retrieved via the callback (onSubscribed) and via the future returned by the subscribeTo call. It can be used later to update the subscription or to unsubscribe from it. The subscriptionId will be available when the subscription is successfully registered at the provider. If the subscription failed, a SubscriptionException will be returned via the callback (onError) and thrown by future.get().
To receive the subscription, a callback has to be provided which is done providing a listener class as outlined below. Since the callback is called by a communication middleware thread, it should not be blocked, wait for user interaction, or do larger computation. The callback methods (onReceive, onSubscribed, onError) are optional. Only the required methods have to be implemented.
import joynr.MulticastSubscriptionQos;
...
// for any Franca broadcast named "<Broadcast>" used
import joynr.<Package>.<Interface>BroadcastInterface.<Broadcast>BroadcastAdapter;
...
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
public void run() {
// setup proxy named "<interface>Proxy"
...
private Future<String> subscriptionIdFuture;
...
try {
long expiryDateMs;
int publicationTtlMs;
String partitionLevel1;
String partitionLevel2;
...
// provide values for expiryDateMs, publicationTtlMs here
...
MulticastSubscriptionQos qos =
new MulticastSubscriptionQos()
.setExpiryDateMs(expiryDateMs)
.setPublicationTtlMs(publicationTtlMs);
...
subscriptionIdFuture = <interface>Proxy.subscribeTo<Broadcast>Broadcast(
new <Broadcast>BroadcastAdapter() {
// Gets called on every received publication
@Override
public void onReceive(<AttributeType> value) {
// handle broadcast info
}
// Gets called when the subscription is successfully registered at the provider
@Override
public void onSubscribed(String subscriptionId) {
// save the subscriptionId for updating the subscription or unsubscribing from it
// the subscriptionId can also be taken from the future returned by the subscribeTo call
}
// Gets called on every error that is detected on the subscription
@Override
public void onError(JoynrRuntimeException e) {
// handle error
}
},
qos,
partitionLevel1, // optional partitions
...
partitionLevelN // optional partitions
);
...
} catch (DiscoveryException e) {
// handle error
} catch (JoynrCommunicationExceptin e) {
// handle error
}
...
// get the subscriptionId from the Future when needed
String subscriptionId;
if (subscriptionIdFuture != null) {
try {
subscriptionId = subscriptionIdFuture.get();
} catch (JoynrRuntimeException | InterruptedException | ApplicationException e) {
// handle error
}
}
}
The subscribeTo method can also be used to update an existing subscription, when the subscriptionId is passed as additional parameter as follows:
subscriptionIdFuture = <interface>Proxy.subscribeTo<Broadcast>Broadcast(
subscriptionId,
new <Broadcast>BroadcastAdapter() {
// Gets called on every received publication
@Override
public void onReceive(<AttributeType> value) {
// handle broadcast info
}
// Gets called when the subscription is successfully updated at the provider
@Override
public void onSubscribed(String subscriptionId) {
// save the subscriptionId for updating the subscription or unsubscribing from it
// the subscriptionId can also be taken from the future returned by the subscribeTo call
}
// Gets called on every error that is detected on the subscription
@Override
public void onError(JoynrRuntimeException e) {
// handle error
}
},
qos,
partitionLevel1, // optional partitions
...
partitionLevelN // optional partitions
);
Selective Broadcasts use filter logic implemented by the provider and filter parameters set by the consumer to send only those broadcasts from the provider to the consumer that pass the filter. The broadcast output values are passed to the consumer via callback.
The subscriptionId can be retrieved via the callback (onSubscribed) and via the future returned by the subscribeTo call (see section Subscribing to a (non-selective) broadcast).
To receive the subscription, a callback has to be provided (cf. section Subscribing to a (non-selective) broadcast).
In addition to the normal broadcast subscription, the filter parameters for this broadcast must be
created and initialized as additional parameters to the subscribeTo
method. These filter
parameters are used to receive only those broadcasts matching the provided filter criteria.
import joynr.OnChangeSubscriptionQos;
...
// for any Franca broadcast named "<Broadcast>" used
import joynr.<Package>.<Interface>BroadcastInterface.<Broadcast>BroadcastAdapter;
import joynr.<Package>.<Interface>BroadcastInterface.<Broadcast>FilterParameters;
...
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
public void run() {
// setup proxy named "<interface>Proxy"
...
private Future<String> subscriptionIdFuture;
...
try {
long minIntervalMs;
long expiryDateMs;
long publicationTtlMs;
long validityMs;
...
// provide values for minIntervalMs, expiryDateMs, publicationTtlMs, validityMs here
...
OnChangeSubscriptionQos qos =
new OnChangeSubscriptionQos()
.setMinIntervalMs(minIntervalMs)
.setExpiryDateMs(expiryDateMs)
.setPublicationTtlMs(publicationTtlMs)
.setValidityMs(validityMs);
<Broadcast>FilterParameters filter = new <Broadcast>FilterParameters();
// foreach BroadcastFilterAttribute of that filter
filter.setBroadcastFilter<Attribute>(value);
...
subscriptionIdFuture = <interface>Proxy.subscribeTo<Broadcast>Broadcast(
new <Broadcast>BroadcastAdapter() {
// Gets called on every received publication
@Override
public void onReceive(<AttributeType> value) {
// handle broadcast info
}
// Gets called when the subscription is successfully registered at the provider
@Override
public void onSubscribed(String subscriptionId) {
// save the subscriptionId for updating the subscription or unsubscribing from it
// the subscriptionId can also be taken from the future returned by the subscribeTo call
}
// Gets called on every error that is detected on the subscription
@Override
public void onError(JoynrRuntimeException e) {
// handle error
}
},
qos,
filter
);
...
} catch (DiscoveryException e) {
// handle error
} catch (JoynrCommunicationExceptin e) {
// handle error
}
...
// to retrieve the subscriptionId, please refer to section "subscribing to a broadcast unconditionally"
}
The subscribeTo method can also be used to update an existing subscription, when the subscriptionId is passed as additional parameter as follows:
subscriptionIdFuture = <interface>Proxy.subscribeTo<Broadcast>Broadcast(
subscriptionId,
new <Broadcast>BroadcastAdapter() {
// Gets called on every received publication
@Override
public void onReceive(... OutputParameters ...) {
// handle broadcast info
}
// Gets called when the subscription is successfully updated at the provider
@Override
public void onSubscribed(String subscriptionId) {
// save the subscriptionId for updating the subscription or unsubscribing from it
// the subscriptionId can also be taken from the future returned by the subscribeTo call
}
// Gets called on every error that is detected on the subscription
@Override
public void onError(JoynrRuntimeException e) {
// handle error
}
},
qos,
filter
);
Unsubscribing from a broadcast subscription requires the subscriptionId returned by the earlier subscribe call.
public void run() {
// setup proxy named "<interface>Proxy"
...
private String subscriptionId;
...
try {
<interface>Proxy.unsubscribeFrom<Broadcast>Broadcast(subscriptionId);
...
} catch (DiscoveryException e) {
// handle error
} catch (JoynrCommunicationExceptin e) {
// handle error
}
...
}
The shutdown method should be called on exit of the application. Inside the shutdown()
method , the consumer should unsubscribe from any attributes and broadcasts it was subscribed to and
terminate the instance.
@Override
public void shutdown() {
// for all proxies
if (<interface>Proxy != null) {
if (subscribed) {
// unsubscribe from attributes
// unsubscribe from broadcasts
}
}
runtime.shutdown(true);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// handle error
}
System.exit(0);
}
The Java Provider mainly consists of the following classes:
- A generic Provider Application Class
- One Provider Class for each Franca interface to be supported
The provider application class is used to register a provider class for each Franca interface to be supported.
import io.joynr.accesscontrol.StaticDomainAccessControlProvisioning;
import io.joynr.accesscontrol.StaticDomainAccessControlProvisioningModule;
import io.joynr.exceptions.JoynrRuntimeException;
import io.joynr.messaging.MessagingPropertyKeys;
import io.joynr.runtime.AbstractJoynrApplication;
import io.joynr.runtime.JoynrApplication;
import io.joynr.runtime.JoynrApplicationModule;
import io.joynr.runtime.JoynrInjectorFactory;
import java.io.IOException;
import java.util.Properties;
import com.google.inject.Inject;
import com.google.inject.Module;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
The class must extend AbstractJoynrApplication
and can theoretically serve multiple Franca
interfaces.
For each Franca interface implemented, the providing application creates an instance of
My<Interface>Provider
, which implements the service for that particular interface, and
registers it as a provider at the Joynr Middleware.
The example below shows the code for one interface:
package myPackage;
...
// required imports
...
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
public class MyProviderApplication extends AbstractJoynrApplication {
private static final String AUTH_TOKEN = "MyProvider_authToken";
public static final String STATIC_PERSISTENCE_FILE = "provider-joynr.properties";
private My<Interface>Provider <interface>Provider = null;
public static void main(String[] args) {
// ...
}
@Override
public void run() {
// ...
}
@Override
public void shutdown() {
// ...
}
}
public static void main(String[] args) {
String localDomain = "<ProviderDomain>";
Properties joynrConfig = new Properties();
joynrConfig.setProperty(MessagingPropertyKeys.PERSISTENCE_FILE, STATIC_PERSISTENCE_FILE);
joynrConfig.setProperty(PROPERTY_JOYNR_DOMAIN_LOCAL, localDomain);
Properties appConfig = new Properties();
provisionAccessControl(joynrConfig, localDomain);
Module runtimeModule = Modules.override(new CCInProcessRuntimeModule()).with(new MqttPahoModule());
JoynrApplication joynrApplication =
new JoynrInjectorFactory(joynrConfig,
runtimeModule,
new StaticDomainAccessControlProvisioningModule()).createApplication(
new JoynrApplicationModule(MyProviderApplication.class, appConfig)
);
joynrApplication.run();
joynrApplication.shutdown();
}
The ProviderQos
has the following members:
- customParameters e.g. the key-value for the arbitration strategy Keyword during discovery
- priority the priority used for arbitration strategy HighestPriority during discovery
- scope the Provider scope (see below), used in discovery
- supportsOnChangeSubscriptions whether the provider supports subscriptions on changes
The ProviderScope can be
- LOCAL The provider will be registered in the local capability directory
- GLOBAL The provider will be registered in the local and global capability directory
Example:
ProviderQos providerQos = new ProviderQos();
providerQos.setCustomParameters(customParameters);
providerQos.setPriority(100);
providerQos.setScope(ProviderScope.GLOBAL);
providerQos.setSupportsOnChangeSubscriptions(true);
The run method registers the interface specific provider class instance. From that time on, the provider will be reachable from outside and react on incoming requests (e.g. method RPC etc.). It can be found by consumers through Discovery. Any specific broadcast filters must be added prior to registry.
@Override
public void run() {
<interface>Provider = new My<Interface>Provider();
// for any filter of a broadcast with filter
<interface>Provider.addBroadcastFilter(new <Filter>BroadcastFilter());
ProviderQos providerQos = new ProviderQos();
// use setters on providerQos as required
// set the priority, used for arbitration by highest priority
long priorityValue;
// set priorityValue
providerQos.setPriority(priorityValue);
runtime.registerProvider(localDomain, <interface>Provider, providerQos);
// loop here
}
By default, joynr generates a participantID for the provider instance. This participantID is persisted (see PROPERTY_PARTICIPANTIDS_PERSISISTENCE_FILE in Joynr Java Settings) and reused when a provider for the same interface is registered again on the same domain (e.g. after a restart).
A provider can also be registered with a fixed (predefined) participantID by adding a property to the joynrConfig, see The main method, before the runtimeModule is created.
import io.joynr.capabilities.ParticipantIdKeyUtil;
...
public static void main(String[] args) {
...
joynrConfig.setProperty(
ParticipantIdKeyUtil.getProviderParticipantIdKey(localDomain, <interface>Provider.class),
<customProviderParticipantID>);
...
}
The property key is also used as key in the persistence file. It is created as follows:
<JOYNR_PARTICIPANT_PREFIX><DOMAIN>.<INTERFACE_NAME>.v<MAJOR_VERSION>
It is strongly recommended to use the ParticipantIdKeyUtil to create the key as the key format might
change again in the future.
Note: The provided fixed participantId is only used if there is no entry in the persistence file. If the provider has already been registered with a generated (default) participantId before, the persistence file or the entry for the provider has to be deleted to enable the fixed participantId.
The shutdown
method should be called on exit of the application. It should cleanly unregister
any providers the application had registered earlier.
@Override
@SuppressWarnings(value = "DM_EXIT", justification = "WORKAROUND to be removed")
public void shutdown() {
if (<interface>Provider != null) {
try {
runtime.unregisterProvider(localDomain, <interface>Provider);
} catch (JoynrRuntimeException e) {
// handle error
}
}
runtime.shutdown(true);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// do nothing; exiting application
}
System.exit(0);
}
The following allows anyone to access interface:
private static void provisionAccessControl(Properties properties, String domain) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.enableDefaultTypingAsProperty(DefaultTyping.JAVA_LANG_OBJECT, "_typeName");
MasterAccessControlEntry newMasterAccessControlEntry = new MasterAccessControlEntry(
"*",
domain,
MyProvider.INTERFACE_NAME,
TrustLevel.LOW,
Arrays.asList(TrustLevel.LOW),
TrustLevel.LOW,
Arrays.asList(TrustLevel.LOW),
"*",
Permission.YES,
Arrays.asList(Permission.YES)
);
MasterAccessControlEntry[] provisionedAccessControlEntries = { newMasterAccessControlEntry };
String provisionedAccessControlEntriesAsJson = objectMapper.writeValueAsString(provisionedAccessControlEntries);
properties.setProperty(StaticDomainAccessControlProvisioning.PROPERTY_PROVISIONED_MASTER_ACCESSCONTROLENTRIES,
provisionedAccessControlEntriesAsJson);
}
The provider class implements the attributes, methods and broadcasts of a particular Franca interface.
The following Joynr Java imports are required:
import io.joynr.provider.Deferred;
import io.joynr.provider.DeferredVoid;
import io.joynr.provider.Promise;
import joynr.<Package>.<Interface>AbstractProvider;
The provider class must either extend the generated class Default<Interface>Provider
or
alternatively at least its super class <Interface>AbstractProvider
.
In the latter case it must as well implement getter and / or setter methods itself for each Franca
attribute (where required). The class Default<Interface>Provider
already includes default
implementations of getter and setter methods (where required).
In both cases it must implement a method for each method of the Franca interface. In order to send
broadcasts the generated code of the super class <Interface>AbstractProvider
can be used.
If the value of a notifiable attribute gets changed directly inside the implementation of a method
or (non-default) setter, the <Attribute>Changed(<Attribute>)
method needs to be called in order
to inform subscribers about the value change.
package myPackage;
...
// required imports
...
public class My<Interface>Provider extends <Interface>AbstractProvider {
// member variables realizing the Franca interfaces Attributes go here, if any
<AttributeType> <Attribute>;
...
// default constructor
public My<Interface>Provider() {
// initialize members and attributes here, if any
}
...
// foreach Franca interface "<Attribute>" provide a getter method
...
// foreach Franca interface "<method>" provide an implementation
...
// foreach Franca "<broadcast>" you can use the provided method to fire the event
...
}
The asynchronous getter methods return the current value of an attribute. Since the current thread is blocked while the getter runs, activity should be kept as short as possible. In most cases, when a simple element is returned, the method can resolve the Promise immediately. However, if longer activity is required, it should be done in the background and the deferred should also be resolved by a background thread.
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
@Override
public Promise<Deferred<<AttributeType>>> get<Attribute>() {
Deferred<<AttributeType>> deferred = new Deferred<<AttributeType>>();
<AttributeType> value;
...
// start some activity to get the value
// if complex, execute this asynchronously;
// once the value is available, resolve the Promise
// may be run from background thread, if required
deferred.resolve(value);
// if an error occurs, the Deferred can be rejected with a ProviderRuntimeException
deferred.reject(new ProviderRuntimeException(<errorMessage>));
...
// from current thread
return new Promise<Deferred<<AttributeType>>>(deferred);
}
Since the current thread is blocked while the setter runs, activity should be kept as short as possible. In most cases, when a simple element is returned, the method can resolve the Promise immediately. However, if longer activity is required, it should be done in the background and the deferred should also be resolved by a background thread.
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
@Override
public Promise<DeferredVoid> set<Attribute>(<AttributeType> <attribute>) {
DeferredVoid deferred = new DeferredVoid();
...
// start some activity to set the value
// if complex, execute this asynchronously;
// once the value is set, resolve the Promise
// may be run from background thread, if required
deferred.resolve();
// if an error occurs, the Deferred can be rejected with a ProviderRuntimeException
deferred.reject(new ProviderRuntimeException(<errorMessage>));
// if attribute is notifiable (not marked as noSubscriptions in the Franca model),
// inform subscribers about the value change
<Attribute>Changed(<Attribute>);
...
// from current thread
return new Promise<DeferredVoid>(deferred);
}
The provider should always implement RPC calls asynchronously in order to not block the main thread longer than required. Also it needs to take care not to overload the server, e.g. it must not accept unlimited amount of RPC requests causing background activity. After exceeding a limit, further calls should be rejected until the number of outstanding activities falls below the limit again.
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
// in case of single return parameter
@Override
public Promise<Deferred<<ReturnType>>> <method>(... parameters ...) {
Deferred<<ReturnType>> deferred = new Deferred<<ReturnType>>();
<ReturnType> returnValue;
...
// start some activity to perform the task;
// if complex, execute this asynchronously by background thread;
// once the task is finished, resolve the Promise providing
// the returnValue, if any (see following line).
deferred.resolve(returnValue);
// For methods which are modelled with error enumerations, the Promise can be rejected with such
// an error enumeration. It is then wrapped in an ApplicationException which serves as container
// for the actual error enumeration.
deferred.reject(<ErrorEnum>.<VALUE>);
// If no errors are modelled, the Deferred can be rejected with a ProviderRuntimeException
deferred.reject(new ProviderRuntimeException(<errorMessage>));
...
// from current thread
return new Promise<Deferred<<ReturnType>>>(deferred);
}
// in case of multiple return parameters
@Override
public Promise<<Method>Deferred>> <method>(... parameters ...) {
<Method>Deferred deferred = new <Method>Deferred();
<ReturnType1> returnValue1;
...
<ReturnTypeN> returnValueN;
...
// start some activity to perform the task;
// if complex, execute this asynchronously by background thread;
// once the task is finished, resolve the Promise providing
// the returnValue, if any (see following line).
deferred.resolve(returnValue1, ..., returnValueN);
// For methods which are modelled with error enumerations, the Promise can be rejected with such
// an error enumeration. It is then wrapped in an ApplicationException which serves as container
// for the actual error enumeration.
deferred.reject(<ErrorEnum>.<VALUE>);
// If no errors are modelled, the Deferred can be rejected with a ProviderRuntimeException
deferred.reject(new ProviderRuntimeException(<errorMessage>));
...
// from current thread
return new Promise<<Method>Deferred>(deferred);
}
A broadcast can be emitted using the following method. Note that firing a broadcast blocks the current thread until the message is serialized.
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
public void fire<Broadcast>Event {
<OutputValueType1> outputValue1;
...
<OutputValueTypeN> outputValueN;
...
// setup outputValue(s)
...
// use the method provided by generators to send the broadcast
fire<Broadcast>(outputValue1, ... , outputValueN);
}
Optionally a partition can be set when firing a (non-selective) broadcast:
fire<Broadcast>(outputValue1,
outputValue2,
partitionLevel1,
...
partitionLevelN);
// Note: wildcards are only allowed on consumer side The partition syntax is explained in the multicast concept
In contrast to unfiltered broadcasts, to realize selective (filtered) broadcasts, the filter logic has to be implemented and registered by the provider. If multiple filters are registered on the same provider and broadcast, all filters are applied in a chain and the broadcast is only delivered if all filters in the chain return true.
A broadcast filter class implements a filtering function called filter()
which returns a
boolean value indicating whether the broadcast should be delivered. The input parameters of the
filter()
method reflect the output values of the broadcast.
import joynr.<Package>.<Interface>BroadcastInterface.<Broadcast>BroadcastFilterParameters;
import joynr.<Package>.<Interface><Broadcast>BroadcastFilter;
// for any Franca type named "<Type>" used
import joynr.<Package>.<TypeCollection>.<Type>;
...
public class <Filter>BroadcastFilter extends <Interface><Broadcast>BroadcastFilter {
...
@Override
public boolean filter(
<OutputValueType1> outputValue1,
...
<OutputValueTypeN> outputValueN,
<Broadcast>BroadcastFilterParameters filterParameters
) {
boolean returnValue;
...
// calculate result
...
return returnValue;
}
}
Joynr provides metrics which can be used to detect invalid states and situations which require a restart of an instance. In order to access this information, inject an implementation of the following interfaces via Guice:
io.joynr.messaging.mqtt.statusmetrics.MqttStatusReceiver
io.joynr.statusmetrics.StatusReceiver
See the documentation of each interface for more information.
In order to prevent queued messages being lost if a runtime should unexpectedly quit, e.g. because
the application crashes or the container it is running in is terminated forcibly, you can provide
one implementation of MessagePersister
. This gives the chance to externally persist messages
before they are passed into the joynr runtime for processing. See the JavaDoc in
io.joynr.messaging.persistence.MessagePersister
for details on how to implement this interface.
For each message which is added to the queue, the registered instance of MessagePersister
will be
asked to persist the given message. The decision whether the message needs to be persisted or not
can be made two ways:
- The implementation of
MessagePersister
decides itself. It can for example only persist messages which have a certain recipient set. - The sender of the message has set a previously agreed application-specific custom header
(key/value pair) and thus has marked his decision that this message is important and should be
persisted. The implementation of
MessagePersister
must respect this and persist messages having the agreed header.
The decision how to persist a given message, e.g. to a file, database etc., should be made the same way.
After the message is persisted from the MessagePersister
(or decided not to) the message is
regularly added to the in-memory queue for processing.
Once a message has been processed from joynr, the persister is asked to remove it from persistence. There is a small chance that if the runtime crashes hard after the message has been persisted and before the message is processed and removed from persistence, that it will be added for processing at the next startup of the runtime. You should guard against this in your business code. This is not an issue for reply messages, as the reply handling callback will already have been removed, and hence the message will simply be discarded.
At startup time, the message persister is asked to provide all messages it has for the given message queue (by its ID), and these messages are then added back into the in-memory message queue for processing. From there they are processed, and as described above are then, upon successful completion, handed to the persister to be removed.
See the examples/message-persistence project for an example of how to integrate a message persister implementation into your application.