Tasks

Introduction

The Task Manager is a component that provides a clean, intuitive interface for handling asynchronous tasks using Swoole’s task worker capabilities. It allows you to offload time-consuming operations from your main application flow.

Why Use Task Manager?

Asynchronous task processing is essential for maintaining responsive applications, especially when dealing with operations that:

  • Take significant time to complete
  • Access external resources (APIs, file systems, etc.)
  • Perform CPU-intensive calculations
  • Send emails or notifications
  • Process uploads or media files
  • Generate reports or exports

By offloading these operations to task workers, your main application threads remain free to handle new requests, significantly improving your application’s scalability and user experience.

Installation

composer require ody/task

Basic Usage

Creating a Task

All tasks must implement the TaskInterface and provide a handle() method:

<?php

namespace App\Tasks;

use Ody\Task\TaskInterface;

class SendWelcomeEmailTask implements TaskInterface
{
    public function handle(array $params = [])
    {
        $email = $params['email'] ?? null;
        $name = $params['name'] ?? 'User';
        
        if (!$email) {
            return ['status' => 'error', 'message' => 'Email is required'];
        }
        
        // Logic to send email here
        
        return [
            'status' => 'success',
            'message' => "Welcome email sent to {$email}"
        ];
    }
}

Executing Tasks

To execute a task immediately:

use Ody\Task\Task;

// Simple execution
$taskId = Task::execute(\App\Tasks\SendWelcomeEmailTask::class, [
    'email' => 'user@example.com',
    'name' => 'John Doe'
]);

Setting Priority

You can set task priority to ensure important tasks are processed first:

// High priority task
$taskId = Task::execute(\App\Tasks\ProcessPaymentTask::class, [
    'amount' => 99.99,
    'userId' => 123
], Task::PRIORITY_HIGH);

// Low priority task
$taskId = Task::execute(\App\Tasks\GenerateReportTask::class, [
    'reportType' => 'monthly',
    'format' => 'pdf'
], Task::PRIORITY_LOW);

Delayed Execution

To schedule a task to run in the future:

// Run a task after 5 seconds
$taskId = Task::later(\App\Tasks\SendReminderTask::class, [
    'userId' => 456,
    'message' => 'Don\'t forget to complete your profile!'
], 5000); // 5000ms = 5 seconds

Task Retry Mechanisms

This feature adds automatic retry capability for tasks that fail, with exponential backoff. The system will automatically handle retry attempts when a task throws an exception, with increasing delays between attempts.

// Execute a task with retry (will retry up to 3 times with exponential backoff)
$taskId = Task::withRetry(
    \App\Tasks\PaymentProcessorTask::class,
    ['order_id' => 12345],
    [
        'attempts' => 3,        // Maximum attempts
        'delay' => 1000,        // Initial delay (1 second)
        'multiplier' => 2       // Each retry doubles the delay
    ]
);

Task Groups and Batches

Task Groups

Groups let you organize related tasks and wait for their collective completion:

// Create a task group
$group = Task::group('email-notifications')
    ->add(\App\Tasks\SendWelcomeEmailTask::class, ['user_id' => 123])
    ->add(\App\Tasks\NotifyAdminTask::class, ['new_user_id' => 123])
    ->add(\App\Tasks\UpdateStatisticsTask::class, ['event' => 'new_signup'])
    ->concurrency(2)           // Only run 2 tasks at a time
    ->allowFailures(true);     // Continue even if some tasks fail

// Dispatch all tasks in the group
$taskIds = $group->dispatch();

// Wait for all tasks to complete and get their results
$results = $group->wait(10000);  // Wait up to 10 seconds

// Or cancel all pending tasks in the group
$group->cancel();

Task Batches

Batches are optimized for processing multiple similar tasks:

// Process many items with the same task class
$userIds = [1, 2, 3, 4, 5];
$tasks = [];

foreach ($userIds as $userId) {
    $tasks[] = [
        'class' => \App\Tasks\ProcessUserDataTask::class,
        'params' => ['user_id' => $userId]
    ];
}

// Create and dispatch the batch
$batch = Task::batch($tasks);
$taskIds = $batch->dispatch();

// Wait for all tasks to complete or timeout after 30 seconds
$results = $batch->wait(30000);

Task Monitoring and Reporting

// Get status of a specific task
$taskStatus = Task::status($taskId);
// Returns: id, status, attempts, created_at, started_at, completed_at, execution_time, result, error

// Get overall metrics
$metrics = TaskManager::getInstance()->getMetrics();
// Returns: total_tasks, completed_tasks, failed_tasks, retried_tasks, cancelled_tasks, average_execution_time

Task Cancellation

// Cancel a specific task
$cancelled = Task::cancel($taskId);

// Cancel all tasks in a group
$group->cancel();

// Cancel all tasks in a batch
$batch->cancel();

Note: Only pending tasks can be cancelled. Running tasks will continue to completion.

Task Middleware

Middleware allows you to modify tasks or add cross-cutting concerns:

// Register global middleware that applies to all tasks
TaskManager::getInstance()->registerMiddleware(new LoggingMiddleware());

// Apply middleware to a specific
WIP

Framework Integration

Server Configuration

When setting up your Swoole server, you need to enable task workers and initialize the task handler:

$server = new Swoole\Server('0.0.0.0', 9501);

// Configure Swoole server
$server->set([
    'worker_num' => 4,            // Number of worker processes
    'task_worker_num' => 8,       // Number of task worker processes
    'task_enable_coroutine' => true,  // Enable coroutines in task workers
]);

// Initialize the task handler
\Ody\Task\TaskHandler::init($server);

// Start the server
$server->start();

Advanced Usage

Task Chaining

You can chain tasks by triggering a new task from within a task’s handle() method:

public function handle(array $params = [])
{
    // Do something...
    
    // Then trigger another task
    Task::execute(\App\Tasks\AnotherTask::class, [
        'previousResult' => $result
    ]);
    
    return ['status' => 'success'];
}

Task Batching

For operations that require processing multiple items, consider implementing a batch task pattern:

// Create a batch of tasks
foreach ($userIds as $userId) {
    Task::execute(\App\Tasks\NotifyUserTask::class, [
        'userId' => $userId,
        'message' => $notificationMessage
    ]);
}

API Reference

Task Class

Static Methods

MethodDescription
execute(string $taskClass, array $params = [], int $priority = self::PRIORITY_NORMAL): intExecute a task immediately with the given parameters and priority. Returns the task ID.
later(string $taskClass, array $params = [], int $delayMs = 1000, int $priority = self::PRIORITY_NORMAL): intSchedule a task to execute after the specified delay in milliseconds. Returns the task ID.

Constants

ConstantValueDescription
PRIORITY_HIGH20High priority tasks are processed before normal and low priority tasks
PRIORITY_NORMAL10Default priority level
PRIORITY_LOW5Low priority tasks are processed after high and normal priority tasks

TaskInterface

Methods

MethodDescription
handle(array $params = [])The main method that will be called when the task is executed. Should return a result that will be passed to the finish event.

TaskManager Class

Methods

MethodDescription
getInstance(): TaskManagerGet the singleton instance of the TaskManager
setServer(Server $server): voidSet the Swoole server instance
enqueue(string $taskClass, array $params = [], int $priority = Task::PRIORITY_NORMAL): intAdd a task to the queue for immediate execution
enqueueDelayed(string $taskClass, array $params = [], int $delayMs = 1000, int $priority = Task::PRIORITY_NORMAL): intAdd a task to be executed after the specified delay
getNextTask(): ?arrayGet the next task from the highest priority queue
taskComplete(int $taskId, $result): voidProcess a completed task

TaskHandler Class

Methods

MethodDescription
init(Server $server): voidInitialize the task handler with the Swoole server
handleTask(array $data)Handle a task execution

Best Practices

  1. Keep Tasks Small and Focused: Each task should do one thing well
  2. Make Tasks Idempotent: When possible, design tasks to be safely retried if they fail
  3. Include Error Handling: Always handle exceptions within your tasks to prevent task worker crashes
  4. Consider Timeouts: For tasks that might take a long time, implement timeout mechanisms
  5. Log Task Execution: Add logging to track task execution and troubleshoot issues

Troubleshooting

Tasks Not Executing

  • Ensure you have configured enough task workers (task_worker_num)
  • Check that you’ve properly initialized the TaskHandler
  • Verify that your task class implements TaskInterface correctly

Memory Leaks

  • Avoid storing large amounts of data in class properties
  • Use dependency injection rather than globals for accessing services
  • Ensure resources (file handles, database connections) are properly closed

Task Workers Crashing

  • Add try-catch blocks around your task code
  • Implement proper error logging
  • Avoid using functions that are not coroutine-safe if using task_enable_coroutine