Skip to content

Commit

Permalink
Fix DMM parser and a couple of other minor issues. (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
giorgi1324 authored Nov 19, 2024
1 parent a7d5944 commit 594320e
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/debrid-collector/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ WORKDIR /app

ENV PYTHONUNBUFFERED=1

RUN apk add --update --no-cache python3=~3.11.9-r0 py3-pip && ln -sf python3 /usr/bin/python
RUN apk add --update --no-cache python3=~3.11 py3-pip && ln -sf python3 /usr/bin/python

COPY --from=build /src/out .

Expand Down
4 changes: 3 additions & 1 deletion src/metadata/Features/Configuration/PostgresConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ public class PostgresConfiguration
private const string PasswordVariable = "PASSWORD";
private const string DatabaseVariable = "DB";
private const string PortVariable = "PORT";
private const string CommandTimeoutVariable = "COMMAND_TIMEOUT_SEC"; // Seconds

private string Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HostVariable);
private string Username { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(UsernameVariable);
private string Password { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(PasswordVariable);
private string Database { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(DatabaseVariable);
private int PORT { get; init; } = Prefix.GetEnvironmentVariableAsInt(PortVariable, 5432);
private int CommandTimeout { get; init; } = Prefix.GetEnvironmentVariableAsInt(CommandTimeoutVariable, 300);

public string StorageConnectionString => $"Host={Host};Port={PORT};Username={Username};Password={Password};Database={Database};";
public string StorageConnectionString => $"Host={Host};Port={PORT};Username={Username};Password={Password};Database={Database};CommandTimeout={CommandTimeout}";
}
12 changes: 1 addition & 11 deletions src/metadata/Features/ImportImdbData/ImdbDbService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private async Task ExecuteCommandAsync(Func<NpgsqlConnection, Task> operation, s
{
try
{
await using var connection = CreateNpgsqlConnection();
await using var connection = new NpgsqlConnection(configuration.StorageConnectionString);
await connection.OpenAsync();

await operation(connection);
Expand All @@ -145,16 +145,6 @@ private async Task ExecuteCommandAsync(Func<NpgsqlConnection, Task> operation, s
}
}

private NpgsqlConnection CreateNpgsqlConnection()
{
var connectionStringBuilder = new NpgsqlConnectionStringBuilder(configuration.StorageConnectionString)
{
CommandTimeout = 3000,
};

return new(connectionStringBuilder.ConnectionString);
}

private async Task ExecuteCommandWithTransactionAsync(Func<NpgsqlConnection, NpgsqlTransaction, Task> operation, NpgsqlTransaction transaction, string errorMessage)
{
try
Expand Down
2 changes: 1 addition & 1 deletion src/metadata/Metadata.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
<PackageReference Include="Npgsql" Version="8.0.2" />
<PackageReference Include="Npgsql" Version="8.0.3" />
<PackageReference Include="Serilog" Version="3.1.1" />
<PackageReference Include="Serilog.AspNetCore" Version="8.0.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
Expand Down
2 changes: 1 addition & 1 deletion src/producer/src/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ WORKDIR /app

ENV PYTHONUNBUFFERED=1

RUN apk add --update --no-cache python3=~3.11.9-r0 py3-pip && ln -sf python3 /usr/bin/python
RUN apk add --update --no-cache python3=~3.11 py3-pip && ln -sf python3 /usr/bin/python

COPY --from=build /src/out .

Expand Down
63 changes: 44 additions & 19 deletions src/producer/src/Features/Crawlers/Dmm/DebridMediaManagerCrawler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public partial class DebridMediaManagerCrawler(
protected override string Source => "DMM";

private const int ParallelismCount = 4;

public override async Task Execute()
{
var tempDirectory = await dmmFileDownloader.DownloadFileToTempPath(CancellationToken.None);
Expand All @@ -24,7 +24,7 @@ public override async Task Execute()
logger.LogInformation("Found {Files} files to parse", files.Length);

var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };

await Parallel.ForEachAsync(files, options, async (file, token) =>
{
var fileName = Path.GetFileName(file);
Expand Down Expand Up @@ -69,9 +69,9 @@ await Parallel.ForEachAsync(batchProcessables.Select(t => t.InfoHash), options,
if (page.TryGetValue(infoHash, out var dmmContent) &&
successfulResponses.TryGetValue(dmmContent.Filename, out var parsedResponse))
{
page[infoHash] = dmmContent with {ParseResponse = parsedResponse};
page[infoHash] = dmmContent with { ParseResponse = parsedResponse };
}

return ValueTask.CompletedTask;
});
}
Expand All @@ -86,7 +86,7 @@ await Parallel.ForEachAsync(batchProcessables.Select(t => t.InfoHash), options,
}

var pageSource = await File.ReadAllTextAsync(filePath);

var match = HashCollectionMatcher().Match(pageSource);

if (!match.Success)
Expand All @@ -106,9 +106,34 @@ await Parallel.ForEachAsync(batchProcessables.Select(t => t.InfoHash), options,

var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson.Value);

var json = JsonDocument.Parse(decodedJson);

var torrents = await json.RootElement.EnumerateArray()
JsonElement arrayToProcess;
try
{
var json = JsonDocument.Parse(decodedJson);

if (json.RootElement.ValueKind == JsonValueKind.Object &&
json.RootElement.TryGetProperty("torrents", out var torrentsProperty) &&
torrentsProperty.ValueKind == JsonValueKind.Array)
{
arrayToProcess = torrentsProperty;
}
else if (json.RootElement.ValueKind == JsonValueKind.Array)
{
arrayToProcess = json.RootElement;
}
else
{
logger.LogWarning("Unexpected JSON format in {Name}", name);
return [];
}
}
catch (Exception ex)
{
logger.LogError("Failed to parse JSON {decodedJson} for {Name}: {Exception}", decodedJson, name, ex);
return [];
}

var torrents = await arrayToProcess.EnumerateArray()
.ToAsyncEnumerable()
.Select(ParsePageContent)
.Where(t => t is not null)
Expand All @@ -120,7 +145,7 @@ await Parallel.ForEachAsync(batchProcessables.Select(t => t.InfoHash), options,
await Storage.MarkPageAsIngested(filenameOnly);
return [];
}

var torrentDictionary = torrents
.Where(x => x is not null)
.GroupBy(x => x.InfoHash)
Expand All @@ -141,7 +166,7 @@ await Parallel.ForEachAsync(page, options, async (kvp, ct) =>
{
var (infoHash, dmmContent) = kvp;
var parsedTorrent = dmmContent.ParseResponse;
if (parsedTorrent is not {Success: true})
if (parsedTorrent is not { Success: true })
{
return;
}
Expand Down Expand Up @@ -192,27 +217,27 @@ private IngestedTorrent MapToTorrent(ImdbEntry result, long size, string infoHas
Category = AssignCategory(result),
RtnResponse = parsedTorrent.Response.ToJson(),
};


private Task AddToCache(string cacheKey, ImdbEntry best)
{
var cacheOptions = new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(1),
};

return cache.SetStringAsync(cacheKey, JsonSerializer.Serialize(best), cacheOptions);
}

private async Task<(bool Success, ImdbEntry? Entry)> CheckIfInCacheAndReturn(string cacheKey)
{
var cachedImdbId = await cache.GetStringAsync(cacheKey);

if (!string.IsNullOrEmpty(cachedImdbId))
{
return (true, JsonSerializer.Deserialize<ImdbEntry>(cachedImdbId));
}

return (false, null);
}

Expand All @@ -222,17 +247,17 @@ private Task AddToCache(string cacheKey, ImdbEntry best)

return (pageIngested, filename);
}

private static string AssignCategory(ImdbEntry entry) =>
entry.Category.ToLower() switch
{
var category when string.Equals(category, "movie", StringComparison.OrdinalIgnoreCase) => "movies",
var category when string.Equals(category, "tvSeries", StringComparison.OrdinalIgnoreCase) => "tv",
_ => "unknown",
};

private static string GetCacheKey(string category, string title, int year) => $"{category.ToLowerInvariant()}:{year}:{title.ToLowerInvariant()}";

private static ExtractedDMMContent? ParsePageContent(JsonElement item)
{
if (!item.TryGetProperty("filename", out var filenameElement) ||
Expand All @@ -241,10 +266,10 @@ var category when string.Equals(category, "tvSeries", StringComparison.OrdinalIg
{
return null;
}

return new(filenameElement.GetString(), bytesElement.GetInt64(), hashElement.GetString());
}

private record DmmContent(string Filename, long Bytes, ParseTorrentTitleResponse? ParseResponse);
private record ExtractedDMMContent(string Filename, long Bytes, string InfoHash);
private record RtnBatchProcessable(string InfoHash, string Filename);
Expand Down
2 changes: 1 addition & 1 deletion src/qbit-collector/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ WORKDIR /app

ENV PYTHONUNBUFFERED=1

RUN apk add --update --no-cache python3=~3.11.9-r0 py3-pip && ln -sf python3 /usr/bin/python
RUN apk add --update --no-cache python3=~3.11 py3-pip && ln -sf python3 /usr/bin/python

COPY --from=build /src/out .

Expand Down
4 changes: 3 additions & 1 deletion src/shared/Configuration/PostgresConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ public class PostgresConfiguration
private const string PasswordVariable = "PASSWORD";
private const string DatabaseVariable = "DB";
private const string PortVariable = "PORT";
private const string CommandTimeoutVariable = "COMMAND_TIMEOUT_SEC"; // Seconds

private string Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HostVariable);
private string Username { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(UsernameVariable);
private string Password { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(PasswordVariable);
private string Database { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(DatabaseVariable);
private int PORT { get; init; } = Prefix.GetEnvironmentVariableAsInt(PortVariable, 5432);
private int CommandTimeout { get; init; } = Prefix.GetEnvironmentVariableAsInt(CommandTimeoutVariable, 300);

public string StorageConnectionString => $"Host={Host};Port={PORT};Username={Username};Password={Password};Database={Database};";
public string StorageConnectionString => $"Host={Host};Port={PORT};Username={Username};Password={Password};Database={Database};CommandTimeout={CommandTimeout}";
}
2 changes: 1 addition & 1 deletion src/shared/SharedContracts.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="MassTransit.Abstractions" Version="8.2.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.2.0" />
<PackageReference Include="Npgsql" Version="8.0.2" />
<PackageReference Include="Npgsql" Version="8.0.3" />
<PackageReference Include="pythonnet" Version="3.0.3" />
<PackageReference Include="Serilog" Version="3.1.1" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
Expand Down
4 changes: 3 additions & 1 deletion src/tissue/Features/DataProcessing/PostgresConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ public class PostgresConfiguration
private const string PasswordVariable = "PASSWORD";
private const string DatabaseVariable = "DB";
private const string PortVariable = "PORT";
private const string CommandTimeoutVariable = "COMMAND_TIMEOUT_SEC"; // Seconds

private string Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HostVariable);
private string Username { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(UsernameVariable);
private string Password { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(PasswordVariable);
private string Database { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(DatabaseVariable);
private int PORT { get; init; } = Prefix.GetEnvironmentVariableAsInt(PortVariable, 5432);
private int CommandTimeout { get; init; } = Prefix.GetEnvironmentVariableAsInt(CommandTimeoutVariable, 300);

public string StorageConnectionString => $"Host={Host};Port={PORT};Username={Username};Password={Password};Database={Database};";
public string StorageConnectionString => $"Host={Host};Port={PORT};Username={Username};Password={Password};Database={Database};CommandTimeout={CommandTimeout}";
}
2 changes: 1 addition & 1 deletion src/tissue/Tissue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<PackageReference Include="Dapper" Version="2.1.28" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
<PackageReference Include="Npgsql" Version="8.0.1" />
<PackageReference Include="Npgsql" Version="8.0.3" />
<PackageReference Include="Serilog" Version="3.1.1" />
<PackageReference Include="Serilog.AspNetCore" Version="8.0.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
Expand Down

0 comments on commit 594320e

Please sign in to comment.