Workflow 3.0

Now with dependency injection, mermaid graphs, retries and callable support

rodber
rodber   GitHub

After over a year of development and extensive production use, Workflow 3.0 brings significant improvements to building multi-step procedures in PHP. This release focuses on simplifying asynchronous execution, improving developer experience, and adding essential resilience features.

# Dependency injection

Version 3.0 introduces PSR-11 container support for injecting dependencies into jobs at runtime:

$workflow = workflow(
    user: sync(
        FetchUser::class,
        userId: variable('id')
    ),
    notify: sync(
        SendEmail::class,
        email: response('user', 'email')
    )
);
$result = run($workflow, $container, id: 123);

This enables workflows to remain stateless while accessing services like databases or HTTP clients through the container.

# Mermaid graph visualization

Version 3.0 includes built-in support for generating Mermaid diagrams from workflow definitions:

$workflow = workflow(
    ja: async(
        fn (): int => 1
    ),
    jb: async(
        fn (): int => 2
    )
        ->withRunIf(response('ja'))
        ->withRunIfNot(variable('var')),
    j1: async(
        #[_return(new _arrayp(
            id: new _int(),
            name: new _string()
        ))]
        fn (): array => [
            'id' => 123,
            'name' => 'example',
        ]
    ),
    j2: sync(
        fn (int $n, string $m): int => $n + $m,
        n: response('j1', 'id'),
        m: response('j1', 'name')
    ),
    j3: sync(
        fn (int $a): int => $a,
        a: response('jb')
    ),
    j4: sync(
        fn (int $i, int $j): int => $i * $j,
        i: response('j2'),
        j: response('j3')
    ),
);
$mermaid = Mermaid::generate($workflow);

mermaid

Being able to render a workflow as a graph is a huge boost for understanding, communicating, and maintaining complex systems. A visual representation makes it easy to see the shape of your data flow, spot missing dependencies, and validate that the order of execution matches your intent.

Teams gain confidence faster because they can know what will happen before running anything: reviewers can validate job boundaries, architects can confirm that async tasks are correctly wired, and new contributors can onboard by reading a diagram rather than deciphering code.

# Callable + Invocable class support

Version 3.0 accepts any PHP callable or invocable class as a job, providing flexibility in how you define workflow steps:

workflow(
    // Closure for inline logic
    filter: sync(
        fn(array $data): array => array_filter($data, fn($item) => $item['active']),
        data: response('fetch')
    ),
    // Invocable class name (resolved via container)
    process: sync(ProcessData::class, items: response('filter')),
    // Named function
    format: sync('json_encode', value: response('process')),
    // Invocable object
    send: sync(new MessageDispatcher(), payload: response('format'))
);

This eliminates boilerplate for simple operations while maintaining support for Action classes when business logic requires full class structure. Callables enable inline data transformation without requiring dedicated action classes for single-use operations.

# Response property access

Version 3.0 extends response() to access public object properties directly, not just array keys:

class FetchUser extends Action
{
    public function __invoke(int $id): User
    {
        return new User(
            email: 'user@example.com',
            name: 'John Doe'
        );
    }
}

workflow(
    user: sync(FetchUser::class, id: variable('userId')),
    notify: sync(
        SendEmail::class,
        // Access public property directly
        email: response('user', 'email'),
        name: response('user', 'name')
    )
);

This works transparently with both arrays and objects, allowing actions to return domain objects without requiring array conversion. The workflow engine inspects the response and accesses properties or array keys accordingly.

# Retry policies

Transient failures in distributed systems are inevitable. Workflow 3.0 implements configurable retry policies:

workflow(
    fetch: sync(FetchFromApi::class, url: variable('endpoint'))
        ->withRetry(
            timeout: 300,
            maxAttempts: 5,
            delay: 10
        )
);

Retry policies are essential for handling transient failures in distributed systems, where network operations and external services may temporarily fail but succeed on subsequent attempts.

# True async execution

The parallel runner has been replaced with a true async implementation using AMPHP. This provides non-blocking execution without the overhead of process forking, leveraging PHP 8.1+ Fibers for efficient multitasking.

// Publish new podcast
workflow(
    process: async(
        ProcessPodcast::class,
        variable('podcast')
    ),
    optimize: async(
        OptimizePodcast::class,
        variable('podcast')
    ),
    releaseTransistorFM: async(
        ReleaseOnTransistorFM::class,
        response('optimize')
    ),
    releaseApplePodcasts: async(
        ReleaseOnApplePodcasts::class,
        response('optimize')
    ),
    transcribe: async(
        CreateAudioTranscription::class,
        response('process')
    ),
    translate: async(
        TranslateAudioTranscription::class,
        response('transcribe')
    ),
    notify: async(
        NotifySubscribers::class,
        variable('podcast'),
        response('releaseTransistorFM'),
        response('releaseApplePodcasts'),
    ),
    tweet: async(
        SendTweetAboutNewPodcast::class,
        variable('podcast'),
        response('translate'),
        response('releaseTransistorFM'),
        response('releaseApplePodcasts'),
    )
);

Independent jobs execute concurrently while the engine manages the resolution of the dependency graph. This follows asynchronous task-based execution model where the scheduler unrolls the graph and executes nodes as soon as their data dependencies (like response()) are satisfied. This shift significantly reduces memory footprint compared to the previous process-based model while maintaining strict execution order.

# Conditional execution

Version 3.0 adds withRunIfNot() for cleaner conditional logic:

workflow(
    // Check if app exists
    exists: sync(
        AppExists::class,
        id: variable('id')
    ),
    // Update the app if it exists
    update: sync(
        AppUpdate::class,
        id: variable('id')
    )->withRunIf(response('exists')),
    // Create the app if it does not exist
    create: sync(
        AppCreate::class,
        id: variable('id')
    )->withRunIfNot(response('exists'))
);

This complements withRunIf() and accepts boolean literals, variables, job responses, and callables. Conditional execution enables branching without complex orchestration logic.

This new version also supports passing integer values as conditions (e.g., withRunIf(1) / withRunIfNot(0)), treating non-zero values as truthy and zero as falsy.

# Type safety

Integration with chevere/parameter 2.0 provides runtime validation:

class ProcessOrder
{
    public function __invoke(
        #[_int(min: 1)]
        int $orderId,
        #[_string(regex: '/^[A-Z]{3}$/')]
        string $currency
    ): array {
        // Parameters validated before execution
    }
}

Workflow validates inputs before job execution and verifies response types match expected parameters in dependent jobs. This eliminates a class of runtime errors that would otherwise require extensive testing.

# Topological sorting

Workflow 3.0 uses Kahn’s topological sorting algorithm to determine execution order based on job dependencies and bindings between jobs. This makes the system deterministic, ensures jobs run only once their inputs are ready, and lets the engine detect cycles early instead of failing at runtime.

This means you can define your workflow in a natural, declarative way and trust the engine to execute jobs in the right order without needing to manually specify sequencing.

# Practical example

Here's a complete workflow for processing user uploads:

$workflow = workflow(
    validate: sync(
        fn(string $path, int $maxSize): bool =>
            file_exists($path) && filesize($path) <= $maxSize,
        path: variable('file'),
        maxSize: 5_000_000
    ),
    resize: async(ImageResize::class, path: variable('file'), width: 1200)
        ->withRunIf(response('validate'))
        ->withRetry(maxAttempts: 3, delay: 5),
    optimize: async(ImageOptimize::class, path: response('resize', 'output'))
        ->withRunIf(response('validate')),
    store: sync(
        StoreToS3::class,
        path: response('optimize', 'output'),
        bucket: variable('bucket')
    )
);

$result = run($workflow, $container,
    file: '/tmp/upload.jpg',
    bucket: 'user-uploads'
);

This workflow validates the file, resizes and optimizes it in parallel, then stores the result. The resize job retries on failure, and both processing jobs only run if validation succeeds.

# Migration notes

The parallel runner removal is the only breaking change. Applications using parallel execution should switch to async jobs with appropriate dependency declarations. The async runner provides better performance and simpler semantics.

# Conclusion

Workflow 3.0 represents three years of production refinement. The addition of container support, callables, retry policies, and true async execution address real-world requirements while maintaining the declarative approach that makes workflows maintainable.

The library continues following established patterns from workflow research and distributed systems literature. Each job remains independently testable, workflows stay declarative, and the dependency graph handles execution ordering automatically.

For complete documentation and examples, visit chevere.org/packages/workflow.

Rodolfo blogging since 2012.