168 lines
5.5 KiB
TypeScript
168 lines
5.5 KiB
TypeScript
|
|
/**
|
||
|
|
* Generic offline write engine.
|
||
|
|
*
|
||
|
|
* Every offline write is recorded as an {@link OutboxOp} carrying a stable
|
||
|
|
* idempotency key. On reconnect the outbox is drained in causal (insertion)
|
||
|
|
* order:
|
||
|
|
* - local ids (local_*) created by earlier ops are remapped to their real
|
||
|
|
* server ids before an op that references them is sent;
|
||
|
|
* - each op is sent with its idempotency key, so a replay after a lost response
|
||
|
|
* is de-duplicated by the server instead of creating a duplicate;
|
||
|
|
* - failures are classified: offline → stop; server 5xx / in-progress →
|
||
|
|
* retry next pass; client 4xx → count an attempt and poison after MAX.
|
||
|
|
*/
|
||
|
|
import { isAxiosError } from "axios";
|
||
|
|
import {
|
||
|
|
apiDelete,
|
||
|
|
apiPatch,
|
||
|
|
apiPost,
|
||
|
|
apiPut,
|
||
|
|
ApiClientError,
|
||
|
|
type WriteOptions,
|
||
|
|
} from "@/lib/api/client";
|
||
|
|
import {
|
||
|
|
getIdMap,
|
||
|
|
getOutboxOps,
|
||
|
|
removeOutboxOp,
|
||
|
|
setIdMapEntry,
|
||
|
|
updateOutboxOp,
|
||
|
|
type OutboxOp,
|
||
|
|
} from "@/lib/offline/offline-db";
|
||
|
|
|
||
|
|
const MAX_ATTEMPTS = 5;
|
||
|
|
/** Matches local placeholder ids like `local_1717…_a1b2c3`. */
|
||
|
|
const LOCAL_ID_RE = /local_[A-Za-z0-9]+(?:_[A-Za-z0-9]+)*/g;
|
||
|
|
|
||
|
|
function getByPath(obj: unknown, path: string): string | undefined {
|
||
|
|
let cur: unknown = obj;
|
||
|
|
for (const part of path.split(".")) {
|
||
|
|
if (cur == null || typeof cur !== "object") return undefined;
|
||
|
|
cur = (cur as Record<string, unknown>)[part];
|
||
|
|
}
|
||
|
|
return typeof cur === "string" ? cur : undefined;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Replace known local ids in the op's url/body with their server ids. Returns
|
||
|
|
* `blocked: true` if it still references an unresolved local id (its creator
|
||
|
|
* hasn't synced yet) other than the id this op itself creates.
|
||
|
|
*/
|
||
|
|
export function remapReferences(
|
||
|
|
op: Pick<OutboxOp, "url" | "body" | "createsClientId">,
|
||
|
|
idMap: Record<string, string>
|
||
|
|
): { url: string; body: unknown; blocked: boolean } {
|
||
|
|
let url = op.url;
|
||
|
|
let bodyStr = op.body !== undefined ? JSON.stringify(op.body) : "";
|
||
|
|
|
||
|
|
for (const [clientId, serverId] of Object.entries(idMap)) {
|
||
|
|
if (url.includes(clientId)) url = url.split(clientId).join(serverId);
|
||
|
|
if (bodyStr && bodyStr.includes(clientId)) bodyStr = bodyStr.split(clientId).join(serverId);
|
||
|
|
}
|
||
|
|
|
||
|
|
const remaining = `${url} ${bodyStr}`.match(LOCAL_ID_RE) ?? [];
|
||
|
|
const unresolved = remaining.filter((id) => id !== op.createsClientId);
|
||
|
|
|
||
|
|
return {
|
||
|
|
url,
|
||
|
|
body: bodyStr !== "" ? JSON.parse(bodyStr) : op.body,
|
||
|
|
blocked: unresolved.length > 0,
|
||
|
|
};
|
||
|
|
}
|
||
|
|
|
||
|
|
async function sendOp(op: OutboxOp, url: string, body: unknown): Promise<unknown> {
|
||
|
|
const opts: WriteOptions = { idempotencyKey: op.idempotencyKey };
|
||
|
|
switch (op.method) {
|
||
|
|
case "POST":
|
||
|
|
return apiPost(url, body, opts);
|
||
|
|
case "PUT":
|
||
|
|
return apiPut(url, body, opts);
|
||
|
|
case "PATCH":
|
||
|
|
return apiPatch(url, body, opts);
|
||
|
|
case "DELETE":
|
||
|
|
await apiDelete(url, opts);
|
||
|
|
return undefined;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
type Disposition = "offline" | "transient" | "permanent";
|
||
|
|
|
||
|
|
function classify(err: unknown): Disposition {
|
||
|
|
if (err instanceof ApiClientError) {
|
||
|
|
if (err.code === "IDEMPOTENCY_IN_PROGRESS") return "transient"; // another tab/pass owns it
|
||
|
|
if (err.status !== undefined && err.status >= 500) return "transient";
|
||
|
|
return "permanent"; // validation / 4xx — retrying the same payload won't help
|
||
|
|
}
|
||
|
|
if (isAxiosError(err)) {
|
||
|
|
if (!err.response) return "offline"; // network down
|
||
|
|
if (err.response.status >= 500) return "transient";
|
||
|
|
return "permanent";
|
||
|
|
}
|
||
|
|
return "permanent";
|
||
|
|
}
|
||
|
|
|
||
|
|
function errMessage(err: unknown): string {
|
||
|
|
if (err instanceof Error) return err.message;
|
||
|
|
return String(err);
|
||
|
|
}
|
||
|
|
|
||
|
|
export type DrainResult = { sent: number; remaining: number; ran: boolean };
|
||
|
|
|
||
|
|
let draining = false;
|
||
|
|
|
||
|
|
/** Drain the outbox once, in causal order. Never throws. */
|
||
|
|
export async function drainOutbox(): Promise<DrainResult> {
|
||
|
|
const isOffline = typeof navigator !== "undefined" && !navigator.onLine;
|
||
|
|
if (draining || isOffline) {
|
||
|
|
return { sent: 0, remaining: (await getOutboxOps()).length, ran: false };
|
||
|
|
}
|
||
|
|
|
||
|
|
draining = true;
|
||
|
|
let sent = 0;
|
||
|
|
try {
|
||
|
|
const idMap = await getIdMap();
|
||
|
|
const ops = await getOutboxOps();
|
||
|
|
|
||
|
|
for (const op of ops) {
|
||
|
|
if (op.status === "failed" && op.attempts >= MAX_ATTEMPTS) continue; // poisoned
|
||
|
|
|
||
|
|
const { url, body, blocked } = remapReferences(op, idMap);
|
||
|
|
if (blocked) continue; // a dependency hasn't synced yet; revisit next pass
|
||
|
|
|
||
|
|
try {
|
||
|
|
const result = await sendOp(op, url, body);
|
||
|
|
if (op.createsClientId) {
|
||
|
|
const serverId = getByPath(result, op.idField ?? "id");
|
||
|
|
if (serverId) {
|
||
|
|
idMap[op.createsClientId] = serverId;
|
||
|
|
await setIdMapEntry(op.createsClientId, serverId);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
await removeOutboxOp(op.id);
|
||
|
|
sent++;
|
||
|
|
} catch (err) {
|
||
|
|
const disposition = classify(err);
|
||
|
|
if (disposition === "offline") break; // stop the whole pass; resume when online
|
||
|
|
if (disposition === "transient") {
|
||
|
|
await updateOutboxOp(op.id, { lastError: errMessage(err) }); // retry, don't burn an attempt
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
await updateOutboxOp(op.id, {
|
||
|
|
status: "failed",
|
||
|
|
attempts: op.attempts + 1,
|
||
|
|
lastError: errMessage(err),
|
||
|
|
});
|
||
|
|
}
|
||
|
|
}
|
||
|
|
} finally {
|
||
|
|
draining = false;
|
||
|
|
}
|
||
|
|
|
||
|
|
return { sent, remaining: (await getOutboxOps()).length, ran: true };
|
||
|
|
}
|
||
|
|
|
||
|
|
/** Ops that exhausted their retries and need user attention. */
|
||
|
|
export async function getPoisonedOps(): Promise<OutboxOp[]> {
|
||
|
|
const ops = await getOutboxOps();
|
||
|
|
return ops.filter((o) => o.status === "failed" && o.attempts >= MAX_ATTEMPTS);
|
||
|
|
}
|