CompositeStreamConnection.php 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Predis\Command\CommandInterface;
  12. use Predis\Protocol\ProtocolProcessorInterface;
  13. use Predis\Protocol\Text\ProtocolProcessor as TextProtocolProcessor;
  14. /**
  15. * Connection abstraction to Redis servers based on PHP's stream that uses an
  16. * external protocol processor defining the protocol used for the communication.
  17. *
  18. * @author Daniele Alessandri <suppakilla@gmail.com>
  19. */
  20. class CompositeStreamConnection extends StreamConnection implements CompositeConnectionInterface
  21. {
  22. protected $protocol;
  23. /**
  24. * @param ParametersInterface $parameters Initialization parameters for the connection.
  25. * @param ProtocolProcessorInterface $protocol Protocol processor.
  26. */
  27. public function __construct(
  28. ParametersInterface $parameters,
  29. ProtocolProcessorInterface $protocol = null
  30. ) {
  31. $this->parameters = $this->assertParameters($parameters);
  32. $this->protocol = $protocol ?: new TextProtocolProcessor();
  33. }
  34. /**
  35. * {@inheritdoc}
  36. */
  37. public function getProtocol()
  38. {
  39. return $this->protocol;
  40. }
  41. /**
  42. * {@inheritdoc}
  43. */
  44. public function writeBuffer($buffer)
  45. {
  46. $this->write($buffer);
  47. }
  48. /**
  49. * {@inheritdoc}
  50. */
  51. public function readBuffer($length)
  52. {
  53. if ($length <= 0) {
  54. throw new \InvalidArgumentException('Length parameter must be greater than 0.');
  55. }
  56. $value = '';
  57. $socket = $this->getResource();
  58. do {
  59. $chunk = fread($socket, $length);
  60. if ($chunk === false || $chunk === '') {
  61. $this->onConnectionError('Error while reading bytes from the server.');
  62. }
  63. $value .= $chunk;
  64. } while (($length -= strlen($chunk)) > 0);
  65. return $value;
  66. }
  67. /**
  68. * {@inheritdoc}
  69. */
  70. public function readLine()
  71. {
  72. $value = '';
  73. $socket = $this->getResource();
  74. do {
  75. $chunk = fgets($socket);
  76. if ($chunk === false || $chunk === '') {
  77. $this->onConnectionError('Error while reading line from the server.');
  78. }
  79. $value .= $chunk;
  80. } while (substr($value, -2) !== "\r\n");
  81. return substr($value, 0, -2);
  82. }
  83. /**
  84. * {@inheritdoc}
  85. */
  86. public function writeRequest(CommandInterface $command)
  87. {
  88. $this->protocol->write($this, $command);
  89. }
  90. /**
  91. * {@inheritdoc}
  92. */
  93. public function read()
  94. {
  95. return $this->protocol->read($this);
  96. }
  97. /**
  98. * {@inheritdoc}
  99. */
  100. public function __sleep()
  101. {
  102. return array_merge(parent::__sleep(), array('protocol'));
  103. }
  104. }