PHP實現RabbitMQ消息列隊的示例代碼
業務場景
項目公司是主php做開發的,框架為thinkphp。眾所周知,php本身的運行效率存在一定的缺陷,所以如果有一個很復雜很耗時的業務時,必須開發一個常駐內存的程序。首先我想到瞭php的workerman與swoole,但是這裡應上面的標題哈,想將耗時任務交給另一個服務器,同時列隊處理。所以這裡我想獨立部署一個rabbitMQ服務器用於處理列隊任務。
當rabbitMQ服務器我們準備好瞭,建立瞭一個持久化命名為ceshi的列隊,如下:
項目上生產者和消費者的開發我這裡全部采用tinkphp6+workerman,為便於管理。這裡這麼做也是因為發現workerman中對rabbitMQ的文檔解釋太少瞭!
所以開始踩坑!
1、首先部署好thinkphp6框架
過程去看thinkphp6手冊
2、安裝workerman擴展
過程去看thinkphp6手冊
3、生產者
配置一個workerman類
創建的Send類代碼如下:
<?php namespace app\workerman; use Bunny\Channel; use Workerman\RabbitMQ\Client; use think\worker\Server; class Send extends Server { //websocket地址,一會用於測試。 protected $socket = 'websocket://127.0.0.1:2345'; /** * 收到信息 * @param $connection * @param $data */ public function onMessage($connection, $data) { //websocket發送過來的消息 $connection->send('我收到你的信息瞭:'.$data); //rabbitMQ配置 $options = [ 'host'=>'127.0.0.1',//rabbitMQ IP 'port'=>5672,//rabbitMQ 通訊端口 'user'=>'admin',//rabbitMQ 賬號 'password'=>'123456'//rabbitMQ 密碼 ]; (new Client($options))->connect()->then(function (Client $client) { return $client->channel(); })->then(function (Channel $channel) { /** * 創建隊列(Queue) * name: ceshi // 隊列名稱 * passive: false // 如果設置true存在則返回OK,否則就報錯。設置false存在返回OK,不存在則自動創建 * durable: true // 是否持久化,設置false是存放到內存中RabbitMQ重啟後會丟失, * 設置true則代表是一個持久的隊列,服務重啟之後也會存在,因為服務會把持久化的Queue存放在硬盤上,當服務重啟的時候,會重新加載之前被持久化的Queue * exclusive: false // 是否排他,指定該選項為true則隊列隻對當前連接有效,連接斷開後自動刪除 * auto_delete: false // 是否自動刪除,當最後一個消費者斷開連接之後隊列是否自動被刪除 */ return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) { return $channel; }); })->then(function (Channel $channel) use($data){ echo "發送消息內容:".$data."\n"; /** * 發送消息 * body 發送的數據 * headers 數據頭,建議 ['content_type' => 'text/plain'],這樣消費端是springboot註解接收直接是字符串類型 * exchange 交換器名稱 * routingKey 路由key * mandatory * immediate * @return bool|PromiseInterface|int */ return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) { return $channel; }); })->then(function (Channel $channel) { //echo " [x] Sent 'Hello World!'\n"; $client = $channel->getClient(); return $channel->close()->then(function () use ($client) { return $client; }); })->then(function (Client $client) { $client->disconnect(); }); } /** * 當連接建立時觸發的回調函數 * @param $connection */ public function onConnect($connection) { } /** * 當連接斷開時觸發的回調函數 * @param $connection */ public function onClose($connection) { } /** * 當客戶端的連接上發生錯誤時觸發 * @param $connection * @param $code * @param $msg */ public function onError($connection, $code, $msg) { echo "error $code $msg\n"; } /** * 每個進程啟動 * @param $worker */ public function onWorkerStart($worker) { } }
上述都OK以後咱們可以項目路徑下通過命令啟動這個生產者:
php think worker:server
測試發送數據:
通過這個網站
連接【ws://127.0.0.1:2345】後發送數據!
前往rabbitMQ控制臺
列隊中有一條消息產生並且等待瞭!
這個時候你可能問,如果我發送數據不想通過ws發送而是接口發送怎麼辦?
笨思路唄:接口給內置服務器發消息->內置服務去發消息給rabbitMQ
將協議改為tcp
然後重新啟動服務
然後去tp6創建一個路由接口
接口代碼
<?php namespace app\controller; use app\BaseController; class Index extends BaseController { public function index(string $msg) { //連接本地tcp服務 $client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1); //發送字符串 fwrite($client, $msg."\n"); //斷開服務 fclose($client); return 'OK'; } }
執行結果:
說明接口成功的將數據發送給瞭本地內置的tcp服務。
同時,內置服務將收到的數據給瞭rabbitMQ服務列隊中。
生產者完成。
4、消費者
同生產者一樣新創建一個thinkphp6及安裝workerman擴展,註意端口別和生產者沖突!這裡我設置的是2346端口
創建的Receive類代碼如下:
<?php namespace app\workerman; use Bunny\Channel; use Bunny\Message; use Workerman\RabbitMQ\Client; use think\worker\Server; class Receive extends Server { protected $socket = 'tcp://127.0.0.1:2346'; /** * 收到信息 * @param $connection * @param $data */ public function onMessage($connection, $data) { } /** * 當連接建立時觸發的回調函數 * @param $connection */ public function onConnect($connection) { } /** * 當連接斷開時觸發的回調函數 * @param $connection */ public function onClose($connection) { } /** * 當客戶端的連接上發生錯誤時觸發 * @param $connection * @param $code * @param $msg */ public function onError($connection, $code, $msg) { echo "error $code $msg\n"; } /** * 每個進程啟動 * @param $worker */ public function onWorkerStart($worker) { //rabbitMQ配置 $options = [ 'host'=>'127.0.0.1',//rabbitMQ IP 'port'=>5672,//rabbitMQ 通訊端口 'user'=>'admin',//rabbitMQ 賬號 'password'=>'123456'//rabbitMQ 密碼 ]; (new Client($options))->connect()->then(function (Client $client) { return $client->channel(); })->then(function (Channel $channel) { /** * 創建隊列(Queue) * name: ceshi // 隊列名稱 * passive: false // 如果設置true存在則返回OK,否則就報錯。設置false存在返回OK,不存在則自動創建 * durable: true // 是否持久化,設置false是存放到內存中RabbitMQ重啟後會丟失, * 設置true則代表是一個持久的隊列,服務重啟之後也會存在,因為服務會把持久化的Queue存放在硬盤上,當服務重啟的時候,會重新加載之前被持久化的Queue * exclusive: false // 是否排他,指定該選項為true則隊列隻對當前連接有效,連接斷開後自動刪除 * auto_delete: false // 是否自動刪除,當最後一個消費者斷開連接之後隊列是否自動被刪除 */ return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) { return $channel; }); })->then(function (Channel $channel) { echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $channel->consume( function (Message $message, Channel $channel, Client $client) { echo "接收消息內容:", $message->content, "\n"; }, 'ceshi', '', false, true ); }); } }
都OK以後咱們可以項目路徑下通過命令啟動這個消費者:
php think worker:server
此時應該會自動消費掉rabbitMQ中等待的消息!
到這裡消費者也就結束啦!
5、整體測試
接下來我用cmd來啟動兩個服務,然後用接口發送消息和消費測試!
至於具體怎麼靈活應用自行開拓大腦哦~
比如php項目有些業務吃力,可以去做個java的消費端,讓java來完成任務~
以上就是PHP實現RabbitMQ消息列隊的示例代碼的詳細內容,更多關於PHP RabbitMQ消息列隊的資料請關註LevelAH其它相關文章!
推薦閱讀:
- php中Workerman框架實例講解
- Java RabbitMQ的工作隊列與消息應答詳解
- RabbitMQ冪等性與優先級及惰性詳細全面講解
- 高級消息隊列協議AMQP簡介
- C#利用RabbitMQ實現點對點消息傳輸