• Stars
    star
    783
  • Rank 58,097 (Top 2 %)
  • Language
    PHP
  • License
    MIT License
  • Created over 8 years ago
  • Updated about 1 month ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

An advanced parallelization library for PHP, enabling efficient multitasking, optimizing resource use, and application responsiveness through multiple CPU threads.

amphp/parallel

AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind. amphp/parallel provides true parallel processing for PHP using multiple processes or threads, without blocking and no extensions required.

To be as flexible as possible, this library comes with a collection of non-blocking concurrency tools that can be used independently as needed, as well as an "opinionated" worker API that allows you to assign units of work to a pool of worker processes.

Latest Release MIT License

Installation

This package can be installed as a Composer dependency.

composer require amphp/parallel

Usage

The basic usage of this library is to submit blocking tasks to be executed by a worker pool in order to avoid blocking the main event loop.

<?php

require __DIR__ . '/../vendor/autoload.php';

use Amp\Future;
use Amp\Parallel\Worker;
use function Amp\async;

$urls = [
    'https://secure.php.net',
    'https://amphp.org',
    'https://github.com',
];

$executions = [];
foreach ($urls as $url) {
    // FetchTask is just an example, you'll have to implement
    // the Task interface for your task.
    $executions[$url] = Worker\submit(new FetchTask($url));
}

// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$responses = Future\await(array_map(
    fn (Worker\Execution $e) => $e->getFuture(),
    $executions,
));

foreach ($responses as $url => $response) {
    \printf("Read %d bytes from %s\n", \strlen($response), $url);
}

FetchTask is just used as an example for a blocking function here. If you just want to fetch multiple HTTP resources concurrently, it's better to use amphp/http-client, our non-blocking HTTP client.

Note The functions you call must be predefined or autoloadable by Composer, so they also exist in the worker process or thread.

Workers

Worker provides a simple interface for executing PHP code in parallel in a separate PHP process or thread. Classes implementing Task are used to define the code to be run in parallel.

Tasks

The Task interface has a single run() method that gets invoked in the worker to dispatch the work that needs to be done. The run() method can be written using blocking code since the code is executed in a separate process or thread.

Task instances are serialize'd in the main process and unserialize'd in the worker. That means that all data that is passed between the main process and a worker needs to be serializable.

In the example below, a Task is defined which calls a blocking function (file_get_contents() is only an example of a blocking function, use http-client for non-blocking HTTP requests).

Child processes or threads executing tasks may be reused to execute multiple tasks.

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

class FetchTask implements Task
{
    public function __construct(
        private readonly string $url,
    ) {
    }

    public function run(Channel $channel, Cancellation $cancellation): string
    {
        return file_get_contents($this->url); // Example blocking function
    }
}
$worker = Amp\Parallel\Worker\createWorker();
$task = new FetchTask('https://amphp.org');

$execution = $worker->submit($task);

// $data will be the return value from FetchTask::run()
$data = $execution->await();

Sharing data between tasks

Tasks may wish to share data between tasks runs. A Cache instance stored in a static property that is only initialized within Task::run() is our recommended strategy to share data.

use Amp\Cache\LocalCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

final class ExampleTask implements Task
{
    private static ?LocalCache $cache = null;
    
    public function run(Channel $channel, Cancellation $cancellation): mixed
    {
        $cache = self::$cache ??= new LocalCache();
        $cachedValue = $cache->get('cache-key');
        // Use and modify $cachedValue...
        $cache->set('cache-key', $updatedValue);
        return $updatedValue;
    }
}

You may wish to provide a hook to initialize the cache with mock data for testing.

A worker may be executing multiple tasks, so consider using AtomicCache instead of LocalCache when creating or updating cache values if a task uses async I/O to generate a cache value. AtomicCache has methods which provide mutual exclusion based on a cache key.

Task cancellation

A Cancellation provided to Worker::submit() may be used to request cancellation of the task in the worker. When cancellation is requested in the parent, the Cancellation provided to Task::run() is cancelled. The task may choose to ignore this cancellation request or act accordingly and throw a CancelledException from Task::run(). If the cancellation request is ignored, the task may continue and return a value which will be returned to the parent as though cancellation had not been requested.

Worker Pools

The easiest way to use workers is through a worker pool. Worker pools can be used to submit tasks in the same way as a worker, but rather than using a single worker process, the pool uses multiple workers to execute tasks. This allows multiple tasks to be executed simultaneously.

The WorkerPool interface extends Worker, adding methods to get information about the pool or pull a single Worker instance out of the pool. A pool uses multiple Worker instances to execute Task instances.

If a set of tasks should be run within a single worker, use the WorkerPool::getWorker() method to pull a single worker from the pool. The worker is automatically returned to the pool when the instance returned is destroyed.

Global Worker Pool

A global worker pool is available and can be set using the function Amp\Parallel\Worker\workerPool(?WorkerPool $pool = null). Passing an instance of WorkerPool will set the global pool to the given instance. Invoking the function without an instance will return the current global instance.

Child Processes or Threads

Contexts simplify writing and running PHP in parallel. A script written to be run in parallel must return a callable that will be run in a child process or thread. The callable receives a single argument – an instance of Channel that can be used to send data between the parent and child processes or threads. Any serializable data can be sent across this channel. The Context object, which extends the Channel interface, is the other end of the communication channel.

Contexts are created using a ContextFactory. DefaultContextFactory will use the best available method of creating context, creating a thread if ext-parallel is installed or otherwise using a child process. ParallelContextFactory (requires a ZTS build of PHP and ext-parallel to create threads) and ProcessContextFactory are also provided should you wish to create a specific context type.

In the example below, a child process or thread is used to call a blocking function (file_get_contents() is only an example of a blocking function, use http-client for non-blocking HTTP requests). The result of that function is then sent back to the parent using the Channel object. The return value of the child callable is available using the Context::join() method.

Child Process or Thread

# child.php

use Amp\Sync\Channel;

return function (Channel $channel): mixed {
    $url = $channel->receive();

    $data = file_get_contents($url); // Example blocking function

    $channel->send($data);

    return 'Any serializable data';
};

Parent Process

# parent.php

use Amp\Parallel\Context\ProcessContext;

// Creates and starts a child process or thread.
$context = Amp\Parallel\Context\contextFactory()->start(__DIR__ . '/child.php');

$url = 'https://google.com';
$context->send($url);

$requestData = $context->receive();
printf("Received %d bytes from %s\n", \strlen($requestData), $url);

$returnValue = $context->join();
printf("Child processes exited with '%s'\n", $returnValue);

Child processes or threads are also great for CPU-intensive operations such as image manipulation or for running daemons that perform periodic tasks based on input from the parent.

Context creation

An execution context can be created using the function Amp\Parallel\Context\startContext(), which uses the global ContextFactory. The global factory is an instance of DefaultContextFactory by default, but this instance can be overridden using the function Amp\Parallel\Context\contextFactory().

Context factories are used by worker pools to create the context which executes tasks. Providing a custom ContextFactory to a worker pool allows custom bootstrapping or other behavior within pool workers.

An execution context can be created by a ContextFactory. The worker pool uses context factories to create workers.

A global worker pool is available and can be set using the function Amp\Parallel\Worker\workerPool(?WorkerPool $pool = null). Passing an instance of WorkerPool will set the global pool to the given instance. Invoking the function without an instance will return the current global instance.

Debugging

Step debugging may be used in child processes with PhpStorm and Xdebug by listening for debug connections in the IDE.

In PhpStorm settings, under PHP > Debug, ensure the box "Can accept external connections" is checked. The specific ports used are not important, yours may differ.

PhpStorm Xdebug settings

For child processes to connect to the IDE and stop at breakpoints set in the child processes, turn on listening for debug connections.

Listening off:

Debug listening off

Listening on:

Debug listening on

No PHP ini settings need to be set manually. Settings set by PhpStorm when invoking the parent PHP process will be forwarded to child processes.

Run the parent script in debug mode from PhpStorm with breakpoints set in code executed in the child process. Execution should stop at any breakpoints set in the child.

Debugger running:

Debug running

When stopping at a breakpoint in a child process, execution of the parent process and any other child processes will continue. PhpStorm will open a new debugger tab for each child process connecting to the debugger, so you may need to limit the amount of child processes created when debugging or the number of connections may become overwhelming! If you set breakpoints in the parent and child process, you may need to switch between debug tabs to resume both the parent and child.

Versioning

amphp/parallel follows the semver semantic versioning specification like all other amphp packages.

Security

If you discover any security related issues, please use the private security issue reporter instead of using the public issue tracker.

License

The MIT License (MIT). Please see LICENSE for more information.

More Repositories

1

amp

A non-blocking concurrency framework for PHP applications. 🐘
PHP
4,239
star
2

http-server

An advanced async HTTP server library for PHP, perfect for real-time apps and APIs with high concurrency demands.
PHP
1,287
star
3

http-client

An advanced async HTTP client library for PHP, enabling efficient, non-blocking, and concurrent requests and responses.
PHP
701
star
4

byte-stream

A non-blocking stream abstraction for PHP based on Amp.
PHP
367
star
5

mysql

An async MySQL client for PHP, optimizing database interactions with efficient non-blocking capabilities. Perfect for responsive, high-performance applications.
PHP
358
star
6

thread

Unmaintained. Use https://github.com/amphp/parallel.
PHP
298
star
7

parallel-functions

Simplified parallel processing for PHP based on Amp.
PHP
271
star
8

ext-fiber

PHP Fiber extension
Assembly
239
star
9

process

An async process dispatcher for Amp.
PHP
229
star
10

socket

Non-blocking socket and TLS functionality for PHP based on Amp.
PHP
229
star
11

ext-uv

C
190
star
12

sync

Non-blocking synchronization primitives for PHP based on Amp and Revolt.
PHP
161
star
13

dns

Async DNS resolution for PHP based on Amp.
PHP
157
star
14

redis

Efficient asynchronous communication with Redis servers, enabling scalable and responsive data storage and retrieval.
PHP
156
star
15

websocket-client

Async WebSocket client for PHP based on Amp.
PHP
144
star
16

parser

A generator parser to make streaming parsers simple.
PHP
124
star
17

websocket-server

WebSocket component for PHP based on the Amp HTTP server.
PHP
114
star
18

serialization

Serialization tools for IPC and data storage in PHP.
PHP
110
star
19

cache

A fiber-aware cache API based on Amp and Revolt.
PHP
99
star
20

file

An abstraction layer and non-blocking file access solution that keeps your application responsive.
PHP
97
star
21

windows-registry

Windows Registry Reader.
PHP
97
star
22

postgres

Async Postgres client for PHP based on Amp.
PHP
96
star
23

hpack

HPack - HTTP/2 header compression implementation in PHP.
PHP
94
star
24

http

HTTP primitives which can be shared by servers and clients.
PHP
88
star
25

beanstalk

Asynchronous Beanstalk Client for PHP.
PHP
65
star
26

cluster

Building multi-core network applications with PHP.
PHP
60
star
27

aerys

A non-blocking HTTP application, WebSocket and file server for PHP based on Amp.
PHP
53
star
28

pipeline

Concurrent iterators and pipeline operations.
PHP
46
star
29

http-server-router

A router for Amp's HTTP Server.
PHP
38
star
30

getting-started

A getting started guide for Amp.
PHP
37
star
31

websocket

Shared code for websocket servers and clients.
PHP
36
star
32

green-thread

PHP
36
star
33

ssh

Async SSH client for PHP based on Amp.
PHP
35
star
34

log

Non-blocking logging for PHP based on Amp and Monolog.
PHP
33
star
35

injector

A recursive dependency injector used to bootstrap and wire together S.O.L.I.D., object-oriented PHP applications.
PHP
31
star
36

uri

Uri Parser and Resolver.
PHP
24
star
37

amphp.github.io

Main website repository.
HTML
24
star
38

react-adapter

Makes any ReactPHP library compatible with Amp.
PHP
24
star
39

artax

An async HTTP/1.1 client for PHP based on Amp.
PHP
23
star
40

http-server-static-content

An HTTP server plugin to serve static files like HTML, CSS, JavaScript, and images effortlessly.
PHP
22
star
41

phpunit-util

Helper package to ease testing with PHPUnit.
PHP
21
star
42

http-server-session

An HTTP server plugin that simplifies session management for your applications. Effortlessly handle user sessions, securely managing data across requests.
PHP
19
star
43

http-server-form-parser

An HTTP server plugin that simplifies form data handling. Effortlessly parse incoming form submissions and extracting its data.
HTML
18
star
44

aerys-reverse

Reverse HTTP proxy handler for Aerys
PHP
16
star
45

mysql-dbal

PHP
16
star
46

sql

Common interfaces for Amp based SQL drivers.
PHP
15
star
47

stomp

A non-blocking STOMP client built on the amp concurrency framework
PHP
15
star
48

loop

Discontinued. Merged into https://github.com/amphp/amp.
PHP
13
star
49

http-tunnel

This package provides an HTTP CONNECT tunnel for PHP based on Amp.
PHP
11
star
50

http-client-psr7

PSR-7 adapter for amphp/http-client.
PHP
10
star
51

http-client-cookies

Automatic cookie handling for Amp's HTTP client.
PHP
10
star
52

rpc

Remote procedure calls for PHP based on Amp.
PHP
9
star
53

http-client-cache

An async HTTP cache for Amp's HTTP client.
PHP
8
star
54

sql-common

Implementations shared by amphp/postgres and amphp/mysql
PHP
7
star
55

php-cs-fixer-config

Common code style configuration for all @amphp projects.
PHP
7
star
56

react-stream-adapter

Adapters to make React's and Amp's streams compatible.
PHP
7
star
57

http-client-guzzle-adapter

PHP
6
star
58

windows-process-wrapper

Child process wrapper to support non-blocking process pipes on Windows.
C
6
star
59

amphp.org

Documentation for AMPHP v3 based libraries.
HTML
6
star
60

quic

PHP
5
star
61

logo

Repository to store the logo and other assets.
3
star
62

dbus

A non-blocking DBus Connector with message serialization based on Amp.
PHP
2
star
63

website-tools

Website administration tools for amphp.org.
PHP
1
star
64

template

This repository serves as template for new amphp projects.
1
star
65

website-shared

Unmaintained. Has been merged into https://github.com/amphp/amphp.github.io.
1
star
66

.github

1
star