Pipeline.php 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use Predis\ClientContextInterface;
  12. use Predis\ClientException;
  13. use Predis\ClientInterface;
  14. use Predis\Command\CommandInterface;
  15. use Predis\Connection\Aggregate\ReplicationInterface;
  16. use Predis\Connection\ConnectionInterface;
  17. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  18. use Predis\Response\ResponseInterface;
  19. use Predis\Response\ServerException;
  20. /**
  21. * Implementation of a command pipeline in which write and read operations of
  22. * Redis commands are pipelined to alleviate the effects of network round-trips.
  23. *
  24. * {@inheritdoc}
  25. *
  26. * @author Daniele Alessandri <suppakilla@gmail.com>
  27. */
  28. class Pipeline implements ClientContextInterface
  29. {
  30. private $client;
  31. private $pipeline;
  32. private $responses = array();
  33. private $running = false;
  34. /**
  35. * @param ClientInterface $client Client instance used by the context.
  36. */
  37. public function __construct(ClientInterface $client)
  38. {
  39. $this->client = $client;
  40. $this->pipeline = new \SplQueue();
  41. }
  42. /**
  43. * Queues a command into the pipeline buffer.
  44. *
  45. * @param string $method Command ID.
  46. * @param array $arguments Arguments for the command.
  47. *
  48. * @return $this
  49. */
  50. public function __call($method, $arguments)
  51. {
  52. $command = $this->client->createCommand($method, $arguments);
  53. $this->recordCommand($command);
  54. return $this;
  55. }
  56. /**
  57. * Queues a command instance into the pipeline buffer.
  58. *
  59. * @param CommandInterface $command Command to be queued in the buffer.
  60. */
  61. protected function recordCommand(CommandInterface $command)
  62. {
  63. $this->pipeline->enqueue($command);
  64. }
  65. /**
  66. * Queues a command instance into the pipeline buffer.
  67. *
  68. * @param CommandInterface $command Command instance to be queued in the buffer.
  69. *
  70. * @return $this
  71. */
  72. public function executeCommand(CommandInterface $command)
  73. {
  74. $this->recordCommand($command);
  75. return $this;
  76. }
  77. /**
  78. * Throws an exception on -ERR responses returned by Redis.
  79. *
  80. * @param ConnectionInterface $connection Redis connection that returned the error.
  81. * @param ErrorResponseInterface $response Instance of the error response.
  82. *
  83. * @throws ServerException
  84. */
  85. protected function exception(ConnectionInterface $connection, ErrorResponseInterface $response)
  86. {
  87. $connection->disconnect();
  88. $message = $response->getMessage();
  89. throw new ServerException($message);
  90. }
  91. /**
  92. * Returns the underlying connection to be used by the pipeline.
  93. *
  94. * @return ConnectionInterface
  95. */
  96. protected function getConnection()
  97. {
  98. $connection = $this->getClient()->getConnection();
  99. if ($connection instanceof ReplicationInterface) {
  100. $connection->switchTo('master');
  101. }
  102. return $connection;
  103. }
  104. /**
  105. * Implements the logic to flush the queued commands and read the responses
  106. * from the current connection.
  107. *
  108. * @param ConnectionInterface $connection Current connection instance.
  109. * @param \SplQueue $commands Queued commands.
  110. *
  111. * @return array
  112. */
  113. protected function executePipeline(ConnectionInterface $connection, \SplQueue $commands)
  114. {
  115. foreach ($commands as $command) {
  116. $connection->writeRequest($command);
  117. }
  118. $responses = array();
  119. $exceptions = $this->throwServerExceptions();
  120. while (!$commands->isEmpty()) {
  121. $command = $commands->dequeue();
  122. $response = $connection->readResponse($command);
  123. if (!$response instanceof ResponseInterface) {
  124. $responses[] = $command->parseResponse($response);
  125. } elseif ($response instanceof ErrorResponseInterface && $exceptions) {
  126. $this->exception($connection, $response);
  127. } else {
  128. $responses[] = $response;
  129. }
  130. }
  131. return $responses;
  132. }
  133. /**
  134. * Flushes the buffer holding all of the commands queued so far.
  135. *
  136. * @param bool $send Specifies if the commands in the buffer should be sent to Redis.
  137. *
  138. * @return $this
  139. */
  140. public function flushPipeline($send = true)
  141. {
  142. if ($send && !$this->pipeline->isEmpty()) {
  143. $responses = $this->executePipeline($this->getConnection(), $this->pipeline);
  144. $this->responses = array_merge($this->responses, $responses);
  145. } else {
  146. $this->pipeline = new \SplQueue();
  147. }
  148. return $this;
  149. }
  150. /**
  151. * Marks the running status of the pipeline.
  152. *
  153. * @param bool $bool Sets the running status of the pipeline.
  154. *
  155. * @throws ClientException
  156. */
  157. private function setRunning($bool)
  158. {
  159. if ($bool && $this->running) {
  160. throw new ClientException('The current pipeline context is already being executed.');
  161. }
  162. $this->running = $bool;
  163. }
  164. /**
  165. * Handles the actual execution of the whole pipeline.
  166. *
  167. * @param mixed $callable Optional callback for execution.
  168. *
  169. * @throws \Exception
  170. * @throws \InvalidArgumentException
  171. *
  172. * @return array
  173. */
  174. public function execute($callable = null)
  175. {
  176. if ($callable && !is_callable($callable)) {
  177. throw new \InvalidArgumentException('The argument must be a callable object.');
  178. }
  179. $exception = null;
  180. $this->setRunning(true);
  181. try {
  182. if ($callable) {
  183. call_user_func($callable, $this);
  184. }
  185. $this->flushPipeline();
  186. } catch (\Exception $exception) {
  187. // NOOP
  188. }
  189. $this->setRunning(false);
  190. if ($exception) {
  191. throw $exception;
  192. }
  193. return $this->responses;
  194. }
  195. /**
  196. * Returns if the pipeline should throw exceptions on server errors.
  197. *
  198. * @return bool
  199. */
  200. protected function throwServerExceptions()
  201. {
  202. return (bool) $this->client->getOptions()->exceptions;
  203. }
  204. /**
  205. * Returns the underlying client instance used by the pipeline object.
  206. *
  207. * @return ClientInterface
  208. */
  209. public function getClient()
  210. {
  211. return $this->client;
  212. }
  213. }