Skip to content

Commit

Permalink
Synchronize view change and retransmit task
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns committed Jan 16, 2024
1 parent 84bbaac commit 4b711b6
Showing 1 changed file with 75 additions and 42 deletions.
117 changes: 75 additions & 42 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package org.jgroups.protocols;

import org.jgroups.*;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.protocols.relay.RELAY;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.*;
import static org.jgroups.Message.Flag.NO_RELIABILITY;
import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK;
import static org.jgroups.Message.TransientFlag.OOB_DELIVERED;
import static org.jgroups.protocols.UnicastHeader3.DATA;

import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -24,10 +33,32 @@
import java.util.function.ToIntFunction;
import java.util.stream.Stream;

import static org.jgroups.Message.Flag.NO_RELIABILITY;
import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK;
import static org.jgroups.Message.TransientFlag.OOB_DELIVERED;
import static org.jgroups.protocols.UnicastHeader3.DATA;
import org.jgroups.Address;
import org.jgroups.EmptyMessage;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.ObjectMessage;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.protocols.relay.RELAY;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.AgeOutCache;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.ExpiryCache;
import org.jgroups.util.LongTuple;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.SeqnoList;
import org.jgroups.util.ShutdownRejectedExecutionHandler;
import org.jgroups.util.Table;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.TimeService;
import org.jgroups.util.Util;


/**
Expand Down Expand Up @@ -619,33 +650,35 @@ public Object down(Event evt) {
switch (evt.getType()) {

case Event.VIEW_CHANGE: // remove connections to peers that are not members anymore !
View view=evt.getArg();
List<Address> new_members=view.getMembers();
Set<Address> non_members=new HashSet<>(send_table.keySet());
non_members.addAll(recv_table.keySet());
members=new_members;
new_members.forEach(non_members::remove);
if(cache != null)
cache.removeAll(new_members);

if(!non_members.isEmpty()) {
log.trace("%s: closing connections of non members %s", local_addr, non_members);
// remove all non members, except those from remote sites: https://issues.redhat.com/browse/JGRP-2729
non_members.stream().filter(this::isLocal).forEach(this::closeConnection);
}
if(!new_members.isEmpty()) {
for(Address mbr: new_members) {
Entry e=send_table.get(mbr);
if(e != null && e.state() == State.CLOSING)
e.state(State.OPEN);
e=recv_table.get(mbr);
if(e != null && e.state() == State.CLOSING)
e.state(State.OPEN);
synchronized (this) {
View view = evt.getArg();
List<Address> new_members = view.getMembers();
Set<Address> non_members = new HashSet<>(send_table.keySet());
non_members.addAll(recv_table.keySet());
members = new_members;
new_members.forEach(non_members::remove);
if (cache != null)
cache.removeAll(new_members);

if (!non_members.isEmpty()) {
log.trace("%s: closing connections of non members %s", local_addr, non_members);
// remove all non members, except those from remote sites: https://issues.redhat.com/browse/JGRP-2729
non_members.stream().filter(this::isLocal).forEach(this::closeConnection);
}
if (!new_members.isEmpty()) {
for (Address mbr : new_members) {
Entry e = send_table.get(mbr);
if (e != null && e.state() == State.CLOSING)
e.state(State.OPEN);
e = recv_table.get(mbr);
if (e != null && e.state() == State.CLOSING)
e.state(State.OPEN);
}
}
xmit_task_map.keySet().retainAll(new_members);
last_sync_sent.removeExpiredElements();
break;
}
xmit_task_map.keySet().retainAll(new_members);
last_sync_sent.removeExpiredElements();
break;
}

return down_prot.down(evt); // Pass on to the layer below us
Expand Down Expand Up @@ -1328,7 +1361,7 @@ public int removeConnections(boolean remove_send_connections, boolean remove_rec
}

@ManagedOperation(description="Triggers the retransmission task")
public void triggerXmit() {
public synchronized void triggerXmit() {
// check for gaps in the received messages and ask senders to send them again
for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
Address target=entry.getKey(); // target to send retransmit requests to
Expand Down

0 comments on commit 4b711b6

Please sign in to comment.