Skip to content

Commit

Permalink
Merge pull request #40 from tvorova-tvornica/add-tracing-decorator
Browse files Browse the repository at this point in the history
Implement tracing job decorator
  • Loading branch information
i-vukman authored Oct 13, 2023
2 parents a664add + 4b730c1 commit 4d90f6f
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 37 deletions.
1 change: 1 addition & 0 deletions src/AnagramSolver/AnagramSolver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.4" />
<PackageReference Include="Scrutor" Version="4.2.2" />
<PackageReference Include="Sentry.AspNetCore" Version="3.39.1" />
<PackageReference Include="System.Interactive.Async" Version="6.0.1" />
</ItemGroup>
Expand Down
6 changes: 6 additions & 0 deletions src/AnagramSolver/BackgroundJobs/IJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace AnagramSolver.BackgroundJobs;

public interface IJob<T>
{
Task ExecuteAsync(T jobData);
}
57 changes: 57 additions & 0 deletions src/AnagramSolver/BackgroundJobs/TracingJobDecorator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@

using System.Runtime.ExceptionServices;
using Sentry;

namespace AnagramSolver.BackgroundJobs;

public class TracingJobDecorator<T> : IJob<T>
{
private readonly IJob<T> _decorated;
private readonly Func<IHub> _getHub;

public TracingJobDecorator(
IJob<T> decorated,
Func<IHub> getHub)
{
_decorated = decorated;
_getHub = getHub;
}

public async Task ExecuteAsync(T jobData)
{
var hub = _getHub();

if (!hub.IsEnabled)
{
await _decorated.ExecuteAsync(jobData).ConfigureAwait(false);
return;
}

var transaction = _getHub().StartTransaction("BackgroundJob", $"{_decorated.GetType().Name}");

hub.ConfigureScope(scope => {
scope.Transaction = transaction;
});

Exception? exception = null;
try
{
await _decorated.ExecuteAsync(jobData).ConfigureAwait(false);
}
catch(Exception e)
{
exception = e;
}
finally
{
if (exception is null)
{
transaction.Finish(SpanStatus.Ok);
}
else
{
transaction.Finish(exception);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using AnagramSolver.Data;
using Hangfire;
using Microsoft.EntityFrameworkCore;
using static AnagramSolver.BackgroundJobs.WikiDataImport.EnqueueScheduledCelebritiesPageImportsJob;
using static AnagramSolver.BackgroundJobs.WikiDataImport.ImportCelebritiesPageJob;
using static AnagramSolver.Data.Entities.ImportWikiDataCelebritiesPageRequest;

namespace AnagramSolver.BackgroundJobs.WikiDataImport;

public class EnqueueScheduledCelebritiesPageImportsJob
public class EnqueueScheduledCelebritiesPageImportsJob : IJob<EnqueueScheduledCelebritiesPageImportsJobData>
{
private readonly AnagramSolverContext _db;
private const int DelayStepInSeconds = 8;
Expand All @@ -15,20 +17,22 @@ public EnqueueScheduledCelebritiesPageImportsJob(AnagramSolverContext db)
_db = db;
}

public async Task EnqueueAsync(int importRequestId)
public async Task ExecuteAsync(EnqueueScheduledCelebritiesPageImportsJobData data)
{
var importRequest = await _db.ImportWikiDataCelebritiesRequests
.Include(x => x.ImportPageRequests)
.SingleAsync(x => x.Id == importRequestId);
.SingleAsync(x => x.Id == data.ImportRequestId);

var scheduledPageRequests = importRequest.ImportPageRequests
.Where(x => x.Status == ImportWikiDataCelebritiesPageRequestStatus.Scheduled)
.ToList();

var delayInSeconds = DelayStepInSeconds;
scheduledPageRequests.ForEach(x => {
BackgroundJob.Schedule<ImportCelebritiesPageJob>(y => y.ImportAsync(x.Id), TimeSpan.FromSeconds(delayInSeconds));
BackgroundJob.Schedule<IJob<ImportCelebritiesPageJobData>>(y => y.ExecuteAsync(new ImportCelebritiesPageJobData(x.Id)), TimeSpan.FromSeconds(delayInSeconds));
delayInSeconds += DelayStepInSeconds;
});
}

public record EnqueueScheduledCelebritiesPageImportsJobData(int ImportRequestId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,40 @@
using AnagramSolver.HttpClients;
using AnagramSolver.HttpClients.Dto;
using Microsoft.EntityFrameworkCore;
using Sentry;
using static AnagramSolver.BackgroundJobs.WikiDataImport.ImportCelebritiesPageJob;
using static AnagramSolver.Data.Entities.ImportWikiDataCelebritiesPageRequest;

namespace AnagramSolver.BackgroundJobs.WikiDataImport;

public class ImportCelebritiesPageJob
public class ImportCelebritiesPageJob : IJob<ImportCelebritiesPageJobData>
{
private readonly AnagramSolverContext _db;
private readonly WikiDataHttpClient _httpClient;
private readonly IHub _sentryHub;

public ImportCelebritiesPageJob(AnagramSolverContext db,
WikiDataHttpClient httpClient,
IHub sentryHub)
WikiDataHttpClient httpClient)
{
_db = db;
_httpClient = httpClient;
_sentryHub = sentryHub;
}

public async Task ImportAsync(int importPageRequestId)
public async Task ExecuteAsync(ImportCelebritiesPageJobData data)
{
var transaction = _sentryHub.StartTransaction("background-job", "import-celebrities-page");
var span = transaction.StartChild("import");

var pageRequest = await _db.ImportWikiDataCelebritiesPageRequests
.Include(x => x.ImportCelebritiesRequest)
.Where(x => x.Status == ImportWikiDataCelebritiesPageRequestStatus.Scheduled)
.FirstOrDefaultAsync(x => x.Id == importPageRequestId);
.FirstOrDefaultAsync(x => x.Id == data.ImportPageRequestId);

if (pageRequest == null)
{
throw new Exception($"Cannot find import page request (id:{importPageRequestId}) with 'Scheduled' status");
throw new Exception($"Cannot find import page request (id:{data.ImportPageRequestId}) with 'Scheduled' status");
}

var wikiDataCelebritiesByPageId = await GetWikiDataCelebritiesByPageIdAsync(pageRequest);
await UpsertCelebritiesAsync(wikiDataCelebritiesByPageId);
pageRequest.MarkProcessed();

await _db.SaveChangesAsync();

span.Finish();
transaction.Finish();
}

private async Task<Dictionary<string, WikiDataCelebritiesResponse.WikiDataCelebrity>> GetWikiDataCelebritiesByPageIdAsync(ImportWikiDataCelebritiesPageRequest pageRequest)
Expand Down Expand Up @@ -99,4 +90,6 @@ private async Task UpsertCelebritiesAsync(Dictionary<string, WikiDataCelebrities

_db.Celebrities.AddRange(celebritiesToInsert);
}

public record ImportCelebritiesPageJobData(int ImportPageRequestId);
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using AnagramSolver.Data;
using Microsoft.EntityFrameworkCore;
using static AnagramSolver.BackgroundJobs.WikiDataImport.ImportCelebrityRequestsProcessorJob;
using static AnagramSolver.Data.Entities.ImportWikiDataCelebritiesPageRequest;
using static AnagramSolver.Data.Entities.ImportWikiDataCelebritiesRequest;

namespace AnagramSolver.BackgroundJobs.WikiDataImport;

public class ImportCelebrityRequestsProcessorJob
public class ImportCelebrityRequestsProcessorJob : IJob<ImportCelebrityRequestsProcessorJobData>
{
private readonly AnagramSolverContext _db;

Expand All @@ -14,7 +15,7 @@ public ImportCelebrityRequestsProcessorJob(AnagramSolverContext db)
_db = db;
}

public async Task ProcessAsync()
public async Task ExecuteAsync(ImportCelebrityRequestsProcessorJobData _)
{
var processedRequests = await _db.ImportWikiDataCelebritiesRequests
.Where(x => x.Status == ImportWikiDataCelebritiesRequestStatus.PageRequestsScheduled &&
Expand All @@ -24,4 +25,6 @@ public async Task ProcessAsync()
processedRequests.ForEach(x => x.MarkProcessed());
await _db.SaveChangesAsync();
}

public record ImportCelebrityRequestsProcessorJobData();
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
using AnagramSolver.Data;
using Hangfire;
using Microsoft.EntityFrameworkCore;
using static AnagramSolver.BackgroundJobs.WikiDataImport.EnqueueScheduledCelebritiesPageImportsJob;
using static AnagramSolver.BackgroundJobs.WikiDataImport.ImportCelebrityRequestsSchedulerJob;
using static AnagramSolver.BackgroundJobs.WikiDataImport.ScheduleCelebritiesPageImportsJob;
using static AnagramSolver.Data.Entities.ImportWikiDataCelebritiesRequest;

namespace AnagramSolver.BackgroundJobs.WikiDataImport;

public class ImportCelebrityRequestsSchedulerJob
public class ImportCelebrityRequestsSchedulerJob : IJob<ImportCelebrityRequestsSchedulerJobData>
{
private readonly AnagramSolverContext _db;

Expand All @@ -14,7 +17,7 @@ public ImportCelebrityRequestsSchedulerJob(AnagramSolverContext db)
_db = db;
}

public async Task ScheduleSingleAsync()
public async Task ExecuteAsync(ImportCelebrityRequestsSchedulerJobData _)
{
var isAnyRequestProcessing = await _db.ImportWikiDataCelebritiesRequests
.AnyAsync(x => x.Status != ImportWikiDataCelebritiesRequestStatus.Requested &&
Expand All @@ -35,10 +38,12 @@ public async Task ScheduleSingleAsync()
return;
}

var jobId = BackgroundJob.Schedule<ScheduleCelebritiesPageImportsJob>(y => y.ScheduleAsync(requestedImport.Id), TimeSpan.FromSeconds(5));
BackgroundJob.ContinueJobWith<EnqueueScheduledCelebritiesPageImportsJob>(jobId, y => y.EnqueueAsync(requestedImport.Id));
var jobId = BackgroundJob.Schedule<IJob<ScheduleCelebritiesPageImportsJobData>>(y => y.ExecuteAsync(new ScheduleCelebritiesPageImportsJobData(requestedImport.Id)), TimeSpan.FromSeconds(5));
BackgroundJob.ContinueJobWith<IJob<EnqueueScheduledCelebritiesPageImportsJobData>>(jobId, y => y.ExecuteAsync(new EnqueueScheduledCelebritiesPageImportsJobData(requestedImport.Id)));
requestedImport.MarkScheduled();

await _db.SaveChangesAsync();
}

public record ImportCelebrityRequestsSchedulerJobData();
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using AnagramSolver.Data;
using AnagramSolver.HttpClients;
using Microsoft.EntityFrameworkCore;
using static AnagramSolver.BackgroundJobs.WikiDataImport.ScheduleCelebritiesPageImportsJob;
using static AnagramSolver.Data.Entities.ImportWikiDataCelebritiesRequest;

namespace AnagramSolver.BackgroundJobs.WikiDataImport;

public class ScheduleCelebritiesPageImportsJob
public class ScheduleCelebritiesPageImportsJob : IJob<ScheduleCelebritiesPageImportsJobData>
{
private readonly AnagramSolverContext _db;
private readonly WikiDataHttpClient _httpClient;
Expand All @@ -21,14 +22,14 @@ public ScheduleCelebritiesPageImportsJob(
_logger = logger;
}

public async Task ScheduleAsync(int importCelebritiesRequestId)
public async Task ExecuteAsync(ScheduleCelebritiesPageImportsJobData data)
{
var request = await _db.ImportWikiDataCelebritiesRequests.SingleOrDefaultAsync(x => x.Id == importCelebritiesRequestId &&
var request = await _db.ImportWikiDataCelebritiesRequests.SingleOrDefaultAsync(x => x.Id == data.ImportCelebritiesRequestId &&
x.Status == ImportWikiDataCelebritiesRequestStatus.Scheduled);

if (request is null)
{
throw new Exception($"Cannot find import request (requestId: {importCelebritiesRequestId}) with status 'Scheduled'");
throw new Exception($"Cannot find import request (requestId: {data.ImportCelebritiesRequestId}) with status 'Scheduled'");
}

var totalCelebrityCount = await _httpClient.GetTotalCelebrityCountAsync(request.WikiDataOccupationId, request.WikiDataNationalityId);
Expand All @@ -43,5 +44,7 @@ public async Task ScheduleAsync(int importCelebritiesRequestId)

await _db.SaveChangesAsync();
}

public record ScheduleCelebritiesPageImportsJobData(int ImportCelebritiesRequestId);
}

19 changes: 11 additions & 8 deletions src/AnagramSolver/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using AnagramSolver.BackgroundJobs;
using AnagramSolver.BackgroundJobs.WikiDataImport;
using AnagramSolver.Data;
using AnagramSolver.Exceptions;
using AnagramSolver.HttpClients;
Expand All @@ -10,6 +9,8 @@
using Microsoft.AspNetCore.Diagnostics;
using Microsoft.EntityFrameworkCore;
using static System.Net.Mime.MediaTypeNames;
using static AnagramSolver.BackgroundJobs.WikiDataImport.ImportCelebrityRequestsProcessorJob;
using static AnagramSolver.BackgroundJobs.WikiDataImport.ImportCelebrityRequestsSchedulerJob;

var builder = WebApplication.CreateBuilder(args);
builder.WebHost.UseShutdownTimeout(TimeSpan.FromSeconds(55));
Expand All @@ -30,11 +31,13 @@
builder.Services.AddDbContext<AnagramSolverContext>(options =>
options.UseNpgsql(builder.Configuration.GetValue<string>("CONNECTION_STRING")));

builder.Services.AddTransient<ImportCelebritiesPageJob>();
builder.Services.AddTransient<EnqueueScheduledCelebritiesPageImportsJob>();
builder.Services.AddTransient<ImportCelebrityRequestsSchedulerJob>();
builder.Services.AddTransient<ImportCelebrityRequestsProcessorJob>();
builder.Services.AddTransient<ScheduleCelebritiesPageImportsJob>();
builder.Services.Scan(scan => scan.FromCallingAssembly()
.AddClasses(classes => classes.AssignableTo(typeof(IJob<>)).Where(_ => !_.IsGenericType))
.AsImplementedInterfaces()
.WithTransientLifetime());

builder.Services.Decorate(typeof(IJob<>), typeof(TracingJobDecorator<>));

builder.Services.AddHttpClient<WikiDataHttpClient>();

builder.Services.AddAuthentication(CookieAuthenticationDefaults.AuthenticationScheme)
Expand Down Expand Up @@ -131,7 +134,7 @@

app.MapFallbackToFile("index.html");

RecurringJob.AddOrUpdate<ImportCelebrityRequestsSchedulerJob>("ImportWikiDataCelebrityRequestsSchedulerJob", x => x.ScheduleSingleAsync(), Cron.Minutely);
RecurringJob.AddOrUpdate<ImportCelebrityRequestsProcessorJob>("ProcessImportWikiDataCelebrityRequestsJob", x => x.ProcessAsync(), Cron.Minutely);
RecurringJob.AddOrUpdate<IJob<ImportCelebrityRequestsSchedulerJobData>>("ImportWikiDataCelebrityRequestsSchedulerJob", x => x.ExecuteAsync(new ImportCelebrityRequestsSchedulerJobData()), Cron.Minutely);
RecurringJob.AddOrUpdate<IJob<ImportCelebrityRequestsProcessorJobData>>("ProcessImportWikiDataCelebrityRequestsJob", x => x.ExecuteAsync(new ImportCelebrityRequestsProcessorJobData()), Cron.Minutely);

app.Run();

0 comments on commit 4d90f6f

Please sign in to comment.