acquireCommandLock('storage:queue-health', $lockSeconds, (bool) $this->option('force')); if ($lock === false) { $this->warn('Queue health check already running.'); return self::SUCCESS; } $connection = config('queue.default'); $thresholds = config('storage-monitor.queue_health.thresholds', []); if (empty($thresholds)) { $thresholds = [ 'default' => ['warning' => 100, 'critical' => 300], 'media-storage' => ['warning' => 200, 'critical' => 500], 'media-security' => ['warning' => 50, 'critical' => 150], ]; } try { $queueSummaries = []; $alerts = []; foreach ($thresholds as $queueName => $limits) { $size = $this->readQueueSize($queueManager, $connection, (string) $queueName); $failed = $this->countFailedJobs((string) $queueName); $severity = $this->determineQueueSeverity($size, $limits); if ($severity !== 'ok') { $alerts[] = [ 'queue' => $queueName, 'type' => 'size', 'severity' => $severity, 'size' => $size, ]; } if ($failed > 0) { $alerts[] = [ 'queue' => $queueName, 'type' => 'failed_jobs', 'severity' => $failed >= 10 ? 'critical' : 'warning', 'failed' => $failed, ]; } $queueSummaries[] = [ 'queue' => $queueName, 'size' => $size, 'failed' => $failed, 'severity' => $severity, 'limits' => $limits, ]; } $stalledMinutes = max(0, (int) config('storage-monitor.queue_health.stalled_minutes', 10)); $stalledAssets = 0; if ($stalledMinutes > 0) { $stalledAssets = EventMediaAsset::query() ->where('status', 'pending') ->where('created_at', '<=', now()->subMinutes($stalledMinutes)) ->count(); if ($stalledAssets > 0) { $alerts[] = [ 'type' => 'pending_assets', 'severity' => 'warning', 'older_than_minutes' => $stalledMinutes, 'count' => $stalledAssets, ]; } } $snapshot = [ 'generated_at' => now()->toIso8601String(), 'connection' => $connection, 'queues' => $queueSummaries, 'alerts' => $alerts, 'stalled_assets' => $stalledAssets, ]; $cacheTtl = max(1, (int) config('storage-monitor.queue_health.cache_minutes', 10)); Cache::put('storage:queue-health:last', $snapshot, now()->addMinutes($cacheTtl)); Log::channel('storage-jobs')->info('Upload queue health snapshot generated', [ 'queues' => count($queueSummaries), 'alerts' => count($alerts), ]); $this->info(sprintf( 'Checked %d queue(s); %d alert(s).', count($queueSummaries), count($alerts) )); $this->maybeNotifyGuests($alerts); return self::SUCCESS; } finally { if ($lock instanceof Lock) { $lock->release(); } } } private function maybeNotifyGuests(array $alerts): void { if (empty($alerts)) { return; } $this->dispatchPendingAlerts(); $this->dispatchFailureAlerts(); } private function dispatchPendingAlerts(): void { $threshold = max(1, (int) config('storage-monitor.queue_health.pending_event_threshold', 5)); $minutes = max(1, (int) config('storage-monitor.queue_health.pending_event_minutes', 8)); $pending = EventMediaAsset::query() ->selectRaw('event_id, COUNT(*) as pending_count, MIN(created_at) as oldest_created_at') ->where('status', 'pending') ->where('created_at', '<=', now()->subMinutes($minutes)) ->groupBy('event_id') ->havingRaw('COUNT(*) >= ?', [$threshold]) ->limit(50) ->get(); foreach ($pending as $row) { $event = Event::query()->find($row->event_id); if (! $event) { continue; } $title = 'Uploads werden noch verarbeitet …'; if ($this->recentlySentAlert($event->id, $title)) { continue; } $count = (int) $row->pending_count; $body = $count > 1 ? sprintf('%d Fotos stehen noch in der Warteschlange. Wir sagen Bescheid, sobald alles gespeichert ist.', $count) : 'Ein Upload-Schub wird gerade verarbeitet. Danke für deine Geduld!'; $this->guestNotifications->createNotification( $event, GuestNotificationType::UPLOAD_ALERT, $title, $body, [ 'audience_scope' => GuestNotificationAudience::ALL, 'priority' => 1, 'expires_at' => now()->addMinutes(90), ] ); $this->rememberAlert($event->id, $title); } } private function dispatchFailureAlerts(): void { $threshold = max(1, (int) config('storage-monitor.queue_health.failed_event_threshold', 2)); $minutes = max(1, (int) config('storage-monitor.queue_health.failed_event_minutes', 30)); $failed = EventMediaAsset::query() ->selectRaw('event_id, COUNT(*) as failed_count') ->where('status', 'failed') ->where('updated_at', '>=', now()->subMinutes($minutes)) ->groupBy('event_id') ->havingRaw('COUNT(*) >= ?', [$threshold]) ->limit(50) ->get(); foreach ($failed as $row) { $event = Event::query()->find($row->event_id); if (! $event) { continue; } $title = 'Einige Uploads mussten neu gestartet werden'; if ($this->recentlySentAlert($event->id, $title)) { continue; } $count = (int) $row->failed_count; $body = $count > 1 ? sprintf('%d Fotos wurden automatisch erneut angestoßen. Bitte öffne kurz die App, falls deine Uploads hängen.', $count) : 'Ein Upload wurde neu gestartet. Öffne bitte kurz die App, damit nichts verloren geht.'; $this->guestNotifications->createNotification( $event, GuestNotificationType::SUPPORT_TIP, $title, $body, [ 'audience_scope' => GuestNotificationAudience::ALL, 'priority' => 2, 'expires_at' => now()->addHours(2), ] ); $this->rememberAlert($event->id, $title); } } private function recentlySentAlert(int $eventId, string $title): bool { $key = $this->alertCacheKey($eventId, $title); return Cache::has($key); } private function rememberAlert(int $eventId, string $title): void { $key = $this->alertCacheKey($eventId, $title); $ttl = max(5, (int) config('storage-monitor.queue_health.guest_alert_ttl', 30)); Cache::put($key, true, now()->addMinutes($ttl)); } private function alertCacheKey(int $eventId, string $title): string { return sprintf('guest-queue-alert:%d:%s', $eventId, sha1($title)); } private function readQueueSize(QueueManager $manager, ?string $connection, string $queue): int { try { return $manager->connection($connection)->size($queue); } catch (\Throwable $exception) { Log::channel('storage-jobs')->warning('Unable to read queue size', [ 'queue' => $queue, 'connection' => $connection, 'message' => $exception->getMessage(), ]); return -1; } } private function countFailedJobs(string $queue): int { $table = config('queue.failed.table', 'failed_jobs'); if (! $this->failedJobsTableExists($table)) { return 0; } return (int) DB::table($table)->where('queue', $queue)->count(); } private function failedJobsTableExists(string $table): bool { static $cache = []; if (array_key_exists($table, $cache)) { return $cache[$table]; } return $cache[$table] = Schema::hasTable($table); } private function determineQueueSeverity(int $size, array $limits): string { if ($size < 0) { return 'unknown'; } $critical = (int) ($limits['critical'] ?? 0); $warning = (int) ($limits['warning'] ?? 0); if ($critical > 0 && $size >= $critical) { return 'critical'; } if ($warning > 0 && $size >= $warning) { return 'warning'; } return 'ok'; } }