Skill
Job Management
Job Management Integration Skill
Overview
This skill provides ready-to-use code for working with the Ubuntu backend job queue system.
Service: Ubuntu FastAPI Backend
Endpoint: https://api.proyaro.com/jobs (external) or http://10.0.0.11:8000/jobs (internal)
Authentication: Required (JWT Bearer token)
Job Types Supported
| Job Type | Purpose | Processing Time |
|---|---|---|
speech_to_text | Whisper STT | 10-30 seconds |
text_to_speech | XTTS-v2 TTS | 1-5 seconds |
image_generation | ComfyUI (GPU) | 10-30 seconds |
text_generation | MLX (proxied) | 5-20 seconds |
embedding | Embeddings (GPU) | 1-10 seconds |
TypeScript/JavaScript Implementation
Basic Job Client
interface JobCreate {
job_type: 'speech_to_text' | 'text_to_speech' | 'image_generation' | 'text_generation' | 'embedding';
parameters: Record<string, any>;
priority?: number; // -10 to 10, default 0
}
interface Job {
id: number;
user_id: number;
job_type: string;
status: 'pending' | 'queued' | 'processing' | 'completed' | 'failed' | 'cancelled';
parameters: Record<string, any>;
priority: number;
created_at: string;
queued_at?: string;
started_at?: string;
completed_at?: string;
error_message?: string;
result?: JobResult;
}
interface JobResult {
id: number;
job_id: number;
result_data: Record<string, any>;
processing_time_ms: number;
model_version?: string;
}
class JobClient {
private baseURL: string;
private token: string;
constructor(baseURL: string = 'https://api.proyaro.com', token: string) {
this.baseURL = baseURL;
this.token = token;
}
// Create a new job
async createJob(jobData: JobCreate): Promise<Job> {
const response = await fetch(`${this.baseURL}/jobs`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(jobData),
});
if (!response.ok) {
throw new Error(`Failed to create job: ${response.statusText}`);
}
return response.json();
}
// Get job status and result
async getJob(jobId: number): Promise<{ job: Job; result: JobResult | null }> {
const response = await fetch(`${this.baseURL}/jobs/${jobId}`, {
headers: {
'Authorization': `Bearer ${this.token}`,
},
});
if (!response.ok) {
throw new Error(`Failed to get job: ${response.statusText}`);
}
return response.json();
}
// List all user's jobs
async listJobs(skip: number = 0, limit: number = 100): Promise<Job[]> {
const response = await fetch(`${this.baseURL}/jobs?skip=${skip}&limit=${limit}`, {
headers: {
'Authorization': `Bearer ${this.token}`,
},
});
if (!response.ok) {
throw new Error(`Failed to list jobs: ${response.statusText}`);
}
return response.json();
}
// Cancel a pending/queued job
async cancelJob(jobId: number): Promise<void> {
const response = await fetch(`${this.baseURL}/jobs/${jobId}`, {
method: 'DELETE',
headers: {
'Authorization': `Bearer ${this.token}`,
},
});
if (!response.ok && response.status !== 204) {
throw new Error(`Failed to cancel job: ${response.statusText}`);
}
}
// Clear job history
async clearHistory(completedOnly: boolean = true): Promise<{ deleted_count: number }> {
const response = await fetch(
`${this.baseURL}/jobs/clear?completed_only=${completedOnly}`,
{
method: 'DELETE',
headers: {
'Authorization': `Bearer ${this.token}`,
},
}
);
if (!response.ok) {
throw new Error(`Failed to clear history: ${response.statusText}`);
}
return response.json();
}
// Poll job until completion (with timeout)
async waitForJob(
jobId: number,
pollInterval: number = 2000,
timeout: number = 300000 // 5 minutes
): Promise<JobResult> {
const startTime = Date.now();
while (true) {
// Check timeout
if (Date.now() - startTime > timeout) {
throw new Error(`Job ${jobId} timed out after ${timeout}ms`);
}
// Get job status
const { job, result } = await this.getJob(jobId);
// Check if complete
if (job.status === 'completed' && result) {
return result;
}
// Check if failed
if (job.status === 'failed') {
throw new Error(`Job ${jobId} failed: ${job.error_message}`);
}
// Check if cancelled
if (job.status === 'cancelled') {
throw new Error(`Job ${jobId} was cancelled`);
}
// Wait before next poll
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
}
}
// Usage Example
async function example() {
const client = new JobClient('https://api.proyaro.com', 'your-jwt-token');
// Create STT job
const job = await client.createJob({
job_type: 'speech_to_text',
parameters: {
audio_path: '/app/audio_files/recording.wav',
language: 'ar',
task: 'transcribe',
},
});
console.log(`Job created: ${job.id}`);
// Wait for completion
const result = await client.waitForJob(job.id);
console.log('Transcription:', result.result_data.text);
}
React Hook with WebSocket
import { useState, useEffect, useCallback } from 'react';
interface UseJobOptions {
baseURL?: string;
token: string;
enableWebSocket?: boolean;
}
export function useJob(options: UseJobOptions) {
const [jobs, setJobs] = useState<Map<number, Job>>(new Map());
const [ws, setWs] = useState<WebSocket | null>(null);
// Connect to WebSocket
useEffect(() => {
if (!options.enableWebSocket) return;
const websocket = new WebSocket(
`${options.baseURL?.replace('http', 'ws') || 'wss://api.proyaro.com'}/ws?token=${options.token}`
);
websocket.onmessage = (event) => {
const update = JSON.parse(event.data);
if (update.type === 'job_update') {
if (update.status === 'HISTORY_CLEARED') {
setJobs(new Map());
} else {
setJobs(prev => {
const newJobs = new Map(prev);
newJobs.set(update.job_id, {
...newJobs.get(update.job_id),
id: update.job_id,
status: update.status,
...update.data,
});
return newJobs;
});
}
}
};
setWs(websocket);
return () => {
websocket.close();
};
}, [options.token, options.baseURL, options.enableWebSocket]);
// Create job
const createJob = useCallback(async (jobData: JobCreate): Promise<Job> => {
const client = new JobClient(options.baseURL, options.token);
const job = await client.createJob(jobData);
// Add to local state
setJobs(prev => {
const newJobs = new Map(prev);
newJobs.set(job.id, job);
return newJobs;
});
return job;
}, [options.baseURL, options.token]);
// Wait for job with WebSocket or polling
const waitForJob = useCallback(async (jobId: number): Promise<JobResult> => {
if (options.enableWebSocket) {
// Wait for WebSocket update
return new Promise((resolve, reject) => {
const checkJob = () => {
const job = jobs.get(jobId);
if (job?.status === 'completed' && job.result) {
resolve(job.result);
} else if (job?.status === 'failed') {
reject(new Error(job.error_message));
} else {
setTimeout(checkJob, 100);
}
};
checkJob();
});
} else {
// Use polling
const client = new JobClient(options.baseURL, options.token);
return client.waitForJob(jobId);
}
}, [jobs, options.baseURL, options.token, options.enableWebSocket]);
return {
jobs: Array.from(jobs.values()),
createJob,
waitForJob,
ws,
};
}
// Usage in Component
function JobMonitor() {
const { jobs, createJob, waitForJob } = useJob({
token: 'your-jwt-token',
enableWebSocket: true,
});
const handleCreateSTT = async () => {
const job = await createJob({
job_type: 'speech_to_text',
parameters: {
audio_path: '/app/audio_files/test.wav',
language: 'ar',
},
});
const result = await waitForJob(job.id);
console.log('Result:', result);
};
return (
<div>
<button onClick={handleCreateSTT}>Transcribe Audio</button>
<div>
<h3>Jobs ({jobs.length})</h3>
{jobs.map(job => (
<div key={job.id}>
Job #{job.id}: {job.status}
</div>
))}
</div>
</div>
);
}
Python Implementation
import time
import requests
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
@dataclass
class JobResult:
"""Job result data"""
id: int
job_id: int
result_data: Dict[str, Any]
processing_time_ms: int
model_version: Optional[str] = None
class JobClient:
"""Client for Ubuntu backend job management API"""
def __init__(self, base_url: str = "https://api.proyaro.com", token: str = None):
self.base_url = base_url
self.token = token
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
def create_job(
self,
job_type: str,
parameters: Dict[str, Any],
priority: int = 0
) -> Dict[str, Any]:
"""
Create a new job
Args:
job_type: Type of job (speech_to_text, text_to_speech, etc.)
parameters: Job-specific parameters
priority: Job priority (-10 to 10)
Returns:
Job dict with id, status, etc.
"""
response = requests.post(
f"{self.base_url}/jobs",
json={
"job_type": job_type,
"parameters": parameters,
"priority": priority
},
headers=self.headers,
timeout=30
)
response.raise_for_status()
return response.json()
def get_job(self, job_id: int) -> Dict[str, Any]:
"""
Get job status and result
Args:
job_id: Job ID
Returns:
dict with 'job' and 'result' keys
"""
response = requests.get(
f"{self.base_url}/jobs/{job_id}",
headers=self.headers,
timeout=10
)
response.raise_for_status()
return response.json()
def list_jobs(self, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
"""List all user's jobs"""
response = requests.get(
f"{self.base_url}/jobs?skip={skip}&limit={limit}",
headers=self.headers,
timeout=30
)
response.raise_for_status()
return response.json()
def cancel_job(self, job_id: int) -> None:
"""Cancel a pending/queued job"""
response = requests.delete(
f"{self.base_url}/jobs/{job_id}",
headers=self.headers,
timeout=10
)
response.raise_for_status()
def clear_history(self, completed_only: bool = True) -> Dict[str, Any]:
"""Clear job history"""
response = requests.delete(
f"{self.base_url}/jobs/clear?completed_only={completed_only}",
headers=self.headers,
timeout=30
)
response.raise_for_status()
return response.json()
def wait_for_job(
self,
job_id: int,
poll_interval: float = 2.0,
timeout: float = 300.0
) -> JobResult:
"""
Poll job until completion
Args:
job_id: Job ID to wait for
poll_interval: Seconds between polls
timeout: Maximum wait time in seconds
Returns:
JobResult object
Raises:
TimeoutError: If job doesn't complete in time
RuntimeError: If job fails
"""
start_time = time.time()
while True:
# Check timeout
if time.time() - start_time > timeout:
raise TimeoutError(f"Job {job_id} timed out after {timeout}s")
# Get job status
data = self.get_job(job_id)
job = data["job"]
result = data.get("result")
# Check if complete
if job["status"] == "completed" and result:
return JobResult(
id=result["id"],
job_id=result["job_id"],
result_data=result["result_data"],
processing_time_ms=result["processing_time_ms"],
model_version=result.get("model_version")
)
# Check if failed
if job["status"] == "failed":
raise RuntimeError(f"Job {job_id} failed: {job.get('error_message')}")
# Check if cancelled
if job["status"] == "cancelled":
raise RuntimeError(f"Job {job_id} was cancelled")
# Wait before next poll
time.sleep(poll_interval)
# Usage Example
if __name__ == "__main__":
client = JobClient(token="your-jwt-token")
# Create speech-to-text job
print("Creating STT job...")
job = client.create_job(
job_type="speech_to_text",
parameters={
"audio_path": "/app/audio_files/recording.wav",
"language": "ar",
"task": "transcribe",
}
)
print(f"Job created: {job['id']}")
# Wait for completion
print("Waiting for result...")
result = client.wait_for_job(job["id"])
print(f"Transcription: {result.result_data['text']}")
print(f"Processing time: {result.processing_time_ms}ms")
Job Type Examples
Speech-to-Text
const sttJob = await client.createJob({
job_type: 'speech_to_text',
parameters: {
audio_path: '/app/audio_files/recording.wav',
language: 'ar', // 'ar', 'en', or null for auto-detect
task: 'transcribe', // 'transcribe' or 'translate'
temperature: 0.0,
beam_size: 5,
vad_filter: true,
},
});
Text-to-Speech
const ttsJob = await client.createJob({
job_type: 'text_to_speech',
parameters: {
text: 'مرحبا، كيف حالك؟',
language: 'ar', // 'ar', 'en', etc.
speed: 1.0, // 0.5 - 2.0
speaker_wav: null, // Optional: voice cloning
},
});
Image Generation
const imageJob = await client.createJob({
job_type: 'image_generation',
parameters: {
prompt: 'A beautiful sunset over pyramids',
negative_prompt: 'cartoon, anime',
width: 1024,
height: 1024,
steps: 9,
cfg_scale: 1.0,
seed: 42,
},
});
Text Generation (via MLX)
const textJob = await client.createJob({
job_type: 'text_generation',
parameters: {
prompt: 'Write a product description',
system_prompt: 'You are a marketing expert',
max_tokens: 500,
temperature: 0.7,
model: 'qwen2.5-combined-egyptian-marketing-4bit',
},
});
Embeddings
const embeddingJob = await client.createJob({
job_type: 'embedding',
parameters: {
texts: ['Text 1', 'Text 2', 'Text 3'],
normalize: true,
instruction: 'query:', // Optional: 'query:' or 'passage:'
},
});
Error Handling
async function safeCreateJob(
client: JobClient,
jobData: JobCreate
): Promise<Job> {
try {
return await client.createJob(jobData);
} catch (error) {
if (error.response?.status === 401) {
throw new Error('Authentication failed. Token may be expired.');
}
if (error.response?.status === 400) {
throw new Error('Invalid job parameters: ' + error.response.data.detail);
}
if (error.response?.status === 503) {
throw new Error('AI service unavailable. Try again later.');
}
throw error;
}
}
async function safeWaitForJob(
client: JobClient,
jobId: number,
onProgress?: (status: string) => void
): Promise<JobResult> {
const pollInterval = 2000;
const timeout = 300000;
const startTime = Date.now();
while (true) {
if (Date.now() - startTime > timeout) {
throw new Error(`Job ${jobId} timed out`);
}
try {
const { job, result } = await client.getJob(jobId);
// Report progress
onProgress?.(job.status);
if (job.status === 'completed' && result) {
return result;
}
if (job.status === 'failed') {
throw new Error(`Job failed: ${job.error_message}`);
}
await new Promise(resolve => setTimeout(resolve, pollInterval));
} catch (error) {
if (error.message?.includes('Job failed')) {
throw error;
}
// Retry on network errors
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
}
}
Best Practices
1. Use WebSocket for Real-time Updates
// Better than polling
const { jobs, createJob, waitForJob } = useJob({
token,
enableWebSocket: true, // ✅ Real-time updates
});
// vs polling (less efficient)
while (job.status !== 'completed') {
await sleep(2000);
job = await getJob(job.id); // ❌ Wastes bandwidth
}
2. Set Appropriate Priorities
// High priority for user-facing features
const userJob = await createJob({
job_type: 'speech_to_text',
parameters: { ... },
priority: 5, // Higher priority
});
// Low priority for background tasks
const backgroundJob = await createJob({
job_type: 'embedding',
parameters: { ... },
priority: -5, // Lower priority
});
3. Handle Timeouts Appropriately
const timeouts = {
speech_to_text: 300000, // 5 minutes
text_to_speech: 120000, // 2 minutes
image_generation: 600000, // 10 minutes
text_generation: 300000, // 5 minutes
embedding: 180000, // 3 minutes
};
await client.waitForJob(jobId, 2000, timeouts[job.job_type]);
4. Clear Old Jobs Regularly
// In app cleanup/logout
await client.clearHistory(true); // Keep only active jobs
Testing
async function testJobWorkflow() {
const client = new JobClient('https://api.proyaro.com', 'test-token');
// Test job creation
console.log('Testing job creation...');
const job = await client.createJob({
job_type: 'embedding',
parameters: {
texts: ['Test'],
normalize: true,
},
});
console.assert(job.id > 0, 'Job should have ID');
console.assert(job.status === 'queued', 'Job should be queued');
// Test job retrieval
console.log('Testing job retrieval...');
const { job: retrievedJob } = await client.getJob(job.id);
console.assert(retrievedJob.id === job.id, 'Should retrieve same job');
// Test job completion
console.log('Waiting for completion...');
const result = await client.waitForJob(job.id, 1000, 60000);
console.assert(result.result_data, 'Should have result data');
console.log('All tests passed!');
}
Skill Version: 1.0 Last Updated: 2025-01-01
ProYaro AI Infrastructure Documentation • Version 1.2