-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathrebalancer.py
395 lines (374 loc) · 24.3 KB
/
rebalancer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
import django, json, secrets, asyncio
from asgiref.sync import sync_to_async
from django.db.models import Sum, F
from datetime import datetime, timedelta
from gui.lnd_deps import lightning_pb2 as ln
from gui.lnd_deps import lightning_pb2_grpc as lnrpc
from gui.lnd_deps import router_pb2 as lnr
from gui.lnd_deps import router_pb2_grpc as lnrouter
from gui.lnd_deps.lnd_connect import lnd_connect, async_lnd_connect
from os import environ
from typing import List
environ['DJANGO_SETTINGS_MODULE'] = 'lndg.settings'
django.setup()
from gui.models import Rebalancer, Channels, LocalSettings, Forwards, Autopilot
@sync_to_async
def get_out_cans(rebalance, auto_rebalance_channels):
try:
return list(auto_rebalance_channels.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).exclude(remote_pubkey=rebalance.last_hop_pubkey).values_list('chan_id', flat=True))
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Error getting outbound cands: {str(e)}")
@sync_to_async
def save_record(record):
try:
record.save()
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Error saving database record: {str(e)}")
@sync_to_async
def inbound_cans_len(inbound_cans):
try:
return len(inbound_cans)
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Error getting inbound cands: {str(e)}")
async def run_rebalancer(rebalance, worker):
try:
#Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target
auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target'))
outbound_cans = await get_out_cans(rebalance, auto_rebalance_channels)
if len(outbound_cans) == 0 and rebalance.manual == False:
print(f"{datetime.now().strftime('%c')} : No outbound_cans")
rebalance.status = 406
rebalance.start = datetime.now()
rebalance.stop = datetime.now()
await save_record(rebalance)
return None
elif str(outbound_cans).replace('\'', '') != rebalance.outgoing_chan_ids and rebalance.manual == False:
rebalance.outgoing_chan_ids = str(outbound_cans).replace('\'', '')
rebalance.start = datetime.now()
try:
#Open connection with lnd via grpc
stub = lnrpc.LightningStub(lnd_connect())
routerstub = lnrouter.RouterStub(async_lnd_connect())
chan_ids = json.loads(rebalance.outgoing_chan_ids)
timeout = rebalance.duration * 60
invoice_response = stub.AddInvoice(ln.Invoice(value=rebalance.value, expiry=timeout))
print(f"{datetime.now().strftime('%c')} : {worker} starting rebalance for: {rebalance.target_alias} {rebalance.last_hop_pubkey=} {rebalance.value=} {rebalance.duration=} {chan_ids=}")
async for payment_response in routerstub.SendPaymentV2(lnr.SendPaymentRequest(payment_request=str(invoice_response.payment_request), fee_limit_msat=int(rebalance.fee_limit*1000), outgoing_chan_ids=chan_ids, last_hop_pubkey=bytes.fromhex(rebalance.last_hop_pubkey), timeout_seconds=(timeout-5), allow_self_payment=True), timeout=(timeout+60)):
#print(f"{datetime.now().strftime('%c')} : DEBUG {worker} got a payment response: {payment_response.status=} {payment_response.failure_reason=} {payment_response.payment_hash=}")
if payment_response.status == 1 and rebalance.status == 0:
#IN-FLIGHT
rebalance.payment_hash = payment_response.payment_hash
rebalance.status = 1
await save_record(rebalance)
elif payment_response.status == 2:
#SUCCESSFUL
rebalance.status = 2
rebalance.fees_paid = payment_response.fee_msat/1000
successful_out = payment_response.htlcs[0].route.hops[0].pub_key
elif payment_response.status == 3:
#FAILURE
if payment_response.failure_reason == 1:
#FAILURE_REASON_TIMEOUT
rebalance.status = 3
elif payment_response.failure_reason == 2:
#FAILURE_REASON_NO_ROUTE
rebalance.status = 4
elif payment_response.failure_reason == 3:
#FAILURE_REASON_ERROR
rebalance.status = 5
elif payment_response.failure_reason == 4:
#FAILURE_REASON_INCORRECT_PAYMENT_DETAILS
rebalance.status = 6
elif payment_response.failure_reason == 5:
#FAILURE_REASON_INSUFFICIENT_BALANCE
rebalance.status = 7
elif payment_response.status == 0:
rebalance.status = 400
except Exception as e:
if str(e.code()) == 'StatusCode.DEADLINE_EXCEEDED':
rebalance.status = 408
else:
rebalance.status = 400
print(f"{datetime.now().strftime('%c')} : Error while sending payment: {str(e)}")
finally:
rebalance.stop = datetime.now()
await save_record(rebalance)
print(f"{datetime.now().strftime('%c')} : {worker} completed payment attempts for: {rebalance.payment_hash=}")
original_alias = rebalance.target_alias
inc=1.21
dec=2
if rebalance.status ==2:
await update_channels(stub, rebalance.last_hop_pubkey, successful_out)
#Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target
auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value*inc)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target'))
inbound_cans = auto_rebalance_channels.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1)
outbound_cans = await get_out_cans(rebalance, auto_rebalance_channels)
if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0:
next_rebalance = Rebalancer(value=int(rebalance.value*inc), fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1)
await save_record(next_rebalance)
print(f"{datetime.now().strftime('%c')} : RapidFire up {next_rebalance.target_alias=} {next_rebalance.value=} {rebalance.value=}")
else:
next_rebalance = None
# For failed rebalances, try in rapid fire with reduced balances until give up.
elif rebalance.status > 2 and rebalance.value > 69420:
#Previous Rapidfire with increased value failed, try with lower value up to 69420.
if rebalance.duration > 1:
next_value = await estimate_liquidity ( payment_response )
if next_value < 1000:
next_rebalance = None
return next_rebalance
else:
next_value = rebalance.value/dec
inbound_cans = auto_rebalance_channels.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1)
if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0:
next_rebalance = Rebalancer(value=int(next_value), fee_limit=round(rebalance.fee_limit/(rebalance.value/next_value), 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1)
await save_record(next_rebalance)
print(f"{datetime.now().strftime('%c')} : RapidFire Down {next_rebalance.target_alias=} {next_rebalance.value=} {rebalance.value=}")
else:
next_rebalance = None
else:
next_rebalance = None
return next_rebalance
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Error running rebalance attempt: {str(e)}")
@sync_to_async
def estimate_liquidity( payment ):
try:
estimated_liquidity = 0
if payment.status == 3:
attempt = None
for attempt in payment.htlcs:
total_hops=len(attempt.route.hops)
#Failure Codes https://github.com/lightningnetwork/lnd/blob/9f013f5058a7780075bca393acfa97aa0daec6a0/lnrpc/lightning.proto#L4200
#print(f"{datetime.now().strftime('%c')} : DEBUG Liquidity Estimation {attempt.attempt_id=} {attempt.status=} {attempt.failure.code=} {attempt.failure.failure_source_index=} {total_hops=} {attempt.route.total_amt=} {payment.value_msat/1000=} {estimated_liquidity=} {payment.payment_hash=}")
if attempt.failure.failure_source_index == total_hops:
#Failure from last hop indicating liquidity available
estimated_liquidity = attempt.route.total_amt if attempt.route.total_amt > estimated_liquidity else estimated_liquidity
chan_id=attempt.route.hops[len(attempt.route.hops)-1].chan_id
#print(f"{datetime.now().strftime('%c')} : DEBUG Liquidity Estimation {attempt.attempt_id=} {attempt.status=} {attempt.failure.code=} {chan_id=} {attempt.route.total_amt=} {payment.value_msat/1000=} {estimated_liquidity=} {payment.payment_hash=}")
print(f"{datetime.now().strftime('%c')} : Estimated Liquidity {estimated_liquidity=} {payment.payment_hash=} {payment.status=} {payment.failure_reason=}")
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Error estimating liquidity: {str(e)}")
estimated_liquidity = 0
return estimated_liquidity
@sync_to_async
def update_channels(stub, incoming_channel, outgoing_channel):
try:
# Incoming channel update
channel = stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(incoming_channel))).channels[0]
db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0]
db_channel.local_balance = channel.local_balance
db_channel.remote_balance = channel.remote_balance
db_channel.save()
# Outgoing channel update
channel = stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(outgoing_channel))).channels[0]
db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0]
db_channel.local_balance = channel.local_balance
db_channel.remote_balance = channel.remote_balance
db_channel.save()
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Error updating channel balances: {str(e)}")
@sync_to_async
def auto_schedule() -> List[Rebalancer]:
try:
#No rebalancer jobs have been scheduled, lets look for any channels with an auto_rebalance flag and make the best request if we find one
if LocalSettings.objects.filter(key='AR-Enabled').exists():
enabled = int(LocalSettings.objects.filter(key='AR-Enabled')[0].value)
else:
LocalSettings(key='AR-Enabled', value='0').save()
enabled = 0
if enabled == 0:
return []
auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound'))*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target'))
if len(auto_rebalance_channels) == 0:
return []
if not LocalSettings.objects.filter(key='AR-Outbound%').exists():
LocalSettings(key='AR-Outbound%', value='75').save()
if not LocalSettings.objects.filter(key='AR-Inbound%').exists():
LocalSettings(key='AR-Inbound%', value='100').save()
outbound_cans = list(auto_rebalance_channels.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).values_list('chan_id', flat=True))
already_scheduled = Rebalancer.objects.exclude(last_hop_pubkey='').filter(status=0).values_list('last_hop_pubkey')
inbound_cans = auto_rebalance_channels.filter(auto_rebalance=True, inbound_can__gte=1).exclude(remote_pubkey__in=already_scheduled).order_by('-inbound_can')
if len(inbound_cans) == 0 or len(outbound_cans) == 0:
return []
if LocalSettings.objects.filter(key='AR-MaxFeeRate').exists():
max_fee_rate = int(LocalSettings.objects.filter(key='AR-MaxFeeRate')[0].value)
else:
LocalSettings(key='AR-MaxFeeRate', value='100').save()
max_fee_rate = 100
if LocalSettings.objects.filter(key='AR-Variance').exists():
variance = int(LocalSettings.objects.filter(key='AR-Variance')[0].value)
else:
LocalSettings(key='AR-Variance', value='0').save()
variance = 0
if LocalSettings.objects.filter(key='AR-WaitPeriod').exists():
wait_period = int(LocalSettings.objects.filter(key='AR-WaitPeriod')[0].value)
else:
LocalSettings(key='AR-WaitPeriod', value='30').save()
wait_period = 30
if not LocalSettings.objects.filter(key='AR-Target%').exists():
LocalSettings(key='AR-Target%', value='5').save()
if not LocalSettings.objects.filter(key='AR-MaxCost%').exists():
LocalSettings(key='AR-MaxCost%', value='65').save()
to_schedule = []
for target in inbound_cans:
target_fee_rate = int(target.local_fee_rate * (target.ar_max_cost/100))
if target_fee_rate > 0 and target_fee_rate > target.remote_fee_rate:
target_value = int(target.ar_amt_target+(target.ar_amt_target*((secrets.choice(range(-1000,1001))/1000)*variance/100)))
target_fee = round(target_fee_rate*target_value*0.000001, 3) if target_fee_rate <= max_fee_rate else round(max_fee_rate*target_value*0.000001, 3)
if target_fee == 0:
return []
if LocalSettings.objects.filter(key='AR-Time').exists():
target_time = int(LocalSettings.objects.filter(key='AR-Time')[0].value)
else:
LocalSettings(key='AR-Time', value='5').save()
target_time = 5
# TLDR: willing to pay 1 sat for every value_per_fee sats moved
if Rebalancer.objects.filter(last_hop_pubkey=target.remote_pubkey).exclude(status=0).exists():
last_rebalance = Rebalancer.objects.filter(last_hop_pubkey=target.remote_pubkey).exclude(status=0).order_by('-id')[0]
if not (last_rebalance.status == 2 or (last_rebalance.status in [3, 4, 5, 6, 7, 400, 408, 499] and (int((datetime.now() - last_rebalance.stop).total_seconds() / 60) > wait_period)) or (last_rebalance.status == 1 and (int((datetime.now() - last_rebalance.start).total_seconds() / 60) > wait_period))):
continue
print(f"{datetime.now().strftime('%c')} : Creating Auto Rebalance Request for: {target.chan_id}")
print(f"{datetime.now().strftime('%c')} : Request routing through: {outbound_cans}")
print(f"{datetime.now().strftime('%c')} : {target_value} / {target.ar_amt_target}")
print(f"{datetime.now().strftime('%c')} : {target_fee}")
print(f"{datetime.now().strftime('%c')} : {target_time}")
new_rebalance = Rebalancer(value=target_value, fee_limit=target_fee, outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=target.remote_pubkey, target_alias=target.alias, duration=target_time)
new_rebalance.save()
to_schedule.append(new_rebalance)
return to_schedule
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Error scheduling rebalances: {str(e)}")
@sync_to_async
def auto_enable():
try:
if LocalSettings.objects.filter(key='AR-Autopilot').exists():
enabled = int(LocalSettings.objects.filter(key='AR-Autopilot')[0].value)
else:
LocalSettings(key='AR-Autopilot', value='0').save()
enabled = 0
if LocalSettings.objects.filter(key='AR-APDays').exists():
apdays = int(LocalSettings.objects.filter(key='AR-APDays')[0].value)
else:
LocalSettings(key='AR-APDays', value='7').save()
apdays = 7
if enabled == 1:
lookup_channels=Channels.objects.filter(is_active=True, is_open=True, private=False)
channels = lookup_channels.values('remote_pubkey').annotate(outbound_percent=((Sum('local_balance')+Sum('pending_outbound'))*1000)/Sum('capacity')).annotate(inbound_percent=((Sum('remote_balance')+Sum('pending_inbound'))*1000)/Sum('capacity')).order_by()
filter_day = datetime.now() - timedelta(days=apdays)
forwards = Forwards.objects.filter(forward_date__gte=filter_day)
for channel in channels:
outbound_percent = int(round(channel['outbound_percent']/10, 0))
inbound_percent = int(round(channel['inbound_percent']/10, 0))
chan_list = lookup_channels.filter(remote_pubkey=channel['remote_pubkey']).values('chan_id')
routed_in_apday = forwards.filter(chan_id_in__in=chan_list).count()
routed_out_apday = forwards.filter(chan_id_out__in=chan_list).count()
iapD = 0 if routed_in_apday == 0 else int(forwards.filter(chan_id_in__in=chan_list).aggregate(Sum('amt_in_msat'))['amt_in_msat__sum']/10000000)/100
oapD = 0 if routed_out_apday == 0 else int(forwards.filter(chan_id_out__in=chan_list).aggregate(Sum('amt_out_msat'))['amt_out_msat__sum']/10000000)/100
for peer_channel in lookup_channels.filter(chan_id__in=chan_list):
#print('Processing: ', peer_channel.alias, ' : ', peer_channel.chan_id, ' : ', oapD, " : ", iapD, ' : ', outbound_percent, ' : ', inbound_percent)
if peer_channel.ar_out_target == 100 and peer_channel.auto_rebalance == True:
#Special Case for LOOP, Wos, etc. Always Auto Rebalance if enabled to keep outbound full.
print(f"{datetime.now().strftime('%c')} : Skipping AR enabled and 100% oTarget channel... {peer_channel.alias=} {peer_channel.chan_id=}")
pass
elif oapD > (iapD*1.10) and outbound_percent > 75:
#print('Case 1: Pass')
pass
elif oapD > (iapD*1.10) and inbound_percent > 75 and peer_channel.auto_rebalance == False:
#print('Case 2: Enable AR - o7D > i7D AND Inbound Liq > 75%')
peer_channel.auto_rebalance = True
peer_channel.save()
Autopilot(chan_id=peer_channel.chan_id, peer_alias=peer_channel.alias, setting='Enabled', old_value=0, new_value=1).save()
print(f"{datetime.now().strftime('%c')} : Auto Pilot Enabled: {peer_channel.alias=} {peer_channel.chan_id=} {oapD=} {iapD=}")
elif oapD < (iapD*1.10) and outbound_percent > 75 and peer_channel.auto_rebalance == True:
#print('Case 3: Disable AR - o7D < i7D AND Outbound Liq > 75%')
peer_channel.auto_rebalance = False
peer_channel.save()
Autopilot(chan_id=peer_channel.chan_id, peer_alias=peer_channel.alias, setting='Enabled', old_value=1, new_value=0).save()
print(f"{datetime.now().strftime('%c')} : Auto Pilot Disabled (3): {peer_channel.alias=} {peer_channel.chan_id=} {oapD=} {iapD=}" )
elif oapD < (iapD*1.10) and inbound_percent > 75:
#print('Case 4: Pass')
pass
else:
#print('Case 5: Pass')
pass
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Error during auto channel enabling: {str(e)}")
@sync_to_async
def get_pending_rebals():
try:
rebalances = Rebalancer.objects.filter(status=0).order_by('id')
return rebalances, len(rebalances)
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Error getting pending rebalances: {str(e)}")
shutdown_rebalancer = False
active_rebalances = []
async def async_queue_manager(rebalancer_queue):
print(f"{datetime.now().strftime('%c')} : Queue manager is starting...")
pending_rebalances, rebal_count = await get_pending_rebals()
if rebal_count > 0:
for rebalance in pending_rebalances:
await rebalancer_queue.put(rebalance)
try:
while True:
global active_rebalances
print(f"{datetime.now().strftime('%c')} : Queue currently has {rebalancer_queue.qsize()} items...")
print(f"{datetime.now().strftime('%c')} : There are currently {len(active_rebalances)} tasks in progress...")
print(f"{datetime.now().strftime('%c')} : Queue manager is checking for more work...")
await auto_enable()
scheduled = await auto_schedule()
if len(scheduled) > 0:
print(f"{datetime.now().strftime('%c')} : Scheduling {len(scheduled)} more jobs...")
for rebalance in scheduled:
await rebalancer_queue.put(rebalance)
elif rebalancer_queue.qsize() == 0 and len(active_rebalances) == 0:
print(f"{datetime.now().strftime('%c')} : Queue is still empty, stoping the rebalancer...")
global shutdown_rebalancer
shutdown_rebalancer = True
return
await asyncio.sleep(30)
except Exception as e:
print(f"{datetime.now().strftime('%c')} : Queue manager exception: {str(e)}")
finally:
print(f"{datetime.now().strftime('%c')} : Queue manager has shut down...")
async def async_run_rebalancer(worker, rebalancer_queue):
while True:
global active_rebalances, shutdown_rebalancer
if not rebalancer_queue.empty():
rebalance = await rebalancer_queue.get()
print(f"{datetime.now().strftime('%c')} : {worker} is starting a new request...")
active_rebalance_id = None
if rebalance != None:
active_rebalance_id = rebalance.id
active_rebalances.append(active_rebalance_id)
while rebalance != None:
rebalance = await run_rebalancer(rebalance, worker)
if active_rebalance_id != None:
active_rebalances.remove(active_rebalance_id)
print(f"{datetime.now().strftime('%c')} : {worker} completed its request...")
else:
if shutdown_rebalancer == True:
return
await asyncio.sleep(3)
async def start_queue(worker_count=1):
rebalancer_queue = asyncio.Queue()
manager = asyncio.create_task(async_queue_manager(rebalancer_queue))
workers = [asyncio.create_task(async_run_rebalancer("Worker " + str(worker_num+1), rebalancer_queue)) for worker_num in range(worker_count)]
await asyncio.gather(manager, *workers)
print(f"{datetime.now().strftime('%c')} : Manager and workers have stopped...")
def main():
if Rebalancer.objects.filter(status=1).exists():
unknown_errors = Rebalancer.objects.filter(status=1)
for unknown_error in unknown_errors:
unknown_error.status = 400
unknown_error.stop = datetime.now()
unknown_error.save()
if LocalSettings.objects.filter(key='AR-Workers').exists():
worker_count = int(LocalSettings.objects.filter(key='AR-Workers')[0].value)
else:
LocalSettings(key='AR-Workers', value='1').save()
worker_count = 1
asyncio.run(start_queue(worker_count))
print(f"{datetime.now().strftime('%c')} : Rebalancer successfully shutdown...")
if __name__ == '__main__':
main()