Skill

Redis Queue


name: redis-queue-skill description: Robust Redis job queue implementation patterns for async task processing. Use when implementing (1) reliable job queues with persistence, (2) Dead Letter Queue (DLQ) handling, (3) job retry strategies with exponential backoff, (4) priority queues, (5) job progress tracking, (6) worker pool management, or (7) queue monitoring. Triggers on Redis queue, job processing, async tasks, background workers, DLQ.

Redis Queue Production Patterns

Redis Configuration for Queues

# redis.conf - Optimized for job queues
appendonly yes
appendfsync everysec
maxmemory 4gb
maxmemory-policy noeviction  # Never lose jobs
tcp-keepalive 300

Queue Data Structures

# Queue keys
jobs:pending:{priority}     # ZSET - sorted by timestamp
jobs:processing             # HASH - job_id -> worker_id
jobs:completed              # ZSET - for TTL cleanup
jobs:dlq                    # LIST - failed jobs
jobs:data:{job_id}          # HASH - job payload and metadata

Job Schema

from pydantic import BaseModel
from datetime import datetime
from enum import Enum
from typing import Optional, Any

class JobPriority(str, Enum):
    HIGH = "high"
    NORMAL = "normal"
    LOW = "low"

class JobStatus(str, Enum):
    QUEUED = "queued"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    DLQ = "dlq"

class Job(BaseModel):
    id: str
    type: str  # "text_to_image", "stt", "tts", "llm"
    priority: JobPriority = JobPriority.NORMAL
    payload: dict[str, Any]
    status: JobStatus = JobStatus.QUEUED
    progress: int = 0
    result: Optional[dict] = None
    error: Optional[str] = None
    retry_count: int = 0
    max_retries: int = 3
    created_at: datetime
    updated_at: datetime
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

Queue Service Implementation

import redis.asyncio as redis
from datetime import datetime
import json

class QueueService:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.queues = {
            JobPriority.HIGH: "jobs:pending:high",
            JobPriority.NORMAL: "jobs:pending:normal",
            JobPriority.LOW: "jobs:pending:low",
        }
    
    async def enqueue(self, job: Job) -> str:
        """Add job to queue atomically."""
        pipe = self.redis.pipeline()
        
        # Store job data
        pipe.hset(f"jobs:data:{job.id}", mapping=job.model_dump_json())
        
        # Add to priority queue (score = timestamp for FIFO within priority)
        score = job.created_at.timestamp()
        pipe.zadd(self.queues[job.priority], {job.id: score})
        
        # Publish for real-time listeners
        pipe.publish("jobs:events", json.dumps({"type": "created", "job_id": job.id}))
        
        await pipe.execute()
        return job.id
    
    async def dequeue(self, worker_id: str, timeout: int = 5) -> Optional[Job]:
        """Pop highest priority job atomically."""
        # Check queues in priority order
        for priority in [JobPriority.HIGH, JobPriority.NORMAL, JobPriority.LOW]:
            queue = self.queues[priority]
            
            # Atomic pop from sorted set
            result = await self.redis.bzpopmin(queue, timeout=timeout)
            if result:
                _, job_id, _ = result
                
                # Mark as processing
                await self.redis.hset("jobs:processing", job_id, worker_id)
                
                # Get job data
                data = await self.redis.hgetall(f"jobs:data:{job_id}")
                job = Job.model_validate_json(data)
                job.status = JobStatus.PROCESSING
                job.started_at = datetime.utcnow()
                
                await self._update_job(job)
                return job
        
        return None
    
    async def complete(self, job_id: str, result: dict):
        """Mark job as completed."""
        job = await self.get_job(job_id)
        job.status = JobStatus.COMPLETED
        job.result = result
        job.progress = 100
        job.completed_at = datetime.utcnow()
        
        pipe = self.redis.pipeline()
        pipe.hdel("jobs:processing", job_id)
        pipe.zadd("jobs:completed", {job_id: datetime.utcnow().timestamp()})
        pipe.hset(f"jobs:data:{job_id}", mapping=job.model_dump_json())
        pipe.publish("jobs:events", json.dumps({"type": "completed", "job_id": job_id}))
        await pipe.execute()
    
    async def fail(self, job_id: str, error: str):
        """Handle job failure with retry or DLQ."""
        job = await self.get_job(job_id)
        job.retry_count += 1
        job.error = error
        
        if job.retry_count < job.max_retries:
            # Retry with exponential backoff
            delay = 2 ** job.retry_count  # 2, 4, 8 seconds
            job.status = JobStatus.QUEUED
            
            pipe = self.redis.pipeline()
            pipe.hdel("jobs:processing", job_id)
            # Re-queue with delayed score
            future_time = datetime.utcnow().timestamp() + delay
            pipe.zadd(self.queues[job.priority], {job_id: future_time})
            pipe.hset(f"jobs:data:{job_id}", mapping=job.model_dump_json())
            await pipe.execute()
        else:
            # Send to DLQ
            job.status = JobStatus.DLQ
            
            pipe = self.redis.pipeline()
            pipe.hdel("jobs:processing", job_id)
            pipe.rpush("jobs:dlq", job_id)
            pipe.hset(f"jobs:data:{job_id}", mapping=job.model_dump_json())
            pipe.publish("jobs:events", json.dumps({"type": "dlq", "job_id": job_id}))
            await pipe.execute()
    
    async def update_progress(self, job_id: str, progress: int):
        """Update job progress (0-100)."""
        await self.redis.hset(f"jobs:data:{job_id}", "progress", progress)
        await self.redis.publish("jobs:progress", json.dumps({"job_id": job_id, "progress": progress}))

Worker Implementation

import asyncio
import signal

class Worker:
    def __init__(self, queue: QueueService, worker_id: str):
        self.queue = queue
        self.worker_id = worker_id
        self.running = True
    
    async def run(self):
        """Main worker loop with graceful shutdown."""
        # Handle SIGTERM gracefully
        loop = asyncio.get_event_loop()
        loop.add_signal_handler(signal.SIGTERM, self._shutdown)
        
        while self.running:
            job = await self.queue.dequeue(self.worker_id, timeout=5)
            if not job:
                continue
            
            try:
                result = await self._process_job(job)
                await self.queue.complete(job.id, result)
            except Exception as e:
                await self.queue.fail(job.id, str(e))
    
    async def _process_job(self, job: Job) -> dict:
        """Process job with progress updates."""
        # Report progress during long operations
        for step in range(10):
            await asyncio.sleep(1)  # Simulated work
            await self.queue.update_progress(job.id, (step + 1) * 10)
        
        return {"output": "processed"}
    
    def _shutdown(self):
        """Graceful shutdown - finish current job."""
        self.running = False

DLQ Processing

async def process_dlq(queue: QueueService):
    """Manually review and retry DLQ jobs."""
    while True:
        job_id = await queue.redis.lpop("jobs:dlq")
        if not job_id:
            break
        
        job = await queue.get_job(job_id)
        print(f"DLQ Job: {job.id}, Error: {job.error}, Type: {job.type}")
        
        # Options: retry, delete, or manual intervention
        action = input("Action (retry/delete/skip): ")
        if action == "retry":
            job.retry_count = 0
            job.status = JobStatus.QUEUED
            await queue.enqueue(job)
        elif action == "delete":
            await queue.redis.delete(f"jobs:data:{job_id}")

Queue Monitoring

async def get_queue_stats(redis_client: redis.Redis) -> dict:
    """Get queue statistics for monitoring."""
    pipe = redis_client.pipeline()
    pipe.zcard("jobs:pending:high")
    pipe.zcard("jobs:pending:normal")
    pipe.zcard("jobs:pending:low")
    pipe.hlen("jobs:processing")
    pipe.llen("jobs:dlq")
    pipe.zcard("jobs:completed")
    
    results = await pipe.execute()
    
    return {
        "pending_high": results[0],
        "pending_normal": results[1],
        "pending_low": results[2],
        "processing": results[3],
        "dlq": results[4],
        "completed_24h": results[5],  # With TTL cleanup
        "total_pending": results[0] + results[1] + results[2],
    }

Prometheus Metrics

from prometheus_client import Gauge, Counter, Histogram

QUEUE_DEPTH = Gauge("job_queue_depth", "Jobs waiting", ["priority"])
JOBS_PROCESSED = Counter("jobs_processed_total", "Total processed", ["type", "status"])
JOB_DURATION = Histogram("job_duration_seconds", "Processing time", ["type"])

async def update_metrics(queue: QueueService):
    """Periodic metrics update."""
    stats = await get_queue_stats(queue.redis)
    QUEUE_DEPTH.labels("high").set(stats["pending_high"])
    QUEUE_DEPTH.labels("normal").set(stats["pending_normal"])
    QUEUE_DEPTH.labels("low").set(stats["pending_low"])

Backpressure Handling

MAX_QUEUE_DEPTH = {"high": 100, "normal": 200, "low": 50}

async def enqueue_with_backpressure(queue: QueueService, job: Job) -> str:
    """Reject jobs when queue is full."""
    stats = await get_queue_stats(queue.redis)
    
    limit = MAX_QUEUE_DEPTH[job.priority.value]
    current = stats[f"pending_{job.priority.value}"]
    
    if current >= limit:
        raise HTTPException(
            status_code=503,
            detail="Queue full",
            headers={"Retry-After": "60"}
        )
    
    return await queue.enqueue(job)

Production Checklist

  • AOF persistence enabled (appendonly yes)
  • maxmemory-policy noeviction to prevent job loss
  • Atomic operations via pipelines
  • Exponential backoff for retries
  • DLQ for jobs exceeding max retries
  • Progress tracking via pub/sub
  • Graceful worker shutdown on SIGTERM
  • Queue depth metrics exposed
  • Backpressure handling (503 when full)
  • Job TTL cleanup for completed jobs

ProYaro AI Infrastructure Documentation • Version 1.2