Showing posts with label 0mq. Show all posts
Showing posts with label 0mq. Show all posts

Monday, July 17, 2017

Inter Process Communication Using PHP ,ZeroMQ AND React/ZMQ

push the external changes to existing web pages

The above diagram shows the any external php file or database layer changes has happen should be notify to all our web client. We can able to achieve this using React/ZMQ

Requirements

ZeroMQ

All Our client web pages will be listening to port 8080 for incoming WebSocket connections...but how will it also get updates from another PHP script or another server process event. Enter ZeroMQ. We could use raw sockets, like the ones Ratchet is built on, but ZeroMQ is a library that just makes sockets easier. install zmq

React/ZMQ

Ratchet is a WebSocket library built on top of a socket library called React. React handles connections and the raw I/O for Ratchet. In addition to React, which comes with Ratchet, we need another library that is part of the React suite: React/ZMQ. This library will bind ZeroMQ sockets to the Reactor core enabling us to handle both WebSockets and ZeroMQ sockets. To install, your composer.json file should look like this:

{
"autoload": {
"psr-0": {
"MyApp": "src"
}
},
"require": {
"cboden/ratchet": "0.3.*",
"react/zmq": "0.2.*|0.3.*"
}
}
view raw composer.json hosted with ❤ by GitHub
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
$loop = React\EventLoop\Factory::create();
$pusher = new MyApp\Pusher;
// Listen for the web server to make a ZeroMQ push after an ajax request
$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:55556'); // Binding to 127.0.0.1 means the only client that can connect is itself
$pull->on('message', array($pusher, 'onBlogEntry'));
// Set up our WebSocket server for clients wanting real-time updates
$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
$webServer = new Ratchet\Server\IoServer(
new Ratchet\Http\HttpServer(
new Ratchet\WebSocket\WsServer(
new Ratchet\Wamp\WampServer(
$pusher
)
)
),
$webSock
);
$loop->run();
<?php
namespace MyApp;
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;
class Chat implements MessageComponentInterface {
protected $clients;
public function __construct() {
$this->clients = new \SplObjectStorage;
}
public function onOpen(ConnectionInterface $conn) {
// Store the new connection to send messages to later
$this->clients->attach($conn);
echo "New connection! ({$conn->resourceId})\n";
}
public function onMessage(ConnectionInterface $from, $msg) {
$numRecv = count($this->clients) - 1;
echo sprintf('Connection %d sending message "%s" to %d other connection%s' . "\n"
, $from->resourceId, $msg, $numRecv, $numRecv == 1 ? '' : 's');
foreach ($this->clients as $client) {
if ($from !== $client) {
// The sender is not the receiver, send to each client connected
$client->send($msg);
}
}
}
public function onClose(ConnectionInterface $conn) {
// The connection is closed, remove it, as we can no longer send it messages
$this->clients->detach($conn);
echo "Connection {$conn->resourceId} has disconnected\n";
}
public function onError(ConnectionInterface $conn, \Exception $e) {
echo "An error has occurred: {$e->getMessage()}\n";
$conn->close();
}
}
<html>
<head>
<title>Testing</title>
<script src="autobahn.js"></script>
<script>
var conn = new ab.Session('ws://localhost:8080',function() {
conn.subscribe('kittensCategory', function(topic, data) {
// This is where you would add the new article to the DOM (beyond the scope of this tutorial)
console.log('New article published to category "' + topic + '" : ' + data.title);
});
},
function() {
console.warn('WebSocket connection closed');
},
{'skipSubprotocolCheck': true}
);
</script>
</head>
<body>
<h1> Sample Client web page </h1>
</body>
</html>
<?php
// post.php ???
// This all was here before ;)
$entryData = array(
'category' =>"kittensCategory"
, 'title' => "Ces Dialer"
, 'article' => "First Articale"
, 'when' => time()
);
// This is our new stuff
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:55556");
$socket->send(json_encode($entryData));
<?php
namespace MyApp;
use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;
class Pusher implements WampServerInterface {
/**
* A lookup of all the topics clients have subscribed to
*/
protected $subscribedTopics = array();
public function onSubscribe(ConnectionInterface $conn, $topic) {
echo "subscribe\n";
print_r($topic->getId());
$this->subscribedTopics[$topic->getId()] = $topic;
}
public function onUnSubscribe(ConnectionInterface $conn, $topic) {
}
public function onOpen(ConnectionInterface $conn) {
}
public function onClose(ConnectionInterface $conn) {
}
public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
// In this application if clients send data it's because the user hacked around in console
$conn->callError($id, $topic, 'You are not allowed to make calls')->close();
}
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
// In this application if clients send data it's because the user hacked around in console
$conn->close();
}
public function onError(ConnectionInterface $conn, \Exception $e) {
}
/**
* @param string JSON'ified string we'll receive from ZeroMQ
*/
public function onBlogEntry($entry) {
$entryData = json_decode($entry, true);
print_r($entryData);
// If the lookup topic object isn't set there is no one to publish to
if (!array_key_exists($entryData['category'], $this->subscribedTopics)) {
return;
}
$topic = $this->subscribedTopics[$entryData['category']];
// re-send the data to all the clients subscribed to that category
$topic->broadcast($entryData);
}
}
To Run the above application go to root directory "push" run below command to run the web socket connection on server side
php bin/push-server.php
Run the : "push\src\MyApp\clientwebpage.html" which connect to web socket listen to the specified category

PHP ZMQ extension Installation in Apache 2.4 & php 5.6


PHP ZMQ extension we will look at how you can easily distribute work to background processes, provide flexible service brokering for your next service oriented architecture, and manage caches efficiently and easily with just PHP and the ZeroMQ libraries. Whether the problem is asynchronous communication, message distribution, process management or just about anything, ZeroMQ can help you build an architecture that is more resilient, more scalable and more flexible, without introducing unnecessary overhead or requiring a heavyweight queue manager node..

Installing ZMQ in Windows

Required Softwares Apache 2.4 & PHP 5.6
  1. Download the zmq php extension from following url depends up on the installed php version
    url: ZMq Download Url
  2. Extract the folder
  3. Copy the php_zmq.dll to php/ext directory
  4. Then copy the libzmq.dll to the php/ root directory
  5. Add below line in php.ini file : extension=php_zmq.dll