MAJ Librarys

This commit is contained in:
EoleDev 2016-03-09 16:03:46 +01:00
parent 1d3ed3af6d
commit 03ef74d0cf
17 changed files with 347 additions and 194 deletions

View file

@ -1,5 +1,11 @@
# CHANGELOG
## 1.1.0 - 2016-03-07
* Update EachPromise to prevent recurring on a iterator when advancing, as this
could trigger fatal generator errors.
* Update Promise to allow recursive waiting without unwrapping exceptions.
## 1.0.3 - 2015-10-15
* Update EachPromise to immediately resolve when the underlying promise iterator

View file

@ -24,6 +24,9 @@ class EachPromise implements PromisorInterface
/** @var Promise */
private $aggregate;
/** @var bool */
private $mutex;
/**
* Configuration hash can include the following key value pairs:
*
@ -81,6 +84,7 @@ class EachPromise implements PromisorInterface
private function createPromise()
{
$this->mutex = false;
$this->aggregate = new Promise(function () {
reset($this->pending);
if (empty($this->pending) && !$this->iterable->valid()) {
@ -169,11 +173,21 @@ class EachPromise implements PromisorInterface
private function advanceIterator()
{
// Place a lock on the iterator so that we ensure to not recurse,
// preventing fatal generator errors.
if ($this->mutex) {
return false;
}
$this->mutex = true;
try {
$this->iterable->next();
$this->mutex = false;
return true;
} catch (\Exception $e) {
$this->aggregate->reject($e);
$this->mutex = false;
return false;
}
}
@ -186,9 +200,11 @@ class EachPromise implements PromisorInterface
}
unset($this->pending[$idx]);
$this->advanceIterator();
if (!$this->checkIfFinished()) {
// Only refill pending promises if we are not locked, preventing the
// EachPromise to recursively invoke the provided iterator, which
// cause a fatal error: "Cannot resume an already running generator"
if ($this->advanceIterator() && !$this->checkIfFinished()) {
// Add more pending promises if possible.
$this->refillPending();
}

View file

@ -61,17 +61,19 @@ class Promise implements PromiseInterface
{
$this->waitIfPending();
if (!$unwrap) {
return null;
}
$inner = $this->result instanceof PromiseInterface
? $this->result->wait($unwrap)
: $this->result;
if ($this->result instanceof PromiseInterface) {
return $this->result->wait($unwrap);
} elseif ($this->state === self::FULFILLED) {
return $this->result;
} else {
// It's rejected so "unwrap" and throw an exception.
throw exception_for($this->result);
if ($unwrap) {
if ($this->result instanceof PromiseInterface
|| $this->state === self::FULFILLED
) {
return $inner;
} else {
// It's rejected so "unwrap" and throw an exception.
throw exception_for($inner);
}
}
}
@ -257,11 +259,10 @@ class Promise implements PromiseInterface
$this->waitList = null;
foreach ($waitList as $result) {
descend:
$result->waitIfPending();
if ($result->result instanceof Promise) {
while ($result->result instanceof Promise) {
$result = $result->result;
goto descend;
$result->waitIfPending();
}
}
}

View file

@ -56,6 +56,7 @@ class TaskQueue
*/
public function run()
{
/** @var callable $task */
while ($task = array_shift($this->queue)) {
$task();
}

View file

@ -146,9 +146,9 @@ function inspect(PromiseInterface $promise)
'value' => $promise->wait()
];
} catch (RejectionException $e) {
return ['state' => 'rejected', 'reason' => $e->getReason()];
return ['state' => PromiseInterface::REJECTED, 'reason' => $e->getReason()];
} catch (\Exception $e) {
return ['state' => 'rejected', 'reason' => $e];
return ['state' => PromiseInterface::REJECTED, 'reason' => $e];
}
}
@ -304,10 +304,10 @@ function settle($promises)
return each(
$promises,
function ($value, $idx) use (&$results) {
$results[$idx] = ['state' => 'fulfilled', 'value' => $value];
$results[$idx] = ['state' => PromiseInterface::FULFILLED, 'value' => $value];
},
function ($reason, $idx) use (&$results) {
$results[$idx] = ['state' => 'rejected', 'reason' => $reason];
$results[$idx] = ['state' => PromiseInterface::REJECTED, 'reason' => $reason];
}
)->then(function () use (&$results) {
ksort($results);

View file

@ -39,8 +39,8 @@ class EachPromiseTest extends \PHPUnit_Framework_TestCase
public function testIsWaitable()
{
$a = new Promise(function () use (&$a) { $a->resolve('a'); });
$b = new Promise(function () use (&$b) { $b->resolve('b'); });
$a = $this->createSelfResolvingPromise('a');
$b = $this->createSelfResolvingPromise('b');
$called = [];
$each = new EachPromise([$a, $b], [
'fulfilled' => function ($value) use (&$called) { $called[] = $value; }
@ -54,7 +54,7 @@ class EachPromiseTest extends \PHPUnit_Framework_TestCase
public function testCanResolveBeforeConsumingAll()
{
$called = 0;
$a = new Promise(function () use (&$a) { $a->resolve('a'); });
$a = $this->createSelfResolvingPromise('a');
$b = new Promise(function () { $this->fail(); });
$each = new EachPromise([$a, $b], [
'fulfilled' => function ($value, $idx, Promise $aggregate) use (&$called) {
@ -291,4 +291,46 @@ class EachPromiseTest extends \PHPUnit_Framework_TestCase
}
$this->assertEquals(range(0, 9), $results);
}
private function createSelfResolvingPromise($value)
{
$p = new Promise(function () use (&$p, $value) {
$p->resolve($value);
});
return $p;
}
public function testMutexPreventsGeneratorRecursion()
{
$results = $promises = [];
for ($i = 0; $i < 20; $i++) {
$p = $this->createSelfResolvingPromise($i);
$pending[] = $p;
$promises[] = $p;
}
$iter = function () use (&$promises, &$pending) {
foreach ($promises as $promise) {
// Resolve a promises, which will trigger the then() function,
// which would cause the EachPromise to try to add more
// promises to the queue. Without a lock, this would trigger
// a "Cannot resume an already running generator" fatal error.
if ($p = array_pop($pending)) {
$p->wait();
}
yield $promise;
}
};
$each = new EachPromise($iter(), [
'concurrency' => 5,
'fulfilled' => function ($r) use (&$results, &$pending) {
$results[] = $r;
}
]);
$each->promise()->wait();
$this->assertCount(20, $results);
}
}

View file

@ -172,6 +172,18 @@ class PromiseTest extends \PHPUnit_Framework_TestCase
$this->assertEquals('Whoop', $p->wait());
}
public function testWaitsOnAPromiseChainEvenWhenNotUnwrapped()
{
$p2 = new Promise(function () use (&$p2) {
$p2->reject('Fail');
});
$p = new Promise(function () use ($p2, &$p) {
$p->resolve($p2);
});
$p->wait(false);
$this->assertSame(Promise::REJECTED, $p2->getState());
}
public function testCannotCancelNonPending()
{
$p = new Promise();

View file

@ -1,5 +1,11 @@
# CHANGELOG
## 1.2.3 - 2016-02-18
* Fixed support in `GuzzleHttp\Psr7\CachingStream` for seeking forward on remote
streams, which can sometimes return fewer bytes than requested with `fread`.
* Fixed handling of gzipped responses with FNAME headers.
## 1.2.2 - 2016-01-22
* Added support for URIs without any authority.

View file

@ -38,7 +38,7 @@ echo $composed(); // abc, 123. Above all listen to me.
`GuzzleHttp\Psr7\BufferStream`
Provides a buffer stream that can be written to to fill a buffer, and read
Provides a buffer stream that can be written to fill a buffer, and read
from to remove bytes from the buffer.
This stream returns a "hwm" metadata value that tells upstream consumers
@ -106,7 +106,7 @@ echo $stream; // 0123456789
Compose stream implementations based on a hash of functions.
Allows for easy testing and extension of a provided stream without needing to
Allows for easy testing and extension of a provided stream without needing
to create a concrete class for a simple extension point.
```php

View file

@ -60,9 +60,12 @@ class CachingStream implements StreamInterface
$diff = $byte - $this->stream->getSize();
if ($diff > 0) {
// If the seek byte is greater the number of read bytes, then read
// the difference of bytes to cache the bytes and inherently seek.
$this->read($diff);
// Read the remoteStream until we have read in at least the amount
// of bytes requested, or we reach the end of the file.
while ($diff > 0 && !$this->remoteStream->eof()) {
$this->read($diff);
$diff = $byte - $this->stream->getSize();
}
} else {
// We can just do a normal seek since we've already seen this byte.
$this->stream->seek($byte);

View file

@ -20,10 +20,33 @@ class InflateStream implements StreamInterface
public function __construct(StreamInterface $stream)
{
// Skip the first 10 bytes
$stream = new LimitStream($stream, -1, 10);
// read the first 10 bytes, ie. gzip header
$header = $stream->read(10);
$filenameHeaderLength = $this->getLengthOfPossibleFilenameHeader($stream, $header);
// Skip the header, that is 10 + length of filename + 1 (nil) bytes
$stream = new LimitStream($stream, -1, 10 + $filenameHeaderLength);
$resource = StreamWrapper::getResource($stream);
stream_filter_append($resource, 'zlib.inflate', STREAM_FILTER_READ);
$this->stream = new Stream($resource);
}
/**
* @param StreamInterface $stream
* @param $header
* @return int
*/
private function getLengthOfPossibleFilenameHeader(StreamInterface $stream, $header)
{
$filename_header_length = 0;
if (substr(bin2hex($header), 6, 2) === '08') {
// we have a filename, read until nil
$filename_header_length = 1;
while ($stream->read(1) !== chr(0)) {
$filename_header_length++;
}
}
return $filename_header_length;
}
}

View file

@ -98,6 +98,33 @@ class CachingStreamTest extends \PHPUnit_Framework_TestCase
$this->assertEquals('ing', $this->body->read(3));
}
public function testCanSeekToReadBytesWithPartialBodyReturned()
{
$stream = fopen('php://temp', 'r+');
fwrite($stream, 'testing');
fseek($stream, 0);
$this->decorated = $this->getMockBuilder('\GuzzleHttp\Psr7\Stream')
->setConstructorArgs([$stream])
->setMethods(['read'])
->getMock();
$this->decorated->expects($this->exactly(2))
->method('read')
->willReturnCallback(function($length) use ($stream){
return fread($stream, 2);
});
$this->body = new CachingStream($this->decorated);
$this->assertEquals(0, $this->body->tell());
$this->body->seek(4, SEEK_SET);
$this->assertEquals(4, $this->body->tell());
$this->body->seek(0);
$this->assertEquals('test', $this->body->read(4));
}
public function testWritesToBufferStream()
{
$this->body->read(2);

View file

@ -13,4 +13,27 @@ class InflateStreamtest extends \PHPUnit_Framework_TestCase
$b = new InflateStream($a);
$this->assertEquals('test', (string) $b);
}
public function testInflatesStreamsWithFilename()
{
$content = $this->getGzipStringWithFilename('test');
$a = Psr7\stream_for($content);
$b = new InflateStream($a);
$this->assertEquals('test', (string) $b);
}
private function getGzipStringWithFilename($original_string)
{
$gzipped = bin2hex(gzencode($original_string));
$header = substr($gzipped, 0, 20);
// set FNAME flag
$header[6]=0;
$header[7]=8;
// make a dummy filename
$filename = "64756d6d7900";
$rest = substr($gzipped, 20);
return hex2bin($header . $filename . $rest);
}
}

View file

@ -154,7 +154,7 @@ class UriTest extends \PHPUnit_Framework_TestCase
[self::RFC3986_BASE, 'g;x=1/../y', 'http://a/b/c/y'],
['http://u@a/b/c/d;p?q', '.', 'http://u@a/b/c/'],
['http://u:p@a/b/c/d;p?q', '.', 'http://u:p@a/b/c/'],
//[self::RFC3986_BASE, 'http:g', 'http:g'],
['http://a/b/c/d/', 'e', 'http://a/b/c/d/e'],
];
}