acquireCommandLock('storage:queue-health', $lockSeconds, (bool) $this->option('force')); if ($lock === false) { $this->warn('Queue health check already running.'); return self::SUCCESS; } $connection = config('queue.default'); $thresholds = config('storage-monitor.queue_health.thresholds', []); if (empty($thresholds)) { $thresholds = [ 'default' => ['warning' => 100, 'critical' => 300], 'media-storage' => ['warning' => 200, 'critical' => 500], 'media-security' => ['warning' => 50, 'critical' => 150], ]; } try { $queueSummaries = []; $alerts = []; foreach ($thresholds as $queueName => $limits) { $size = $this->readQueueSize($queueManager, $connection, (string) $queueName); $failed = $this->countFailedJobs((string) $queueName); $severity = $this->determineQueueSeverity($size, $limits); if ($severity !== 'ok') { $alerts[] = [ 'queue' => $queueName, 'type' => 'size', 'severity' => $severity, 'size' => $size, ]; } if ($failed > 0) { $alerts[] = [ 'queue' => $queueName, 'type' => 'failed_jobs', 'severity' => $failed >= 10 ? 'critical' : 'warning', 'failed' => $failed, ]; } $queueSummaries[] = [ 'queue' => $queueName, 'size' => $size, 'failed' => $failed, 'severity' => $severity, 'limits' => $limits, ]; } $stalledMinutes = max(0, (int) config('storage-monitor.queue_health.stalled_minutes', 10)); $stalledAssets = 0; if ($stalledMinutes > 0) { $stalledAssets = EventMediaAsset::query() ->where('status', 'pending') ->where('created_at', '<=', now()->subMinutes($stalledMinutes)) ->count(); if ($stalledAssets > 0) { $alerts[] = [ 'type' => 'pending_assets', 'severity' => 'warning', 'older_than_minutes' => $stalledMinutes, 'count' => $stalledAssets, ]; } } $snapshot = [ 'generated_at' => now()->toIso8601String(), 'connection' => $connection, 'queues' => $queueSummaries, 'alerts' => $alerts, 'stalled_assets' => $stalledAssets, ]; $cacheTtl = max(1, (int) config('storage-monitor.queue_health.cache_minutes', 10)); Cache::put('storage:queue-health:last', $snapshot, now()->addMinutes($cacheTtl)); Log::channel('storage-jobs')->info('Upload queue health snapshot generated', [ 'queues' => count($queueSummaries), 'alerts' => count($alerts), ]); $this->info(sprintf( 'Checked %d queue(s); %d alert(s).', count($queueSummaries), count($alerts) )); return self::SUCCESS; } finally { if ($lock instanceof Lock) { $lock->release(); } } } private function readQueueSize(QueueManager $manager, ?string $connection, string $queue): int { try { return $manager->connection($connection)->size($queue); } catch (\Throwable $exception) { Log::channel('storage-jobs')->warning('Unable to read queue size', [ 'queue' => $queue, 'connection' => $connection, 'message' => $exception->getMessage(), ]); return -1; } } private function countFailedJobs(string $queue): int { $table = config('queue.failed.table', 'failed_jobs'); if (! $this->failedJobsTableExists($table)) { return 0; } return (int) DB::table($table)->where('queue', $queue)->count(); } private function failedJobsTableExists(string $table): bool { static $cache = []; if (array_key_exists($table, $cache)) { return $cache[$table]; } return $cache[$table] = Schema::hasTable($table); } private function determineQueueSeverity(int $size, array $limits): string { if ($size < 0) { return 'unknown'; } $critical = (int) ($limits['critical'] ?? 0); $warning = (int) ($limits['warning'] ?? 0); if ($critical > 0 && $size >= $critical) { return 'critical'; } if ($warning > 0 && $size >= $warning) { return 'warning'; } return 'ok'; } }