callError($id, $topic, 'You are not allowed to make calls')->close();
}
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
// In this application if clients send data it's because the user hacked around in console
$conn->close();
}
public function onError(ConnectionInterface $conn, \Exception $e) {
}
}
これを ''/src/Pusher.php'' に保存します。WAMPに必要なメソッドを作成し、誰からもデータを送信されないようにし、送信された場合はその接続を閉じます。ここでは、プッシュアプリケーションを作成しており、WebSocketからの受信メッセージは受け付けていません。これらのメッセージはすべてAJAXから送信されます。
\\
==== ブログ送信の編集 ====
次に、新しいブログ投稿を処理する既存のWebサイトのコードに ZeroMQ のおまじないを少し追加します。ここでのコードは、あなたが実際にブログで設置している Drupal や WordPress のような高度なアーキテクチャと比較すると、少し基本的で古風なものかもしれませんが、ここでは基本的なことに焦点を当てています。
$_POST['category']
, 'title' => $_POST['title']
, 'article' => $_POST['article']
, 'when' => time()
);
$pdo->prepare("INSERT INTO blogs (title, article, category, published) VALUES (?, ?, ?, ?)")
->execute($entryData['title'], $entryData['article'], $entryData['category'], $entryData['when']);
// This is our new stuff
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");
$socket->send(json_encode($entryData));
ブログ投稿をデータベースに記録した後、ソケットサーバーへの ZeroMQ 接続を開き、投稿と同じ情報を含むシリアル化されたメッセージを配信します。(注:適切な消毒を行ってください。これはとりあえずの汚い例です)
\\
==== ZeroMQ メッセージの処理 ====
アプリケーションスタブクラスに戻りましょう。そのままにしておくと、WebSocket 接続のみを処理することになります。前回のコードスニペットで見たように、データを送信するポート ''5555'' でローカルホストへの接続を開きました。ZeroMQメッセージに対する処理を追加し、WebSocketクライアントに再送信するようにしましょう。
subscribedTopics[$topic->getId()] = $topic;
}
/**
* @param string JSON'ified string we'll receive from ZeroMQ
*/
public function onBlogEntry($entry) {
$entryData = json_decode($entry, true);
// If the lookup topic object isn't set there is no one to publish to
// トピック探索オブジェクトがセットされていない場合、公開する相手はありません
if (!array_key_exists($entryData['category'], $this->subscribedTopics)) {
return;
}
$topic = $this->subscribedTopics[$entryData['category']];
// re-send the data to all the clients subscribed to that category
// そのカテゴリを購読している全てのクライアントにデータを再送信します
$topic->broadcast($entryData);
}
/* The rest of our methods were as they were, omitted from docs to save space
残りのメソッドはそのままで、スペースを節約するためにドキュメントから省略しました
*/
}
\\
==== 全てを結合する ====
これまで、メッセージの送信、受信、および処理のすべてのロジックについて説明してきました。次に、すべてを結合して、すべてを管理する実行可能なスクリプトを作成します。I/O、WebSocket、WAMP、ZeroMQ の各コンポーネントを使用してRatchetアプリケーションを構築し、イベントループを実行します。
getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:5555'); // Binding to 127.0.0.1 means the only client that can connect is itself
$pull->on('message', array($pusher, 'onBlogEntry'));
// Set up our WebSocket server for clients wanting real-time updates
// リアルタイム更新が必要なクライアント向けにWebSocketサーバーをセットアップします
$webSock = new React\Socket\Server('0.0.0.0:8080', $loop); // Binding to 0.0.0.0 means remotes can connect
$webServer = new Ratchet\Server\IoServer(
new Ratchet\Http\HttpServer(
new Ratchet\WebSocket\WsServer(
new Ratchet\Wamp\WampServer(
$pusher
)
)
),
$webSock
);
$loop->run();
コードを ''/bin/push-server.php'' として保存し、実行します。
$ php bin/push-server.php
\\
==== クライアントサイド ====
サーバーサイドのコードが完成し、稼働しているので、これらのリアルタイムの投稿を取得する時が来ました!これらの更新で具体的に何を行うのかは、このドキュメントの範囲を超えています。これらのメッセージをデバッグコンソールに出力するだけにします。
最後に、このJavascriptを配置したページを1つのブラウザウィンドウで開き、別のブラウザから「kittensCategory」にブログエントリを投稿して、最初からコンソールのログを確認します。それが機能していたら、次のステップは、受信したデータを操作性の良いDOMの中に組み込むことです。
これがローカルで機能している場合(ローカルホストが開発環境であると想定)、ローカルホストの参照と、場合によっては適切なサーバーホスト名/IPアドレスへの結合を変更することができます。
\\
===== チュートリアルの実行 =====
本項ではチュートリアルの実行について説明します。[[#チュートリアルの説明|前項]]でも述べたように、筆者の環境では ZeroMQ を使った実装がWindowsで出来なかったので、以下の環境に変更してチュートリアルの実行を行いました。
* サーバ側は、''React\ZMQ'' から ''React\Socket'' に変更(ZeroMQ でなくて、rawソケットを使用)
* ブログポストからは ZeroMQ によるプッシュを PHPのソケット関数に変更
* ZeroMQ を使わないので、PHPは7.2から7.3に変更
以下では、実行で使用したプロジェクトとソースコードを掲載しますが、変更点以外は、前項の[[#チュートリアルの説明]]と同じです。
\\
==== プロジェクトの作成 ====
まず、プロジェクトフォルダ(以下 ''{Project-Folder}'' と呼びます)を作成し、そこにComposerファイルを設置します。
{
"autoload": {
"psr-4": {
"MyApp\\": "src"
}
},
"require": {
"cboden/ratchet": "^0.4"
}
}
上の設定では、プロジェクトフォルダ下の ''src'' フォルダにアプリケーションを設置し、そこを ''MyApp'' 名前空間として定義します。
Composer の規則に従い、ratchet は ''vender/cboden/ratchet'' の下に保存されます。
また、プロジェクトフォルダ下に以下のフォルダを作成します:
* ''bin'' --- WebSocketsサーバー用のスクリプトを格納します(CLIモード)
* ''src'' --- Pusherクラスを格納します
* ''blog'' --- ブログ側のソースコード一式
準備が出来たら、コマンドプロンプトでプロジェクトフォルダに移動して、''composer install'' を実行します。
\\
==== WebSockサーバー ====
以下のWebSockサーバーを ''{Project-Folder}/bin'' の下に配置します。
{{fa>folder-open-o}} ** {Project-Folder}/bin/push-server.php **
on('connection', function (ConnectionInterface $conn) use ($pusher) {
// ソケットでメッセージを受信した時
$conn->on('data', function ($data) use ($conn, $pusher) {
//ブログ投稿の更新をプッシュします
$pusher->onBlogEntry($data);
$conn->close();
});
});
// イベントループ
$loop->run();
* 前半のWebSocketサーバーのセットアップの部分は[[#全てを結合する|オリジナル]]のコードと同じです。
* 後半部分は、プッシュ用のメッセージ受信のために着信TCPソケットをセットアップし、それをWebSocketサーバーと同じイベントループで処理しています。
* プッシュ用のメッセージを受信した時、Pusher の onBlogEntry() を明示的に呼び出しています。
Pusherクラスについては、デバッグライトを追加した以外は、[[#ZeroMQ メッセージの処理|オリジナル]]のコードと同じです。これは、''{Project-Folder}/src'' に配置します。
{{fa>folder-open-o}} ** {Project-Folder}/src/Pusher.php **
remoteAddress},{$conn->resourceId},'{$topic->getId()}')\n";
$this->subscribedTopics[$topic->getId()] = $topic;
}
/**
* @param string ZeroMQから受信したJSON化された文字列
*/
public function onBlogEntry($entry) {
echo "onBlogEntry():\n";
$entryData = json_decode($entry, true);
// トピック探索オブジェクトがセットされていない場合、公開する相手はありません
if (!array_key_exists($entryData['category'], $this->subscribedTopics)) {
return;
}
$topic = $this->subscribedTopics[$entryData['category']];
// そのカテゴリを購読している全てのクライアントにデータを再送信します
$topic->broadcast($entryData);
echo json_encode($entryData)."\n";
}
public function onUnSubscribe(ConnectionInterface $conn, $topic) {
echo "onUnSubscribe({$conn->remoteAddress},{$conn->resourceId})\n";
}
public function onOpen(ConnectionInterface $conn) {
echo "onOpen({$conn->remoteAddress},{$conn->resourceId})\n";
}
public function onClose(ConnectionInterface $conn) {
echo "onClose({$conn->remoteAddress},{$conn->resourceId})\n";
}
public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
echo "onCall({$conn->remoteAddress},{$conn->resourceId})\n";
// このアプリケーションでは、クライアントがデータを送信する場合、それはユーザーがコンソールでハッキングしたためです
$conn->callError($id, $topic, 'You are not allowed to make calls')->close();
}
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
echo "onPublish({$conn->remoteAddress},{$conn->resourceId})\n";
// このアプリケーションでは、クライアントがデータを送信する場合、それはユーザーがコンソールでハッキングしたためです
$conn->close();
}
public function onError(ConnectionInterface $conn, \Exception $e) {
echo "onError({$conn->remoteAddress},{$conn->resourceId})\n";
}
}
\\
==== ブログ投稿ページ ====
新しいブログ投稿を処理する為のコードは、新規に作成し直しました。このコードは ''{Project-Folder}/blog'' に配置します。
{{fa>folder-open-o}} ** {Project-Folder}/blog/entry.php **
$_POST['category']
, 'title' => $_POST['title']
, 'article' => $_POST['article']
, 'when' => time()
);
$instance = stream_socket_client('tcp://127.0.0.1:5555');
fwrite($instance, json_encode($entryData));
header("Location: {$_SERVER['REQUEST_URI']}");
exit();
}
?>
Blog Stab
* ブログが投稿された時(POSTメソッドでは)、WebSocketサーバーへPHPのソケット関数(stream_socket_client)を使い、メッセージを送信しています。\\ \\
* GETメソッドでこのスクリプトが呼び出された場合は、投稿用のHTMLをレンダリングします:
* ブラウザ表示用のフレームワークとしてbootstrapを使用しています。
* 投稿ページは ''entry.template.php'' に保存されているものを include() して使用します。
\\
==== クライアントサイド ====
クライアントサイドのメインのソースコードは上に掲載した [[#ブログ投稿ページ|entry.php]] 内のHTMLです。メインで使っている他のソースコードは以下の通りです。
* template/entry.template.php --- ブログ投稿画面のHTML
* js/main.js --- WebSocketとの接続、購読を行うJavaScript
* js/autobahn.js --- WAMP(V1)のJavaScriptライブラリー
以下は、ブログ投稿画面のHTMLです。このコードは ''{Project-Folder}/blog/template'' に配置します。
{{fa>folder-open-o}} ** {Project-Folder}/blog/template/entry.template.php **
Blog Entry
以下は、WebSocketとの接続、購読を行うJavaScriptです。このコードは ''{Project-Folder}/blog/js'' に配置します。
{{fa>folder-open-o}} ** {Project-Folder}/blog/js/main.js **
/*
* Subscribe Session
*/
var conn = new ab.Session('ws://localhost:8080',
function() {
conn.subscribe('kittensCategory', function(topic, data) {
console.log('New article published to category "' + topic + '" : ' + data.title);
$('#new-articles').prepend(''+data.title+' '+data.article+' ');
});
},
function() {
console.warn('WebSocket connection closed');
},
{'skipSubprotocolCheck': true}
);
* ページロード時に、WebSocketサーバー(
{Project-Folder}/blog/js/autobahn.js
このライブラリーは、以下より取得して設置しました。
https://gist.githubusercontent.com/cboden/fcae978cfc016d506639c5241f94e772/raw/e974ce895df527c83b8e010124a034cfcf6c9f4b/autobahn.js
この autobahn.js は、WAMP のバージョン1対応のライブラリです。Ratchet は WAMP のバージョン2をサポートしていないので、敢えて [[https://github.com/crossbario/autobahn-js|AutobahnJS]] の古いライブラリーを使用しています。
''補足''
WAMPとは WebSocket Application Messaging Protocol の略で、次の機能を提供するWebSocketのサブプロトコルです。
* ルーティングされたリモートプロシージャコール
* パブリッシュ & サブスクライブ
詳しくは以下を参照:
* https://wamp-proto.org/
\\
==== プログラムの実行 ====
コマンドプロンプトでプロジェクトフォルダに移動してWebSocketサーバーを起動します。
php bin/push-server.php
ブラウザで ''entry.php'' を2つのブラウザーからアクセスします。一方のブラウザでプログを投稿すると、その内容は他方のブラウザに表示されます。
[{{:ratchet:0.4:rachet-push02.png?nolink&400}}]
[{{:ratchet:0.4:rachet-push03.png?nolink&400}}]
\\