Files
fotospiel-app/app/Jobs/PollAiEditRequest.php
Codex Agent 8cc0918881
Some checks failed
linter / quality (push) Has been cancelled
tests / ci (push) Has been cancelled
tests / ui (push) Has been cancelled
feat(ai-edits): add output storage backfill flow and coverage
2026-02-07 10:10:45 +01:00

265 lines
10 KiB
PHP

<?php
namespace App\Jobs;
use App\Models\AiEditOutput;
use App\Models\AiEditRequest;
use App\Models\AiProviderRun;
use App\Services\AiEditing\AiEditingRuntimeConfig;
use App\Services\AiEditing\AiEditOutputStorageService;
use App\Services\AiEditing\AiImageProviderManager;
use App\Services\AiEditing\AiObservabilityService;
use App\Services\AiEditing\AiStatusNotificationService;
use App\Services\AiEditing\AiUsageLedgerService;
use App\Services\AiEditing\Safety\AiAbuseEscalationService;
use App\Services\AiEditing\Safety\AiSafetyPolicyService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Throwable;
class PollAiEditRequest implements ShouldQueue
{
use Dispatchable;
use InteractsWithQueue;
use Queueable;
use SerializesModels;
public int $tries = 3;
/**
* @var array<int, int>
*/
public array $backoff = [20, 60, 120];
public int $timeout = 60;
public function __construct(
private readonly int $requestId,
private readonly string $providerTaskId,
private readonly int $pollAttempt = 1,
) {
$this->onQueue((string) config('ai-editing.queue.name', 'default'));
}
public function handle(
AiImageProviderManager $providers,
AiSafetyPolicyService $safetyPolicy,
AiAbuseEscalationService $abuseEscalation,
AiObservabilityService $observability,
AiStatusNotificationService $statusNotifications,
AiEditOutputStorageService $outputStorage,
AiEditingRuntimeConfig $runtimeConfig,
AiUsageLedgerService $usageLedger
): void {
$request = AiEditRequest::query()->with('outputs')->find($this->requestId);
if (! $request || $request->status !== AiEditRequest::STATUS_PROCESSING) {
return;
}
$run = AiProviderRun::query()->create([
'request_id' => $request->id,
'provider' => $request->provider,
'attempt' => ((int) $request->providerRuns()->max('attempt')) + 1,
'provider_task_id' => $this->providerTaskId,
'status' => AiProviderRun::STATUS_RUNNING,
'started_at' => now(),
]);
$result = $providers->forProvider($request->provider)->poll($request, $this->providerTaskId);
$run->forceFill([
'status' => $result->status === 'succeeded' ? AiProviderRun::STATUS_SUCCEEDED : ($result->status === 'processing' ? AiProviderRun::STATUS_RUNNING : AiProviderRun::STATUS_FAILED),
'http_status' => $result->httpStatus,
'finished_at' => $result->status === 'processing' ? null : now(),
'duration_ms' => $run->started_at ? (int) max(0, $run->started_at->diffInMilliseconds(now())) : null,
'cost_usd' => $result->costUsd,
'request_payload' => $result->requestPayload,
'response_payload' => $result->responsePayload,
'error_message' => $result->failureMessage,
])->save();
if ($result->status === 'succeeded') {
$outputDecision = $safetyPolicy->evaluateProviderOutput($result);
if ($outputDecision->blocked) {
$abuseSignal = $abuseEscalation->recordOutputBlock(
(int) $request->tenant_id,
(int) $request->event_id,
'provider:'.$request->provider
);
$safetyReasons = $outputDecision->reasonCodes;
if (($abuseSignal['escalated'] ?? false) && ! in_array(AiAbuseEscalationService::REASON_CODE, $safetyReasons, true)) {
$safetyReasons[] = AiAbuseEscalationService::REASON_CODE;
}
$metadata = (array) ($request->metadata ?? []);
$metadata['abuse'] = $abuseSignal;
$request->forceFill([
'status' => AiEditRequest::STATUS_BLOCKED,
'safety_state' => $outputDecision->state,
'safety_reasons' => $safetyReasons,
'failure_code' => $outputDecision->failureCode ?? 'output_policy_blocked',
'failure_message' => $outputDecision->failureMessage,
'metadata' => $metadata,
'completed_at' => now(),
])->save();
$observability->recordTerminalOutcome(
$request,
AiEditRequest::STATUS_BLOCKED,
$run->duration_ms,
true,
'poll'
);
$statusNotifications->notifyTerminalOutcome($request->fresh());
return;
}
foreach ($result->outputs as $output) {
$persistedOutput = $outputStorage->persist($request, is_array($output) ? $output : []);
AiEditOutput::query()->updateOrCreate(
[
'request_id' => $request->id,
'provider_asset_id' => (string) Arr::get($persistedOutput, 'provider_asset_id', $this->providerTaskId),
],
[
'storage_disk' => Arr::get($persistedOutput, 'storage_disk'),
'storage_path' => Arr::get($persistedOutput, 'storage_path'),
'provider_url' => Arr::get($persistedOutput, 'provider_url'),
'mime_type' => Arr::get($persistedOutput, 'mime_type'),
'width' => Arr::get($persistedOutput, 'width'),
'height' => Arr::get($persistedOutput, 'height'),
'bytes' => Arr::get($persistedOutput, 'bytes'),
'checksum' => Arr::get($persistedOutput, 'checksum'),
'is_primary' => true,
'safety_state' => 'passed',
'safety_reasons' => [],
'generated_at' => now(),
'metadata' => array_merge(
['provider' => $request->provider],
is_array(Arr::get($persistedOutput, 'metadata'))
? Arr::get($persistedOutput, 'metadata')
: []
),
]
);
}
$request->forceFill([
'status' => AiEditRequest::STATUS_SUCCEEDED,
'safety_state' => 'passed',
'safety_reasons' => [],
'failure_code' => null,
'failure_message' => null,
'completed_at' => now(),
])->save();
$usageLedger->recordDebitForRequest($request->fresh(), $result->costUsd, [
'source' => 'poll_job',
'poll_attempt' => $this->pollAttempt,
]);
$observability->recordTerminalOutcome(
$request,
AiEditRequest::STATUS_SUCCEEDED,
$run->duration_ms,
false,
'poll'
);
$statusNotifications->notifyTerminalOutcome($request->fresh());
return;
}
if ($result->status === 'processing') {
$maxPolls = $runtimeConfig->maxPolls();
if ($this->pollAttempt < $maxPolls) {
self::dispatch($request->id, $this->providerTaskId, $this->pollAttempt + 1)
->delay(now()->addSeconds(20))
->onQueue($runtimeConfig->queueName());
return;
}
$run->forceFill([
'status' => AiProviderRun::STATUS_FAILED,
'finished_at' => now(),
'duration_ms' => $run->started_at ? (int) max(0, $run->started_at->diffInMilliseconds(now())) : null,
'error_message' => sprintf('Polling exhausted after %d attempt(s).', $maxPolls),
])->save();
$request->forceFill([
'status' => AiEditRequest::STATUS_FAILED,
'failure_code' => 'provider_poll_timeout',
'failure_message' => sprintf('Polling timed out after %d attempt(s).', $maxPolls),
'completed_at' => now(),
])->save();
$observability->recordTerminalOutcome(
$request,
AiEditRequest::STATUS_FAILED,
$run->duration_ms,
false,
'poll'
);
$statusNotifications->notifyTerminalOutcome($request->fresh());
return;
}
$request->forceFill([
'status' => $result->status === 'blocked' ? AiEditRequest::STATUS_BLOCKED : AiEditRequest::STATUS_FAILED,
'safety_state' => $result->safetyState ?? $request->safety_state,
'safety_reasons' => $result->safetyReasons !== [] ? $result->safetyReasons : $request->safety_reasons,
'failure_code' => $result->failureCode,
'failure_message' => $result->failureMessage,
'completed_at' => now(),
])->save();
$observability->recordTerminalOutcome(
$request,
$result->status === 'blocked' ? AiEditRequest::STATUS_BLOCKED : AiEditRequest::STATUS_FAILED,
$run->duration_ms,
$result->status === 'blocked',
'poll'
);
$statusNotifications->notifyTerminalOutcome($request->fresh());
}
public function failed(Throwable $exception): void
{
$request = AiEditRequest::query()->find($this->requestId);
if (! $request) {
return;
}
if (! in_array($request->status, [AiEditRequest::STATUS_QUEUED, AiEditRequest::STATUS_PROCESSING], true)) {
return;
}
$message = trim($exception->getMessage());
$request->forceFill([
'status' => AiEditRequest::STATUS_FAILED,
'failure_code' => 'queue_job_failed',
'failure_message' => $message !== ''
? Str::limit($message, 500, '')
: 'AI edit polling failed in queue.',
'completed_at' => now(),
])->save();
app(AiObservabilityService::class)->recordTerminalOutcome(
$request,
AiEditRequest::STATUS_FAILED,
null,
false,
'poll_failed_hook'
);
app(AiStatusNotificationService::class)->notifyTerminalOutcome($request->fresh());
}
}