Init Server Composer Components

This commit is contained in:
Eole 2016-01-21 10:29:26 +01:00
parent 35db27b0e6
commit a44cc1d2e3
177 changed files with 24745 additions and 0 deletions

View file

@ -0,0 +1,16 @@
<?php
namespace GuzzleHttp\Promise;
/**
* Exception thrown when too many errors occur in the some() or any() methods.
*/
class AggregateException extends RejectionException
{
public function __construct($msg, array $reasons)
{
parent::__construct(
$reasons,
sprintf('%s; %d rejected promises', $msg, count($reasons))
);
}
}

View file

@ -0,0 +1,9 @@
<?php
namespace GuzzleHttp\Promise;
/**
* Exception that is set as the reason for a promise that has been cancelled.
*/
class CancellationException extends RejectionException
{
}

View file

@ -0,0 +1,207 @@
<?php
namespace GuzzleHttp\Promise;
/**
* Represents a promise that iterates over many promises and invokes
* side-effect functions in the process.
*/
class EachPromise implements PromisorInterface
{
private $pending = [];
/** @var \Iterator */
private $iterable;
/** @var callable|int */
private $concurrency;
/** @var callable */
private $onFulfilled;
/** @var callable */
private $onRejected;
/** @var Promise */
private $aggregate;
/**
* Configuration hash can include the following key value pairs:
*
* - fulfilled: (callable) Invoked when a promise fulfills. The function
* is invoked with three arguments: the fulfillment value, the index
* position from the iterable list of the promise, and the aggregate
* promise that manages all of the promises. The aggregate promise may
* be resolved from within the callback to short-circuit the promise.
* - rejected: (callable) Invoked when a promise is rejected. The
* function is invoked with three arguments: the rejection reason, the
* index position from the iterable list of the promise, and the
* aggregate promise that manages all of the promises. The aggregate
* promise may be resolved from within the callback to short-circuit
* the promise.
* - concurrency: (integer) Pass this configuration option to limit the
* allowed number of outstanding concurrently executing promises,
* creating a capped pool of promises. There is no limit by default.
*
* @param mixed $iterable Promises or values to iterate.
* @param array $config Configuration options
*/
public function __construct($iterable, array $config = [])
{
$this->iterable = iter_for($iterable);
if (isset($config['concurrency'])) {
$this->concurrency = $config['concurrency'];
}
if (isset($config['fulfilled'])) {
$this->onFulfilled = $config['fulfilled'];
}
if (isset($config['rejected'])) {
$this->onRejected = $config['rejected'];
}
}
public function promise()
{
if ($this->aggregate) {
return $this->aggregate;
}
try {
$this->createPromise();
$this->iterable->rewind();
$this->refillPending();
} catch (\Exception $e) {
$this->aggregate->reject($e);
}
return $this->aggregate;
}
private function createPromise()
{
$this->aggregate = new Promise(function () {
reset($this->pending);
if (empty($this->pending) && !$this->iterable->valid()) {
$this->aggregate->resolve(null);
return;
}
// Consume a potentially fluctuating list of promises while
// ensuring that indexes are maintained (precluding array_shift).
while ($promise = current($this->pending)) {
next($this->pending);
$promise->wait();
if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
return;
}
}
});
// Clear the references when the promise is resolved.
$clearFn = function () {
$this->iterable = $this->concurrency = $this->pending = null;
$this->onFulfilled = $this->onRejected = null;
};
$this->aggregate->then($clearFn, $clearFn);
}
private function refillPending()
{
if (!$this->concurrency) {
// Add all pending promises.
while ($this->addPending() && $this->advanceIterator());
return;
}
// Add only up to N pending promises.
$concurrency = is_callable($this->concurrency)
? call_user_func($this->concurrency, count($this->pending))
: $this->concurrency;
$concurrency = max($concurrency - count($this->pending), 0);
// Concurrency may be set to 0 to disallow new promises.
if (!$concurrency) {
return;
}
// Add the first pending promise.
$this->addPending();
// Note this is special handling for concurrency=1 so that we do
// not advance the iterator after adding the first promise. This
// helps work around issues with generators that might not have the
// next value to yield until promise callbacks are called.
while (--$concurrency
&& $this->advanceIterator()
&& $this->addPending());
}
private function addPending()
{
if (!$this->iterable || !$this->iterable->valid()) {
return false;
}
$promise = promise_for($this->iterable->current());
$idx = $this->iterable->key();
$this->pending[$idx] = $promise->then(
function ($value) use ($idx) {
if ($this->onFulfilled) {
call_user_func(
$this->onFulfilled, $value, $idx, $this->aggregate
);
}
$this->step($idx);
},
function ($reason) use ($idx) {
if ($this->onRejected) {
call_user_func(
$this->onRejected, $reason, $idx, $this->aggregate
);
}
$this->step($idx);
}
);
return true;
}
private function advanceIterator()
{
try {
$this->iterable->next();
return true;
} catch (\Exception $e) {
$this->aggregate->reject($e);
return false;
}
}
private function step($idx)
{
// If the promise was already resolved, then ignore this step.
if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
return;
}
unset($this->pending[$idx]);
$this->advanceIterator();
if (!$this->checkIfFinished()) {
// Add more pending promises if possible.
$this->refillPending();
}
}
private function checkIfFinished()
{
if (!$this->pending && !$this->iterable->valid()) {
// Resolve the promise if there's nothing left to do.
$this->aggregate->resolve(null);
return true;
}
return false;
}
}

View file

@ -0,0 +1,80 @@
<?php
namespace GuzzleHttp\Promise;
/**
* A promise that has been fulfilled.
*
* Thenning off of this promise will invoke the onFulfilled callback
* immediately and ignore other callbacks.
*/
class FulfilledPromise implements PromiseInterface
{
private $value;
public function __construct($value)
{
if (method_exists($value, 'then')) {
throw new \InvalidArgumentException(
'You cannot create a FulfilledPromise with a promise.');
}
$this->value = $value;
}
public function then(
callable $onFulfilled = null,
callable $onRejected = null
) {
// Return itself if there is no onFulfilled function.
if (!$onFulfilled) {
return $this;
}
$queue = queue();
$p = new Promise([$queue, 'run']);
$value = $this->value;
$queue->add(static function () use ($p, $value, $onFulfilled) {
if ($p->getState() === self::PENDING) {
try {
$p->resolve($onFulfilled($value));
} catch (\Exception $e) {
$p->reject($e);
}
}
});
return $p;
}
public function otherwise(callable $onRejected)
{
return $this->then(null, $onRejected);
}
public function wait($unwrap = true, $defaultDelivery = null)
{
return $unwrap ? $this->value : null;
}
public function getState()
{
return self::FULFILLED;
}
public function resolve($value)
{
if ($value !== $this->value) {
throw new \LogicException("Cannot resolve a fulfilled promise");
}
}
public function reject($reason)
{
throw new \LogicException("Cannot reject a fulfilled promise");
}
public function cancel()
{
// pass
}
}

View file

@ -0,0 +1,268 @@
<?php
namespace GuzzleHttp\Promise;
/**
* Promises/A+ implementation that avoids recursion when possible.
*
* @link https://promisesaplus.com/
*/
class Promise implements PromiseInterface
{
private $state = self::PENDING;
private $result;
private $cancelFn;
private $waitFn;
private $waitList;
private $handlers = [];
/**
* @param callable $waitFn Fn that when invoked resolves the promise.
* @param callable $cancelFn Fn that when invoked cancels the promise.
*/
public function __construct(
callable $waitFn = null,
callable $cancelFn = null
) {
$this->waitFn = $waitFn;
$this->cancelFn = $cancelFn;
}
public function then(
callable $onFulfilled = null,
callable $onRejected = null
) {
if ($this->state === self::PENDING) {
$p = new Promise(null, [$this, 'cancel']);
$this->handlers[] = [$p, $onFulfilled, $onRejected];
$p->waitList = $this->waitList;
$p->waitList[] = $this;
return $p;
}
// Return a fulfilled promise and immediately invoke any callbacks.
if ($this->state === self::FULFILLED) {
return $onFulfilled
? promise_for($this->result)->then($onFulfilled)
: promise_for($this->result);
}
// It's either cancelled or rejected, so return a rejected promise
// and immediately invoke any callbacks.
$rejection = rejection_for($this->result);
return $onRejected ? $rejection->then(null, $onRejected) : $rejection;
}
public function otherwise(callable $onRejected)
{
return $this->then(null, $onRejected);
}
public function wait($unwrap = true)
{
$this->waitIfPending();
if (!$unwrap) {
return null;
}
if ($this->result instanceof PromiseInterface) {
return $this->result->wait($unwrap);
} elseif ($this->state === self::FULFILLED) {
return $this->result;
} else {
// It's rejected so "unwrap" and throw an exception.
throw exception_for($this->result);
}
}
public function getState()
{
return $this->state;
}
public function cancel()
{
if ($this->state !== self::PENDING) {
return;
}
$this->waitFn = $this->waitList = null;
if ($this->cancelFn) {
$fn = $this->cancelFn;
$this->cancelFn = null;
try {
$fn();
} catch (\Exception $e) {
$this->reject($e);
}
}
// Reject the promise only if it wasn't rejected in a then callback.
if ($this->state === self::PENDING) {
$this->reject(new CancellationException('Promise has been cancelled'));
}
}
public function resolve($value)
{
$this->settle(self::FULFILLED, $value);
}
public function reject($reason)
{
$this->settle(self::REJECTED, $reason);
}
private function settle($state, $value)
{
if ($this->state !== self::PENDING) {
// Ignore calls with the same resolution.
if ($state === $this->state && $value === $this->result) {
return;
}
throw $this->state === $state
? new \LogicException("The promise is already {$state}.")
: new \LogicException("Cannot change a {$this->state} promise to {$state}");
}
if ($value === $this) {
throw new \LogicException('Cannot fulfill or reject a promise with itself');
}
// Clear out the state of the promise but stash the handlers.
$this->state = $state;
$this->result = $value;
$handlers = $this->handlers;
$this->handlers = null;
$this->waitList = $this->waitFn = null;
$this->cancelFn = null;
if (!$handlers) {
return;
}
// If the value was not a settled promise or a thenable, then resolve
// it in the task queue using the correct ID.
if (!method_exists($value, 'then')) {
$id = $state === self::FULFILLED ? 1 : 2;
// It's a success, so resolve the handlers in the queue.
queue()->add(static function () use ($id, $value, $handlers) {
foreach ($handlers as $handler) {
self::callHandler($id, $value, $handler);
}
});
} elseif ($value instanceof Promise
&& $value->getState() === self::PENDING
) {
// We can just merge our handlers onto the next promise.
$value->handlers = array_merge($value->handlers, $handlers);
} else {
// Resolve the handlers when the forwarded promise is resolved.
$value->then(
static function ($value) use ($handlers) {
foreach ($handlers as $handler) {
self::callHandler(1, $value, $handler);
}
},
static function ($reason) use ($handlers) {
foreach ($handlers as $handler) {
self::callHandler(2, $reason, $handler);
}
}
);
}
}
/**
* Call a stack of handlers using a specific callback index and value.
*
* @param int $index 1 (resolve) or 2 (reject).
* @param mixed $value Value to pass to the callback.
* @param array $handler Array of handler data (promise and callbacks).
*
* @return array Returns the next group to resolve.
*/
private static function callHandler($index, $value, array $handler)
{
/** @var PromiseInterface $promise */
$promise = $handler[0];
// The promise may have been cancelled or resolved before placing
// this thunk in the queue.
if ($promise->getState() !== self::PENDING) {
return;
}
try {
if (isset($handler[$index])) {
$promise->resolve($handler[$index]($value));
} elseif ($index === 1) {
// Forward resolution values as-is.
$promise->resolve($value);
} else {
// Forward rejections down the chain.
$promise->reject($value);
}
} catch (\Exception $reason) {
$promise->reject($reason);
}
}
private function waitIfPending()
{
if ($this->state !== self::PENDING) {
return;
} elseif ($this->waitFn) {
$this->invokeWaitFn();
} elseif ($this->waitList) {
$this->invokeWaitList();
} else {
// If there's not wait function, then reject the promise.
$this->reject('Cannot wait on a promise that has '
. 'no internal wait function. You must provide a wait '
. 'function when constructing the promise to be able to '
. 'wait on a promise.');
}
queue()->run();
if ($this->state === self::PENDING) {
$this->reject('Invoking the wait callback did not resolve the promise');
}
}
private function invokeWaitFn()
{
try {
$wfn = $this->waitFn;
$this->waitFn = null;
$wfn(true);
} catch (\Exception $reason) {
if ($this->state === self::PENDING) {
// The promise has not been resolved yet, so reject the promise
// with the exception.
$this->reject($reason);
} else {
// The promise was already resolved, so there's a problem in
// the application.
throw $reason;
}
}
}
private function invokeWaitList()
{
$waitList = $this->waitList;
$this->waitList = null;
foreach ($waitList as $result) {
descend:
$result->waitIfPending();
if ($result->result instanceof Promise) {
$result = $result->result;
goto descend;
}
}
}
}

View file

@ -0,0 +1,93 @@
<?php
namespace GuzzleHttp\Promise;
/**
* A promise represents the eventual result of an asynchronous operation.
*
* The primary way of interacting with a promise is through its then method,
* which registers callbacks to receive either a promises eventual value or
* the reason why the promise cannot be fulfilled.
*
* @link https://promisesaplus.com/
*/
interface PromiseInterface
{
const PENDING = 'pending';
const FULFILLED = 'fulfilled';
const REJECTED = 'rejected';
/**
* Appends fulfillment and rejection handlers to the promise, and returns
* a new promise resolving to the return value of the called handler.
*
* @param callable $onFulfilled Invoked when the promise fulfills.
* @param callable $onRejected Invoked when the promise is rejected.
*
* @return PromiseInterface
*/
public function then(
callable $onFulfilled = null,
callable $onRejected = null
);
/**
* Appends a rejection handler callback to the promise, and returns a new
* promise resolving to the return value of the callback if it is called,
* or to its original fulfillment value if the promise is instead
* fulfilled.
*
* @param callable $onRejected Invoked when the promise is rejected.
*
* @return PromiseInterface
*/
public function otherwise(callable $onRejected);
/**
* Get the state of the promise ("pending", "rejected", or "fulfilled").
*
* The three states can be checked against the constants defined on
* PromiseInterface: PENDING, FULFILLED, and REJECTED.
*
* @return string
*/
public function getState();
/**
* Resolve the promise with the given value.
*
* @param mixed $value
* @throws \RuntimeException if the promise is already resolved.
*/
public function resolve($value);
/**
* Reject the promise with the given reason.
*
* @param mixed $reason
* @throws \RuntimeException if the promise is already resolved.
*/
public function reject($reason);
/**
* Cancels the promise if possible.
*
* @link https://github.com/promises-aplus/cancellation-spec/issues/7
*/
public function cancel();
/**
* Waits until the promise completes if possible.
*
* Pass $unwrap as true to unwrap the result of the promise, either
* returning the resolved value or throwing the rejected exception.
*
* If the promise cannot be waited on, then the promise will be rejected.
*
* @param bool $unwrap
*
* @return mixed
* @throws \LogicException if the promise has no wait function or if the
* promise does not settle after waiting.
*/
public function wait($unwrap = true);
}

View file

@ -0,0 +1,15 @@
<?php
namespace GuzzleHttp\Promise;
/**
* Interface used with classes that return a promise.
*/
interface PromisorInterface
{
/**
* Returns a promise.
*
* @return PromiseInterface
*/
public function promise();
}

View file

@ -0,0 +1,84 @@
<?php
namespace GuzzleHttp\Promise;
/**
* A promise that has been rejected.
*
* Thenning off of this promise will invoke the onRejected callback
* immediately and ignore other callbacks.
*/
class RejectedPromise implements PromiseInterface
{
private $reason;
public function __construct($reason)
{
if (method_exists($reason, 'then')) {
throw new \InvalidArgumentException(
'You cannot create a RejectedPromise with a promise.');
}
$this->reason = $reason;
}
public function then(
callable $onFulfilled = null,
callable $onRejected = null
) {
// If there's no onRejected callback then just return self.
if (!$onRejected) {
return $this;
}
$queue = queue();
$reason = $this->reason;
$p = new Promise([$queue, 'run']);
$queue->add(static function () use ($p, $reason, $onRejected) {
if ($p->getState() === self::PENDING) {
try {
// Return a resolved promise if onRejected does not throw.
$p->resolve($onRejected($reason));
} catch (\Exception $e) {
// onRejected threw, so return a rejected promise.
$p->reject($e);
}
}
});
return $p;
}
public function otherwise(callable $onRejected)
{
return $this->then(null, $onRejected);
}
public function wait($unwrap = true, $defaultDelivery = null)
{
if ($unwrap) {
throw exception_for($this->reason);
}
}
public function getState()
{
return self::REJECTED;
}
public function resolve($value)
{
throw new \LogicException("Cannot resolve a rejected promise");
}
public function reject($reason)
{
if ($reason !== $this->reason) {
throw new \LogicException("Cannot reject a rejected promise");
}
}
public function cancel()
{
// pass
}
}

View file

@ -0,0 +1,47 @@
<?php
namespace GuzzleHttp\Promise;
/**
* A special exception that is thrown when waiting on a rejected promise.
*
* The reason value is available via the getReason() method.
*/
class RejectionException extends \RuntimeException
{
/** @var mixed Rejection reason. */
private $reason;
/**
* @param mixed $reason Rejection reason.
* @param string $description Optional description
*/
public function __construct($reason, $description = null)
{
$this->reason = $reason;
$message = 'The promise was rejected';
if ($description) {
$message .= ' with reason: ' . $description;
} elseif (is_string($reason)
|| (is_object($reason) && method_exists($reason, '__toString'))
) {
$message .= ' with reason: ' . $this->reason;
} elseif ($reason instanceof \JsonSerializable) {
$message .= ' with reason: '
. json_encode($this->reason, JSON_PRETTY_PRINT);
}
parent::__construct($message);
}
/**
* Returns the rejection reason.
*
* @return mixed
*/
public function getReason()
{
return $this->reason;
}
}

View file

@ -0,0 +1,79 @@
<?php
namespace GuzzleHttp\Promise;
/**
* A task queue that executes tasks in a FIFO order.
*
* This task queue class is used to settle promises asynchronously and
* maintains a constant stack size. You can use the task queue asynchronously
* by calling the `run()` function of the global task queue in an event loop.
*
* GuzzleHttp\Promise\queue()->run();
*/
class TaskQueue
{
private $enableShutdown = true;
private $queue = [];
public function __construct($withShutdown = true)
{
if ($withShutdown) {
register_shutdown_function(function () {
if ($this->enableShutdown) {
// Only run the tasks if an E_ERROR didn't occur.
$err = error_get_last();
if (!$err || ($err['type'] ^ E_ERROR)) {
$this->run();
}
}
});
}
}
/**
* Returns true if the queue is empty.
*
* @return bool
*/
public function isEmpty()
{
return !$this->queue;
}
/**
* Adds a task to the queue that will be executed the next time run is
* called.
*
* @param callable $task
*/
public function add(callable $task)
{
$this->queue[] = $task;
}
/**
* Execute all of the pending task in the queue.
*/
public function run()
{
while ($task = array_shift($this->queue)) {
$task();
}
}
/**
* The task queue will be run and exhausted by default when the process
* exits IFF the exit is not the result of a PHP E_ERROR error.
*
* You can disable running the automatic shutdown of the queue by calling
* this function. If you disable the task queue shutdown process, then you
* MUST either run the task queue (as a result of running your event loop
* or manually using the run() method) or wait on each outstanding promise.
*
* Note: This shutdown will occur before any destructors are triggered.
*/
public function disableShutdown()
{
$this->enableShutdown = false;
}
}

View file

@ -0,0 +1,495 @@
<?php
namespace GuzzleHttp\Promise;
/**
* Get the global task queue used for promise resolution.
*
* This task queue MUST be run in an event loop in order for promises to be
* settled asynchronously. It will be automatically run when synchronously
* waiting on a promise.
*
* <code>
* while ($eventLoop->isRunning()) {
* GuzzleHttp\Promise\queue()->run();
* }
* </code>
*
* @return TaskQueue
*/
function queue()
{
static $queue;
if (!$queue) {
$queue = new TaskQueue();
}
return $queue;
}
/**
* Adds a function to run in the task queue when it is next `run()` and returns
* a promise that is fulfilled or rejected with the result.
*
* @param callable $task Task function to run.
*
* @return PromiseInterface
*/
function task(callable $task)
{
$queue = queue();
$promise = new Promise([$queue, 'run']);
$queue->add(function () use ($task, $promise) {
try {
$promise->resolve($task());
} catch (\Exception $e) {
$promise->reject($e);
}
});
return $promise;
}
/**
* Creates a promise for a value if the value is not a promise.
*
* @param mixed $value Promise or value.
*
* @return PromiseInterface
*/
function promise_for($value)
{
if ($value instanceof PromiseInterface) {
return $value;
}
// Return a Guzzle promise that shadows the given promise.
if (method_exists($value, 'then')) {
$wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null;
$cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null;
$promise = new Promise($wfn, $cfn);
$value->then([$promise, 'resolve'], [$promise, 'reject']);
return $promise;
}
return new FulfilledPromise($value);
}
/**
* Creates a rejected promise for a reason if the reason is not a promise. If
* the provided reason is a promise, then it is returned as-is.
*
* @param mixed $reason Promise or reason.
*
* @return PromiseInterface
*/
function rejection_for($reason)
{
if ($reason instanceof PromiseInterface) {
return $reason;
}
return new RejectedPromise($reason);
}
/**
* Create an exception for a rejected promise value.
*
* @param mixed $reason
*
* @return \Exception
*/
function exception_for($reason)
{
return $reason instanceof \Exception
? $reason
: new RejectionException($reason);
}
/**
* Returns an iterator for the given value.
*
* @param mixed $value
*
* @return \Iterator
*/
function iter_for($value)
{
if ($value instanceof \Iterator) {
return $value;
} elseif (is_array($value)) {
return new \ArrayIterator($value);
} else {
return new \ArrayIterator([$value]);
}
}
/**
* Synchronously waits on a promise to resolve and returns an inspection state
* array.
*
* Returns a state associative array containing a "state" key mapping to a
* valid promise state. If the state of the promise is "fulfilled", the array
* will contain a "value" key mapping to the fulfilled value of the promise. If
* the promise is rejected, the array will contain a "reason" key mapping to
* the rejection reason of the promise.
*
* @param PromiseInterface $promise Promise or value.
*
* @return array
*/
function inspect(PromiseInterface $promise)
{
try {
return [
'state' => PromiseInterface::FULFILLED,
'value' => $promise->wait()
];
} catch (RejectionException $e) {
return ['state' => 'rejected', 'reason' => $e->getReason()];
} catch (\Exception $e) {
return ['state' => 'rejected', 'reason' => $e];
}
}
/**
* Waits on all of the provided promises, but does not unwrap rejected promises
* as thrown exception.
*
* Returns an array of inspection state arrays.
*
* @param PromiseInterface[] $promises Traversable of promises to wait upon.
*
* @return array
* @see GuzzleHttp\Promise\inspect for the inspection state array format.
*/
function inspect_all($promises)
{
$results = [];
foreach ($promises as $key => $promise) {
$results[$key] = inspect($promise);
}
return $results;
}
/**
* Waits on all of the provided promises and returns the fulfilled values.
*
* Returns an array that contains the value of each promise (in the same order
* the promises were provided). An exception is thrown if any of the promises
* are rejected.
*
* @param mixed $promises Iterable of PromiseInterface objects to wait on.
*
* @return array
* @throws \Exception on error
*/
function unwrap($promises)
{
$results = [];
foreach ($promises as $key => $promise) {
$results[$key] = $promise->wait();
}
return $results;
}
/**
* Given an array of promises, return a promise that is fulfilled when all the
* items in the array are fulfilled.
*
* The promise's fulfillment value is an array with fulfillment values at
* respective positions to the original array. If any promise in the array
* rejects, the returned promise is rejected with the rejection reason.
*
* @param mixed $promises Promises or values.
*
* @return Promise
*/
function all($promises)
{
$results = [];
return each(
$promises,
function ($value, $idx) use (&$results) {
$results[$idx] = $value;
},
function ($reason, $idx, Promise $aggregate) {
$aggregate->reject($reason);
}
)->then(function () use (&$results) {
ksort($results);
return $results;
});
}
/**
* Initiate a competitive race between multiple promises or values (values will
* become immediately fulfilled promises).
*
* When count amount of promises have been fulfilled, the returned promise is
* fulfilled with an array that contains the fulfillment values of the winners
* in order of resolution.
*
* This prommise is rejected with a {@see GuzzleHttp\Promise\AggregateException}
* if the number of fulfilled promises is less than the desired $count.
*
* @param int $count Total number of promises.
* @param mixed $promises Promises or values.
*
* @return Promise
*/
function some($count, $promises)
{
$results = [];
$rejections = [];
return each(
$promises,
function ($value, $idx, PromiseInterface $p) use (&$results, $count) {
if ($p->getState() !== PromiseInterface::PENDING) {
return;
}
$results[$idx] = $value;
if (count($results) >= $count) {
$p->resolve(null);
}
},
function ($reason) use (&$rejections) {
$rejections[] = $reason;
}
)->then(
function () use (&$results, &$rejections, $count) {
if (count($results) !== $count) {
throw new AggregateException(
'Not enough promises to fulfill count',
$rejections
);
}
ksort($results);
return array_values($results);
}
);
}
/**
* Like some(), with 1 as count. However, if the promise fulfills, the
* fulfillment value is not an array of 1 but the value directly.
*
* @param mixed $promises Promises or values.
*
* @return PromiseInterface
*/
function any($promises)
{
return some(1, $promises)->then(function ($values) { return $values[0]; });
}
/**
* Returns a promise that is fulfilled when all of the provided promises have
* been fulfilled or rejected.
*
* The returned promise is fulfilled with an array of inspection state arrays.
*
* @param mixed $promises Promises or values.
*
* @return Promise
* @see GuzzleHttp\Promise\inspect for the inspection state array format.
*/
function settle($promises)
{
$results = [];
return each(
$promises,
function ($value, $idx) use (&$results) {
$results[$idx] = ['state' => 'fulfilled', 'value' => $value];
},
function ($reason, $idx) use (&$results) {
$results[$idx] = ['state' => 'rejected', 'reason' => $reason];
}
)->then(function () use (&$results) {
ksort($results);
return $results;
});
}
/**
* Given an iterator that yields promises or values, returns a promise that is
* fulfilled with a null value when the iterator has been consumed or the
* aggregate promise has been fulfilled or rejected.
*
* $onFulfilled is a function that accepts the fulfilled value, iterator
* index, and the aggregate promise. The callback can invoke any necessary side
* effects and choose to resolve or reject the aggregate promise if needed.
*
* $onRejected is a function that accepts the rejection reason, iterator
* index, and the aggregate promise. The callback can invoke any necessary side
* effects and choose to resolve or reject the aggregate promise if needed.
*
* @param mixed $iterable Iterator or array to iterate over.
* @param callable $onFulfilled
* @param callable $onRejected
*
* @return Promise
*/
function each(
$iterable,
callable $onFulfilled = null,
callable $onRejected = null
) {
return (new EachPromise($iterable, [
'fulfilled' => $onFulfilled,
'rejected' => $onRejected
]))->promise();
}
/**
* Like each, but only allows a certain number of outstanding promises at any
* given time.
*
* $concurrency may be an integer or a function that accepts the number of
* pending promises and returns a numeric concurrency limit value to allow for
* dynamic a concurrency size.
*
* @param mixed $iterable
* @param int|callable $concurrency
* @param callable $onFulfilled
* @param callable $onRejected
*
* @return mixed
*/
function each_limit(
$iterable,
$concurrency,
callable $onFulfilled = null,
callable $onRejected = null
) {
return (new EachPromise($iterable, [
'fulfilled' => $onFulfilled,
'rejected' => $onRejected,
'concurrency' => $concurrency
]))->promise();
}
/**
* Like each_limit, but ensures that no promise in the given $iterable argument
* is rejected. If any promise is rejected, then the aggregate promise is
* rejected with the encountered rejection.
*
* @param mixed $iterable
* @param int|callable $concurrency
* @param callable $onFulfilled
*
* @return mixed
*/
function each_limit_all(
$iterable,
$concurrency,
callable $onFulfilled = null
) {
return each_limit(
$iterable,
$concurrency,
$onFulfilled,
function ($reason, $idx, PromiseInterface $aggregate) {
$aggregate->reject($reason);
}
);
}
/**
* Returns true if a promise is fulfilled.
*
* @param PromiseInterface $promise
*
* @return bool
*/
function is_fulfilled(PromiseInterface $promise)
{
return $promise->getState() === PromiseInterface::FULFILLED;
}
/**
* Returns true if a promise is rejected.
*
* @param PromiseInterface $promise
*
* @return bool
*/
function is_rejected(PromiseInterface $promise)
{
return $promise->getState() === PromiseInterface::REJECTED;
}
/**
* Returns true if a promise is fulfilled or rejected.
*
* @param PromiseInterface $promise
*
* @return bool
*/
function is_settled(PromiseInterface $promise)
{
return $promise->getState() !== PromiseInterface::PENDING;
}
/**
* Creates a promise that is resolved using a generator that yields values or
* promises (somewhat similar to C#'s async keyword).
*
* When called, the coroutine function will start an instance of the generator
* and returns a promise that is fulfilled with its final yielded value.
*
* Control is returned back to the generator when the yielded promise settles.
* This can lead to less verbose code when doing lots of sequential async calls
* with minimal processing in between.
*
* use GuzzleHttp\Promise;
*
* function createPromise($value) {
* return new Promise\FulfilledPromise($value);
* }
*
* $promise = Promise\coroutine(function () {
* $value = (yield createPromise('a'));
* try {
* $value = (yield createPromise($value . 'b'));
* } catch (\Exception $e) {
* // The promise was rejected.
* }
* yield $value . 'c';
* });
*
* // Outputs "abc"
* $promise->then(function ($v) { echo $v; });
*
* @param callable $generatorFn Generator function to wrap into a promise.
*
* @return Promise
* @link https://github.com/petkaantonov/bluebird/blob/master/API.md#generators inspiration
*/
function coroutine(callable $generatorFn)
{
$generator = $generatorFn();
return __next_coroutine($generator->current(), $generator)->then();
}
/** @internal */
function __next_coroutine($yielded, \Generator $generator)
{
return promise_for($yielded)->then(
function ($value) use ($generator) {
$nextYield = $generator->send($value);
return $generator->valid()
? __next_coroutine($nextYield, $generator)
: $value;
},
function ($reason) use ($generator) {
$nextYield = $generator->throw(exception_for($reason));
// The throw was caught, so keep iterating on the coroutine
return __next_coroutine($nextYield, $generator);
}
);
}

View file

@ -0,0 +1,6 @@
<?php
// Don't redefine the functions if included multiple times.
if (!function_exists('GuzzleHttp\Promise\promise_for')) {
require __DIR__ . '/functions.php';
}