Skill

Job Management

Job Management Integration Skill

Overview

This skill provides ready-to-use code for working with the Ubuntu backend job queue system.

Service: Ubuntu FastAPI Backend Endpoint: https://api.proyaro.com/jobs (external) or http://10.0.0.11:8000/jobs (internal) Authentication: Required (JWT Bearer token)


Job Types Supported

Job TypePurposeProcessing Time
speech_to_textWhisper STT10-30 seconds
text_to_speechXTTS-v2 TTS1-5 seconds
image_generationComfyUI (GPU)10-30 seconds
text_generationMLX (proxied)5-20 seconds
embeddingEmbeddings (GPU)1-10 seconds

TypeScript/JavaScript Implementation

Basic Job Client

interface JobCreate {
  job_type: 'speech_to_text' | 'text_to_speech' | 'image_generation' | 'text_generation' | 'embedding';
  parameters: Record<string, any>;
  priority?: number; // -10 to 10, default 0
}

interface Job {
  id: number;
  user_id: number;
  job_type: string;
  status: 'pending' | 'queued' | 'processing' | 'completed' | 'failed' | 'cancelled';
  parameters: Record<string, any>;
  priority: number;
  created_at: string;
  queued_at?: string;
  started_at?: string;
  completed_at?: string;
  error_message?: string;
  result?: JobResult;
}

interface JobResult {
  id: number;
  job_id: number;
  result_data: Record<string, any>;
  processing_time_ms: number;
  model_version?: string;
}

class JobClient {
  private baseURL: string;
  private token: string;

  constructor(baseURL: string = 'https://api.proyaro.com', token: string) {
    this.baseURL = baseURL;
    this.token = token;
  }

  // Create a new job
  async createJob(jobData: JobCreate): Promise<Job> {
    const response = await fetch(`${this.baseURL}/jobs`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.token}`,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify(jobData),
    });

    if (!response.ok) {
      throw new Error(`Failed to create job: ${response.statusText}`);
    }

    return response.json();
  }

  // Get job status and result
  async getJob(jobId: number): Promise<{ job: Job; result: JobResult | null }> {
    const response = await fetch(`${this.baseURL}/jobs/${jobId}`, {
      headers: {
        'Authorization': `Bearer ${this.token}`,
      },
    });

    if (!response.ok) {
      throw new Error(`Failed to get job: ${response.statusText}`);
    }

    return response.json();
  }

  // List all user's jobs
  async listJobs(skip: number = 0, limit: number = 100): Promise<Job[]> {
    const response = await fetch(`${this.baseURL}/jobs?skip=${skip}&limit=${limit}`, {
      headers: {
        'Authorization': `Bearer ${this.token}`,
      },
    });

    if (!response.ok) {
      throw new Error(`Failed to list jobs: ${response.statusText}`);
    }

    return response.json();
  }

  // Cancel a pending/queued job
  async cancelJob(jobId: number): Promise<void> {
    const response = await fetch(`${this.baseURL}/jobs/${jobId}`, {
      method: 'DELETE',
      headers: {
        'Authorization': `Bearer ${this.token}`,
      },
    });

    if (!response.ok && response.status !== 204) {
      throw new Error(`Failed to cancel job: ${response.statusText}`);
    }
  }

  // Clear job history
  async clearHistory(completedOnly: boolean = true): Promise<{ deleted_count: number }> {
    const response = await fetch(
      `${this.baseURL}/jobs/clear?completed_only=${completedOnly}`,
      {
        method: 'DELETE',
        headers: {
          'Authorization': `Bearer ${this.token}`,
        },
      }
    );

    if (!response.ok) {
      throw new Error(`Failed to clear history: ${response.statusText}`);
    }

    return response.json();
  }

  // Poll job until completion (with timeout)
  async waitForJob(
    jobId: number,
    pollInterval: number = 2000,
    timeout: number = 300000 // 5 minutes
  ): Promise<JobResult> {
    const startTime = Date.now();

    while (true) {
      // Check timeout
      if (Date.now() - startTime > timeout) {
        throw new Error(`Job ${jobId} timed out after ${timeout}ms`);
      }

      // Get job status
      const { job, result } = await this.getJob(jobId);

      // Check if complete
      if (job.status === 'completed' && result) {
        return result;
      }

      // Check if failed
      if (job.status === 'failed') {
        throw new Error(`Job ${jobId} failed: ${job.error_message}`);
      }

      // Check if cancelled
      if (job.status === 'cancelled') {
        throw new Error(`Job ${jobId} was cancelled`);
      }

      // Wait before next poll
      await new Promise(resolve => setTimeout(resolve, pollInterval));
    }
  }
}

// Usage Example
async function example() {
  const client = new JobClient('https://api.proyaro.com', 'your-jwt-token');

  // Create STT job
  const job = await client.createJob({
    job_type: 'speech_to_text',
    parameters: {
      audio_path: '/app/audio_files/recording.wav',
      language: 'ar',
      task: 'transcribe',
    },
  });

  console.log(`Job created: ${job.id}`);

  // Wait for completion
  const result = await client.waitForJob(job.id);
  console.log('Transcription:', result.result_data.text);
}

React Hook with WebSocket

import { useState, useEffect, useCallback } from 'react';

interface UseJobOptions {
  baseURL?: string;
  token: string;
  enableWebSocket?: boolean;
}

export function useJob(options: UseJobOptions) {
  const [jobs, setJobs] = useState<Map<number, Job>>(new Map());
  const [ws, setWs] = useState<WebSocket | null>(null);

  // Connect to WebSocket
  useEffect(() => {
    if (!options.enableWebSocket) return;

    const websocket = new WebSocket(
      `${options.baseURL?.replace('http', 'ws') || 'wss://api.proyaro.com'}/ws?token=${options.token}`
    );

    websocket.onmessage = (event) => {
      const update = JSON.parse(event.data);

      if (update.type === 'job_update') {
        if (update.status === 'HISTORY_CLEARED') {
          setJobs(new Map());
        } else {
          setJobs(prev => {
            const newJobs = new Map(prev);
            newJobs.set(update.job_id, {
              ...newJobs.get(update.job_id),
              id: update.job_id,
              status: update.status,
              ...update.data,
            });
            return newJobs;
          });
        }
      }
    };

    setWs(websocket);

    return () => {
      websocket.close();
    };
  }, [options.token, options.baseURL, options.enableWebSocket]);

  // Create job
  const createJob = useCallback(async (jobData: JobCreate): Promise<Job> => {
    const client = new JobClient(options.baseURL, options.token);
    const job = await client.createJob(jobData);

    // Add to local state
    setJobs(prev => {
      const newJobs = new Map(prev);
      newJobs.set(job.id, job);
      return newJobs;
    });

    return job;
  }, [options.baseURL, options.token]);

  // Wait for job with WebSocket or polling
  const waitForJob = useCallback(async (jobId: number): Promise<JobResult> => {
    if (options.enableWebSocket) {
      // Wait for WebSocket update
      return new Promise((resolve, reject) => {
        const checkJob = () => {
          const job = jobs.get(jobId);
          if (job?.status === 'completed' && job.result) {
            resolve(job.result);
          } else if (job?.status === 'failed') {
            reject(new Error(job.error_message));
          } else {
            setTimeout(checkJob, 100);
          }
        };
        checkJob();
      });
    } else {
      // Use polling
      const client = new JobClient(options.baseURL, options.token);
      return client.waitForJob(jobId);
    }
  }, [jobs, options.baseURL, options.token, options.enableWebSocket]);

  return {
    jobs: Array.from(jobs.values()),
    createJob,
    waitForJob,
    ws,
  };
}

// Usage in Component
function JobMonitor() {
  const { jobs, createJob, waitForJob } = useJob({
    token: 'your-jwt-token',
    enableWebSocket: true,
  });

  const handleCreateSTT = async () => {
    const job = await createJob({
      job_type: 'speech_to_text',
      parameters: {
        audio_path: '/app/audio_files/test.wav',
        language: 'ar',
      },
    });

    const result = await waitForJob(job.id);
    console.log('Result:', result);
  };

  return (
    <div>
      <button onClick={handleCreateSTT}>Transcribe Audio</button>

      <div>
        <h3>Jobs ({jobs.length})</h3>
        {jobs.map(job => (
          <div key={job.id}>
            Job #{job.id}: {job.status}
          </div>
        ))}
      </div>
    </div>
  );
}

Python Implementation

import time
import requests
from typing import Dict, Any, Optional, List
from dataclasses import dataclass


@dataclass
class JobResult:
    """Job result data"""
    id: int
    job_id: int
    result_data: Dict[str, Any]
    processing_time_ms: int
    model_version: Optional[str] = None


class JobClient:
    """Client for Ubuntu backend job management API"""

    def __init__(self, base_url: str = "https://api.proyaro.com", token: str = None):
        self.base_url = base_url
        self.token = token
        self.headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }

    def create_job(
        self,
        job_type: str,
        parameters: Dict[str, Any],
        priority: int = 0
    ) -> Dict[str, Any]:
        """
        Create a new job

        Args:
            job_type: Type of job (speech_to_text, text_to_speech, etc.)
            parameters: Job-specific parameters
            priority: Job priority (-10 to 10)

        Returns:
            Job dict with id, status, etc.
        """
        response = requests.post(
            f"{self.base_url}/jobs",
            json={
                "job_type": job_type,
                "parameters": parameters,
                "priority": priority
            },
            headers=self.headers,
            timeout=30
        )
        response.raise_for_status()
        return response.json()

    def get_job(self, job_id: int) -> Dict[str, Any]:
        """
        Get job status and result

        Args:
            job_id: Job ID

        Returns:
            dict with 'job' and 'result' keys
        """
        response = requests.get(
            f"{self.base_url}/jobs/{job_id}",
            headers=self.headers,
            timeout=10
        )
        response.raise_for_status()
        return response.json()

    def list_jobs(self, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
        """List all user's jobs"""
        response = requests.get(
            f"{self.base_url}/jobs?skip={skip}&limit={limit}",
            headers=self.headers,
            timeout=30
        )
        response.raise_for_status()
        return response.json()

    def cancel_job(self, job_id: int) -> None:
        """Cancel a pending/queued job"""
        response = requests.delete(
            f"{self.base_url}/jobs/{job_id}",
            headers=self.headers,
            timeout=10
        )
        response.raise_for_status()

    def clear_history(self, completed_only: bool = True) -> Dict[str, Any]:
        """Clear job history"""
        response = requests.delete(
            f"{self.base_url}/jobs/clear?completed_only={completed_only}",
            headers=self.headers,
            timeout=30
        )
        response.raise_for_status()
        return response.json()

    def wait_for_job(
        self,
        job_id: int,
        poll_interval: float = 2.0,
        timeout: float = 300.0
    ) -> JobResult:
        """
        Poll job until completion

        Args:
            job_id: Job ID to wait for
            poll_interval: Seconds between polls
            timeout: Maximum wait time in seconds

        Returns:
            JobResult object

        Raises:
            TimeoutError: If job doesn't complete in time
            RuntimeError: If job fails
        """
        start_time = time.time()

        while True:
            # Check timeout
            if time.time() - start_time > timeout:
                raise TimeoutError(f"Job {job_id} timed out after {timeout}s")

            # Get job status
            data = self.get_job(job_id)
            job = data["job"]
            result = data.get("result")

            # Check if complete
            if job["status"] == "completed" and result:
                return JobResult(
                    id=result["id"],
                    job_id=result["job_id"],
                    result_data=result["result_data"],
                    processing_time_ms=result["processing_time_ms"],
                    model_version=result.get("model_version")
                )

            # Check if failed
            if job["status"] == "failed":
                raise RuntimeError(f"Job {job_id} failed: {job.get('error_message')}")

            # Check if cancelled
            if job["status"] == "cancelled":
                raise RuntimeError(f"Job {job_id} was cancelled")

            # Wait before next poll
            time.sleep(poll_interval)


# Usage Example
if __name__ == "__main__":
    client = JobClient(token="your-jwt-token")

    # Create speech-to-text job
    print("Creating STT job...")
    job = client.create_job(
        job_type="speech_to_text",
        parameters={
            "audio_path": "/app/audio_files/recording.wav",
            "language": "ar",
            "task": "transcribe",
        }
    )
    print(f"Job created: {job['id']}")

    # Wait for completion
    print("Waiting for result...")
    result = client.wait_for_job(job["id"])
    print(f"Transcription: {result.result_data['text']}")
    print(f"Processing time: {result.processing_time_ms}ms")

Job Type Examples

Speech-to-Text

const sttJob = await client.createJob({
  job_type: 'speech_to_text',
  parameters: {
    audio_path: '/app/audio_files/recording.wav',
    language: 'ar',           // 'ar', 'en', or null for auto-detect
    task: 'transcribe',       // 'transcribe' or 'translate'
    temperature: 0.0,
    beam_size: 5,
    vad_filter: true,
  },
});

Text-to-Speech

const ttsJob = await client.createJob({
  job_type: 'text_to_speech',
  parameters: {
    text: 'مرحبا، كيف حالك؟',
    language: 'ar',           // 'ar', 'en', etc.
    speed: 1.0,               // 0.5 - 2.0
    speaker_wav: null,        // Optional: voice cloning
  },
});

Image Generation

const imageJob = await client.createJob({
  job_type: 'image_generation',
  parameters: {
    prompt: 'A beautiful sunset over pyramids',
    negative_prompt: 'cartoon, anime',
    width: 1024,
    height: 1024,
    steps: 9,
    cfg_scale: 1.0,
    seed: 42,
  },
});

Text Generation (via MLX)

const textJob = await client.createJob({
  job_type: 'text_generation',
  parameters: {
    prompt: 'Write a product description',
    system_prompt: 'You are a marketing expert',
    max_tokens: 500,
    temperature: 0.7,
    model: 'qwen2.5-combined-egyptian-marketing-4bit',
  },
});

Embeddings

const embeddingJob = await client.createJob({
  job_type: 'embedding',
  parameters: {
    texts: ['Text 1', 'Text 2', 'Text 3'],
    normalize: true,
    instruction: 'query:',    // Optional: 'query:' or 'passage:'
  },
});

Error Handling

async function safeCreateJob(
  client: JobClient,
  jobData: JobCreate
): Promise<Job> {
  try {
    return await client.createJob(jobData);
  } catch (error) {
    if (error.response?.status === 401) {
      throw new Error('Authentication failed. Token may be expired.');
    }
    if (error.response?.status === 400) {
      throw new Error('Invalid job parameters: ' + error.response.data.detail);
    }
    if (error.response?.status === 503) {
      throw new Error('AI service unavailable. Try again later.');
    }
    throw error;
  }
}

async function safeWaitForJob(
  client: JobClient,
  jobId: number,
  onProgress?: (status: string) => void
): Promise<JobResult> {
  const pollInterval = 2000;
  const timeout = 300000;
  const startTime = Date.now();

  while (true) {
    if (Date.now() - startTime > timeout) {
      throw new Error(`Job ${jobId} timed out`);
    }

    try {
      const { job, result } = await client.getJob(jobId);

      // Report progress
      onProgress?.(job.status);

      if (job.status === 'completed' && result) {
        return result;
      }

      if (job.status === 'failed') {
        throw new Error(`Job failed: ${job.error_message}`);
      }

      await new Promise(resolve => setTimeout(resolve, pollInterval));
    } catch (error) {
      if (error.message?.includes('Job failed')) {
        throw error;
      }
      // Retry on network errors
      await new Promise(resolve => setTimeout(resolve, pollInterval));
    }
  }
}

Best Practices

1. Use WebSocket for Real-time Updates

// Better than polling
const { jobs, createJob, waitForJob } = useJob({
  token,
  enableWebSocket: true,  // ✅ Real-time updates
});

// vs polling (less efficient)
while (job.status !== 'completed') {
  await sleep(2000);
  job = await getJob(job.id);  // ❌ Wastes bandwidth
}

2. Set Appropriate Priorities

// High priority for user-facing features
const userJob = await createJob({
  job_type: 'speech_to_text',
  parameters: { ... },
  priority: 5,  // Higher priority
});

// Low priority for background tasks
const backgroundJob = await createJob({
  job_type: 'embedding',
  parameters: { ... },
  priority: -5,  // Lower priority
});

3. Handle Timeouts Appropriately

const timeouts = {
  speech_to_text: 300000,    // 5 minutes
  text_to_speech: 120000,    // 2 minutes
  image_generation: 600000,  // 10 minutes
  text_generation: 300000,   // 5 minutes
  embedding: 180000,         // 3 minutes
};

await client.waitForJob(jobId, 2000, timeouts[job.job_type]);

4. Clear Old Jobs Regularly

// In app cleanup/logout
await client.clearHistory(true);  // Keep only active jobs

Testing

async function testJobWorkflow() {
  const client = new JobClient('https://api.proyaro.com', 'test-token');

  // Test job creation
  console.log('Testing job creation...');
  const job = await client.createJob({
    job_type: 'embedding',
    parameters: {
      texts: ['Test'],
      normalize: true,
    },
  });
  console.assert(job.id > 0, 'Job should have ID');
  console.assert(job.status === 'queued', 'Job should be queued');

  // Test job retrieval
  console.log('Testing job retrieval...');
  const { job: retrievedJob } = await client.getJob(job.id);
  console.assert(retrievedJob.id === job.id, 'Should retrieve same job');

  // Test job completion
  console.log('Waiting for completion...');
  const result = await client.waitForJob(job.id, 1000, 60000);
  console.assert(result.result_data, 'Should have result data');

  console.log('All tests passed!');
}

Skill Version: 1.0 Last Updated: 2025-01-01

ProYaro AI Infrastructure Documentation • Version 1.2