added a help system, replaced the words "tenant" and "Pwa" with better alternatives. corrected and implemented cron jobs. prepared going live on a coolify-powered system.
This commit is contained in:
184
app/Console/Commands/CheckUploadQueuesCommand.php
Normal file
184
app/Console/Commands/CheckUploadQueuesCommand.php
Normal file
@@ -0,0 +1,184 @@
|
||||
<?php
|
||||
|
||||
namespace App\Console\Commands;
|
||||
|
||||
use App\Console\Concerns\InteractsWithCacheLocks;
|
||||
use App\Models\EventMediaAsset;
|
||||
use Illuminate\Console\Command;
|
||||
use Illuminate\Contracts\Cache\Lock;
|
||||
use Illuminate\Queue\QueueManager;
|
||||
use Illuminate\Support\Facades\Cache;
|
||||
use Illuminate\Support\Facades\DB;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use Illuminate\Support\Facades\Schema;
|
||||
|
||||
class CheckUploadQueuesCommand extends Command
|
||||
{
|
||||
use InteractsWithCacheLocks;
|
||||
|
||||
protected $signature = 'storage:check-upload-queues {--force : Execute even if another health check is running}';
|
||||
|
||||
protected $description = 'Inspect upload-related queues and flag stalled or overloaded workers.';
|
||||
|
||||
public function handle(QueueManager $queueManager): int
|
||||
{
|
||||
$lockSeconds = (int) config('storage-monitor.queue_health.lock_seconds', 120);
|
||||
$lock = $this->acquireCommandLock('storage:queue-health', $lockSeconds, (bool) $this->option('force'));
|
||||
|
||||
if ($lock === false) {
|
||||
$this->warn('Queue health check already running.');
|
||||
|
||||
return self::SUCCESS;
|
||||
}
|
||||
|
||||
$connection = config('queue.default');
|
||||
$thresholds = config('storage-monitor.queue_health.thresholds', []);
|
||||
if (empty($thresholds)) {
|
||||
$thresholds = [
|
||||
'default' => ['warning' => 100, 'critical' => 300],
|
||||
'media-storage' => ['warning' => 200, 'critical' => 500],
|
||||
'media-security' => ['warning' => 50, 'critical' => 150],
|
||||
];
|
||||
}
|
||||
|
||||
try {
|
||||
$queueSummaries = [];
|
||||
$alerts = [];
|
||||
|
||||
foreach ($thresholds as $queueName => $limits) {
|
||||
$size = $this->readQueueSize($queueManager, $connection, (string) $queueName);
|
||||
$failed = $this->countFailedJobs((string) $queueName);
|
||||
$severity = $this->determineQueueSeverity($size, $limits);
|
||||
|
||||
if ($severity !== 'ok') {
|
||||
$alerts[] = [
|
||||
'queue' => $queueName,
|
||||
'type' => 'size',
|
||||
'severity' => $severity,
|
||||
'size' => $size,
|
||||
];
|
||||
}
|
||||
|
||||
if ($failed > 0) {
|
||||
$alerts[] = [
|
||||
'queue' => $queueName,
|
||||
'type' => 'failed_jobs',
|
||||
'severity' => $failed >= 10 ? 'critical' : 'warning',
|
||||
'failed' => $failed,
|
||||
];
|
||||
}
|
||||
|
||||
$queueSummaries[] = [
|
||||
'queue' => $queueName,
|
||||
'size' => $size,
|
||||
'failed' => $failed,
|
||||
'severity' => $severity,
|
||||
'limits' => $limits,
|
||||
];
|
||||
}
|
||||
|
||||
$stalledMinutes = max(0, (int) config('storage-monitor.queue_health.stalled_minutes', 10));
|
||||
$stalledAssets = 0;
|
||||
if ($stalledMinutes > 0) {
|
||||
$stalledAssets = EventMediaAsset::query()
|
||||
->where('status', 'pending')
|
||||
->where('created_at', '<=', now()->subMinutes($stalledMinutes))
|
||||
->count();
|
||||
|
||||
if ($stalledAssets > 0) {
|
||||
$alerts[] = [
|
||||
'type' => 'pending_assets',
|
||||
'severity' => 'warning',
|
||||
'older_than_minutes' => $stalledMinutes,
|
||||
'count' => $stalledAssets,
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
$snapshot = [
|
||||
'generated_at' => now()->toIso8601String(),
|
||||
'connection' => $connection,
|
||||
'queues' => $queueSummaries,
|
||||
'alerts' => $alerts,
|
||||
'stalled_assets' => $stalledAssets,
|
||||
];
|
||||
|
||||
$cacheTtl = max(1, (int) config('storage-monitor.queue_health.cache_minutes', 10));
|
||||
Cache::put('storage:queue-health:last', $snapshot, now()->addMinutes($cacheTtl));
|
||||
|
||||
Log::channel('storage-jobs')->info('Upload queue health snapshot generated', [
|
||||
'queues' => count($queueSummaries),
|
||||
'alerts' => count($alerts),
|
||||
]);
|
||||
|
||||
$this->info(sprintf(
|
||||
'Checked %d queue(s); %d alert(s).',
|
||||
count($queueSummaries),
|
||||
count($alerts)
|
||||
));
|
||||
|
||||
return self::SUCCESS;
|
||||
} finally {
|
||||
if ($lock instanceof Lock) {
|
||||
$lock->release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function readQueueSize(QueueManager $manager, ?string $connection, string $queue): int
|
||||
{
|
||||
try {
|
||||
return $manager->connection($connection)->size($queue);
|
||||
} catch (\Throwable $exception) {
|
||||
Log::channel('storage-jobs')->warning('Unable to read queue size', [
|
||||
'queue' => $queue,
|
||||
'connection' => $connection,
|
||||
'message' => $exception->getMessage(),
|
||||
]);
|
||||
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
private function countFailedJobs(string $queue): int
|
||||
{
|
||||
$table = config('queue.failed.table', 'failed_jobs');
|
||||
|
||||
if (! $this->failedJobsTableExists($table)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (int) DB::table($table)->where('queue', $queue)->count();
|
||||
}
|
||||
|
||||
private function failedJobsTableExists(string $table): bool
|
||||
{
|
||||
static $cache = [];
|
||||
|
||||
if (array_key_exists($table, $cache)) {
|
||||
return $cache[$table];
|
||||
}
|
||||
|
||||
return $cache[$table] = Schema::hasTable($table);
|
||||
}
|
||||
|
||||
private function determineQueueSeverity(int $size, array $limits): string
|
||||
{
|
||||
if ($size < 0) {
|
||||
return 'unknown';
|
||||
}
|
||||
|
||||
$critical = (int) ($limits['critical'] ?? 0);
|
||||
$warning = (int) ($limits['warning'] ?? 0);
|
||||
|
||||
if ($critical > 0 && $size >= $critical) {
|
||||
return 'critical';
|
||||
}
|
||||
|
||||
if ($warning > 0 && $size >= $warning) {
|
||||
return 'warning';
|
||||
}
|
||||
|
||||
return 'ok';
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user