Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay send2 weight server #27

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class DefaultStoreConfiguration implements StoreConfiguration {
private static final int SCHEDULE_CLEAN_BEFORE_DISPATCH_TIMES_IN_HOUR = 24;
private static final int DEFAULT_SEGMENT_SCALE_IN_MIN = 60;

public static final String BROKER_WEIGHT_TASK_TIMER_INTERVAL = "broker.weight.task.timer.interval";
public static final int DEFAULT_BROKER_WEIGHT_TASK_TIMER_INTERVAL = 3 * 1000;

private volatile int segmentScale;
private volatile long inAdvanceLoadMillis;
private volatile long loadBlockingExitMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import com.google.common.collect.Maps;
import qunar.tc.qmq.broker.BrokerClusterInfo;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.broker.BrokerLoadBalance;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.broker.impl.PollBrokerLoadBalance;
import qunar.tc.qmq.common.ClientType;
import qunar.tc.qmq.common.Disposable;
import qunar.tc.qmq.configuration.DynamicConfig;
import qunar.tc.qmq.delay.DelayLogFacade;
import qunar.tc.qmq.delay.ScheduleIndex;
import qunar.tc.qmq.delay.sender.loadbalance.InSendingNumWeightLoadBalancer;
import qunar.tc.qmq.delay.sender.loadbalance.LoadBalancer;

import java.util.List;
import java.util.Map;
Expand All @@ -42,15 +42,15 @@ class SenderExecutor implements Disposable {
private static final int DEFAULT_SEND_THREAD = 1;

private final ConcurrentMap<String, SenderGroup> groupSenders = new ConcurrentHashMap<>();
private final BrokerLoadBalance brokerLoadBalance;
private final Sender sender;
private final DelayLogFacade store;
private final int sendThreads;
private final LoadBalancer balancer;

SenderExecutor(final Sender sender, DelayLogFacade store, DynamicConfig sendConfig) {
this.sender = sender;
this.store = store;
this.brokerLoadBalance = PollBrokerLoadBalance.getInstance();
this.balancer = new InSendingNumWeightLoadBalancer(sendConfig);
this.sendThreads = sendConfig.getInt("delay.send.threads", DEFAULT_SEND_THREAD);
}

Expand All @@ -62,7 +62,7 @@ void execute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandle
}

private void doExecute(final SenderGroup group, final List<ScheduleIndex> list, final SenderGroup.ResultHandler handler) {
group.send(list, sender, handler);
group.send(list, sender, handler, balancer);
}

private Map<SenderGroup, List<ScheduleIndex>> groupByBroker(final List<ScheduleIndex> indexList, final BrokerService brokerService) {
Expand Down Expand Up @@ -100,7 +100,7 @@ private SenderGroup getGroup(BrokerGroupInfo groupInfo, int sendThreads) {

private BrokerGroupInfo loadGroup(String subject, BrokerService brokerService) {
BrokerClusterInfo cluster = brokerService.getClusterBySubject(ClientType.PRODUCER, subject);
return brokerLoadBalance.loadBalance(cluster, null);
return balancer.select(cluster);
}

private Map<String, List<ScheduleIndex>> groupBySubject(List<ScheduleIndex> list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import qunar.tc.qmq.delay.DelayLogFacade;
import qunar.tc.qmq.delay.ScheduleIndex;
import qunar.tc.qmq.delay.monitor.QMon;
import qunar.tc.qmq.delay.sender.loadbalance.BrokerGroupStats;
import qunar.tc.qmq.delay.sender.loadbalance.LoadBalancer;
import qunar.tc.qmq.delay.store.model.ScheduleSetRecord;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.netty.exception.ClientSendException;
Expand Down Expand Up @@ -62,33 +64,37 @@ public class SenderGroup implements Disposable {
this.executorService = new ThreadPoolExecutor(1, sendThreads, 1L, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(), new ThreadFactoryBuilder()
.setNameFormat("delay-sender-" + groupInfo.getGroupName() + "-%d").build());

Metrics.gauge("sendGroupQueueSize", new String[]{"brokerGroup"}, new String[]{groupInfo.getGroupName()}, () -> (double) executorService.getQueue().size());
}

public void send(final List<ScheduleIndex> records, final Sender sender, final ResultHandler handler) {
executorService.execute(() -> doSend(records, sender, handler));
public void send(final List<ScheduleIndex> records, final Sender sender, final ResultHandler handler, final LoadBalancer balancer) {
final BrokerGroupStats stats = balancer.getBrokerGroupStats(groupInfo.get());
stats.incrementToSendCount(records.size());
executorService.execute(() -> doSend(records, sender, handler, balancer));
}

private void doSend(final List<ScheduleIndex> batch, final Sender sender, final ResultHandler handler) {
private void doSend(final List<ScheduleIndex> batch, final Sender sender, final ResultHandler handler, final LoadBalancer balancer) {
BrokerGroupInfo groupInfo = this.groupInfo.get();
String groupName = groupInfo.getGroupName();
List<List<ScheduleIndex>> partitions = Lists.partition(batch, MAX_SEND_BATCH_SIZE);

for (List<ScheduleIndex> partition : partitions) {
send(sender, handler, groupInfo, groupName, partition);
send(sender, handler, groupInfo, partition, balancer);
}
}

private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List<ScheduleIndex> list) {
private void send(final Sender sender, final ResultHandler handler, final BrokerGroupInfo groupInfo, final List<ScheduleIndex> list, final LoadBalancer balancer) {
try {
long start = System.currentTimeMillis();
List<ScheduleSetRecord> records = store.recoverLogRecord(list);
QMon.loadMsgTime(System.currentTimeMillis() - start);

Datagram response = sendMessages(records, sender);
release(records);
monitor(list, groupName);
monitor(list, groupInfo.getGroupName());
if (response == null) {
handler.fail(list);
groupInfo.markFailed();
fail(list, groupInfo.getGroupName(), handler);
} else {
final int responseCode = response.getHeader().getCode();
final Map<String, SendResult> resultMap = getSendResult(response);
Expand All @@ -98,9 +104,7 @@ private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInf
groupInfo.markFailed();
}

monitorSendFail(list, groupInfo.getGroupName());

handler.fail(list);
fail(list, groupInfo.getGroupName(), handler);
return;
}

Expand All @@ -121,11 +125,19 @@ private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInf
handler.success(records, failedMessageIds);
}
} catch (Throwable e) {
LOGGER.error("sender group send batch failed,broker:{},batch size:{}", groupName, list.size(), e);
handler.fail(list);
LOGGER.error("sender group send batch failed,broker:{},batch size:{}", groupInfo.getGroupName(), list.size(), e);
fail(list, groupInfo.getGroupName(), handler);
} finally {
BrokerGroupStats stats = balancer.getBrokerGroupStats(groupInfo);
stats.decrementToSendCount(list.size());
}
}

private void fail(final List<ScheduleIndex> list, final String groupName, final ResultHandler handler) {
monitorSendFail(list, groupName);
handler.fail(list);
}

private void release(List<ScheduleSetRecord> records) {
for (ScheduleSetRecord record : records) {
record.release();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package qunar.tc.qmq.delay.sender.loadbalance;

import qunar.tc.qmq.broker.BrokerGroupInfo;

import java.util.concurrent.atomic.AtomicLong;

/**
* @author xufeng.deng [email protected]
* @since 2019-01-08 15:08
*/
public class BrokerGroupStats {
private final BrokerGroupInfo brokerGroupInfo;

// send time

// send failed

// send success

// to send count
private final AtomicLong toSend;

public BrokerGroupStats(final BrokerGroupInfo brokerGroupInfo) {
this.brokerGroupInfo = brokerGroupInfo;
this.toSend = new AtomicLong(0);
}

public void incrementToSendCount(long toSendNum) {
toSend.addAndGet(toSendNum);
}

public void decrementToSendCount(long sendNum) {
long count = toSend.get();
toSend.set(count - sendNum);
}

long getToSendCount() {
return toSend.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package qunar.tc.qmq.delay.sender.loadbalance;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.broker.BrokerClusterInfo;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.configuration.DynamicConfig;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static qunar.tc.qmq.delay.config.DefaultStoreConfiguration.BROKER_WEIGHT_TASK_TIMER_INTERVAL;
import static qunar.tc.qmq.delay.config.DefaultStoreConfiguration.DEFAULT_BROKER_WEIGHT_TASK_TIMER_INTERVAL;

/**
* @author xufeng.deng [email protected]
* @since 2019-01-08 16:03
*/
public class InSendingNumWeightLoadBalancer extends RandomLoadBalancer {
private static final Logger LOG = LoggerFactory.getLogger(InSendingNumWeightLoadBalancer.class);

private final LoadBalanceStats stats;

private volatile List<Long> accumulatedWeights = Collections.synchronizedList(new ArrayList<>());

private volatile List<BrokerGroupInfo> brokerGroups = Collections.synchronizedList(new ArrayList<>());

private final AtomicBoolean brokerWeightAssignmentInProgress = new AtomicBoolean(false);

private final Timer brokerWeightTimer;

private final Random random = new Random();

public InSendingNumWeightLoadBalancer(final DynamicConfig config) {
stats = new LoadBalanceStats();
brokerWeightTimer = new Timer("brokerWeightTimer", true);
scheduleBrokerWeight(config);
}

private void scheduleBrokerWeight(final DynamicConfig config) {
brokerWeightTimer.schedule(new DynamicBrokerGroupWeightTask(), 0, config.getInt(BROKER_WEIGHT_TASK_TIMER_INTERVAL, DEFAULT_BROKER_WEIGHT_TASK_TIMER_INTERVAL));
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Stopping brokerWeightTimer.");
brokerWeightTimer.cancel();
}));
}

@Override
public BrokerGroupInfo select(BrokerClusterInfo clusterInfo) {
List<BrokerGroupInfo> arrivalGroups = clusterInfo.getGroups();
if (arrivalGroups == null || arrivalGroups.isEmpty()) return null;

List<Long> currentWeights = getAccumulatedWeights();
List<BrokerGroupInfo> stayGroupInfos = getBrokerGroups();

int groupsSize = arrivalGroups.size();
refreshBrokerGroups(arrivalGroups, stayGroupInfos);

BrokerGroupInfo brokerGroupInfo = null;
int cyclicCount = 0;
while (brokerGroupInfo == null && cyclicCount++ < groupsSize * 3) {
int brokerIndex = 0;
long maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
if (maxTotalWeight < 1000) {
brokerGroupInfo = super.select(clusterInfo);
} else {
long randomWeight = random.nextLong() * maxTotalWeight;
int n = 0;
for (Long l : currentWeights) {
if (l >= randomWeight) {
brokerIndex = n;
break;
} else {
++n;
}
}

brokerGroupInfo = stayGroupInfos.get(brokerIndex);

if (brokerGroupInfo == null) {
Thread.yield();
continue;
}

if (brokerGroupInfo.isAvailable()) {
return brokerGroupInfo;
}

brokerGroupInfo = null;
}
}

return brokerGroupInfo;
}

private void refreshBrokerGroups(List<BrokerGroupInfo> arrivalGroups, List<BrokerGroupInfo> stayBrokerGroups) {
Set<BrokerGroupInfo> oldSet = Sets.newHashSet(stayBrokerGroups);
Set<BrokerGroupInfo> newSet = Sets.newHashSet(arrivalGroups);
Set<BrokerGroupInfo> removals = Sets.difference(oldSet, newSet);
Set<BrokerGroupInfo> adds = Sets.difference(newSet, oldSet);
if (!removals.isEmpty() || !adds.isEmpty()) {
List<BrokerGroupInfo> attached = Lists.newArrayList(stayBrokerGroups);
attached.removeAll(removals);
attached.addAll(adds);
setBrokerGroups(attached);
}
}

class DynamicBrokerGroupWeightTask extends TimerTask {

@Override
public void run() {
BrokerWeight brokerWeight = new BrokerWeight();
try {
brokerWeight.maintainWeights();
} catch (Exception e) {
LOG.error("Error running DynamicBrokerGroupWeightTask.", e);
}
}
}

class BrokerWeight {
void maintainWeights() {
if (!brokerWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}

try {
doMaintain();
} catch (Exception e) {
LOG.error("Error calculating broker weights.");
} finally {
brokerWeightAssignmentInProgress.set(false);
}
}

private void doMaintain() {
long total = 0;
List<BrokerGroupInfo> groups = getBrokerGroups();
for (BrokerGroupInfo brokerGroup : groups) {
final BrokerGroupStats brokerGroupStats = stats.getBrokerGroupStats(brokerGroup);
total += brokerGroupStats.getToSendCount();
}

long weightSoFar = 0;
List<Long> finalWeights = Lists.newArrayListWithCapacity(groups.size());
for (BrokerGroupInfo brokerGroup : groups) {
final BrokerGroupStats brokerGroupStats = stats.getBrokerGroupStats(brokerGroup);
long weight = total - brokerGroupStats.getToSendCount();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setAccumulatedWeights(finalWeights);
}
}

@Override
public BrokerGroupStats getBrokerGroupStats(BrokerGroupInfo brokerGroupInfo) {
return stats.getBrokerGroupStats(brokerGroupInfo);
}

private void setAccumulatedWeights(final List<Long> weights) {
this.accumulatedWeights = weights;
}

private void setBrokerGroups(final List<BrokerGroupInfo> brokerGroups) {
this.brokerGroups = brokerGroups;
}

private List<BrokerGroupInfo> getBrokerGroups() {
return ImmutableList.copyOf(brokerGroups);
}

private List<Long> getAccumulatedWeights() {
return ImmutableList.copyOf(accumulatedWeights);
}

}
Loading