Atomic.php 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use Predis\ClientException;
  12. use Predis\ClientInterface;
  13. use Predis\Connection\ConnectionInterface;
  14. use Predis\Connection\NodeConnectionInterface;
  15. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  16. use Predis\Response\ResponseInterface;
  17. use Predis\Response\ServerException;
  18. /**
  19. * Command pipeline wrapped into a MULTI / EXEC transaction.
  20. *
  21. * @author Daniele Alessandri <suppakilla@gmail.com>
  22. */
  23. class Atomic extends Pipeline
  24. {
  25. /**
  26. * {@inheritdoc}
  27. */
  28. public function __construct(ClientInterface $client)
  29. {
  30. if (!$client->getProfile()->supportsCommands(array('multi', 'exec', 'discard'))) {
  31. throw new ClientException(
  32. "The current profile does not support 'MULTI', 'EXEC' and 'DISCARD'."
  33. );
  34. }
  35. parent::__construct($client);
  36. }
  37. /**
  38. * {@inheritdoc}
  39. */
  40. protected function getConnection()
  41. {
  42. $connection = $this->getClient()->getConnection();
  43. if (!$connection instanceof NodeConnectionInterface) {
  44. $class = __CLASS__;
  45. throw new ClientException("The class '$class' does not support aggregate connections.");
  46. }
  47. return $connection;
  48. }
  49. /**
  50. * {@inheritdoc}
  51. */
  52. protected function executePipeline(ConnectionInterface $connection, \SplQueue $commands)
  53. {
  54. $profile = $this->getClient()->getProfile();
  55. $connection->executeCommand($profile->createCommand('multi'));
  56. foreach ($commands as $command) {
  57. $connection->writeRequest($command);
  58. }
  59. foreach ($commands as $command) {
  60. $response = $connection->readResponse($command);
  61. if ($response instanceof ErrorResponseInterface) {
  62. $connection->executeCommand($profile->createCommand('discard'));
  63. throw new ServerException($response->getMessage());
  64. }
  65. }
  66. $executed = $connection->executeCommand($profile->createCommand('exec'));
  67. if (!isset($executed)) {
  68. // TODO: should be throwing a more appropriate exception.
  69. throw new ClientException(
  70. 'The underlying transaction has been aborted by the server.'
  71. );
  72. }
  73. if (count($executed) !== count($commands)) {
  74. $expected = count($commands);
  75. $received = count($executed);
  76. throw new ClientException(
  77. "Invalid number of responses [expected $expected, received $received]."
  78. );
  79. }
  80. $responses = array();
  81. $sizeOfPipe = count($commands);
  82. $exceptions = $this->throwServerExceptions();
  83. for ($i = 0; $i < $sizeOfPipe; ++$i) {
  84. $command = $commands->dequeue();
  85. $response = $executed[$i];
  86. if (!$response instanceof ResponseInterface) {
  87. $responses[] = $command->parseResponse($response);
  88. } elseif ($response instanceof ErrorResponseInterface && $exceptions) {
  89. $this->exception($connection, $response);
  90. } else {
  91. $responses[] = $response;
  92. }
  93. unset($executed[$i]);
  94. }
  95. return $responses;
  96. }
  97. }