amphp/sync
AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind.
amphp/sync
specifically provides synchronization primitives such as locks and semaphores for asynchronous and concurrent programming.
Installation
This package can be installed as a Composer dependency.
composer require amphp/sync
Usage
The weak link when managing concurrency is humans; so amphp/sync
provides abstractions to hide some complexity.
Mutex
Mutual exclusion can be achieved using Amp\Sync\synchronized()
and any Mutex
implementation, or by manually using the Mutex
instance to acquire a Lock
.
As long as the resulting Lock
object isn't released using Lock::release()
or by being garbage collected, the holder of the lock can exclusively run some code as long as all other parties running the same code also acquire a lock before doing so.
function writeExclusively(Amp\Sync\Mutex $mutex, string $filePath, string $data) {
$lock = $mutex->acquire();
try {
Amp\File\write($filePath, $data);
} finally {
$lock->release();
}
}
function writeExclusively(Amp\Sync\Mutex $mutex, string $filePath, string $data) {
Amp\Sync\synchronized($mutex, fn () => Amp\File\write($filePath, $data));
}
Semaphore
Semaphores are another synchronization primitive in addition to mutual exclusion.
Instead of providing exclusive access to a single party, they provide access to a limited set of N parties at the same time. This makes them great to control concurrency, e.g. limiting an HTTP client to X concurrent requests, so the HTTP server doesn't get overwhelmed.
Similar to Mutex
, Lock
instances can be acquired using Semaphore::acquire()
.
Please refer to the Mutex
documentation for additional usage documentation, as they're basically equivalent except for the fact that Mutex
is always a Semaphore
with a count of exactly one party.
In many cases you can use amphp/pipeline
instead of directly using a Semaphore
.
Parcel
A Parcel is used to synchronize access to a value across multiple execution contexts, such as multiple coroutines or multiple processes. The example below demonstrates using a LocalParcel
to share an integer between two coroutines.
use Amp\Future;
use Amp\Sync\LocalMutex;
use Amp\Sync\LocalParcel;
use function Amp\async;
use function Amp\delay;
$parcel = new LocalParcel(new LocalMutex(), 42);
$future1 = async(function () use ($parcel): void {
echo "Coroutine 1 started\n";
$value = $parcel->synchronized(function (int $value): int {
delay(1); // Delay for 1s to simulate I/O.
return $value * 2;
});
echo "Value after access in coroutine 1: ", $value, "\n";
});
$future2 = async(function () use ($parcel): void {
echo "Coroutine 2 started\n";
$value = $parcel->synchronized(function (int $value): int {
delay(1); // Delay again in this coroutine.
return $value + 8;
});
echo "Value after access in coroutine 2: ", $value, "\n";
});
Future\await([$future1, $future2]);
Channels
Channels are used to send data between execution contexts, such as multiple coroutines or multiple processes. The example below shares two Channel
between two coroutines. These channels are connected. Data sent on a channel is received on the paired channel and vice-versa.
use Amp\Future;
use function Amp\async;
use function Amp\delay;
[$left, $right] = createChannelPair();
$future1 = async(function () use ($left): void {
echo "Coroutine 1 started\n";
delay(1); // Delay to simulate I/O.
$left->send(42);
$value = $left->receive();
echo "Received ", $value, " in coroutine 1\n";
});
$future2 = async(function () use ($right): void {
echo "Coroutine 2 started\n";
$value = $right->receive();
echo "Received ", $value, " in coroutine 2\n";
delay(1); // Delay to simulate I/O.
$right->send($value * 2);
});
Future\await([$future1, $future2]);
Sharing data between processes
To share data between processes in PHP, the data must be serializable and use external storage or an IPC (inter-process communication) channel.
Parcels in external storage
SharedMemoryParcel
uses shared memory conjunction with PosixSemaphore
wrapped in SemaphoreMutex
(though another cross-context mutex implementation may be used, such as RedisMutex
in amphp/redis
).
Note
ext-shmop
and ext-sysvmsg
are required for SharedMemoryParcel
and PosixSemaphore
respectively.
amphp/redis
provides RedisParcel
for storing shared data in Redis.
Channels over pipes
Channels between processes can be created by layering serialization (Native PHP serialization, JSON serialization, etc.) on a pipe between those processes.
StreamChannel
in amphp/byte-stream
creates a channel from any ReadableStream
and WritableStream
. This allows a channel to be created from a variety of stream sources, such as sockets or process pipes.
ProcessContext
and task Execution
objects in amphp/parallel
provide a Channel
to send data between processes and tasks.
Concurrency Approaches
Given you have a list of URLs you want to crawl, let's discuss a few possible approaches. For simplicity, we will assume a fetch
function already exists, which takes a URL and returns the HTTP status code (which is everything we want to know for these examples).
Approach 1: Sequential
Simple loop using non-blocking I/O, but no concurrency while fetching the individual URLs; starts the second request as soon as the first completed.
$urls = [...];
$results = [];
foreach ($urls as $url) {
$results[$url] = fetch($url);
}
var_dump($results);
Approach 2: Everything Concurrently
Almost the same loop, but awaiting all operations at once; starts all requests immediately. Might not be feasible with too many URLs.
$urls = [...];
$results = [];
foreach ($urls as $url) {
$results[$url] = Amp\async(fetch(...), $url);
}
$results = Amp\Future\await($results);
var_dump($results);
Approach 3: Concurrent Chunks
Splitting the jobs into chunks of ten; all requests within a chunk are made concurrently, but each chunk sequentially, so the timing for each chunk depends on the slowest response; starts the eleventh request as soon as the first ten requests completed.
$urls = [...];
$results = [];
foreach (\array_chunk($urls, 10) as $chunk) {
$futures = [];
foreach ($chunk as $url) {
$futures[$url] = Amp\async(fetch(...), $url);
}
$results = \array_merge($results, Amp\Future\await($futures));
}
var_dump($results);
Approach 4: ConcurrentIterator
The amphp/pipeline
library provides concurrent iterators which can be used to process and consume data concurrently in multiple fibers.
use Amp\Pipeline\Pipeline;
use function Amp\delay;
$urls = [...];
$results = Pipeline::fromIterable($urls)
->concurrent(10) // Process up to 10 URLs concurrently
->unordered() // Results may arrive out of order
->map(fetch(...)) // Map each URL to fetch(...)
->toArray();
var_dump($results);
See the documentation in amphp/pipeline
for more information on using Pipelines for concurrency.
Versioning
amphp/sync
follows the semver semantic versioning specification like all other amphp
packages.
Security
If you discover any security related issues, please email [email protected]
instead of using the issue tracker.
License
The MIT License (MIT). Please see LICENSE
for more information.