MasterSlaveReplication.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection\Aggregate;
  11. use Predis\Command\CommandInterface;
  12. use Predis\Connection\NodeConnectionInterface;
  13. use Predis\Replication\ReplicationStrategy;
  14. /**
  15. * Aggregate connection handling replication of Redis nodes configured in a
  16. * single master / multiple slaves setup.
  17. *
  18. * @author Daniele Alessandri <suppakilla@gmail.com>
  19. */
  20. class MasterSlaveReplication implements ReplicationInterface
  21. {
  22. protected $strategy;
  23. protected $master;
  24. protected $slaves;
  25. protected $current;
  26. /**
  27. * {@inheritdoc}
  28. */
  29. public function __construct(ReplicationStrategy $strategy = null)
  30. {
  31. $this->slaves = array();
  32. $this->strategy = $strategy ?: new ReplicationStrategy();
  33. }
  34. /**
  35. * Checks if one master and at least one slave have been defined.
  36. */
  37. protected function check()
  38. {
  39. if (!isset($this->master) || !$this->slaves) {
  40. throw new \RuntimeException('Replication needs one master and at least one slave.');
  41. }
  42. }
  43. /**
  44. * Resets the connection state.
  45. */
  46. protected function reset()
  47. {
  48. $this->current = null;
  49. }
  50. /**
  51. * {@inheritdoc}
  52. */
  53. public function add(NodeConnectionInterface $connection)
  54. {
  55. $alias = $connection->getParameters()->alias;
  56. if ($alias === 'master') {
  57. $this->master = $connection;
  58. } else {
  59. $this->slaves[$alias ?: count($this->slaves)] = $connection;
  60. }
  61. $this->reset();
  62. }
  63. /**
  64. * {@inheritdoc}
  65. */
  66. public function remove(NodeConnectionInterface $connection)
  67. {
  68. if ($connection->getParameters()->alias === 'master') {
  69. $this->master = null;
  70. $this->reset();
  71. return true;
  72. } else {
  73. if (($id = array_search($connection, $this->slaves, true)) !== false) {
  74. unset($this->slaves[$id]);
  75. $this->reset();
  76. return true;
  77. }
  78. }
  79. return false;
  80. }
  81. /**
  82. * {@inheritdoc}
  83. */
  84. public function getConnection(CommandInterface $command)
  85. {
  86. if ($this->current === null) {
  87. $this->check();
  88. $this->current = $this->strategy->isReadOperation($command)
  89. ? $this->pickSlave()
  90. : $this->master;
  91. return $this->current;
  92. }
  93. if ($this->current === $this->master) {
  94. return $this->current;
  95. }
  96. if (!$this->strategy->isReadOperation($command)) {
  97. $this->current = $this->master;
  98. }
  99. return $this->current;
  100. }
  101. /**
  102. * {@inheritdoc}
  103. */
  104. public function getConnectionById($connectionId)
  105. {
  106. if ($connectionId === 'master') {
  107. return $this->master;
  108. }
  109. if (isset($this->slaves[$connectionId])) {
  110. return $this->slaves[$connectionId];
  111. }
  112. return;
  113. }
  114. /**
  115. * {@inheritdoc}
  116. */
  117. public function switchTo($connection)
  118. {
  119. $this->check();
  120. if (!$connection instanceof NodeConnectionInterface) {
  121. $connection = $this->getConnectionById($connection);
  122. }
  123. if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
  124. throw new \InvalidArgumentException('Invalid connection or connection not found.');
  125. }
  126. $this->current = $connection;
  127. }
  128. /**
  129. * {@inheritdoc}
  130. */
  131. public function getCurrent()
  132. {
  133. return $this->current;
  134. }
  135. /**
  136. * {@inheritdoc}
  137. */
  138. public function getMaster()
  139. {
  140. return $this->master;
  141. }
  142. /**
  143. * {@inheritdoc}
  144. */
  145. public function getSlaves()
  146. {
  147. return array_values($this->slaves);
  148. }
  149. /**
  150. * Returns the underlying replication strategy.
  151. *
  152. * @return ReplicationStrategy
  153. */
  154. public function getReplicationStrategy()
  155. {
  156. return $this->strategy;
  157. }
  158. /**
  159. * Returns a random slave.
  160. *
  161. * @return NodeConnectionInterface
  162. */
  163. protected function pickSlave()
  164. {
  165. return $this->slaves[array_rand($this->slaves)];
  166. }
  167. /**
  168. * {@inheritdoc}
  169. */
  170. public function isConnected()
  171. {
  172. return $this->current ? $this->current->isConnected() : false;
  173. }
  174. /**
  175. * {@inheritdoc}
  176. */
  177. public function connect()
  178. {
  179. if ($this->current === null) {
  180. $this->check();
  181. $this->current = $this->pickSlave();
  182. }
  183. $this->current->connect();
  184. }
  185. /**
  186. * {@inheritdoc}
  187. */
  188. public function disconnect()
  189. {
  190. if ($this->master) {
  191. $this->master->disconnect();
  192. }
  193. foreach ($this->slaves as $connection) {
  194. $connection->disconnect();
  195. }
  196. }
  197. /**
  198. * {@inheritdoc}
  199. */
  200. public function writeRequest(CommandInterface $command)
  201. {
  202. $this->getConnection($command)->writeRequest($command);
  203. }
  204. /**
  205. * {@inheritdoc}
  206. */
  207. public function readResponse(CommandInterface $command)
  208. {
  209. return $this->getConnection($command)->readResponse($command);
  210. }
  211. /**
  212. * {@inheritdoc}
  213. */
  214. public function executeCommand(CommandInterface $command)
  215. {
  216. return $this->getConnection($command)->executeCommand($command);
  217. }
  218. /**
  219. * {@inheritdoc}
  220. */
  221. public function __sleep()
  222. {
  223. return array('master', 'slaves', 'strategy');
  224. }
  225. }