From bfcc7abcfdc700ff00ddbc72725b0c13bdece827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Sun, 10 Dec 2023 15:12:29 +0000 Subject: [PATCH 01/12] Cache credentials reducing IO --- gui/lnd_deps/lnd_connect.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/gui/lnd_deps/lnd_connect.py b/gui/lnd_deps/lnd_connect.py index ecef6f86..cac664d7 100644 --- a/gui/lnd_deps/lnd_connect.py +++ b/gui/lnd_deps/lnd_connect.py @@ -1,7 +1,7 @@ import os, codecs, grpc from lndg import settings -def creds(): +def get_creds(): #Open connection with lnd via grpc with open(os.path.expanduser(settings.LND_MACAROON_PATH), 'rb') as f: macaroon_bytes = f.read() @@ -15,11 +15,12 @@ def metadata_callback(context, callback): creds = grpc.composite_channel_credentials(cert_creds, auth_creds) return creds +creds = get_creds() def lnd_connect(): - return grpc.secure_channel(settings.LND_RPC_SERVER, creds(), options=[('grpc.max_send_message_length', int(settings.LND_MAX_MESSAGE)*1000000), ('grpc.max_receive_message_length', int(settings.LND_MAX_MESSAGE)*1000000),]) + return grpc.secure_channel(settings.LND_RPC_SERVER, creds, options=[('grpc.max_send_message_length', int(settings.LND_MAX_MESSAGE)*1000000), ('grpc.max_receive_message_length', int(settings.LND_MAX_MESSAGE)*1000000),]) def async_lnd_connect(): - return grpc.aio.secure_channel(settings.LND_RPC_SERVER, creds(), options=[('grpc.max_send_message_length', int(settings.LND_MAX_MESSAGE)*1000000), ('grpc.max_receive_message_length', int(settings.LND_MAX_MESSAGE)*1000000),]) + return grpc.aio.secure_channel(settings.LND_RPC_SERVER, creds, options=[('grpc.max_send_message_length', int(settings.LND_MAX_MESSAGE)*1000000), ('grpc.max_receive_message_length', int(settings.LND_MAX_MESSAGE)*1000000),]) def main(): pass From 77933548f8a6f153edd4ef903f7a4925de4bc79d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Sun, 10 Dec 2023 15:18:17 +0000 Subject: [PATCH 02/12] Reuse grpc conn & simplify worker_count reload --- rebalancer.py | 95 ++++++++++++++++++++++----------------------------- 1 file changed, 40 insertions(+), 55 deletions(-) diff --git a/rebalancer.py b/rebalancer.py index dd0e7d9d..74d9f6b7 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -36,7 +36,7 @@ def inbound_cans_len(inbound_cans): except Exception as e: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error getting inbound cands: {str(e)}") -async def run_rebalancer(rebalance, worker): +async def run_rebalancer(rebalance, conn, worker): try: #Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) @@ -52,12 +52,11 @@ async def run_rebalancer(rebalance, worker): rebalance.outgoing_chan_ids = str(outbound_cans).replace('\'', '') rebalance.start = datetime.now() try: - #Open connection with lnd via grpc - stub = lnrpc.LightningStub(lnd_connect()) - routerstub = lnrouter.RouterStub(async_lnd_connect()) + stub = lnrpc.LightningStub(conn) + routerstub = lnrouter.RouterStub(conn) chan_ids = json.loads(rebalance.outgoing_chan_ids) timeout = rebalance.duration * 60 - invoice_response = stub.AddInvoice(ln.Invoice(value=rebalance.value, expiry=timeout)) + invoice_response = await stub.AddInvoice(ln.Invoice(value=rebalance.value, expiry=timeout)) print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} starting rebalance for {rebalance.target_alias} {rebalance.last_hop_pubkey} for {rebalance.value} sats and duration {rebalance.duration}, using {len(chan_ids)} outbound channels") async for payment_response in routerstub.SendPaymentV2(lnr.SendPaymentRequest(payment_request=str(invoice_response.payment_request), fee_limit_msat=int(rebalance.fee_limit*1000), outgoing_chan_ids=chan_ids, last_hop_pubkey=bytes.fromhex(rebalance.last_hop_pubkey), timeout_seconds=(timeout-5), allow_self_payment=True), timeout=(timeout+60)): if payment_response.status == 1 and rebalance.status == 0: @@ -312,12 +311,16 @@ def get_pending_rebals(): except Exception as e: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error getting pending rebalances: {str(e)}") -async def async_queue_manager(rebalancer_queue): - global scheduled_rebalances, active_rebalances, shutdown_rebalancer - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue manager is starting...") +async def async_queue_manager(rebalancer_queue: asyncio.Queue, worker_count): + global scheduled_rebalances, active_rebalances + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue manager is starting with {worker_count} workers...") try: while True: - if shutdown_rebalancer == True: + new_worker_count = await get_worker_count() + if worker_count != new_worker_count: + while not rebalancer_queue.empty(): + await rebalancer_queue.get() #Empty queue to restart with new worker_count config + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Reloading worker count...") return print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue currently has {rebalancer_queue.qsize()} items...") print(f"{datetime.now().strftime('%c')} : [Rebalancer] : There are currently {len(active_rebalances)} tasks in progress...") @@ -338,72 +341,54 @@ async def async_queue_manager(rebalancer_queue): await rebalancer_queue.put(rebalance) elif rebalancer_queue.qsize() == 0 and len(active_rebalances) == 0: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue is still empty, stoping the rebalancer...") - shutdown_rebalancer = True return await asyncio.sleep(30) except Exception as e: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue manager exception: {str(e)}") - shutdown_rebalancer = True finally: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue manager has shut down...") -async def async_run_rebalancer(worker, rebalancer_queue): - global scheduled_rebalances, active_rebalances, shutdown_rebalancer - while True: - if not rebalancer_queue.empty() and not shutdown_rebalancer: - rebalance = await rebalancer_queue.get() - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} is starting a new request...") - active_rebalance_id = None - if rebalance != None: - active_rebalance_id = rebalance.id - active_rebalances.append(active_rebalance_id) - scheduled_rebalances.remove(active_rebalance_id) - while rebalance != None: - rebalance = await run_rebalancer(rebalance, worker) - if active_rebalance_id != None: - active_rebalances.remove(active_rebalance_id) - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} completed its request...") - else: - if shutdown_rebalancer == True: - return +async def async_run_rebalancer(manager: asyncio.Task, rebalancer_queue, conn): + global scheduled_rebalances, active_rebalances + worker = asyncio.current_task().get_name() + + while not manager.done(): + rebalance = await rebalancer_queue.get() + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} is starting a new request...") + active_rebalance_id = None + if rebalance != None: + active_rebalance_id = rebalance.id + active_rebalances.append(active_rebalance_id) + scheduled_rebalances.remove(active_rebalance_id) + while rebalance != None: + rebalance = await run_rebalancer(rebalance, conn, worker) + if active_rebalance_id != None: + active_rebalances.remove(active_rebalance_id) + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} completed its request...") await asyncio.sleep(3) -async def start_queue(worker_count=1): +async def start_queue(conn): + workers = await get_worker_count() rebalancer_queue = asyncio.Queue() - manager = asyncio.create_task(async_queue_manager(rebalancer_queue)) - workers = [asyncio.create_task(async_run_rebalancer("Worker " + str(worker_num+1), rebalancer_queue)) for worker_num in range(worker_count)] + manager = asyncio.create_task(async_queue_manager(rebalancer_queue, workers)) + workers = [asyncio.create_task(async_run_rebalancer(manager, rebalancer_queue, conn), name=f"Worker {id}") for id in range(1, workers+1)] + await asyncio.gather(manager, *workers) print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Manager and workers have stopped...") @sync_to_async def get_worker_count(): - if LocalSettings.objects.filter(key='AR-Workers').exists(): - return int(LocalSettings.objects.filter(key='AR-Workers')[0].value) - else: - return 1 - -async def update_worker_count(): - global worker_count, shutdown_rebalancer - while True: - updated_worker_count = await get_worker_count() - if updated_worker_count != worker_count: - worker_count = updated_worker_count - shutdown_rebalancer = True - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : New worker count detected...restarting rebalancer") - await asyncio.sleep(20) + return int(LocalSettings.objects.filter(key='AR-Workers')[0].value) def main(): - global scheduled_rebalances, active_rebalances, shutdown_rebalancer, worker_count - if LocalSettings.objects.filter(key='AR-Workers').exists(): - worker_count = int(LocalSettings.objects.filter(key='AR-Workers')[0].value) - else: + global scheduled_rebalances, active_rebalances + if not LocalSettings.objects.filter(key='AR-Workers').exists(): LocalSettings(key='AR-Workers', value='1').save() - worker_count = 1 + + conn = async_lnd_connect() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - loop.create_task(update_worker_count()) while True: - shutdown_rebalancer = False scheduled_rebalances = [] active_rebalances = [] if Rebalancer.objects.filter(status=1).exists(): @@ -412,7 +397,7 @@ def main(): unknown_error.status = 400 unknown_error.stop = datetime.now() unknown_error.save() - loop.run_until_complete(start_queue(worker_count)) + loop.run_until_complete(start_queue(conn)) print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Rebalancer successfully exited...sleeping for 20 seconds") sleep(20) From 08052d85c8f257779585da3e627608b65c98d7e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Sun, 10 Dec 2023 16:45:32 +0000 Subject: [PATCH 03/12] Fix skipping cancelled rebals. & reduce redundant log info --- rebalancer.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/rebalancer.py b/rebalancer.py index 74d9f6b7..93074395 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -155,17 +155,16 @@ def estimate_liquidity( payment ): return estimated_liquidity -@sync_to_async -def update_channels(stub, incoming_channel, outgoing_channel): +async def update_channels(stub, incoming_channel, outgoing_channel): try: # Incoming channel update - channel = stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(incoming_channel))).channels[0] + channel = await stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(incoming_channel))).channels[0] db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0] db_channel.local_balance = channel.local_balance db_channel.remote_balance = channel.remote_balance db_channel.save() # Outgoing channel update - channel = stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(outgoing_channel))).channels[0] + channel = await stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(outgoing_channel))).channels[0] db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0] db_channel.local_balance = channel.local_balance db_channel.remote_balance = channel.remote_balance @@ -320,11 +319,9 @@ async def async_queue_manager(rebalancer_queue: asyncio.Queue, worker_count): if worker_count != new_worker_count: while not rebalancer_queue.empty(): await rebalancer_queue.get() #Empty queue to restart with new worker_count config - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Reloading worker count...") + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Reloading worker count...", end="") return - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue currently has {rebalancer_queue.qsize()} items...") - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : There are currently {len(active_rebalances)} tasks in progress...") - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue manager is checking for more work...") + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue currently has {rebalancer_queue.qsize()} items with {len(active_rebalances)} tasks in progress") pending_rebalances, rebal_count = await get_pending_rebals() if rebal_count > 0: for rebalance in pending_rebalances: @@ -335,18 +332,19 @@ async def async_queue_manager(rebalancer_queue: asyncio.Queue, worker_count): await auto_enable() scheduled = await auto_schedule() if len(scheduled) > 0: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Scheduling {len(scheduled)} more jobs...") + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Scheduling {len(scheduled)} rebalancing jobs: ", end="") for rebalance in scheduled: + print(str(rebalance.id) + ", ") scheduled_rebalances.append(rebalance.id) await rebalancer_queue.put(rebalance) elif rebalancer_queue.qsize() == 0 and len(active_rebalances) == 0: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue is still empty, stoping the rebalancer...") + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue is still empty, stoping the rebalancer...", end="") return await asyncio.sleep(30) except Exception as e: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue manager exception: {str(e)}") finally: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue manager has shut down...") + print("Stopped") async def async_run_rebalancer(manager: asyncio.Task, rebalancer_queue, conn): global scheduled_rebalances, active_rebalances @@ -354,9 +352,11 @@ async def async_run_rebalancer(manager: asyncio.Task, rebalancer_queue, conn): while not manager.done(): rebalance = await rebalancer_queue.get() - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} is starting a new request...") active_rebalance_id = None if rebalance != None: + pending = await sync_to_async(Rebalancer.objects.filter(id=rebalance.id, status=0).exists)() + if not pending: + continue # Make sure only pending requests executes (excludes cancelled) active_rebalance_id = rebalance.id active_rebalances.append(active_rebalance_id) scheduled_rebalances.remove(active_rebalance_id) @@ -364,17 +364,18 @@ async def async_run_rebalancer(manager: asyncio.Task, rebalancer_queue, conn): rebalance = await run_rebalancer(rebalance, conn, worker) if active_rebalance_id != None: active_rebalances.remove(active_rebalance_id) - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} completed its request...") + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} completed its request...") await asyncio.sleep(3) -async def start_queue(conn): +async def start_queue(): + conn = async_lnd_connect() workers = await get_worker_count() rebalancer_queue = asyncio.Queue() manager = asyncio.create_task(async_queue_manager(rebalancer_queue, workers)) workers = [asyncio.create_task(async_run_rebalancer(manager, rebalancer_queue, conn), name=f"Worker {id}") for id in range(1, workers+1)] await asyncio.gather(manager, *workers) - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Manager and workers have stopped...") + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Manager and workers have stopped") @sync_to_async def get_worker_count(): @@ -385,7 +386,6 @@ def main(): if not LocalSettings.objects.filter(key='AR-Workers').exists(): LocalSettings(key='AR-Workers', value='1').save() - conn = async_lnd_connect() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) while True: @@ -397,7 +397,7 @@ def main(): unknown_error.status = 400 unknown_error.stop = datetime.now() unknown_error.save() - loop.run_until_complete(start_queue(conn)) + loop.run_until_complete(start_queue()) print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Rebalancer successfully exited...sleeping for 20 seconds") sleep(20) From 4a839e6115e54b9e73fdb60fad6938341449ed1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Sun, 10 Dec 2023 18:09:07 +0000 Subject: [PATCH 04/12] Fix logging --- rebalancer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebalancer.py b/rebalancer.py index 93074395..c0e3cadd 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -334,7 +334,7 @@ async def async_queue_manager(rebalancer_queue: asyncio.Queue, worker_count): if len(scheduled) > 0: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Scheduling {len(scheduled)} rebalancing jobs: ", end="") for rebalance in scheduled: - print(str(rebalance.id) + ", ") + print(str(rebalance.id) + ", ", end="") scheduled_rebalances.append(rebalance.id) await rebalancer_queue.put(rebalance) elif rebalancer_queue.qsize() == 0 and len(active_rebalances) == 0: From 22e46132ff2b16a211df61ec807c748712a1e491 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Mon, 11 Dec 2023 14:04:01 +0000 Subject: [PATCH 05/12] Fix logging & move rapid_fire logic to a separate function --- rebalancer.py | 143 ++++++++++++++++++++++++-------------------------- 1 file changed, 69 insertions(+), 74 deletions(-) diff --git a/rebalancer.py b/rebalancer.py index c0e3cadd..babd5689 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -7,7 +7,7 @@ from gui.lnd_deps import lightning_pb2_grpc as lnrpc from gui.lnd_deps import router_pb2 as lnr from gui.lnd_deps import router_pb2_grpc as lnrouter -from gui.lnd_deps.lnd_connect import lnd_connect, async_lnd_connect +from gui.lnd_deps.lnd_connect import async_lnd_connect from os import environ from typing import List @@ -16,9 +16,10 @@ from gui.models import Rebalancer, Channels, LocalSettings, Forwards, Autopilot @sync_to_async -def get_out_cans(rebalance, auto_rebalance_channels): +def get_out_cans(rebalance): try: - return list(auto_rebalance_channels.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).exclude(remote_pubkey=rebalance.last_hop_pubkey).values_list('chan_id', flat=True)) + pub_active_chans = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) + return list(pub_active_chans.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).exclude(remote_pubkey=rebalance.last_hop_pubkey).values_list('chan_id', flat=True)) except Exception as e: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error getting outbound cands: {str(e)}") @@ -36,20 +37,7 @@ def inbound_cans_len(inbound_cans): except Exception as e: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error getting inbound cands: {str(e)}") -async def run_rebalancer(rebalance, conn, worker): - try: - #Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target - auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) - outbound_cans = await get_out_cans(rebalance, auto_rebalance_channels) - if len(outbound_cans) == 0 and rebalance.manual == False: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : No outbound_cans") - rebalance.status = 406 - rebalance.start = datetime.now() - rebalance.stop = datetime.now() - await save_record(rebalance) - return None - elif str(outbound_cans).replace('\'', '') != rebalance.outgoing_chan_ids and rebalance.manual == False: - rebalance.outgoing_chan_ids = str(outbound_cans).replace('\'', '') +async def run(rebalance: Rebalancer, conn, worker) -> Rebalancer: rebalance.start = datetime.now() try: stub = lnrpc.LightningStub(conn) @@ -68,7 +56,6 @@ async def run_rebalancer(rebalance, conn, worker): #SUCCESSFUL rebalance.status = 2 rebalance.fees_paid = payment_response.fee_msat/1000 - successful_out = payment_response.htlcs[0].route.hops[0].pub_key elif payment_response.status == 3: #FAILURE if payment_response.failure_reason == 1: @@ -98,44 +85,41 @@ async def run_rebalancer(rebalance, conn, worker): rebalance.stop = datetime.now() await save_record(rebalance) print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} completed payment attempts for: {rebalance.payment_hash}") - original_alias = rebalance.target_alias - inc=1.21 - dec=2 - if rebalance.status ==2: - await update_channels(stub, rebalance.last_hop_pubkey, successful_out) - #Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target - auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value*inc)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) - inbound_cans = auto_rebalance_channels.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1) - outbound_cans = await get_out_cans(rebalance, auto_rebalance_channels) - if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0: - next_rebalance = Rebalancer(value=int(rebalance.value*inc), fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1) - await save_record(next_rebalance) - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : RapidFire increase for {next_rebalance.target_alias} from {next_rebalance.value} to {rebalance.value}") - else: - next_rebalance = None - # For failed rebalances, try in rapid fire with reduced balances until give up. - elif rebalance.status > 2 and rebalance.value > 69420: - #Previous Rapidfire with increased value failed, try with lower value up to 69420. - if rebalance.duration > 1: - next_value = await estimate_liquidity ( payment_response ) - if next_value < 1000: - next_rebalance = None - return next_rebalance - else: - next_value = rebalance.value/dec + return await rapid_fire(rebalance, stub, payment_response) + +async def rapid_fire(rebalance: Rebalancer, stub: lnrpc.LightningStub, payment_response) -> Rebalancer: + if rebalance.status < 2: + return None + outbound_cans = await get_out_cans(rebalance) + pub_active_chans = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) + inbound_cans = pub_active_chans.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1) + if rebalance.status == 2: + inc=1.21 + successful_out = payment_response.htlcs[0].route.hops[0].pub_key + await update_channels(stub, rebalance.last_hop_pubkey, successful_out) + #Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target + if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0: + next_rebalance = Rebalancer(value=int(rebalance.value*inc), fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=rebalance.target_alias, duration=1) + await save_record(next_rebalance) + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : RapidFire increase for {next_rebalance.target_alias} from {rebalance.value} to {next_rebalance.value} ") + return next_rebalance + # For failed rebalances, try in rapid fire with reduced balances until give up. + elif rebalance.value > 69420: + dec=2 + #Previous Rapidfire with increased value failed, try with lower value up to 69420. + if rebalance.duration > 1: + next_value = await estimate_liquidity(payment_response) + if next_value < 1000: + return None + else: + next_value = rebalance.value/dec - inbound_cans = auto_rebalance_channels.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1) - if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0: - next_rebalance = Rebalancer(value=int(next_value), fee_limit=round(rebalance.fee_limit/(rebalance.value/next_value), 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=original_alias, duration=1) - await save_record(next_rebalance) - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : RapidFire decrease for {next_rebalance.target_alias} from {next_rebalance.value} to {rebalance.value}") - else: - next_rebalance = None - else: - next_rebalance = None + if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0: + next_rebalance = Rebalancer(value=int(next_value), fee_limit=round(rebalance.fee_limit/(rebalance.value/next_value), 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=rebalance.target_alias, duration=1) + await save_record(next_rebalance) + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : RapidFire decrease for {next_rebalance.target_alias} from {rebalance.value} to {next_rebalance.value} ") return next_rebalance - except Exception as e: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error running rebalance attempt: {str(e)}") + return None @sync_to_async def estimate_liquidity( payment ): @@ -158,13 +142,13 @@ def estimate_liquidity( payment ): async def update_channels(stub, incoming_channel, outgoing_channel): try: # Incoming channel update - channel = await stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(incoming_channel))).channels[0] + channel = (await stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(incoming_channel)))).channels[0] db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0] db_channel.local_balance = channel.local_balance db_channel.remote_balance = channel.remote_balance db_channel.save() # Outgoing channel update - channel = await stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(outgoing_channel))).channels[0] + channel = (await stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(outgoing_channel)))).channels[0] db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0] db_channel.local_balance = channel.local_balance db_channel.remote_balance = channel.remote_balance @@ -310,7 +294,7 @@ def get_pending_rebals(): except Exception as e: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error getting pending rebalances: {str(e)}") -async def async_queue_manager(rebalancer_queue: asyncio.Queue, worker_count): +async def queue_manager(rebalancer_queue: asyncio.Queue, worker_count): global scheduled_rebalances, active_rebalances print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue manager is starting with {worker_count} workers...") try: @@ -337,6 +321,7 @@ async def async_queue_manager(rebalancer_queue: asyncio.Queue, worker_count): print(str(rebalance.id) + ", ", end="") scheduled_rebalances.append(rebalance.id) await rebalancer_queue.put(rebalance) + print("") elif rebalancer_queue.qsize() == 0 and len(active_rebalances) == 0: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue is still empty, stoping the rebalancer...", end="") return @@ -346,33 +331,43 @@ async def async_queue_manager(rebalancer_queue: asyncio.Queue, worker_count): finally: print("Stopped") -async def async_run_rebalancer(manager: asyncio.Task, rebalancer_queue, conn): +async def worker_manager(manager: asyncio.Task, rebalancer_queue, conn): global scheduled_rebalances, active_rebalances worker = asyncio.current_task().get_name() - while not manager.done(): + await asyncio.sleep(3) rebalance = await rebalancer_queue.get() - active_rebalance_id = None - if rebalance != None: - pending = await sync_to_async(Rebalancer.objects.filter(id=rebalance.id, status=0).exists)() - if not pending: - continue # Make sure only pending requests executes (excludes cancelled) - active_rebalance_id = rebalance.id - active_rebalances.append(active_rebalance_id) - scheduled_rebalances.remove(active_rebalance_id) + if rebalance == None: + continue + + if not await sync_to_async(Rebalancer.objects.filter(id=rebalance.id, status=0).exists)(): + continue # Make sure only pending requests executes (excludes cancelled) + + if not rebalance.manual: + outbound_cans = await get_out_cans(rebalance) + if len(outbound_cans) == 0: + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : No outbound_cans") + rebalance.status = 406 + rebalance.start = rebalance.stop = datetime.now() + await save_record(rebalance) + continue + rebalance.outgoing_chan_ids = str(outbound_cans).replace('\'', '') + + active_rebalance_id = rebalance.id + active_rebalances.append(active_rebalance_id) + scheduled_rebalances.remove(active_rebalance_id) while rebalance != None: - rebalance = await run_rebalancer(rebalance, conn, worker) - if active_rebalance_id != None: - active_rebalances.remove(active_rebalance_id) - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} completed its request...") - await asyncio.sleep(3) + rebalance = await run(rebalance, conn, worker) + + active_rebalances.remove(active_rebalance_id) + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} completed its request...") async def start_queue(): conn = async_lnd_connect() workers = await get_worker_count() rebalancer_queue = asyncio.Queue() - manager = asyncio.create_task(async_queue_manager(rebalancer_queue, workers)) - workers = [asyncio.create_task(async_run_rebalancer(manager, rebalancer_queue, conn), name=f"Worker {id}") for id in range(1, workers+1)] + manager = asyncio.create_task(queue_manager(rebalancer_queue, workers)) + workers = [asyncio.create_task(worker_manager(manager, rebalancer_queue, conn), name=f"Worker {id}") for id in range(1, workers+1)] await asyncio.gather(manager, *workers) print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Manager and workers have stopped") From 7b6cc787d94ba24c8b4ef24c5a33ac1261867752 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Mon, 11 Dec 2023 20:38:40 +0000 Subject: [PATCH 06/12] Add `log_fail` decorator helper & use async ORM methods --- rebalancer.py | 428 +++++++++++++++++++++++--------------------------- 1 file changed, 200 insertions(+), 228 deletions(-) diff --git a/rebalancer.py b/rebalancer.py index babd5689..350db50b 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -15,92 +15,84 @@ django.setup() from gui.models import Rebalancer, Channels, LocalSettings, Forwards, Autopilot -@sync_to_async -def get_out_cans(rebalance): - try: - pub_active_chans = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) - return list(pub_active_chans.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).exclude(remote_pubkey=rebalance.last_hop_pubkey).values_list('chan_id', flat=True)) - except Exception as e: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error getting outbound cands: {str(e)}") +def log_fail(msg): + def decorator(func): + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {msg}: {e} at \"{func.__name__}{str(args)}\":{e.__traceback__.tb_next.tb_lineno}") + return wrapper + return decorator @sync_to_async -def save_record(record): - try: - record.save() - except Exception as e: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error saving database record: {str(e)}") - -@sync_to_async -def inbound_cans_len(inbound_cans): - try: - return len(inbound_cans) - except Exception as e: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error getting inbound cands: {str(e)}") +@log_fail("Error getting outbound cans") +def get_out_cans(rebalance): + pub_active_chans = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) + return pub_active_chans, list(pub_active_chans.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).exclude(remote_pubkey=rebalance.last_hop_pubkey).values_list('chan_id', flat=True)) async def run(rebalance: Rebalancer, conn, worker) -> Rebalancer: - rebalance.start = datetime.now() - try: - stub = lnrpc.LightningStub(conn) - routerstub = lnrouter.RouterStub(conn) - chan_ids = json.loads(rebalance.outgoing_chan_ids) - timeout = rebalance.duration * 60 - invoice_response = await stub.AddInvoice(ln.Invoice(value=rebalance.value, expiry=timeout)) - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} starting rebalance for {rebalance.target_alias} {rebalance.last_hop_pubkey} for {rebalance.value} sats and duration {rebalance.duration}, using {len(chan_ids)} outbound channels") - async for payment_response in routerstub.SendPaymentV2(lnr.SendPaymentRequest(payment_request=str(invoice_response.payment_request), fee_limit_msat=int(rebalance.fee_limit*1000), outgoing_chan_ids=chan_ids, last_hop_pubkey=bytes.fromhex(rebalance.last_hop_pubkey), timeout_seconds=(timeout-5), allow_self_payment=True), timeout=(timeout+60)): - if payment_response.status == 1 and rebalance.status == 0: - #IN-FLIGHT - rebalance.payment_hash = payment_response.payment_hash - rebalance.status = 1 - await save_record(rebalance) - elif payment_response.status == 2: - #SUCCESSFUL - rebalance.status = 2 - rebalance.fees_paid = payment_response.fee_msat/1000 - elif payment_response.status == 3: - #FAILURE - if payment_response.failure_reason == 1: - #FAILURE_REASON_TIMEOUT - rebalance.status = 3 - elif payment_response.failure_reason == 2: - #FAILURE_REASON_NO_ROUTE - rebalance.status = 4 - elif payment_response.failure_reason == 3: - #FAILURE_REASON_ERROR - rebalance.status = 5 - elif payment_response.failure_reason == 4: - #FAILURE_REASON_INCORRECT_PAYMENT_DETAILS - rebalance.status = 6 - elif payment_response.failure_reason == 5: - #FAILURE_REASON_INSUFFICIENT_BALANCE - rebalance.status = 7 - elif payment_response.status == 0: - rebalance.status = 400 - except Exception as e: - if str(e.code()) == 'StatusCode.DEADLINE_EXCEEDED': - rebalance.status = 408 - else: + rebalance.start = datetime.now() + try: + stub = lnrpc.LightningStub(conn) + routerstub = lnrouter.RouterStub(conn) + chan_ids = json.loads(rebalance.outgoing_chan_ids) + timeout = rebalance.duration * 60 + invoice_response = await stub.AddInvoice(ln.Invoice(value=rebalance.value, expiry=timeout)) + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} starting rebalance for {rebalance.target_alias} {rebalance.last_hop_pubkey} for {rebalance.value} sats and duration {rebalance.duration}, using {len(chan_ids)} outbound channels") + async for payment_response in routerstub.SendPaymentV2(lnr.SendPaymentRequest(payment_request=str(invoice_response.payment_request), fee_limit_msat=int(rebalance.fee_limit*1000), outgoing_chan_ids=chan_ids, last_hop_pubkey=bytes.fromhex(rebalance.last_hop_pubkey), timeout_seconds=(timeout-5), allow_self_payment=True), timeout=(timeout+60)): + if payment_response.status == 1 and rebalance.status == 0: + #IN-FLIGHT + rebalance.payment_hash = payment_response.payment_hash + rebalance.status = 1 + await rebalance.asave() + elif payment_response.status == 2: + #SUCCESSFUL + rebalance.status = 2 + rebalance.fees_paid = payment_response.fee_msat/1000 + elif payment_response.status == 3: + #FAILURE + if payment_response.failure_reason == 1: + #FAILURE_REASON_TIMEOUT + rebalance.status = 3 + elif payment_response.failure_reason == 2: + #FAILURE_REASON_NO_ROUTE + rebalance.status = 4 + elif payment_response.failure_reason == 3: + #FAILURE_REASON_ERROR + rebalance.status = 5 + elif payment_response.failure_reason == 4: + #FAILURE_REASON_INCORRECT_PAYMENT_DETAILS + rebalance.status = 6 + elif payment_response.failure_reason == 5: + #FAILURE_REASON_INSUFFICIENT_BALANCE + rebalance.status = 7 + elif payment_response.status == 0: rebalance.status = 400 - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error while sending payment: {str(e)}") - finally: - rebalance.stop = datetime.now() - await save_record(rebalance) - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} completed payment attempts for: {rebalance.payment_hash}") - return await rapid_fire(rebalance, stub, payment_response) + except Exception as e: + rebalance.status = 408 if str(e.code()) == 'StatusCode.DEADLINE_EXCEEDED' else 400 + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error while sending payment: {str(e)}") + finally: + rebalance.stop = datetime.now() + await rebalance.asave() + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} completed payment attempts for: {rebalance.payment_hash}") + return await rapid_fire(rebalance, stub, payment_response) +@log_fail("Error on rapid_fire") async def rapid_fire(rebalance: Rebalancer, stub: lnrpc.LightningStub, payment_response) -> Rebalancer: if rebalance.status < 2: return None - outbound_cans = await get_out_cans(rebalance) - pub_active_chans = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) - inbound_cans = pub_active_chans.filter(remote_pubkey=rebalance.last_hop_pubkey).filter(auto_rebalance=True, inbound_can__gte=1) + pub_active_chans, outbound_cans = await get_out_cans(rebalance) + inbound_cans_len = await pub_active_chans.filter(remote_pubkey=rebalance.last_hop_pubkey, auto_rebalance=True, inbound_can__gte=1).acount() if rebalance.status == 2: inc=1.21 successful_out = payment_response.htlcs[0].route.hops[0].pub_key - await update_channels(stub, rebalance.last_hop_pubkey, successful_out) + await update_channel(stub, successful_out) # Outgoing channel update + await update_channel(stub, rebalance.last_hop_pubkey) # Incoming channel update #Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target - if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0: + if inbound_cans_len > 0 and len(outbound_cans) > 0: next_rebalance = Rebalancer(value=int(rebalance.value*inc), fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=rebalance.target_alias, duration=1) - await save_record(next_rebalance) + await next_rebalance.asave() print(f"{datetime.now().strftime('%c')} : [Rebalancer] : RapidFire increase for {next_rebalance.target_alias} from {rebalance.value} to {next_rebalance.value} ") return next_rebalance # For failed rebalances, try in rapid fire with reduced balances until give up. @@ -114,9 +106,9 @@ async def rapid_fire(rebalance: Rebalancer, stub: lnrpc.LightningStub, payment_r else: next_value = rebalance.value/dec - if await inbound_cans_len(inbound_cans) > 0 and len(outbound_cans) > 0: + if inbound_cans_len > 0 and len(outbound_cans) > 0: next_rebalance = Rebalancer(value=int(next_value), fee_limit=round(rebalance.fee_limit/(rebalance.value/next_value), 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=rebalance.target_alias, duration=1) - await save_record(next_rebalance) + await next_rebalance.asave() print(f"{datetime.now().strftime('%c')} : [Rebalancer] : RapidFire decrease for {next_rebalance.target_alias} from {rebalance.value} to {next_rebalance.value} ") return next_rebalance return None @@ -136,170 +128,153 @@ def estimate_liquidity( payment ): except Exception as e: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error estimating liquidity: {str(e)}") estimated_liquidity = 0 - return estimated_liquidity -async def update_channels(stub, incoming_channel, outgoing_channel): - try: - # Incoming channel update - channel = (await stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(incoming_channel)))).channels[0] - db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0] - db_channel.local_balance = channel.local_balance - db_channel.remote_balance = channel.remote_balance - db_channel.save() - # Outgoing channel update - channel = (await stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(outgoing_channel)))).channels[0] - db_channel = Channels.objects.filter(chan_id=channel.chan_id)[0] - db_channel.local_balance = channel.local_balance - db_channel.remote_balance = channel.remote_balance - db_channel.save() - except Exception as e: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error updating channel balances: {str(e)}") +@log_fail("Error updating channel balances") +async def update_channel(stub, chann): + channel = (await stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(chann)))).channels[0] + db_channel = await Channels.objects.aget(chan_id=channel.chan_id) + db_channel.local_balance = channel.local_balance + db_channel.remote_balance = channel.remote_balance + await db_channel.asave() @sync_to_async +@log_fail("Error scheduling rebalances") def auto_schedule() -> List[Rebalancer]: - try: - #No rebalancer jobs have been scheduled, lets look for any channels with an auto_rebalance flag and make the best request if we find one - to_schedule = [] - if LocalSettings.objects.filter(key='AR-Enabled').exists(): - enabled = int(LocalSettings.objects.filter(key='AR-Enabled')[0].value) - else: - LocalSettings(key='AR-Enabled', value='0').save() - enabled = 0 - if enabled == 0: - return [] - - auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound'))*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) - if len(auto_rebalance_channels) == 0: - return [] - - if not LocalSettings.objects.filter(key='AR-Outbound%').exists(): - LocalSettings(key='AR-Outbound%', value='75').save() - if not LocalSettings.objects.filter(key='AR-Inbound%').exists(): - LocalSettings(key='AR-Inbound%', value='90').save() - outbound_cans = list(auto_rebalance_channels.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).values_list('chan_id', flat=True)) - already_scheduled = Rebalancer.objects.exclude(last_hop_pubkey='').filter(status=0).values_list('last_hop_pubkey') - inbound_cans = auto_rebalance_channels.filter(auto_rebalance=True, inbound_can__gte=1).exclude(remote_pubkey__in=already_scheduled).order_by('-inbound_can') - if len(inbound_cans) == 0 or len(outbound_cans) == 0: - return [] + #No rebalancer jobs have been scheduled, lets look for any channels with an auto_rebalance flag and make the best request if we find one + to_schedule = [] + if LocalSettings.objects.filter(key='AR-Enabled').exists(): + enabled = int(LocalSettings.objects.filter(key='AR-Enabled')[0].value) + else: + LocalSettings(key='AR-Enabled', value='0').save() + enabled = 0 + if enabled == 0: + return [] + + auto_rebalance_channels = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound'))*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) + if len(auto_rebalance_channels) == 0: + return [] + if not LocalSettings.objects.filter(key='AR-Outbound%').exists(): + LocalSettings(key='AR-Outbound%', value='75').save() + if not LocalSettings.objects.filter(key='AR-Inbound%').exists(): + LocalSettings(key='AR-Inbound%', value='90').save() + outbound_cans = list(auto_rebalance_channels.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).values_list('chan_id', flat=True)) + already_scheduled = Rebalancer.objects.exclude(last_hop_pubkey='').filter(status=0).values_list('last_hop_pubkey') + inbound_cans = auto_rebalance_channels.filter(auto_rebalance=True, inbound_can__gte=1).exclude(remote_pubkey__in=already_scheduled).order_by('-inbound_can') + if len(inbound_cans) == 0 or len(outbound_cans) == 0: + return [] + + if LocalSettings.objects.filter(key='AR-MaxFeeRate').exists(): + max_fee_rate = int(LocalSettings.objects.filter(key='AR-MaxFeeRate')[0].value) + else: + LocalSettings(key='AR-MaxFeeRate', value='500').save() + max_fee_rate = 500 + if LocalSettings.objects.filter(key='AR-Variance').exists(): + variance = int(LocalSettings.objects.filter(key='AR-Variance')[0].value) + else: + LocalSettings(key='AR-Variance', value='0').save() + variance = 0 + if LocalSettings.objects.filter(key='AR-WaitPeriod').exists(): + wait_period = int(LocalSettings.objects.filter(key='AR-WaitPeriod')[0].value) + else: + LocalSettings(key='AR-WaitPeriod', value='30').save() + wait_period = 30 + if not LocalSettings.objects.filter(key='AR-Target%').exists(): + LocalSettings(key='AR-Target%', value='3').save() + if not LocalSettings.objects.filter(key='AR-MaxCost%').exists(): + LocalSettings(key='AR-MaxCost%', value='65').save() + for target in inbound_cans: + target_fee_rate = int(target.local_fee_rate * (target.ar_max_cost/100)) + if target_fee_rate > 0 and target_fee_rate > target.remote_fee_rate: + target_value = int(target.ar_amt_target+(target.ar_amt_target*((secrets.choice(range(-1000,1001))/1000)*variance/100))) + target_fee = round(target_fee_rate*target_value*0.000001, 3) if target_fee_rate <= max_fee_rate else round(max_fee_rate*target_value*0.000001, 3) + if target_fee == 0: + continue - if LocalSettings.objects.filter(key='AR-MaxFeeRate').exists(): - max_fee_rate = int(LocalSettings.objects.filter(key='AR-MaxFeeRate')[0].value) - else: - LocalSettings(key='AR-MaxFeeRate', value='500').save() - max_fee_rate = 500 - if LocalSettings.objects.filter(key='AR-Variance').exists(): - variance = int(LocalSettings.objects.filter(key='AR-Variance')[0].value) - else: - LocalSettings(key='AR-Variance', value='0').save() - variance = 0 - if LocalSettings.objects.filter(key='AR-WaitPeriod').exists(): - wait_period = int(LocalSettings.objects.filter(key='AR-WaitPeriod')[0].value) - else: - LocalSettings(key='AR-WaitPeriod', value='30').save() - wait_period = 30 - if not LocalSettings.objects.filter(key='AR-Target%').exists(): - LocalSettings(key='AR-Target%', value='3').save() - if not LocalSettings.objects.filter(key='AR-MaxCost%').exists(): - LocalSettings(key='AR-MaxCost%', value='65').save() - for target in inbound_cans: - target_fee_rate = int(target.local_fee_rate * (target.ar_max_cost/100)) - if target_fee_rate > 0 and target_fee_rate > target.remote_fee_rate: - target_value = int(target.ar_amt_target+(target.ar_amt_target*((secrets.choice(range(-1000,1001))/1000)*variance/100))) - target_fee = round(target_fee_rate*target_value*0.000001, 3) if target_fee_rate <= max_fee_rate else round(max_fee_rate*target_value*0.000001, 3) - if target_fee == 0: + if LocalSettings.objects.filter(key='AR-Time').exists(): + target_time = int(LocalSettings.objects.filter(key='AR-Time')[0].value) + else: + LocalSettings(key='AR-Time', value='5').save() + target_time = 5 + # TLDR: willing to pay 1 sat for every value_per_fee sats moved + if Rebalancer.objects.filter(last_hop_pubkey=target.remote_pubkey).exclude(status=0).exists(): + last_rebalance = Rebalancer.objects.filter(last_hop_pubkey=target.remote_pubkey).exclude(status=0).order_by('-id')[0] + if not (last_rebalance.status == 2 or (last_rebalance.status > 2 and (int((datetime.now() - last_rebalance.stop).total_seconds() / 60) > wait_period)) or (last_rebalance.status == 1 and ((int((datetime.now() - last_rebalance.start).total_seconds() / 60) - last_rebalance.duration) > wait_period))): continue - - if LocalSettings.objects.filter(key='AR-Time').exists(): - target_time = int(LocalSettings.objects.filter(key='AR-Time')[0].value) - else: - LocalSettings(key='AR-Time', value='5').save() - target_time = 5 - # TLDR: willing to pay 1 sat for every value_per_fee sats moved - if Rebalancer.objects.filter(last_hop_pubkey=target.remote_pubkey).exclude(status=0).exists(): - last_rebalance = Rebalancer.objects.filter(last_hop_pubkey=target.remote_pubkey).exclude(status=0).order_by('-id')[0] - if not (last_rebalance.status == 2 or (last_rebalance.status > 2 and (int((datetime.now() - last_rebalance.stop).total_seconds() / 60) > wait_period)) or (last_rebalance.status == 1 and ((int((datetime.now() - last_rebalance.start).total_seconds() / 60) - last_rebalance.duration) > wait_period))): - continue - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Creating Auto Rebalance Request for: {target.chan_id}") - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Value: {target_value} / {target.ar_amt_target} | Fee: {target_fee} | Duration: {target_time}") - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Request routing outbound via: {outbound_cans}") - new_rebalance = Rebalancer(value=target_value, fee_limit=target_fee, outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=target.remote_pubkey, target_alias=target.alias, duration=target_time) - new_rebalance.save() - to_schedule.append(new_rebalance) - return to_schedule - except Exception as e: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error scheduling rebalances: {str(e)}") - return to_schedule + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Creating Auto Rebalance Request for: {target.chan_id}") + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Value: {target_value} / {target.ar_amt_target} | Fee: {target_fee} | Duration: {target_time}") + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Request routing outbound via: {outbound_cans}") + new_rebalance = Rebalancer(value=target_value, fee_limit=target_fee, outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=target.remote_pubkey, target_alias=target.alias, duration=target_time) + new_rebalance.save() + to_schedule.append(new_rebalance) + return to_schedule @sync_to_async +@log_fail("Error during auto channel enabling") def auto_enable(): - try: - if LocalSettings.objects.filter(key='AR-Autopilot').exists(): - enabled = int(LocalSettings.objects.filter(key='AR-Autopilot')[0].value) - else: - LocalSettings(key='AR-Autopilot', value='0').save() - enabled = 0 - if LocalSettings.objects.filter(key='AR-APDays').exists(): - apdays = int(LocalSettings.objects.filter(key='AR-APDays')[0].value) - else: - LocalSettings(key='AR-APDays', value='7').save() - apdays = 7 - if enabled == 1: - lookup_channels=Channels.objects.filter(is_active=True, is_open=True, private=False) - channels = lookup_channels.values('remote_pubkey').annotate(outbound_percent=((Sum('local_balance')+Sum('pending_outbound'))*1000)/Sum('capacity')).annotate(inbound_percent=((Sum('remote_balance')+Sum('pending_inbound'))*1000)/Sum('capacity')).order_by() - filter_day = datetime.now() - timedelta(days=apdays) - forwards = Forwards.objects.filter(forward_date__gte=filter_day) - for channel in channels: - outbound_percent = int(round(channel['outbound_percent']/10, 0)) - inbound_percent = int(round(channel['inbound_percent']/10, 0)) - chan_list = lookup_channels.filter(remote_pubkey=channel['remote_pubkey']).values('chan_id') - routed_in_apday = forwards.filter(chan_id_in__in=chan_list).count() - routed_out_apday = forwards.filter(chan_id_out__in=chan_list).count() - iapD = 0 if routed_in_apday == 0 else int(forwards.filter(chan_id_in__in=chan_list).aggregate(Sum('amt_in_msat'))['amt_in_msat__sum']/10000000)/100 - oapD = 0 if routed_out_apday == 0 else int(forwards.filter(chan_id_out__in=chan_list).aggregate(Sum('amt_out_msat'))['amt_out_msat__sum']/10000000)/100 - for peer_channel in lookup_channels.filter(chan_id__in=chan_list): - if peer_channel.ar_out_target == 100 and peer_channel.auto_rebalance == True: - #Special Case for LOOP, Wos, etc. Always Auto Rebalance if enabled to keep outbound full. - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Skipping AR enabled and 100% oTarget channel: {peer_channel.alias} {peer_channel.chan_id}") - pass - elif oapD > (iapD*1.10) and outbound_percent > 75: - #print('Case 1: Pass') - pass - elif oapD > (iapD*1.10) and inbound_percent > 75 and peer_channel.auto_rebalance == False: - #print('Case 2: Enable AR - o7D > i7D AND Inbound Liq > 75%') - peer_channel.auto_rebalance = True - peer_channel.save() - Autopilot(chan_id=peer_channel.chan_id, peer_alias=peer_channel.alias, setting='Enabled', old_value=0, new_value=1).save() - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Auto Pilot Enabled for {peer_channel.alias} {peer_channel.chan_id}: {oapD} {iapD}") - elif oapD < (iapD*1.10) and outbound_percent > 75 and peer_channel.auto_rebalance == True: - #print('Case 3: Disable AR - o7D < i7D AND Outbound Liq > 75%') - peer_channel.auto_rebalance = False - peer_channel.save() - Autopilot(chan_id=peer_channel.chan_id, peer_alias=peer_channel.alias, setting='Enabled', old_value=1, new_value=0).save() - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Auto Pilot Disabled for {peer_channel.alias} {peer_channel.chan_id}: {oapD} {iapD}" ) - elif oapD < (iapD*1.10) and inbound_percent > 75: - #print('Case 4: Pass') - pass - else: - #print('Case 5: Pass') - pass - except Exception as e: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error during auto channel enabling: {str(e)}") + if LocalSettings.objects.filter(key='AR-Autopilot').exists(): + enabled = int(LocalSettings.objects.filter(key='AR-Autopilot')[0].value) + else: + LocalSettings(key='AR-Autopilot', value='0').save() + enabled = 0 + if LocalSettings.objects.filter(key='AR-APDays').exists(): + apdays = int(LocalSettings.objects.filter(key='AR-APDays')[0].value) + else: + LocalSettings(key='AR-APDays', value='7').save() + apdays = 7 + if enabled == 0: + return + lookup_channels=Channels.objects.filter(is_active=True, is_open=True, private=False) + channels = lookup_channels.values('remote_pubkey').annotate(outbound_percent=((Sum('local_balance')+Sum('pending_outbound'))*1000)/Sum('capacity')).annotate(inbound_percent=((Sum('remote_balance')+Sum('pending_inbound'))*1000)/Sum('capacity')).order_by() + filter_day = datetime.now() - timedelta(days=apdays) + forwards = Forwards.objects.filter(forward_date__gte=filter_day) + for channel in channels: + outbound_percent = int(round(channel['outbound_percent']/10, 0)) + inbound_percent = int(round(channel['inbound_percent']/10, 0)) + chan_list = lookup_channels.filter(remote_pubkey=channel['remote_pubkey']).values('chan_id') + routed_in_apday = forwards.filter(chan_id_in__in=chan_list).count() + routed_out_apday = forwards.filter(chan_id_out__in=chan_list).count() + iapD = 0 if routed_in_apday == 0 else int(forwards.filter(chan_id_in__in=chan_list).aggregate(Sum('amt_in_msat'))['amt_in_msat__sum']/10000000)/100 + oapD = 0 if routed_out_apday == 0 else int(forwards.filter(chan_id_out__in=chan_list).aggregate(Sum('amt_out_msat'))['amt_out_msat__sum']/10000000)/100 + for peer_channel in lookup_channels.filter(chan_id__in=chan_list): + if peer_channel.ar_out_target == 100 and peer_channel.auto_rebalance == True: + #Special Case for LOOP, Wos, etc. Always Auto Rebalance if enabled to keep outbound full. + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Skipping AR enabled and 100% oTarget channel: {peer_channel.alias} {peer_channel.chan_id}") + pass + elif oapD > (iapD*1.10) and outbound_percent > 75: + #print('Case 1: Pass') + pass + elif oapD > (iapD*1.10) and inbound_percent > 75 and peer_channel.auto_rebalance == False: + #print('Case 2: Enable AR - o7D > i7D AND Inbound Liq > 75%') + peer_channel.auto_rebalance = True + peer_channel.save() + Autopilot(chan_id=peer_channel.chan_id, peer_alias=peer_channel.alias, setting='Enabled', old_value=0, new_value=1).save() + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Auto Pilot Enabled for {peer_channel.alias} {peer_channel.chan_id}: {oapD} {iapD}") + elif oapD < (iapD*1.10) and outbound_percent > 75 and peer_channel.auto_rebalance == True: + #print('Case 3: Disable AR - o7D < i7D AND Outbound Liq > 75%') + peer_channel.auto_rebalance = False + peer_channel.save() + Autopilot(chan_id=peer_channel.chan_id, peer_alias=peer_channel.alias, setting='Enabled', old_value=1, new_value=0).save() + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Auto Pilot Disabled for {peer_channel.alias} {peer_channel.chan_id}: {oapD} {iapD}" ) + elif oapD < (iapD*1.10) and inbound_percent > 75: + #print('Case 4: Pass') + pass + else: + #print('Case 5: Pass') + pass @sync_to_async +@log_fail("Error getting pending rebalances") def get_pending_rebals(): - try: - rebalances = Rebalancer.objects.filter(status=0).order_by('id') - return rebalances, len(rebalances) - except Exception as e: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Error getting pending rebalances: {str(e)}") + rebalances = Rebalancer.objects.filter(status=0).order_by('id') + return rebalances, len(rebalances) async def queue_manager(rebalancer_queue: asyncio.Queue, worker_count): global scheduled_rebalances, active_rebalances print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue manager is starting with {worker_count} workers...") try: while True: - new_worker_count = await get_worker_count() + new_worker_count = (await LocalSettings.objects.aget(key='AR-Workers')).value if worker_count != new_worker_count: while not rebalancer_queue.empty(): await rebalancer_queue.get() #Empty queue to restart with new worker_count config @@ -323,7 +298,7 @@ async def queue_manager(rebalancer_queue: asyncio.Queue, worker_count): await rebalancer_queue.put(rebalance) print("") elif rebalancer_queue.qsize() == 0 and len(active_rebalances) == 0: - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue is still empty, stoping the rebalancer...", end="") + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue is still empty, stoping queue manager...", end="") return await asyncio.sleep(30) except Exception as e: @@ -336,7 +311,7 @@ async def worker_manager(manager: asyncio.Task, rebalancer_queue, conn): worker = asyncio.current_task().get_name() while not manager.done(): await asyncio.sleep(3) - rebalance = await rebalancer_queue.get() + rebalance = await rebalancer_queue.get() if not rebalancer_queue.empty() else None if rebalance == None: continue @@ -344,12 +319,12 @@ async def worker_manager(manager: asyncio.Task, rebalancer_queue, conn): continue # Make sure only pending requests executes (excludes cancelled) if not rebalance.manual: - outbound_cans = await get_out_cans(rebalance) + _, outbound_cans = await get_out_cans(rebalance) if len(outbound_cans) == 0: print(f"{datetime.now().strftime('%c')} : [Rebalancer] : No outbound_cans") rebalance.status = 406 rebalance.start = rebalance.stop = datetime.now() - await save_record(rebalance) + await rebalance.asave() continue rebalance.outgoing_chan_ids = str(outbound_cans).replace('\'', '') @@ -361,10 +336,11 @@ async def worker_manager(manager: asyncio.Task, rebalancer_queue, conn): active_rebalances.remove(active_rebalance_id) print(f"{datetime.now().strftime('%c')} : [Rebalancer] : {worker} completed its request...") + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Exiting {worker}...") async def start_queue(): conn = async_lnd_connect() - workers = await get_worker_count() + workers = (await LocalSettings.objects.aget(key='AR-Workers')).value rebalancer_queue = asyncio.Queue() manager = asyncio.create_task(queue_manager(rebalancer_queue, workers)) workers = [asyncio.create_task(worker_manager(manager, rebalancer_queue, conn), name=f"Worker {id}") for id in range(1, workers+1)] @@ -372,10 +348,6 @@ async def start_queue(): await asyncio.gather(manager, *workers) print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Manager and workers have stopped") -@sync_to_async -def get_worker_count(): - return int(LocalSettings.objects.filter(key='AR-Workers')[0].value) - def main(): global scheduled_rebalances, active_rebalances if not LocalSettings.objects.filter(key='AR-Workers').exists(): From fa451e8e0c1c68e4bff5cbc87487afd1df65488c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Mon, 11 Dec 2023 20:46:09 +0000 Subject: [PATCH 07/12] Use native async method --- rebalancer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebalancer.py b/rebalancer.py index 350db50b..40ea119e 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -315,7 +315,7 @@ async def worker_manager(manager: asyncio.Task, rebalancer_queue, conn): if rebalance == None: continue - if not await sync_to_async(Rebalancer.objects.filter(id=rebalance.id, status=0).exists)(): + if not await Rebalancer.objects.filter(id=rebalance.id, status=0).aexists(): continue # Make sure only pending requests executes (excludes cancelled) if not rebalance.manual: From 0a872320a4b7d72e5e73bf5a4d02aca9f7cb5e8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Mon, 11 Dec 2023 21:50:26 +0000 Subject: [PATCH 08/12] Convert to int --- rebalancer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rebalancer.py b/rebalancer.py index 40ea119e..da6b1699 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -274,7 +274,7 @@ async def queue_manager(rebalancer_queue: asyncio.Queue, worker_count): print(f"{datetime.now().strftime('%c')} : [Rebalancer] : Queue manager is starting with {worker_count} workers...") try: while True: - new_worker_count = (await LocalSettings.objects.aget(key='AR-Workers')).value + new_worker_count = int((await LocalSettings.objects.aget(key='AR-Workers')).value) if worker_count != new_worker_count: while not rebalancer_queue.empty(): await rebalancer_queue.get() #Empty queue to restart with new worker_count config @@ -340,7 +340,7 @@ async def worker_manager(manager: asyncio.Task, rebalancer_queue, conn): async def start_queue(): conn = async_lnd_connect() - workers = (await LocalSettings.objects.aget(key='AR-Workers')).value + workers = int((await LocalSettings.objects.aget(key='AR-Workers')).value) rebalancer_queue = asyncio.Queue() manager = asyncio.create_task(queue_manager(rebalancer_queue, workers)) workers = [asyncio.create_task(worker_manager(manager, rebalancer_queue, conn), name=f"Worker {id}") for id in range(1, workers+1)] From 71287ee7bec3cab0bb6382ea9d6aad2622993537 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Wed, 13 Dec 2023 00:45:27 +0000 Subject: [PATCH 09/12] Fix rapidfire refactor --- rebalancer.py | 47 ++++++++++++++++++----------------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/rebalancer.py b/rebalancer.py index da6b1699..f1e051ff 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -27,7 +27,7 @@ def wrapper(*args, **kwargs): @sync_to_async @log_fail("Error getting outbound cans") -def get_out_cans(rebalance): +def get_out_cans(rebalance: Rebalancer): pub_active_chans = Channels.objects.filter(is_active=True, is_open=True, private=False).annotate(percent_outbound=((Sum('local_balance')+Sum('pending_outbound')-rebalance.value)*100)/Sum('capacity')).annotate(inbound_can=(((Sum('remote_balance')+Sum('pending_inbound'))*100)/Sum('capacity'))/Sum('ar_in_target')) return pub_active_chans, list(pub_active_chans.filter(auto_rebalance=False, percent_outbound__gte=F('ar_out_target')).exclude(remote_pubkey=rebalance.last_hop_pubkey).values_list('chan_id', flat=True)) @@ -80,37 +80,26 @@ async def run(rebalance: Rebalancer, conn, worker) -> Rebalancer: @log_fail("Error on rapid_fire") async def rapid_fire(rebalance: Rebalancer, stub: lnrpc.LightningStub, payment_response) -> Rebalancer: - if rebalance.status < 2: - return None - pub_active_chans, outbound_cans = await get_out_cans(rebalance) - inbound_cans_len = await pub_active_chans.filter(remote_pubkey=rebalance.last_hop_pubkey, auto_rebalance=True, inbound_can__gte=1).acount() - if rebalance.status == 2: - inc=1.21 - successful_out = payment_response.htlcs[0].route.hops[0].pub_key - await update_channel(stub, successful_out) # Outgoing channel update - await update_channel(stub, rebalance.last_hop_pubkey) # Incoming channel update - #Reduce potential rebalance value in percent out to avoid going below AR-OUT-Target + async def next_rapid_fire(inc:float): + test = Rebalancer(value=int(rebalance.value*inc), last_hop_pubkey=rebalance.last_hop_pubkey) + if test.value < 1000: + return None + pub_active_chans, outbound_cans = await get_out_cans(test) + inbound_cans_len = await pub_active_chans.filter(remote_pubkey=rebalance.last_hop_pubkey, auto_rebalance=True, inbound_can__gte=1).acount() if inbound_cans_len > 0 and len(outbound_cans) > 0: - next_rebalance = Rebalancer(value=int(rebalance.value*inc), fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=rebalance.target_alias, duration=1) await next_rebalance.asave() - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : RapidFire increase for {next_rebalance.target_alias} from {rebalance.value} to {next_rebalance.value} ") + next_rebalance = Rebalancer(value=test.value, fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=rebalance.target_alias, duration=1) + print(f"{datetime.now().strftime('%c')} : [Rebalancer] : new RapidFire for {next_rebalance.target_alias} from {rebalance.value} to {next_rebalance.value} ") return next_rebalance - # For failed rebalances, try in rapid fire with reduced balances until give up. - elif rebalance.value > 69420: - dec=2 - #Previous Rapidfire with increased value failed, try with lower value up to 69420. - if rebalance.duration > 1: - next_value = await estimate_liquidity(payment_response) - if next_value < 1000: - return None - else: - next_value = rebalance.value/dec - if inbound_cans_len > 0 and len(outbound_cans) > 0: - next_rebalance = Rebalancer(value=int(next_value), fee_limit=round(rebalance.fee_limit/(rebalance.value/next_value), 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=rebalance.target_alias, duration=1) - await next_rebalance.asave() - print(f"{datetime.now().strftime('%c')} : [Rebalancer] : RapidFire decrease for {next_rebalance.target_alias} from {rebalance.value} to {next_rebalance.value} ") - return next_rebalance + if rebalance.status == 2: + await update_channel(payment_response.htlcs[0].route.hops[0].pub_key, stub) # Outgoing channel update + await update_channel(rebalance.last_hop_pubkey, stub) # Incoming channel update + return await next_rapid_fire(1.21) + elif rebalance.status > 2 and rebalance.value > 69420: # For failed rebalances, try in rapid fire with reduced balances until give up. + #Previous Rapidfire with increased value failed, try with lower value up to 69420. + incr = (await estimate_liquidity(payment_response))/rebalance.value if rebalance.duration > 1 else .5 + return await next_rapid_fire(incr) return None @sync_to_async @@ -131,7 +120,7 @@ def estimate_liquidity( payment ): return estimated_liquidity @log_fail("Error updating channel balances") -async def update_channel(stub, chann): +async def update_channel(chann, stub): channel = (await stub.ListChannels(ln.ListChannelsRequest(peer=bytes.fromhex(chann)))).channels[0] db_channel = await Channels.objects.aget(chan_id=channel.chan_id) db_channel.local_balance = channel.local_balance From 67b1b26c3461ac33e2e72caae8cf6dd92f639fa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Sat, 16 Dec 2023 12:30:43 +0000 Subject: [PATCH 10/12] Fix order --- rebalancer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebalancer.py b/rebalancer.py index f1e051ff..bf112559 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -87,8 +87,8 @@ async def next_rapid_fire(inc:float): pub_active_chans, outbound_cans = await get_out_cans(test) inbound_cans_len = await pub_active_chans.filter(remote_pubkey=rebalance.last_hop_pubkey, auto_rebalance=True, inbound_can__gte=1).acount() if inbound_cans_len > 0 and len(outbound_cans) > 0: - await next_rebalance.asave() next_rebalance = Rebalancer(value=test.value, fee_limit=round(rebalance.fee_limit*inc, 3), outgoing_chan_ids=str(outbound_cans).replace('\'', ''), last_hop_pubkey=rebalance.last_hop_pubkey, target_alias=rebalance.target_alias, duration=1) + await next_rebalance.asave() print(f"{datetime.now().strftime('%c')} : [Rebalancer] : new RapidFire for {next_rebalance.target_alias} from {rebalance.value} to {next_rebalance.value} ") return next_rebalance From db7c109494b97c8b33a246a8c84a1f15132aaedc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ot=C3=A1vio?= Date: Mon, 18 Dec 2023 14:05:04 +0000 Subject: [PATCH 11/12] Only try failed RF if previous rebalance was RP --- rebalancer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebalancer.py b/rebalancer.py index bf112559..1e9488ec 100644 --- a/rebalancer.py +++ b/rebalancer.py @@ -96,7 +96,7 @@ async def next_rapid_fire(inc:float): await update_channel(payment_response.htlcs[0].route.hops[0].pub_key, stub) # Outgoing channel update await update_channel(rebalance.last_hop_pubkey, stub) # Incoming channel update return await next_rapid_fire(1.21) - elif rebalance.status > 2 and rebalance.value > 69420: # For failed rebalances, try in rapid fire with reduced balances until give up. + elif rebalance.status > 2 and rebalance.duration == 1 and rebalance.value > 69420: # For failed rebalances, try in rapid fire with reduced balances until give up. #Previous Rapidfire with increased value failed, try with lower value up to 69420. incr = (await estimate_liquidity(payment_response))/rebalance.value if rebalance.duration > 1 else .5 return await next_rapid_fire(incr) From d0e3b484a5f591b4be650d025e52b4efdbd68a1e Mon Sep 17 00:00:00 2001 From: Goto Date: Wed, 14 Feb 2024 14:24:26 +0000 Subject: [PATCH 12/12] Pause refresh if user is hovering row|input --- gui/templates/base.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gui/templates/base.html b/gui/templates/base.html index 5c98649b..54ae7d36 100644 --- a/gui/templates/base.html +++ b/gui/templates/base.html @@ -155,7 +155,7 @@

My Lnd Overview

async function auto_refresh(async_callback) { await sleep(21000) - while(document.hidden || document.cookie.includes('refresh=false')) { + while(document.hidden || document.cookie.includes('refresh=false') || document.activeElement.nodeName == "INPUT" || document.querySelectorAll('tr:hover').length === 1) { await sleep(100) } await async_callback() @@ -163,4 +163,4 @@

My Lnd Overview

//END: BASE CONFIG //------------------------------------------------------------------------------------------------------------------------- - + \ No newline at end of file