Skill
Redis Queue
name: redis-queue-skill description: Robust Redis job queue implementation patterns for async task processing. Use when implementing (1) reliable job queues with persistence, (2) Dead Letter Queue (DLQ) handling, (3) job retry strategies with exponential backoff, (4) priority queues, (5) job progress tracking, (6) worker pool management, or (7) queue monitoring. Triggers on Redis queue, job processing, async tasks, background workers, DLQ.
Redis Queue Production Patterns
Redis Configuration for Queues
# redis.conf - Optimized for job queues
appendonly yes
appendfsync everysec
maxmemory 4gb
maxmemory-policy noeviction # Never lose jobs
tcp-keepalive 300
Queue Data Structures
# Queue keys
jobs:pending:{priority} # ZSET - sorted by timestamp
jobs:processing # HASH - job_id -> worker_id
jobs:completed # ZSET - for TTL cleanup
jobs:dlq # LIST - failed jobs
jobs:data:{job_id} # HASH - job payload and metadata
Job Schema
from pydantic import BaseModel
from datetime import datetime
from enum import Enum
from typing import Optional, Any
class JobPriority(str, Enum):
HIGH = "high"
NORMAL = "normal"
LOW = "low"
class JobStatus(str, Enum):
QUEUED = "queued"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
DLQ = "dlq"
class Job(BaseModel):
id: str
type: str # "text_to_image", "stt", "tts", "llm"
priority: JobPriority = JobPriority.NORMAL
payload: dict[str, Any]
status: JobStatus = JobStatus.QUEUED
progress: int = 0
result: Optional[dict] = None
error: Optional[str] = None
retry_count: int = 0
max_retries: int = 3
created_at: datetime
updated_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
Queue Service Implementation
import redis.asyncio as redis
from datetime import datetime
import json
class QueueService:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.queues = {
JobPriority.HIGH: "jobs:pending:high",
JobPriority.NORMAL: "jobs:pending:normal",
JobPriority.LOW: "jobs:pending:low",
}
async def enqueue(self, job: Job) -> str:
"""Add job to queue atomically."""
pipe = self.redis.pipeline()
# Store job data
pipe.hset(f"jobs:data:{job.id}", mapping=job.model_dump_json())
# Add to priority queue (score = timestamp for FIFO within priority)
score = job.created_at.timestamp()
pipe.zadd(self.queues[job.priority], {job.id: score})
# Publish for real-time listeners
pipe.publish("jobs:events", json.dumps({"type": "created", "job_id": job.id}))
await pipe.execute()
return job.id
async def dequeue(self, worker_id: str, timeout: int = 5) -> Optional[Job]:
"""Pop highest priority job atomically."""
# Check queues in priority order
for priority in [JobPriority.HIGH, JobPriority.NORMAL, JobPriority.LOW]:
queue = self.queues[priority]
# Atomic pop from sorted set
result = await self.redis.bzpopmin(queue, timeout=timeout)
if result:
_, job_id, _ = result
# Mark as processing
await self.redis.hset("jobs:processing", job_id, worker_id)
# Get job data
data = await self.redis.hgetall(f"jobs:data:{job_id}")
job = Job.model_validate_json(data)
job.status = JobStatus.PROCESSING
job.started_at = datetime.utcnow()
await self._update_job(job)
return job
return None
async def complete(self, job_id: str, result: dict):
"""Mark job as completed."""
job = await self.get_job(job_id)
job.status = JobStatus.COMPLETED
job.result = result
job.progress = 100
job.completed_at = datetime.utcnow()
pipe = self.redis.pipeline()
pipe.hdel("jobs:processing", job_id)
pipe.zadd("jobs:completed", {job_id: datetime.utcnow().timestamp()})
pipe.hset(f"jobs:data:{job_id}", mapping=job.model_dump_json())
pipe.publish("jobs:events", json.dumps({"type": "completed", "job_id": job_id}))
await pipe.execute()
async def fail(self, job_id: str, error: str):
"""Handle job failure with retry or DLQ."""
job = await self.get_job(job_id)
job.retry_count += 1
job.error = error
if job.retry_count < job.max_retries:
# Retry with exponential backoff
delay = 2 ** job.retry_count # 2, 4, 8 seconds
job.status = JobStatus.QUEUED
pipe = self.redis.pipeline()
pipe.hdel("jobs:processing", job_id)
# Re-queue with delayed score
future_time = datetime.utcnow().timestamp() + delay
pipe.zadd(self.queues[job.priority], {job_id: future_time})
pipe.hset(f"jobs:data:{job_id}", mapping=job.model_dump_json())
await pipe.execute()
else:
# Send to DLQ
job.status = JobStatus.DLQ
pipe = self.redis.pipeline()
pipe.hdel("jobs:processing", job_id)
pipe.rpush("jobs:dlq", job_id)
pipe.hset(f"jobs:data:{job_id}", mapping=job.model_dump_json())
pipe.publish("jobs:events", json.dumps({"type": "dlq", "job_id": job_id}))
await pipe.execute()
async def update_progress(self, job_id: str, progress: int):
"""Update job progress (0-100)."""
await self.redis.hset(f"jobs:data:{job_id}", "progress", progress)
await self.redis.publish("jobs:progress", json.dumps({"job_id": job_id, "progress": progress}))
Worker Implementation
import asyncio
import signal
class Worker:
def __init__(self, queue: QueueService, worker_id: str):
self.queue = queue
self.worker_id = worker_id
self.running = True
async def run(self):
"""Main worker loop with graceful shutdown."""
# Handle SIGTERM gracefully
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, self._shutdown)
while self.running:
job = await self.queue.dequeue(self.worker_id, timeout=5)
if not job:
continue
try:
result = await self._process_job(job)
await self.queue.complete(job.id, result)
except Exception as e:
await self.queue.fail(job.id, str(e))
async def _process_job(self, job: Job) -> dict:
"""Process job with progress updates."""
# Report progress during long operations
for step in range(10):
await asyncio.sleep(1) # Simulated work
await self.queue.update_progress(job.id, (step + 1) * 10)
return {"output": "processed"}
def _shutdown(self):
"""Graceful shutdown - finish current job."""
self.running = False
DLQ Processing
async def process_dlq(queue: QueueService):
"""Manually review and retry DLQ jobs."""
while True:
job_id = await queue.redis.lpop("jobs:dlq")
if not job_id:
break
job = await queue.get_job(job_id)
print(f"DLQ Job: {job.id}, Error: {job.error}, Type: {job.type}")
# Options: retry, delete, or manual intervention
action = input("Action (retry/delete/skip): ")
if action == "retry":
job.retry_count = 0
job.status = JobStatus.QUEUED
await queue.enqueue(job)
elif action == "delete":
await queue.redis.delete(f"jobs:data:{job_id}")
Queue Monitoring
async def get_queue_stats(redis_client: redis.Redis) -> dict:
"""Get queue statistics for monitoring."""
pipe = redis_client.pipeline()
pipe.zcard("jobs:pending:high")
pipe.zcard("jobs:pending:normal")
pipe.zcard("jobs:pending:low")
pipe.hlen("jobs:processing")
pipe.llen("jobs:dlq")
pipe.zcard("jobs:completed")
results = await pipe.execute()
return {
"pending_high": results[0],
"pending_normal": results[1],
"pending_low": results[2],
"processing": results[3],
"dlq": results[4],
"completed_24h": results[5], # With TTL cleanup
"total_pending": results[0] + results[1] + results[2],
}
Prometheus Metrics
from prometheus_client import Gauge, Counter, Histogram
QUEUE_DEPTH = Gauge("job_queue_depth", "Jobs waiting", ["priority"])
JOBS_PROCESSED = Counter("jobs_processed_total", "Total processed", ["type", "status"])
JOB_DURATION = Histogram("job_duration_seconds", "Processing time", ["type"])
async def update_metrics(queue: QueueService):
"""Periodic metrics update."""
stats = await get_queue_stats(queue.redis)
QUEUE_DEPTH.labels("high").set(stats["pending_high"])
QUEUE_DEPTH.labels("normal").set(stats["pending_normal"])
QUEUE_DEPTH.labels("low").set(stats["pending_low"])
Backpressure Handling
MAX_QUEUE_DEPTH = {"high": 100, "normal": 200, "low": 50}
async def enqueue_with_backpressure(queue: QueueService, job: Job) -> str:
"""Reject jobs when queue is full."""
stats = await get_queue_stats(queue.redis)
limit = MAX_QUEUE_DEPTH[job.priority.value]
current = stats[f"pending_{job.priority.value}"]
if current >= limit:
raise HTTPException(
status_code=503,
detail="Queue full",
headers={"Retry-After": "60"}
)
return await queue.enqueue(job)
Production Checklist
- AOF persistence enabled (
appendonly yes) -
maxmemory-policy noevictionto prevent job loss - Atomic operations via pipelines
- Exponential backoff for retries
- DLQ for jobs exceeding max retries
- Progress tracking via pub/sub
- Graceful worker shutdown on SIGTERM
- Queue depth metrics exposed
- Backpressure handling (503 when full)
- Job TTL cleanup for completed jobs
ProYaro AI Infrastructure Documentation • Version 1.2