51 lines
1.8 KiB
C#
51 lines
1.8 KiB
C#
|
|
using Microsoft.Extensions.DependencyInjection;
|
||
|
|
using Microsoft.Extensions.Hosting;
|
||
|
|
using Microsoft.Extensions.Logging;
|
||
|
|
using TeamUp.Modules.Assembler.Queue;
|
||
|
|
using TeamUp.Modules.Assembler.Runtime;
|
||
|
|
|
||
|
|
namespace TeamUp.Modules.Assembler.Worker;
|
||
|
|
|
||
|
|
/// <summary>Drains the agent-run queue on the worker host: claim (SKIP LOCKED) → process, repeat.</summary>
|
||
|
|
internal sealed class JobProcessor(IServiceScopeFactory scopeFactory, ILogger<JobProcessor> logger) : BackgroundService
|
||
|
|
{
|
||
|
|
private static readonly TimeSpan PollInterval = TimeSpan.FromSeconds(2);
|
||
|
|
private readonly string _worker = $"{Environment.MachineName}:{Environment.ProcessId}";
|
||
|
|
|
||
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||
|
|
{
|
||
|
|
logger.LogInformation("Agent-run job processor started ({Worker}).", _worker);
|
||
|
|
|
||
|
|
using var timer = new PeriodicTimer(PollInterval);
|
||
|
|
while (!stoppingToken.IsCancellationRequested)
|
||
|
|
{
|
||
|
|
try
|
||
|
|
{
|
||
|
|
await DrainAsync(stoppingToken);
|
||
|
|
await timer.WaitForNextTickAsync(stoppingToken);
|
||
|
|
}
|
||
|
|
catch (OperationCanceledException)
|
||
|
|
{
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private async Task DrainAsync(CancellationToken cancellationToken)
|
||
|
|
{
|
||
|
|
while (!cancellationToken.IsCancellationRequested)
|
||
|
|
{
|
||
|
|
await using var scope = scopeFactory.CreateAsyncScope();
|
||
|
|
var queue = scope.ServiceProvider.GetRequiredService<JobQueue>();
|
||
|
|
var job = await queue.ClaimNextAsync(_worker, cancellationToken);
|
||
|
|
if (job is null)
|
||
|
|
{
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
|
||
|
|
var executor = scope.ServiceProvider.GetRequiredService<AgentRunExecutor>();
|
||
|
|
await executor.ProcessAsync(job, cancellationToken);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|