ConnectionErrorProof.php 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use Predis\CommunicationException;
  12. use Predis\Connection\Aggregate\ClusterInterface;
  13. use Predis\Connection\ConnectionInterface;
  14. use Predis\Connection\NodeConnectionInterface;
  15. use Predis\NotSupportedException;
  16. /**
  17. * Command pipeline that does not throw exceptions on connection errors, but
  18. * returns the exception instances as the rest of the response elements.
  19. *
  20. * @todo Awful naming!
  21. *
  22. * @author Daniele Alessandri <suppakilla@gmail.com>
  23. */
  24. class ConnectionErrorProof extends Pipeline
  25. {
  26. /**
  27. * {@inheritdoc}
  28. */
  29. protected function getConnection()
  30. {
  31. return $this->getClient()->getConnection();
  32. }
  33. /**
  34. * {@inheritdoc}
  35. */
  36. protected function executePipeline(ConnectionInterface $connection, \SplQueue $commands)
  37. {
  38. if ($connection instanceof NodeConnectionInterface) {
  39. return $this->executeSingleNode($connection, $commands);
  40. } elseif ($connection instanceof ClusterInterface) {
  41. return $this->executeCluster($connection, $commands);
  42. } else {
  43. $class = get_class($connection);
  44. throw new NotSupportedException("The connection class '$class' is not supported.");
  45. }
  46. }
  47. /**
  48. * {@inheritdoc}
  49. */
  50. protected function executeSingleNode(NodeConnectionInterface $connection, \SplQueue $commands)
  51. {
  52. $responses = array();
  53. $sizeOfPipe = count($commands);
  54. foreach ($commands as $command) {
  55. try {
  56. $connection->writeRequest($command);
  57. } catch (CommunicationException $exception) {
  58. return array_fill(0, $sizeOfPipe, $exception);
  59. }
  60. }
  61. for ($i = 0; $i < $sizeOfPipe; ++$i) {
  62. $command = $commands->dequeue();
  63. try {
  64. $responses[$i] = $connection->readResponse($command);
  65. } catch (CommunicationException $exception) {
  66. $add = count($commands) - count($responses);
  67. $responses = array_merge($responses, array_fill(0, $add, $exception));
  68. break;
  69. }
  70. }
  71. return $responses;
  72. }
  73. /**
  74. * {@inheritdoc}
  75. */
  76. protected function executeCluster(ClusterInterface $connection, \SplQueue $commands)
  77. {
  78. $responses = array();
  79. $sizeOfPipe = count($commands);
  80. $exceptions = array();
  81. foreach ($commands as $command) {
  82. $cmdConnection = $connection->getConnection($command);
  83. if (isset($exceptions[spl_object_hash($cmdConnection)])) {
  84. continue;
  85. }
  86. try {
  87. $cmdConnection->writeRequest($command);
  88. } catch (CommunicationException $exception) {
  89. $exceptions[spl_object_hash($cmdConnection)] = $exception;
  90. }
  91. }
  92. for ($i = 0; $i < $sizeOfPipe; ++$i) {
  93. $command = $commands->dequeue();
  94. $cmdConnection = $connection->getConnection($command);
  95. $connectionHash = spl_object_hash($cmdConnection);
  96. if (isset($exceptions[$connectionHash])) {
  97. $responses[$i] = $exceptions[$connectionHash];
  98. continue;
  99. }
  100. try {
  101. $responses[$i] = $cmdConnection->readResponse($command);
  102. } catch (CommunicationException $exception) {
  103. $responses[$i] = $exception;
  104. $exceptions[$connectionHash] = $exception;
  105. }
  106. }
  107. return $responses;
  108. }
  109. }