Skip to content

Commit

Permalink
chore: optimize ScopedTimedBackgroundWorker implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
AlisaAkiron committed Jan 18, 2025
1 parent 36d09d1 commit 691f57a
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 53 deletions.
3 changes: 2 additions & 1 deletion src/PallasBot.Application.Common/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Microsoft.Extensions.Hosting;
using PallasBot.Application.Common.Jobs;
using PallasBot.Application.Common.Services;
using PallasBot.Domain.Extensions;

namespace PallasBot.Application.Common;

Expand All @@ -11,6 +12,6 @@ public static void AddApplicationCommonServices(this IHostApplicationBuilder bui
{
builder.Services.AddSingleton<GitHubApiService>();

// builder.Services.AddHostedService<GitHubOrganizationSyncJob>();
builder.Services.AddScopedTimedBackgroundWorker<GitHubOrganizationSyncJob, GitHubOrganizationSyncService>();
}
}
59 changes: 42 additions & 17 deletions src/PallasBot.Application.Common/Jobs/GitHubOrganizationSyncJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
using PallasBot.Application.Common.Services;
using PallasBot.Domain.Abstract;
using PallasBot.Domain.Entities;
using PallasBot.Domain.Extensions;
using PallasBot.EntityFrameworkCore;

namespace PallasBot.Application.Common.Jobs;

public class GitHubOrganizationSyncJob : ScopedTimedBackgroundWorker
public class GitHubOrganizationSyncJob : ScopedTimedBackgroundWorker<GitHubOrganizationSyncService>
{
public GitHubOrganizationSyncJob(ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
: base(
Expand All @@ -26,22 +27,46 @@ public GitHubOrganizationSyncJob(ILoggerFactory loggerFactory, IServiceProvider

protected override string Name => nameof(GitHubOrganizationSyncJob);

protected override async Task ExecuteInScopeAsync(IServiceProvider serviceProvider, CancellationToken cancellationToken)
protected override async Task ExecuteAsync(GitHubOrganizationSyncService service, CancellationToken cancellationToken)
{
var gitHubApiService = serviceProvider.GetRequiredService<GitHubApiService>();
var dbContext = serviceProvider.GetRequiredService<PallasBotDbContext>();
var publishEndpoint = serviceProvider.GetRequiredService<IPublishEndpoint>();
await service.SyncOrganizationAsync(cancellationToken);
}
}

Logger.LogInformation("GitHub organization sync job started");
public class GitHubOrganizationSyncService
{
private readonly GitHubApiService _gitHubApiService;
private readonly PallasBotDbContext _dbContext;
private readonly IPublishEndpoint _publishEndpoint;
private readonly ILogger<GitHubOrganizationSyncService> _logger;

private const string OrganizationName = "MaaAssistantArknights";
private const string RepositoryName = "MaaAssistantArknights";

public GitHubOrganizationSyncService(
GitHubApiService gitHubApiService,
IPublishEndpoint publishEndpoint,
PallasBotDbContext dbContext,
ILogger<GitHubOrganizationSyncService> logger)
{
_gitHubApiService = gitHubApiService;
_publishEndpoint = publishEndpoint;
_dbContext = dbContext;
_logger = logger;
}

public async Task SyncOrganizationAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("GitHub organization sync job started");

var accessToken = await gitHubApiService.GetGitHubAppAccessTokenAsync();
var accessToken = await _gitHubApiService.GetGitHubAppAccessTokenAsync();

var members = await gitHubApiService.GetOrganizationMembersAsync("MaaAssistantArknights", accessToken.Token);
var contributors = await gitHubApiService.GetRepoContributorsAsync("MaaAssistantArknights", "MaaAssistantArknights", accessToken.Token);
var members = await _gitHubApiService.GetOrganizationMembersAsync(OrganizationName, accessToken.Token);
var contributors = await _gitHubApiService.GetRepoContributorsAsync(OrganizationName, RepositoryName, accessToken.Token);

var changes = await PersistGitHubOrganizationInfoAsync(dbContext, members, contributors);
var changes = await PersistGitHubOrganizationInfoAsync(members, contributors);

var msg = await dbContext.GitHubUserBindings
var msg = await _dbContext.GitHubUserBindings
.Where(x => changes.Contains(x.GitHubLogin))
.Select(x => new TryAssignMaaRoleMqo
{
Expand All @@ -50,17 +75,17 @@ protected override async Task ExecuteInScopeAsync(IServiceProvider serviceProvid
})
.ToListAsync(cancellationToken);

await publishEndpoint.PublishBatch(msg, cancellationToken);
await _publishEndpoint.PublishBatch(msg, cancellationToken);
}

private static async Task<List<string>> PersistGitHubOrganizationInfoAsync(PallasBotDbContext dbContext, List<GitHubUser> members, List<GitHubUser> contributors)
private async Task<List<string>> PersistGitHubOrganizationInfoAsync(List<GitHubUser> members, List<GitHubUser> contributors)
{
var logins = members.Select(x => x.Login)
.Concat(contributors.Select(x => x.Login))
.Distinct()
.ToList();

var existingUsers = await dbContext.GitHubContributors
var existingUsers = await _dbContext.GitHubContributors
.Where(x => logins.Contains(x.GitHubLogin))
.ToListAsync();

Expand Down Expand Up @@ -101,15 +126,15 @@ private static async Task<List<string>> PersistGitHubOrganizationInfoAsync(Palla

if (isNew)
{
await dbContext.GitHubContributors.AddAsync(user);
await _dbContext.GitHubContributors.AddAsync(user);
}
else
{
dbContext.GitHubContributors.Update(user);
_dbContext.GitHubContributors.Update(user);
}
}

await dbContext.SaveChangesAsync();
await _dbContext.SaveChangesAsync();

return changes.Distinct().ToList();
}
Expand Down
94 changes: 59 additions & 35 deletions src/PallasBot.Domain/Abstract/ScopedTimedBackgroundWorker.cs
Original file line number Diff line number Diff line change
@@ -1,81 +1,105 @@
using Microsoft.Extensions.DependencyInjection;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace PallasBot.Domain.Abstract;

public abstract class ScopedTimedBackgroundWorker : IHostedService, IDisposable
[SuppressMessage("Design", "CA1031:Do not catch general exception types")]
public abstract class ScopedTimedBackgroundWorker<TService> : IHostedService, IDisposable
where TService : notnull
{
private readonly ScopedTimedBackgroundWorkerOptions _options;
private readonly IServiceProvider _serviceProvider;
private readonly SemaphoreSlim _lock = new(1, 1);
private readonly ScopedTimedBackgroundWorkerOptions _options;
private readonly ILogger _logger;

private PeriodicTimer? _periodicTimer;

protected CancellationToken CancellationToken { get; set; }
protected ILogger Logger { get; }
private readonly SemaphoreSlim _semaphore = new(1, 1);
private readonly CancellationTokenSource _taskCancellationTokenSource = new();

protected CancellationToken ServiceCancellationToken { get; set; }

protected ScopedTimedBackgroundWorker(
ScopedTimedBackgroundWorkerOptions options,
ILoggerFactory loggerFactory,
IServiceProvider serviceProvider)
{
_options = options;
_serviceProvider = serviceProvider;

Logger = loggerFactory.CreateLogger("BackgroundWorker");
_options = options;
_logger = loggerFactory.CreateLogger("BackgroundWorker");
}

protected abstract string Name { get; }
protected abstract Task ExecuteInScopeAsync(IServiceProvider serviceProvider, CancellationToken cancellationToken);
protected abstract Task ExecuteAsync(TService service, CancellationToken cancellationToken);

private async Task ExecuteInScopeAsync(CancellationToken cancellationToken)
{
await _semaphore.WaitAsync(cancellationToken);
try
{
using var scope = _serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetRequiredService<TService>();
await ExecuteAsync(service, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "Error occurred in the execution of job {Name}", Name);
}
finally
{
_semaphore.Release();
}
}

public Task StartAsync(CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
ServiceCancellationToken = cancellationToken;
_periodicTimer = new PeriodicTimer(_options.Interval);

_ = Task.Run(async () =>
{
_logger.LogInformation("Starting job {Name} {JobConfig}", Name, _options);

if (_options.RunOnStart)
{
await ExecuteAsync(cancellationToken);
_logger.LogInformation("Running job {Name} on start", Name);
await ExecuteInScopeAsync(_taskCancellationTokenSource.Token);
}

while (await _periodicTimer.WaitForNextTickAsync(cancellationToken))
while (_periodicTimer is not null && _taskCancellationTokenSource.Token.IsCancellationRequested is false)
{
await ExecuteAsync(cancellationToken);
try
{
await _periodicTimer.WaitForNextTickAsync(_taskCancellationTokenSource.Token);

if (_taskCancellationTokenSource.Token.IsCancellationRequested is false)
{
_logger.LogInformation("Timer elapsed, running job {Name}", Name);
await ExecuteInScopeAsync(_taskCancellationTokenSource.Token);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Job {Name} was cancelled", Name);
return;
}
}
}, cancellationToken);
}, ServiceCancellationToken);

return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
await _taskCancellationTokenSource.CancelAsync();
_periodicTimer?.Dispose();
return Task.CompletedTask;
}

private async Task ExecuteAsync(CancellationToken cancellationToken)
{
await _lock.WaitAsync(cancellationToken);
using var scope = _serviceProvider.CreateScope();
try
{
await ExecuteInScopeAsync(scope.ServiceProvider, cancellationToken);
}
catch (Exception e)
{
Logger.LogError(e, "Error occurred while executing the background worker {WorkerName}", Name);
}
finally
{
_lock.Release();
}
}

public void Dispose()
{
GC.SuppressFinalize(this);

_periodicTimer?.Dispose();
}
}
Expand Down
23 changes: 23 additions & 0 deletions src/PallasBot.Domain/Extensions/DependencyInjectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace PallasBot.Domain.Extensions;

public static class DependencyInjectionExtensions
{
public static TReturn RunInScope<TService, TReturn>(this IServiceProvider serviceProvider, Func<TService, TReturn> operation)
where TService : notnull
{
using var scope = serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetRequiredService<TService>();
return operation.Invoke(service);
}

public static void AddScopedTimedBackgroundWorker<TWorker, TService>(this IServiceCollection services)
where TWorker : class, IHostedService
where TService : class
{
services.AddScoped<TService>();
services.AddHostedService<TWorker>();
}
}

0 comments on commit 691f57a

Please sign in to comment.