Skip to content

Commit

Permalink
Merge pull request #530 from elandau/bugfix/unregister_onChange
Browse files Browse the repository at this point in the history
API to unsubscribe to property change notifications
  • Loading branch information
elandau authored Nov 14, 2017
2 parents cc79f44 + 9c60288 commit efa67f8
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 8 deletions.
31 changes: 26 additions & 5 deletions archaius2-api/src/main/java/com/netflix/archaius/api/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@
* @param <T>
*/
public interface Property<T> extends Supplier<T> {
/**
* Token returned when calling onChange through which change notification can be
* unsubscribed.
*/
interface Subscription {
void unsubscribe();
}

/**
* Return the most recent value of the property.
*
Expand Down Expand Up @@ -72,21 +80,34 @@ public void accept(T t) {
default void removeListener(PropertyListener<T> listener) {}

/**
* Register for notification whenever the property value changes.
* @deprecated Use {@link Property#subscribe(Consumer)}
* @param consumer
*/
@Deprecated
default Subscription onChange(Consumer<T> consumer) {
return subscribe(consumer);
}

/**
* Subscribe for notification whenever the property value changes.
* {@link Property#onChange(Consumer)} should be called last when chaining properties
* since the notification only applies to the state of the chained property
* up until this point. Changes to subsequent Property objects returned from {@link Property#orElse}
* or {@link Property#map(Function)} will not trigger calls to this consumer.
* @param consumer
* @return This property object
* @return Subscription that may be unsubscribed to no longer get change notifications
*/
default void onChange(Consumer<T> consumer) {
addListener(new PropertyListener<T>() {
default Subscription subscribe(Consumer<T> consumer) {
PropertyListener<T> listener = new PropertyListener<T>() {
@Override
public void onChange(T value) {
consumer.accept(value);
}
});
};

addListener(listener);
return () -> removeListener(listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.netflix.archaius.api.Property;
import com.netflix.archaius.api.PropertyContainer;
import com.netflix.archaius.api.PropertyFactory;
import com.netflix.archaius.api.PropertyListener;

public class DefaultPropertyFactory implements PropertyFactory, ConfigListener {
private static final Logger LOG = LoggerFactory.getLogger(DefaultPropertyFactory.class);
Expand Down Expand Up @@ -192,7 +193,8 @@ private class PropertyImpl<T> implements Property<T> {
private final KeyAndType<T> keyAndType;
private final Supplier<T> supplier;
private final AtomicStampedReference<T> cache = new AtomicStampedReference<>(null, -1);

private final ConcurrentMap<PropertyListener<?>, Subscription> oldSubscriptions = new ConcurrentHashMap<>();

public PropertyImpl(KeyAndType<T> keyAndType, Supplier<T> supplier) {
this.keyAndType = keyAndType;
this.supplier = supplier;
Expand Down Expand Up @@ -226,8 +228,8 @@ public String getKey() {
}

@Override
public void onChange(Consumer<T> consumer) {
listeners.add(new Runnable() {
public Subscription subscribe(Consumer<T> consumer) {
Runnable action = new Runnable() {
private T current = get();
@Override
public synchronized void run() {
Expand All @@ -245,9 +247,32 @@ public synchronized void run() {
}
consumer.accept(current);
}
};

listeners.add(action);
return () -> listeners.remove(action);
}

@Deprecated
public void addListener(PropertyListener<T> listener) {
Subscription cancel = onChange(new Consumer<T>() {
@Override
public void accept(T t) {
listener.accept(t);
}
});
oldSubscriptions.put(listener, cancel);
}

/**
* Remove a listener previously registered by calling addListener
* @param listener
*/
@Deprecated
public void removeListener(PropertyListener<T> listener) {
Optional.ofNullable(oldSubscriptions.remove(listener)).ifPresent(Subscription::unsubscribe);
}

@Override
public Property<T> orElse(T defaultValue) {
return new PropertyImpl<T>(keyAndType, () -> Optional.ofNullable(supplier.get()).orElse(defaultValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.netflix.archaius.DefaultPropertyFactory;
import com.netflix.archaius.api.Config;
import com.netflix.archaius.api.Property;
import com.netflix.archaius.api.Property.Subscription;
import com.netflix.archaius.api.PropertyFactory;
import com.netflix.archaius.api.PropertyListener;
import com.netflix.archaius.api.config.SettableConfig;
Expand Down Expand Up @@ -217,6 +218,49 @@ public void onParseError(Throwable error) {
Assert.assertEquals(456, current.get().intValue());
}

@Test
public void unregisterOldCallback() {
SettableConfig config = new DefaultSettableConfig();

DefaultPropertyFactory factory = DefaultPropertyFactory.from(config);

PropertyListener<Integer> listener = Mockito.mock(PropertyListener.class);

Property<Integer> prop = factory.getProperty("foo").asInteger(1);
prop.addListener(listener);

Mockito.verify(listener, Mockito.never()).accept(Mockito.anyInt());
config.setProperty("foo", "2");
Mockito.verify(listener, Mockito.times(1)).accept(Mockito.anyInt());

prop.removeListener(listener);
config.setProperty("foo", "3");

Mockito.verify(listener, Mockito.times(1)).accept(Mockito.anyInt());
}

@Test
public void unsubscribeOnChange() {
SettableConfig config = new DefaultSettableConfig();

DefaultPropertyFactory factory = DefaultPropertyFactory.from(config);

Consumer<Integer> consumer = Mockito.mock(Consumer.class);

Property<Integer> prop = factory.getProperty("foo").asInteger(1);
Subscription sub = prop.onChange(consumer);

Mockito.verify(consumer, Mockito.never()).accept(Mockito.anyInt());
config.setProperty("foo", "2");
Mockito.verify(consumer, Mockito.times(1)).accept(Mockito.anyInt());

sub.unsubscribe();

config.setProperty("foo", "3");

Mockito.verify(consumer, Mockito.times(1)).accept(Mockito.anyInt());
}

@Test
public void chainedPropertyNoneSet() {
MapConfig config = MapConfig.builder().build();
Expand Down

0 comments on commit efa67f8

Please sign in to comment.