Skip to content

Commit

Permalink
Update config listener
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed Feb 27, 2025
1 parent 7e2f125 commit bab5060
Showing 1 changed file with 43 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.parser.ConfigParser;
import com.jd.live.agent.core.util.type.ValuePath;
import com.jd.live.agent.governance.subscription.config.ConfigEvent;
import com.jd.live.agent.governance.subscription.config.ConfigEvent.EventType;
import com.jd.live.agent.governance.subscription.config.ConfigListener;
Expand All @@ -36,6 +37,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import static com.jd.live.agent.governance.subscription.config.ConfigListener.SYSTEM_ALL;
import static com.jd.live.agent.implement.service.config.nacos.client.NacosClientApi.DEFAULT_GROUP;
Expand All @@ -52,7 +54,7 @@ public class NacosConfigurator implements Configurator {

private final List<NacosSubscription> subscriptions;

private final Map<String, List<SynchronousListener>> listeners = new ConcurrentHashMap<>();
private final Map<String, NameListener> listeners = new ConcurrentHashMap<>();

private final AtomicReference<ConfigCache> ref = new AtomicReference<>();

Expand Down Expand Up @@ -96,7 +98,7 @@ public Object getProperty(String name) {
public void addListener(String name, ConfigListener listener) {
if (name != null && !name.isEmpty() && listener != null) {
SynchronousListener syncListener = new SynchronousListener(listener);
listeners.computeIfAbsent(name, k -> new CopyOnWriteArrayList<>()).add(syncListener);
listeners.computeIfAbsent(name, NameListener::new).addListener(syncListener);
if (!SYSTEM_ALL.equals(name)) {
long ver = version.get();
Object value = getProperty(name);
Expand All @@ -123,11 +125,9 @@ public void removeListener(String name, ConfigListener listener) {
* @param event The configuration event to be published.
*/
private void publish(ConfigEvent event) {
List<SynchronousListener> watchers = listeners.get(event.getName());
if (watchers != null) {
for (ConfigListener listener : watchers) {
listener.onUpdate(event);
}
NameListener listener = listeners.get(event.getName());
if (listener != null) {
listener.onUpdate(event);
}
}

Expand Down Expand Up @@ -247,6 +247,42 @@ public void receiveConfigInfo(String configInfo) {
}
}

private static class NameListener {

@Getter
private final String name;

@Getter
private final ValuePath path;

private final List<SynchronousListener> listeners = new CopyOnWriteArrayList<>();

NameListener(String name) {
this.name = name;
this.path = new ValuePath(name);
}

public void addListener(SynchronousListener listener) {
if (listener != null) {
listeners.add(listener);
}
}

public void removeIf(Predicate<SynchronousListener> predicate) {
listeners.removeIf(predicate);
}

public boolean isEmpty() {
return listeners.isEmpty();
}

public void onUpdate(ConfigEvent event) {
for (SynchronousListener listener : listeners) {
listener.onUpdate(event);
}
}
}

/**
* A static inner class that implements the ConfigListener interface and provides synchronized access to the listener.
*/
Expand Down

0 comments on commit bab5060

Please sign in to comment.