php消息隊列實現詳解

常見進程通信方式

System V IPC 總的包括:消息隊列,共享內存、信號量。

IPC(內部進程間通信)的使用註意:

  • IPC資源僅在本機中使用,不能夠跨網絡使用(其實進程間通信方式除瞭socket 通信方式其他都是僅在本機中使用)。
  • IPC的資源生存周期與內核相同。除非刪除,不然會與系統的生存周期相同。(也就是說如果你不主動刪除創建的ipc資源,那麼它會一直存在,除非系統關機,它才會被清除)
  • (重要)每個IPC都有一個關鍵字key。每個IPC資源都有唯一的整型標識符,進程可以使用id對此資源進行訪問。

系統V IPC消息隊列

消息隊列實際上是一個隊列,由內核維護,使用msgget (msgget() 函數是linux系統調用函數,由c語言編寫) 函數來創建一個消息隊列,創建成功返回隊列 ID

在php 中通過封裝 system V IPC 函數來實現操作消息隊列,共享內存,與信號量。我們可以通過php官方文檔上查看這些函數的定義,其中msg_ 為首的函數是用來操作消息隊列,sem_ 為首的函數是對信號量操作,shm_ 為首的函數是對共享內存的操作

其實 php 這些進程擴展說到底就是對 linux c 系統調用函數與c標準函數庫的封裝,然後在加一點php語言自己的處理,封裝成新的函數供 廣大 phper 調用,隻要在linux 下運行 不管你是 GO, JAVA ,python 其實都是對系統調用函數的封裝,隻是封裝後的名字不同,但是調用的都是linux 內核提供的同一套接口

php創建一個消息隊列

<?php
$key = ftok('demo1.php',"x");//將文件與ID轉換為一個key,這個文件隻要真實存在就行,沒有特殊限制,一般都指定當前文件名
$msqid = msg_get_queue($key);// 創建一個消息隊列,返回一個資源id
echo msg_send($msqid,1,"hello"); //往消息隊列寫入一條數據 hello 到隊列中
echo $msqid; // 輸出資源id

如何查看創建的消息隊列呢?

我們可用通過linux 提供的 ipcs 命令查看

如圖,第一列 Message Queues 代表消息隊列

key msqid owner perms used-bytes messages
0x7801620f 5 root 666 12 1
外部標識由調用函數時傳入 消息隊列標識id(用於進程內部通訊)每個隊列都有唯一標識,用於區分不同消息隊列 創建消息隊列用戶 操作權限位 消息隊列使用瞭多少字節 消息隊列有多少條消息

通過表格分析我們剛剛創建的消息隊列是一個,消息標識符為5 創建用戶為root ,操作權限為666,寫入瞭一條數據,占用瞭12個字節的隊列。

看到這,大傢可能發現兩個問題?

第一個問題:

為什麼我調用 php 函數 msg_get_queue() 創建消息隊列返回的不是消息隊列標識符5,而是 Resource id #4

因為之前說過 php 函數其實是對linux c 系統調用函數的封裝,也就是說 msg_get_queue 函數 其實是對 msgget 系統調用函數的封裝,調用msgget 函數的返回內容應該是 消息隊列標識id,而現在封裝 msgget 函數的 msg_get_queue 函數返回的卻是資源描述符,這是為什麼?

這其實就是php內部做瞭處理,返回資源描述符,使得 msg_get_queue 可以配合其他php

函數一起使用,但內部調用msgget 函數返回的一定是 5,也就是消息隊列標識符

如果不相信可使用 linux 下提供的 strace 工具 跟蹤系統調用。

第二個問題

通過php函數 msg_send 寫入瞭一條數據,數據內容為 ”hello“ 字節長度應該是5個字節,但是消息隊列占用的字節長度缺少12個字節?

這是因為php 內部實現對寫入數據做瞭序列化操作,導致寫入消息隊列的字節長度為12而不是5個字節長度

<?php
$a = serialize("hello");
echo 'hello 序列化長度:'.strlen($a)."\n";
echo 'hello 序列化內容:'.$a."\n";
echo "hello 未序列化長度:".strlen("hello")."\n";

當然如果你不想 msg_send 函數寫入隊列前對數據進行序列化,可以把第3個參數設置為false,不過需要註意的是,如果寫入數據不進行序列化,那麼使用 msg_receive 函數讀取隊列數據時也必須設置為不反序列化操作,不然會引發錯誤

讀取隊列內容

<?php
$key = ftok('demo1.php',"x");//將文件與ID轉換為一個key
$msqid = msg_get_queue($key);//消息隊列如果已創建,直接返回同一個資源描述符
//第一個參數資源描述符,第二個參數一般為0,表示從隊列第一條讀取,或者設置與msg_send函數第二個參數一致,第三個參數設置緩沖區長度,註意msg_send 函數默認會序列化數據,msg_receive函數默認會將讀取數據反序列化,如果緩沖區設置過小會反序列化失敗
msg_receive($msqid,1,$received_message_type,1024,$message);//讀取消息隊列數據,逐條讀取,
echo $message.PHP_EOL;

成功從消息隊列中讀取出一條數據,內容hello

因為消息隊列內容被讀取,消息隊列占用字節被清空為0,消息隊列消息條數也為0

通過消息隊列的讀寫操作我們發現,他與上一篇的管道有什麼區別?

消息隊列不像管道通信寫入數據,必須讀端要打開,讀數據,寫端必須要打開,不然會阻塞無法通信,消息隊列可以隨時往隊列寫入數據,讀取數據

關閉序列化功能

寫端

<?php
$key = ftok('demo3.php',"x");//將文件與ID轉換為一個key,這個文件隻要真實存在就行,沒有特殊限制,一般都指定當前文件名
$msqid = msg_get_queue($key);// 創建一個消息隊列,返回一個資源id
// 第一個參數資源id,第二個參數消息類型,必須大於0,第三個參數寫入隊列內容,取消序列化操作
echo msg_send($msqid,1,"hello",false); //往消息隊列寫入一條數據 hello 到隊列中

因為沒有對寫入數據進行序列化操作,消息隊列占用的字節長度變成瞭5個字節。

讀端

<?php
$key = ftok('demo3.php',"x");//將文件與ID轉換為一個key
$msqid = msg_get_queue($key);
//第一個參數資源描述符
//第二個參數一般為0,表示從隊列第一條讀取,或者設置與msg_send函數第二個參數一致
// 第三個參數是msg_send函數的第二個參數的值,也就是發送消息的消息類型
//第四個參數設置接收緩沖區長度,如果設置過小會反序列化失敗【如果使用序列化功能時候】
//當接收緩沖區大小設置過小並且關閉瞭反序列化【發送端發送數據也關閉瞭】,但是發送的數據超過瞭設置的長度,就會被截斷,並且額外的數據會被丟棄,必須設置第五個參數才有效
// 第五個參數 flags 標志符
msg_receive($msqid,0,$received_message_type,1,$message,false,MSG_NOERROR);
echo $message.PHP_EOL;

隻讀取到一個h​ 是因為接受緩沖區長度設置為1個字節,所以隻接收到一個字節,其它部分被丟棄。

非阻塞讀取消息隊列

當隊列內沒有數據,讀取隊列msg_receive函數是會阻塞的如何讓隊列無數據讀端也不阻塞繼續往下執行,可以通過 設置 flags 標志符 為 MSG_IPC_NOWAIT,該標志符作用是,不管隊列有無數據立即返回結果

<?php
$key = ftok('demo5.php',"x");//將文件與ID轉換為一個key
$msqid = msg_get_queue($key);
// fork 一個子進程
$pid = pcntl_fork();
if($pid == 0){
    while(1){
    // MSG_IPC_NOWAIT【啟用非阻塞】 msg_receive 函數其實是封裝 linux c msgrcv 系統調用函數,如果設置為非阻塞,函數調用次數非常高,會非常消耗cpu資源
    //阻塞模式調用函數次數低,有數據才返回,對cpu友好
        $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code);
        if($error_code != MSG_ENOMSG){
            echo $msg."\n";
        }
        sleep(2);
        echo "go"."\n";
    }
    exit(0);
}

父子進程消息隊列通信

<?php
$key = ftok('demo5.php',"x");//將文件與ID轉換為一個key
$msqid = msg_get_queue($key);
$pid = pcntl_fork();
if($pid == 0){
    while(1){
        $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code);
        if($error_code != MSG_ENOMSG){
            echo $msg."\n";
        }
    }
    exit(0);
}
$i = 1;
while(1){
    msg_send($msqid,2,"hello world",true);
    sleep(2);
    if($i++ == 3){
    	// 殺死子進程
        posix_kill($pid,SIGILL);
        break;
    }
}
//子進程退出回收子進程,防止孤兒進程
$pid = pcntl_wait($status);
if($pid > 0)
{
    echo "exit pid=".$pid."\n";
}

刪除消息隊列

<?php
$key = ftok('demo5.php',"x");//將文件與ID轉換為一個key
$msqid = msg_get_queue($key);
$pid = pcntl_fork();
if($pid == 0){
    while(1){
        $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code);
        if($error_code != MSG_ENOMSG){
            echo $msg."\n";
        }

    }
    exit(0);
}
$i = 1;
while(1){
    msg_send($msqid,2,"hello world",true);
    sleep(2);
    if($i++ == 3){
    	// 殺死子進程
        posix_kill($pid,SIGILL);
        break;
    }
}
//子進程退出回收子進程,防止孤兒進程
$pid = pcntl_wait($status);
if($pid > 0)
{
    echo "exit pid=".$pid."\n";
}
//msg_remove_queue 函數移除消息隊列
if(msg_remove_queue($msqid)){
    echo "remove ok\n";
}

讀取3次隊列數據後父進程殺死子進程,父進程回收子進程,然後父進程退出,最後銷毀消息隊列

刪除消息隊列也可以直接使用命令 ipcrm -q 7

到此這篇關於php消息隊列實現詳解的文章就介紹到這瞭,更多相關php消息隊列內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: