Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes key locking issues #155

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 0 additions & 88 deletions src/BTCPayServer.Lightning.LNDhub/AsyncDuplicateLock.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="AsyncKeyedLock" Version="6.3.4" />
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
</ItemGroup>
<ItemGroup>
Expand Down
71 changes: 40 additions & 31 deletions src/BTCPayServer.Lightning.LNDhub/LndHubClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using AsyncKeyedLock;
using BTCPayServer.Lightning.LNDhub.JsonConverters;
using BTCPayServer.Lightning.LNDhub.Models;
using NBitcoin;
Expand All @@ -32,7 +33,11 @@ public class LndHubClient
private static readonly HttpClient _sharedClient = new ();
private static readonly ConcurrentDictionary<string, AuthResponse> _cache = new();
public readonly string CacheKey;
private static readonly AsyncDuplicateLock _locker = new();
private static readonly AsyncKeyedLocker<string> _locker = new(o =>
{
o.PoolSize = 20;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these settings? Does this restrict to only 20 concurrent locks?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the pooling is there to improve performance and reduce memory allocations by reusing objects. In case the pool is empty, an object is created. The pool is thus not a restriction.

o.PoolInitialFill = 1;
});

public LndHubClient(Uri baseUri, string login, string password, Network network, HttpClient httpClient)
{
Expand Down Expand Up @@ -202,49 +207,53 @@ public async Task<ILightningInvoiceListener> CreateInvoiceSession(CancellationTo

private async Task ClearAccessToken()
{
using var release = await _locker.LockAsync(CacheKey);
_cache.TryRemove(CacheKey, out _);
using (await _locker.LockAsync(CacheKey))
{
_cache.TryRemove(CacheKey, out _);
}
}

private async Task<string> GetAccessToken(CancellationToken cancellation = default)
{
using var release = await _locker.LockAsync(CacheKey, cancellation);
AuthResponse response;
if (_cache.TryGetValue(CacheKey, out var cached))
using (await _locker.LockAsync(CacheKey, cancellation))
{
if (cached.Expiry <= DateTimeOffset.UtcNow)
AuthResponse response;
if (_cache.TryGetValue(CacheKey, out var cached))
{
_cache.TryRemove(CacheKey, out _);
if (cached.Expiry <= DateTimeOffset.UtcNow)
{
_cache.TryRemove(CacheKey, out _);
}
else if (cached.Expiry - DateTimeOffset.UtcNow > TimeSpan.FromMinutes(5))
{
return cached.AccessToken;
}

response = await Post<AuthRequest, AuthResponse>("auth?type=refresh_token",
new AuthRequest { RefreshToken = cached.RefreshToken }, cancellation);
}
else if (cached.Expiry - DateTimeOffset.UtcNow > TimeSpan.FromMinutes(5))
else
{
return cached.AccessToken;
response = await Post<AuthRequest, AuthResponse>("auth?type=auth",
new AuthRequest { Login = _login, Password = _password }, cancellation);
}

response = await Post<AuthRequest, AuthResponse>("auth?type=refresh_token",
new AuthRequest {RefreshToken = cached.RefreshToken}, cancellation);
}
else
{
response = await Post<AuthRequest, AuthResponse>("auth?type=auth",
new AuthRequest {Login = _login, Password = _password}, cancellation);
}

if (response.Expiry is null)
{
try
if (response.Expiry is null)
{
response.Expiry = DateTimeOffset.FromUnixTimeSeconds(
long.Parse(ParseClaimsFromJwt(response.AccessToken).First(claim => claim.Type == "exp").Value));
}
catch (Exception)
{
//it's ok if we dont parse it, once auth fails we try again
try
{
response.Expiry = DateTimeOffset.FromUnixTimeSeconds(
long.Parse(ParseClaimsFromJwt(response.AccessToken).First(claim => claim.Type == "exp").Value));
}
catch (Exception)
{
//it's ok if we dont parse it, once auth fails we try again
}
}
}

_cache.AddOrReplace(CacheKey, response);
return response.AccessToken;
_cache.AddOrReplace(CacheKey, response);
return response.AccessToken;
}
}
private static IEnumerable<Claim> ParseClaimsFromJwt(string jwt)
{
Expand Down
36 changes: 23 additions & 13 deletions src/BTCPayServer.Lightning.LNDhub/LndHubInvoiceListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using AsyncKeyedLock;
using BTCPayServer.Lightning.LNDhub.Models;
using NBitcoin;

Expand Down Expand Up @@ -54,31 +55,36 @@ public void Dispose()
Dispose(true);
}

static readonly AsyncDuplicateLock _locker = new();
static readonly AsyncKeyedLocker<string> _locker = new(o =>
{
o.PoolSize = 20;
o.PoolInitialFill = 1;
});
static readonly ConcurrentDictionary<string, InvoiceData[]> _activeListeners = new();

private async Task ListenLoop()
{
try
{
var releaser = await _locker.LockOrBustAsync(_client.CacheKey, _cts.Token);
if (releaser is null)
AsyncKeyedLockTimeoutReleaser<string> releaser = null;
try
{
while (!_cts.IsCancellationRequested &&releaser is null)
releaser = await _locker.LockAsync(_client.CacheKey, 0, _cts.Token);
if (!releaser.EnteredSemaphore)
{
if (_activeListeners.TryGetValue(_client.CacheKey, out var invoicesData))
while (!_cts.IsCancellationRequested && !releaser.EnteredSemaphore)
{
await HandleInvoicesData(invoicesData);
if (_activeListeners.TryGetValue(_client.CacheKey, out var invoicesData))
{
await HandleInvoicesData(invoicesData);
}
releaser = await _locker.LockAsync(_client.CacheKey, 0, _cts.Token);

if (!releaser.EnteredSemaphore)
await Task.Delay(2500, _cts.Token);
}
releaser = await _locker.LockOrBustAsync(_client.CacheKey, _cts.Token);

if(releaser is null)
await Task.Delay(2500, _cts.Token);
}
}

using (releaser)
{
while (!_cts.IsCancellationRequested)
{
var invoicesData = await _client.GetInvoices(_cts.Token);
Expand All @@ -88,6 +94,10 @@ private async Task ListenLoop()
await Task.Delay(2500, _cts.Token);
}
}
finally
{
releaser.Dispose();
}
}
catch when (_cts.IsCancellationRequested)
{
Expand Down
28 changes: 17 additions & 11 deletions tests/AsycLockTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using System;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using AsyncKeyedLock;
using BTCPayServer.Lightning.LndHub;
using Xunit;
using Xunit.Abstractions;
Expand All @@ -19,7 +20,11 @@ public AsyncDuplicateLockTests(ITestOutputHelper outputHelper)

public class Wallet
{
private static AsyncDuplicateLock _lock = new();
private static AsyncKeyedLocker<string> _lock = new(o =>
{
o.PoolSize = 20;
o.PoolInitialFill = 1;
});

public string Id { get; set; }
public decimal Balance { get; private set; }
Expand Down Expand Up @@ -85,24 +90,25 @@ public async Task Spend_WhenConcurrentlyExceedingBalance_ShouldPreventOverdraw()
[Fact]
public async Task LockAsync_MultipleParallelForeach_ShouldNotDuplicateEntries()
{
var lockObj = new AsyncDuplicateLock();
var lockObj = new AsyncKeyedLocker<char>(o =>
{
o.PoolSize = 20;
o.PoolInitialFill = 1;
});
var alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
var resultList = new ConcurrentDictionary<string, int>();
int iterationsPerLetter = 100; // Number of iterations per letter

async Task WriteToList(char letter)
{
while (true)

{
var release = await lockObj.LockOrBustAsync(letter);
if (release is null)
{
continue;
}

using (release)
using (var releaser = await lockObj.LockAsync(letter, 0))
{
if (!releaser.EnteredSemaphore)
{
continue;
}
try
{
if (resultList.TryGetValue(letter.ToString(), out var count) &&
Expand Down