304 lines
11 KiB
PHP
304 lines
11 KiB
PHP
<?php
|
|
|
|
namespace App\Jobs;
|
|
|
|
use App\Models\AiEditOutput;
|
|
use App\Models\AiEditRequest;
|
|
use App\Models\AiProviderRun;
|
|
use App\Services\AiEditing\AiEditingRuntimeConfig;
|
|
use App\Services\AiEditing\AiImageProviderManager;
|
|
use App\Services\AiEditing\AiObservabilityService;
|
|
use App\Services\AiEditing\AiProviderResult;
|
|
use App\Services\AiEditing\AiStatusNotificationService;
|
|
use App\Services\AiEditing\AiUsageLedgerService;
|
|
use App\Services\AiEditing\Safety\AiAbuseEscalationService;
|
|
use App\Services\AiEditing\Safety\AiSafetyPolicyService;
|
|
use Illuminate\Bus\Queueable;
|
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
|
use Illuminate\Foundation\Bus\Dispatchable;
|
|
use Illuminate\Queue\InteractsWithQueue;
|
|
use Illuminate\Queue\SerializesModels;
|
|
use Illuminate\Support\Arr;
|
|
use Illuminate\Support\Facades\DB;
|
|
use Illuminate\Support\Str;
|
|
use Throwable;
|
|
|
|
class ProcessAiEditRequest implements ShouldQueue
|
|
{
|
|
use Dispatchable;
|
|
use InteractsWithQueue;
|
|
use Queueable;
|
|
use SerializesModels;
|
|
|
|
public int $tries = 3;
|
|
|
|
/**
|
|
* @var array<int, int>
|
|
*/
|
|
public array $backoff = [30, 120, 300];
|
|
|
|
public int $timeout = 90;
|
|
|
|
public function __construct(private readonly int $requestId)
|
|
{
|
|
$queue = (string) config('ai-editing.queue.name', 'default');
|
|
$this->onQueue($queue);
|
|
}
|
|
|
|
public function handle(
|
|
AiImageProviderManager $providers,
|
|
AiSafetyPolicyService $safetyPolicy,
|
|
AiAbuseEscalationService $abuseEscalation,
|
|
AiObservabilityService $observability,
|
|
AiStatusNotificationService $statusNotifications,
|
|
AiEditingRuntimeConfig $runtimeConfig,
|
|
AiUsageLedgerService $usageLedger
|
|
): void {
|
|
$request = AiEditRequest::query()->with('style')->find($this->requestId);
|
|
if (! $request) {
|
|
return;
|
|
}
|
|
|
|
if (! in_array($request->status, [AiEditRequest::STATUS_QUEUED, AiEditRequest::STATUS_PROCESSING], true)) {
|
|
return;
|
|
}
|
|
|
|
if ($request->status === AiEditRequest::STATUS_QUEUED) {
|
|
$request->forceFill([
|
|
'status' => AiEditRequest::STATUS_PROCESSING,
|
|
'started_at' => $request->started_at ?: now(),
|
|
])->save();
|
|
}
|
|
|
|
$attempt = ((int) $request->providerRuns()->max('attempt')) + 1;
|
|
$providerRun = AiProviderRun::query()->create([
|
|
'request_id' => $request->id,
|
|
'provider' => $request->provider,
|
|
'attempt' => $attempt,
|
|
'status' => AiProviderRun::STATUS_RUNNING,
|
|
'started_at' => now(),
|
|
]);
|
|
|
|
$result = $providers->forProvider($request->provider)->submit($request);
|
|
|
|
$this->finalizeProviderRun($providerRun, $result);
|
|
$this->applyProviderResult(
|
|
$request->fresh(['outputs']),
|
|
$providerRun,
|
|
$result,
|
|
$safetyPolicy,
|
|
$abuseEscalation,
|
|
$observability,
|
|
$statusNotifications,
|
|
$runtimeConfig,
|
|
$usageLedger
|
|
);
|
|
}
|
|
|
|
public function failed(Throwable $exception): void
|
|
{
|
|
$request = AiEditRequest::query()->find($this->requestId);
|
|
if (! $request) {
|
|
return;
|
|
}
|
|
|
|
if (! in_array($request->status, [AiEditRequest::STATUS_QUEUED, AiEditRequest::STATUS_PROCESSING], true)) {
|
|
return;
|
|
}
|
|
|
|
$message = trim($exception->getMessage());
|
|
$request->forceFill([
|
|
'status' => AiEditRequest::STATUS_FAILED,
|
|
'failure_code' => 'queue_job_failed',
|
|
'failure_message' => $message !== ''
|
|
? Str::limit($message, 500, '')
|
|
: 'AI edit processing failed in queue.',
|
|
'completed_at' => now(),
|
|
])->save();
|
|
|
|
app(AiObservabilityService::class)->recordTerminalOutcome(
|
|
$request,
|
|
AiEditRequest::STATUS_FAILED,
|
|
null,
|
|
false,
|
|
'process_failed_hook'
|
|
);
|
|
app(AiStatusNotificationService::class)->notifyTerminalOutcome($request->fresh());
|
|
}
|
|
|
|
private function finalizeProviderRun(AiProviderRun $run, AiProviderResult $result): void
|
|
{
|
|
$missingTaskId = $result->status === 'processing'
|
|
&& (! is_string($result->providerTaskId) || trim($result->providerTaskId) === '');
|
|
|
|
$status = $missingTaskId
|
|
? AiProviderRun::STATUS_FAILED
|
|
: ($result->status === 'succeeded'
|
|
? AiProviderRun::STATUS_SUCCEEDED
|
|
: ($result->status === 'processing' ? AiProviderRun::STATUS_RUNNING : AiProviderRun::STATUS_FAILED));
|
|
|
|
$run->forceFill([
|
|
'provider_task_id' => $result->providerTaskId,
|
|
'status' => $status,
|
|
'http_status' => $result->httpStatus,
|
|
'finished_at' => $status === AiProviderRun::STATUS_RUNNING ? null : now(),
|
|
'duration_ms' => $run->started_at ? (int) max(0, $run->started_at->diffInMilliseconds(now())) : null,
|
|
'cost_usd' => $result->costUsd,
|
|
'request_payload' => $result->requestPayload,
|
|
'response_payload' => $result->responsePayload,
|
|
'error_message' => $missingTaskId
|
|
? 'Provider returned processing state without task identifier.'
|
|
: $result->failureMessage,
|
|
])->save();
|
|
}
|
|
|
|
private function applyProviderResult(
|
|
AiEditRequest $request,
|
|
AiProviderRun $providerRun,
|
|
AiProviderResult $result,
|
|
AiSafetyPolicyService $safetyPolicy,
|
|
AiAbuseEscalationService $abuseEscalation,
|
|
AiObservabilityService $observability,
|
|
AiStatusNotificationService $statusNotifications,
|
|
AiEditingRuntimeConfig $runtimeConfig,
|
|
AiUsageLedgerService $usageLedger
|
|
): void {
|
|
if ($result->status === 'succeeded') {
|
|
$outputDecision = $safetyPolicy->evaluateProviderOutput($result);
|
|
if ($outputDecision->blocked) {
|
|
$abuseSignal = $abuseEscalation->recordOutputBlock(
|
|
(int) $request->tenant_id,
|
|
(int) $request->event_id,
|
|
'provider:'.$request->provider
|
|
);
|
|
$safetyReasons = $outputDecision->reasonCodes;
|
|
if (($abuseSignal['escalated'] ?? false) && ! in_array(AiAbuseEscalationService::REASON_CODE, $safetyReasons, true)) {
|
|
$safetyReasons[] = AiAbuseEscalationService::REASON_CODE;
|
|
}
|
|
$metadata = (array) ($request->metadata ?? []);
|
|
$metadata['abuse'] = $abuseSignal;
|
|
|
|
$request->forceFill([
|
|
'status' => AiEditRequest::STATUS_BLOCKED,
|
|
'safety_state' => $outputDecision->state,
|
|
'safety_reasons' => $safetyReasons,
|
|
'failure_code' => $outputDecision->failureCode ?? 'output_policy_blocked',
|
|
'failure_message' => $outputDecision->failureMessage,
|
|
'metadata' => $metadata,
|
|
'completed_at' => now(),
|
|
])->save();
|
|
|
|
$observability->recordTerminalOutcome(
|
|
$request,
|
|
AiEditRequest::STATUS_BLOCKED,
|
|
$providerRun->duration_ms,
|
|
true,
|
|
'process'
|
|
);
|
|
$statusNotifications->notifyTerminalOutcome($request->fresh());
|
|
|
|
return;
|
|
}
|
|
|
|
DB::transaction(function () use ($request, $result): void {
|
|
foreach ($result->outputs as $output) {
|
|
AiEditOutput::query()->updateOrCreate(
|
|
[
|
|
'request_id' => $request->id,
|
|
'provider_asset_id' => (string) Arr::get($output, 'provider_asset_id', ''),
|
|
],
|
|
[
|
|
'provider_url' => Arr::get($output, 'provider_url'),
|
|
'mime_type' => Arr::get($output, 'mime_type'),
|
|
'width' => Arr::get($output, 'width'),
|
|
'height' => Arr::get($output, 'height'),
|
|
'is_primary' => true,
|
|
'safety_state' => 'passed',
|
|
'safety_reasons' => [],
|
|
'generated_at' => now(),
|
|
'metadata' => ['provider' => $request->provider],
|
|
]
|
|
);
|
|
}
|
|
|
|
$request->forceFill([
|
|
'status' => AiEditRequest::STATUS_SUCCEEDED,
|
|
'safety_state' => 'passed',
|
|
'safety_reasons' => [],
|
|
'failure_code' => null,
|
|
'failure_message' => null,
|
|
'completed_at' => now(),
|
|
])->save();
|
|
});
|
|
|
|
$usageLedger->recordDebitForRequest($request->fresh(), $result->costUsd, [
|
|
'source' => 'process_job',
|
|
]);
|
|
|
|
$observability->recordTerminalOutcome(
|
|
$request,
|
|
AiEditRequest::STATUS_SUCCEEDED,
|
|
$providerRun->duration_ms,
|
|
false,
|
|
'process'
|
|
);
|
|
$statusNotifications->notifyTerminalOutcome($request->fresh());
|
|
|
|
return;
|
|
}
|
|
|
|
if ($result->status === 'processing') {
|
|
$providerTaskId = trim((string) ($result->providerTaskId ?? ''));
|
|
if ($providerTaskId === '') {
|
|
$request->forceFill([
|
|
'status' => AiEditRequest::STATUS_FAILED,
|
|
'failure_code' => 'provider_task_id_missing',
|
|
'failure_message' => 'Provider returned processing state without a task identifier.',
|
|
'completed_at' => now(),
|
|
])->save();
|
|
|
|
$observability->recordTerminalOutcome(
|
|
$request,
|
|
AiEditRequest::STATUS_FAILED,
|
|
$providerRun->duration_ms,
|
|
false,
|
|
'process'
|
|
);
|
|
$statusNotifications->notifyTerminalOutcome($request->fresh());
|
|
|
|
return;
|
|
}
|
|
|
|
$request->forceFill([
|
|
'status' => AiEditRequest::STATUS_PROCESSING,
|
|
'failure_code' => null,
|
|
'failure_message' => null,
|
|
])->save();
|
|
|
|
PollAiEditRequest::dispatch($request->id, $providerTaskId, 1)
|
|
->delay(now()->addSeconds(20))
|
|
->onQueue($runtimeConfig->queueName());
|
|
|
|
return;
|
|
}
|
|
|
|
$request->forceFill([
|
|
'status' => $result->status === 'blocked' ? AiEditRequest::STATUS_BLOCKED : AiEditRequest::STATUS_FAILED,
|
|
'safety_state' => $result->safetyState ?? $request->safety_state,
|
|
'safety_reasons' => $result->safetyReasons !== [] ? $result->safetyReasons : $request->safety_reasons,
|
|
'failure_code' => $result->failureCode,
|
|
'failure_message' => $result->failureMessage,
|
|
'completed_at' => now(),
|
|
])->save();
|
|
|
|
$observability->recordTerminalOutcome(
|
|
$request,
|
|
$result->status === 'blocked' ? AiEditRequest::STATUS_BLOCKED : AiEditRequest::STATUS_FAILED,
|
|
$providerRun->duration_ms,
|
|
$result->status === 'blocked',
|
|
'process'
|
|
);
|
|
$statusNotifications->notifyTerminalOutcome($request->fresh());
|
|
}
|
|
}
|