php消息隊列實現詳解
常見進程通信方式
System V IPC 總的包括:消息隊列,共享內存、信號量。
IPC(內部進程間通信)的使用註意:
- IPC資源僅在本機中使用,不能夠跨網絡使用(其實進程間通信方式除瞭socket 通信方式其他都是僅在本機中使用)。
- IPC的資源生存周期與內核相同。除非刪除,不然會與系統的生存周期相同。(也就是說如果你不主動刪除創建的ipc資源,那麼它會一直存在,除非系統關機,它才會被清除)
- (重要)每個IPC都有一個關鍵字key。每個IPC資源都有唯一的整型標識符,進程可以使用id對此資源進行訪問。
系統V IPC消息隊列
消息隊列實際上是一個隊列,由內核維護,使用msgget (msgget() 函數是linux系統調用函數,由c語言編寫) 函數來創建一個消息隊列,創建成功返回隊列 ID
在php 中通過封裝 system V IPC
函數來實現操作消息隊列,共享內存,與信號量。我們可以通過php官方文檔上查看這些函數的定義,其中msg_
為首的函數是用來操作消息隊列,sem_
為首的函數是對信號量操作,shm_
為首的函數是對共享內存的操作
其實 php 這些進程擴展說到底就是對 linux c 系統調用函數與c標準函數庫的封裝,然後在加一點php語言自己的處理,封裝成新的函數供 廣大 phper 調用,隻要在linux 下運行 不管你是 GO, JAVA ,python 其實都是對系統調用函數的封裝,隻是封裝後的名字不同,但是調用的都是linux 內核提供的同一套接口
php創建一個消息隊列
<?php $key = ftok('demo1.php',"x");//將文件與ID轉換為一個key,這個文件隻要真實存在就行,沒有特殊限制,一般都指定當前文件名 $msqid = msg_get_queue($key);// 創建一個消息隊列,返回一個資源id echo msg_send($msqid,1,"hello"); //往消息隊列寫入一條數據 hello 到隊列中 echo $msqid; // 輸出資源id
如何查看創建的消息隊列呢?
我們可用通過linux 提供的 ipcs
命令查看
如圖,第一列 Message Queues
代表消息隊列
key | msqid | owner | perms | used-bytes | messages |
---|---|---|---|---|---|
0x7801620f | 5 | root | 666 | 12 | 1 |
外部標識由調用函數時傳入 | 消息隊列標識id(用於進程內部通訊)每個隊列都有唯一標識,用於區分不同消息隊列 | 創建消息隊列用戶 | 操作權限位 | 消息隊列使用瞭多少字節 | 消息隊列有多少條消息 |
通過表格分析我們剛剛創建的消息隊列是一個,消息標識符為5
創建用戶為root ,操作權限為666,寫入瞭一條數據,占用瞭12個字節的隊列。
看到這,大傢可能發現兩個問題?
第一個問題:
為什麼我調用 php 函數 msg_get_queue()
創建消息隊列返回的不是消息隊列標識符5
,而是 Resource id #4
因為之前說過 php 函數其實是對linux c 系統調用函數的封裝,也就是說 msg_get_queue
函數 其實是對 msgget
系統調用函數的封裝,調用msgget
函數的返回內容應該是 消息隊列標識id,而現在封裝 msgget
函數的 msg_get_queue
函數返回的卻是資源描述符,這是為什麼?
這其實就是php內部做瞭處理,返回資源描述符,使得 msg_get_queue
可以配合其他php
函數一起使用,但內部調用msgget
函數返回的一定是 5
,也就是消息隊列標識符
如果不相信可使用 linux 下提供的 strace 工具
跟蹤系統調用。
第二個問題
通過php函數 msg_send
寫入瞭一條數據,數據內容為 ”hello“
字節長度應該是5個字節,但是消息隊列占用的字節長度缺少12個字節?
這是因為php 內部實現對寫入數據做瞭序列化操作,導致寫入消息隊列的字節長度為12而不是5個字節長度
<?php $a = serialize("hello"); echo 'hello 序列化長度:'.strlen($a)."\n"; echo 'hello 序列化內容:'.$a."\n"; echo "hello 未序列化長度:".strlen("hello")."\n";
當然如果你不想 msg_send
函數寫入隊列前對數據進行序列化,可以把第3個參數設置為false,不過需要註意的是,如果寫入數據不進行序列化,那麼使用 msg_receive
函數讀取隊列數據時也必須設置為不反序列化操作,不然會引發錯誤
讀取隊列內容
<?php $key = ftok('demo1.php',"x");//將文件與ID轉換為一個key $msqid = msg_get_queue($key);//消息隊列如果已創建,直接返回同一個資源描述符 //第一個參數資源描述符,第二個參數一般為0,表示從隊列第一條讀取,或者設置與msg_send函數第二個參數一致,第三個參數設置緩沖區長度,註意msg_send 函數默認會序列化數據,msg_receive函數默認會將讀取數據反序列化,如果緩沖區設置過小會反序列化失敗 msg_receive($msqid,1,$received_message_type,1024,$message);//讀取消息隊列數據,逐條讀取, echo $message.PHP_EOL;
成功從消息隊列中讀取出一條數據,內容hello
因為消息隊列內容被讀取,消息隊列占用字節被清空為0,消息隊列消息條數也為0
通過消息隊列的讀寫操作我們發現,他與上一篇的管道有什麼區別?
消息隊列不像管道通信寫入數據,必須讀端要打開,讀數據,寫端必須要打開,不然會阻塞無法通信,消息隊列可以隨時往隊列寫入數據,讀取數據
關閉序列化功能
寫端
<?php $key = ftok('demo3.php',"x");//將文件與ID轉換為一個key,這個文件隻要真實存在就行,沒有特殊限制,一般都指定當前文件名 $msqid = msg_get_queue($key);// 創建一個消息隊列,返回一個資源id // 第一個參數資源id,第二個參數消息類型,必須大於0,第三個參數寫入隊列內容,取消序列化操作 echo msg_send($msqid,1,"hello",false); //往消息隊列寫入一條數據 hello 到隊列中
因為沒有對寫入數據進行序列化操作,消息隊列占用的字節長度變成瞭5個字節。
讀端
<?php $key = ftok('demo3.php',"x");//將文件與ID轉換為一個key $msqid = msg_get_queue($key); //第一個參數資源描述符 //第二個參數一般為0,表示從隊列第一條讀取,或者設置與msg_send函數第二個參數一致 // 第三個參數是msg_send函數的第二個參數的值,也就是發送消息的消息類型 //第四個參數設置接收緩沖區長度,如果設置過小會反序列化失敗【如果使用序列化功能時候】 //當接收緩沖區大小設置過小並且關閉瞭反序列化【發送端發送數據也關閉瞭】,但是發送的數據超過瞭設置的長度,就會被截斷,並且額外的數據會被丟棄,必須設置第五個參數才有效 // 第五個參數 flags 標志符 msg_receive($msqid,0,$received_message_type,1,$message,false,MSG_NOERROR); echo $message.PHP_EOL;
隻讀取到一個h
是因為接受緩沖區長度設置為1個字節,所以隻接收到一個字節,其它部分被丟棄。
非阻塞讀取消息隊列
當隊列內沒有數據,讀取隊列msg_receive
函數是會阻塞的如何讓隊列無數據讀端也不阻塞繼續往下執行,可以通過 設置 flags 標志符 為 MSG_IPC_NOWAIT
,該標志符作用是,不管隊列有無數據立即返回結果
<?php $key = ftok('demo5.php',"x");//將文件與ID轉換為一個key $msqid = msg_get_queue($key); // fork 一個子進程 $pid = pcntl_fork(); if($pid == 0){ while(1){ // MSG_IPC_NOWAIT【啟用非阻塞】 msg_receive 函數其實是封裝 linux c msgrcv 系統調用函數,如果設置為非阻塞,函數調用次數非常高,會非常消耗cpu資源 //阻塞模式調用函數次數低,有數據才返回,對cpu友好 $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code); if($error_code != MSG_ENOMSG){ echo $msg."\n"; } sleep(2); echo "go"."\n"; } exit(0); }
父子進程消息隊列通信
<?php $key = ftok('demo5.php',"x");//將文件與ID轉換為一個key $msqid = msg_get_queue($key); $pid = pcntl_fork(); if($pid == 0){ while(1){ $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code); if($error_code != MSG_ENOMSG){ echo $msg."\n"; } } exit(0); } $i = 1; while(1){ msg_send($msqid,2,"hello world",true); sleep(2); if($i++ == 3){ // 殺死子進程 posix_kill($pid,SIGILL); break; } } //子進程退出回收子進程,防止孤兒進程 $pid = pcntl_wait($status); if($pid > 0) { echo "exit pid=".$pid."\n"; }
刪除消息隊列
<?php $key = ftok('demo5.php',"x");//將文件與ID轉換為一個key $msqid = msg_get_queue($key); $pid = pcntl_fork(); if($pid == 0){ while(1){ $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code); if($error_code != MSG_ENOMSG){ echo $msg."\n"; } } exit(0); } $i = 1; while(1){ msg_send($msqid,2,"hello world",true); sleep(2); if($i++ == 3){ // 殺死子進程 posix_kill($pid,SIGILL); break; } } //子進程退出回收子進程,防止孤兒進程 $pid = pcntl_wait($status); if($pid > 0) { echo "exit pid=".$pid."\n"; } //msg_remove_queue 函數移除消息隊列 if(msg_remove_queue($msqid)){ echo "remove ok\n"; }
讀取3次隊列數據後父進程殺死子進程,父進程回收子進程,然後父進程退出,最後銷毀消息隊列
刪除消息隊列也可以直接使用命令 ipcrm -q 7
到此這篇關於php消息隊列實現詳解的文章就介紹到這瞭,更多相關php消息隊列內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!