41 lines
1.5 KiB
C#
41 lines
1.5 KiB
C#
|
|
using Microsoft.EntityFrameworkCore;
|
||
|
|
using TeamUp.Modules.Assembler.Domain;
|
||
|
|
using TeamUp.Modules.Assembler.Persistence;
|
||
|
|
|
||
|
|
namespace TeamUp.Modules.Assembler.Queue;
|
||
|
|
|
||
|
|
/// <summary>The Postgres-backed agent-run queue. Enqueue inserts; claim uses FOR UPDATE SKIP LOCKED
|
||
|
|
/// so multiple workers can drain concurrently without contention.</summary>
|
||
|
|
internal sealed class JobQueue(AssemblerDbContext db, TimeProvider clock)
|
||
|
|
{
|
||
|
|
public async Task<Job> EnqueueAsync(string type, string payload, CancellationToken cancellationToken = default)
|
||
|
|
{
|
||
|
|
var job = new Job(type, payload, clock.GetUtcNow());
|
||
|
|
db.Jobs.Add(job);
|
||
|
|
await db.SaveChangesAsync(cancellationToken);
|
||
|
|
return job;
|
||
|
|
}
|
||
|
|
|
||
|
|
public async Task<Job?> ClaimNextAsync(string worker, CancellationToken cancellationToken = default)
|
||
|
|
{
|
||
|
|
await using var transaction = await db.Database.BeginTransactionAsync(cancellationToken);
|
||
|
|
|
||
|
|
var job = await db.Jobs
|
||
|
|
.FromSqlRaw(
|
||
|
|
"SELECT * FROM assembler.jobs WHERE \"Status\" = 'Pending' " +
|
||
|
|
"ORDER BY \"CreatedAtUtc\" LIMIT 1 FOR UPDATE SKIP LOCKED")
|
||
|
|
.FirstOrDefaultAsync(cancellationToken);
|
||
|
|
|
||
|
|
if (job is null)
|
||
|
|
{
|
||
|
|
await transaction.RollbackAsync(cancellationToken);
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
|
||
|
|
job.MarkProcessing(worker, clock.GetUtcNow());
|
||
|
|
await db.SaveChangesAsync(cancellationToken);
|
||
|
|
await transaction.CommitAsync(cancellationToken);
|
||
|
|
return job;
|
||
|
|
}
|
||
|
|
}
|