鍍金池/ 教程/ Linux/ 消息隊(duì)列
命名管道
消息隊(duì)列
進(jìn)程創(chuàng)建與終止
信號(hào)量
進(jìn)程組,會(huì)話和作業(yè)控制
共享內(nèi)存
進(jìn)程間通信簡介
子進(jìn)程監(jiān)視
其他進(jìn)程
覆蓋進(jìn)程映像
進(jìn)程信息
進(jìn)程映像
內(nèi)存映射
相關(guān)系統(tǒng)調(diào)用(System V)
進(jìn)程資源
System V & Posix
信號(hào)
進(jìn)程間通信教程
管道

消息隊(duì)列

為什么已經(jīng)擁有了共享內(nèi)存時(shí)需要消息隊(duì)列呢? 這將是多種原因,讓我們將其分解為多個(gè)點(diǎn)來簡化 -

  • 據(jù)了解,一旦消息被一個(gè)進(jìn)程接收到,它將不再可用于任何其他進(jìn)程。 而在共享內(nèi)存中,數(shù)據(jù)可供多個(gè)進(jìn)程訪問。
  • 如果想使用小信息格式進(jìn)行通信。
  • 當(dāng)多個(gè)進(jìn)程同時(shí)進(jìn)行通信時(shí),共享內(nèi)存數(shù)據(jù)需要同步保護(hù)。
  • 使用共享內(nèi)存的寫入和讀取頻率很高,那么實(shí)現(xiàn)功能將會(huì)非常復(fù)雜。 在這種情況下不值得使用。
  • 如果所有的進(jìn)程不需要訪問共享內(nèi)存,但是很少的進(jìn)程只需要它,那么用消息隊(duì)列實(shí)現(xiàn)會(huì)更好。
  • 如果想要與不同的數(shù)據(jù)包進(jìn)行通信,比如進(jìn)程A正在發(fā)送消息類型1給進(jìn)程B,消息類型10給進(jìn)程C,消息類型20給進(jìn)程D。在這種情況下,用消息隊(duì)列實(shí)現(xiàn)是比較簡單的。 為了將給定的消息類型簡化為1,10,20,它可以是0+ve-ve,如下所述。
  • 當(dāng)然,消息隊(duì)列的順序是FIFO(先進(jìn)先出)。 插入到隊(duì)列中的第一條消息是第一條要檢索的消息。

使用共享內(nèi)存或消息隊(duì)列取決于應(yīng)用程序的需要以及如何有效地使用它。

使用消息隊(duì)列的通信可以通過以下方式進(jìn)行 -

  • 通過一個(gè)進(jìn)程寫入共享內(nèi)存,并由另一個(gè)進(jìn)程讀取共享內(nèi)存。 正如我們所知道的,讀取也可以用多個(gè)進(jìn)程來完成。
  • 通過一個(gè)進(jìn)程用不同的數(shù)據(jù)包寫入共享存儲(chǔ)器,并通過多個(gè)進(jìn)程,即按照消息類型讀出。

    看到消息隊(duì)列中的某些信息后,現(xiàn)在是檢查支持消息隊(duì)列的系統(tǒng)調(diào)用(System V)的時(shí)候了。

要使用消息隊(duì)列執(zhí)行通信,請(qǐng)執(zhí)行以下步驟 -

第1步 - 創(chuàng)建一個(gè)消息隊(duì)列或連接到一個(gè)已經(jīng)存在的消息隊(duì)列(msgget())
第2步 - 寫入消息隊(duì)列(msgsnd())
第3步 - 從消息隊(duì)列中讀取(msgrcv())
第4步 - 對(duì)消息隊(duì)列(msgctl())執(zhí)行控制操作

現(xiàn)在,讓我們看看上述調(diào)用的語法和某些信息。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg)

這個(gè)系統(tǒng)調(diào)用創(chuàng)建或分配一個(gè)System V消息隊(duì)列。需要傳遞以下參數(shù) -

  • 第一個(gè)參數(shù)key用于識(shí)別消息隊(duì)列。key可以是任意值,也可以是來自庫函數(shù)ftok()的值。
  • 第二個(gè)參數(shù)shmflg指定所需的消息隊(duì)列標(biāo)志,例如IPC_CREAT(如果不存在則創(chuàng)建消息隊(duì)列)或IPC_EXCL(與IPC_CREAT一起用于創(chuàng)建消息隊(duì)列,如果消息隊(duì)列已經(jīng)存在,則調(diào)用失敗)。 還需要傳遞權(quán)限。

注 - 有關(guān)權(quán)限的詳細(xì)信息,請(qǐng)參閱前面幾節(jié)。

這個(gè)調(diào)用會(huì)在成功時(shí)返回一個(gè)有效的消息隊(duì)列標(biāo)識(shí)符(用于進(jìn)一步調(diào)用消息隊(duì)列),在失敗的情況下返回-1。 要知道失敗的原因,請(qǐng)檢查errno變量或perror()函數(shù)。

關(guān)于這個(gè)調(diào)用的各種錯(cuò)誤是EACCESS(權(quán)限被拒絕),EEXIST(隊(duì)列已經(jīng)存在不能創(chuàng)建),ENOENT(隊(duì)列不存在),ENOMEM(沒有足夠的內(nèi)存來創(chuàng)建隊(duì)列)等

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)

此系統(tǒng)調(diào)用將消息發(fā)送/附加到消息隊(duì)列(System V)中。 需要傳遞以下參數(shù) -

  • 第一個(gè)參數(shù)msgid識(shí)別消息隊(duì)列,即消息隊(duì)列標(biāo)識(shí)符。 msgget()成功時(shí)收到標(biāo)識(shí)符的值
  • 第二個(gè)參數(shù)msgp是發(fā)送給調(diào)用者的消息的指針,定義在以下形式的結(jié)構(gòu)中 -

    struct msgbuf {
     long mtype;
     char mtext[1];
    };
    

    變量mtype用于與不同的消息類型進(jìn)行通信,在msgrcv()調(diào)用中詳細(xì)解釋。 變量mtext是一個(gè)數(shù)組或其他大小由msgsz(正值)指定的結(jié)構(gòu)。 如果沒有提到mtext字段,則將其視為0大小消息,這是允許的。

  • 第三個(gè)參數(shù)msgsz是消息的大小(消息應(yīng)該以空字符結(jié)尾)

  • 第四個(gè)參數(shù)msgflg表示某些標(biāo)志,例如IPC_NOWAIT(當(dāng)在隊(duì)列中找不到消息時(shí)立即返回)或MSG_NOERROR(截?cái)嘞⑽谋?,如果超過msgsz字節(jié))

這個(gè)調(diào)用在成功時(shí)將返回0,在失敗的情況下為-1。 要知道失敗的原因,請(qǐng)檢查errno變量或perror()函數(shù)。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtype, int msgflg)

該系統(tǒng)調(diào)用從消息隊(duì)列(系統(tǒng)V)中檢索消息。 需要傳遞以下參數(shù) -

  • 第一個(gè)參數(shù)msgid識(shí)別消息隊(duì)列,即消息隊(duì)列標(biāo)識(shí)符。 msgget()成功時(shí)收到標(biāo)識(shí)符的值
  • 第二個(gè)參數(shù)msgp是從調(diào)用者接收的消息的指針。 它在以下形式的結(jié)構(gòu)中被定義 -
    struct msgbuf {
     long mtype;
     char mtext[1];
    };
    
    變量mtype用于與不同的消息類型進(jìn)行通信。 變量mtext是一個(gè)數(shù)組或其他大小由msgsz(正值)指定的結(jié)構(gòu)。 如果沒有提到mtext字段,則將其視為0大小消息,這是允許的。
  • 第三個(gè)參數(shù)msgsz是收到的消息的大小(消息應(yīng)該以空字符結(jié)尾)
  • 第四個(gè)參數(shù)msgtype表示消息的類型 -
    • 如果msgtype0 - 讀取隊(duì)列中的第一個(gè)收到的消息。
    • 如果msgtype+ve - 讀取類型為msgtype的隊(duì)列中的第一條消息(如果msgtype10,則只讀取類型10的第一條消息,即使其他類型可能位于隊(duì)列中的開頭)
    • 如果msgtype-ve - 讀取小于或等于消息類型的絕對(duì)值的最小類型的第一個(gè)消息(例如,如果msgtype-5,則它讀取類型小于5的第一個(gè)消息,即消息類型從15)
  • 第五個(gè)參數(shù)msgflg表示某些標(biāo)志,例如IPC_NOWAIT(當(dāng)隊(duì)列中沒有消息時(shí)立即返回,或MSG_NOERROR(如果超過了msgsz字節(jié)則截?cái)嘞⑽谋?

這個(gè)調(diào)用將返回成功時(shí)在mtext數(shù)組中實(shí)際接收的字節(jié)數(shù),在失敗的情況下返回-1。 要知道失敗的原因,請(qǐng)檢查errno變量或perror()函數(shù)。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msgid, int cmd, struct msqid_ds *buf)

這個(gè)系統(tǒng)調(diào)用執(zhí)行消息隊(duì)列(系統(tǒng)V)的控制操作。 需要傳遞以下參數(shù) -

  • 第一個(gè)參數(shù)msgid識(shí)別消息隊(duì)列,即消息隊(duì)列標(biāo)識(shí)符。 msgget()成功時(shí)收到標(biāo)識(shí)符的值
  • 第二個(gè)參數(shù)cmd是對(duì)消息隊(duì)列執(zhí)行所需控制操作的命令。 cmd的有效值是 -
    • IPC_STAT - 將struct msqid_ds的每個(gè)成員的當(dāng)前值的信息復(fù)制到由buf指向的傳遞結(jié)構(gòu)中。 該命令需要消息隊(duì)列的讀取權(quán)限。
    • IPC_SET - 設(shè)置結(jié)構(gòu)buf指向的用戶ID,所有者的組ID,權(quán)限等。
    • IPC_RMID - 立即刪除消息隊(duì)列。
    • IPC_INFO - 返回有關(guān)buf指向的結(jié)構(gòu)中的消息隊(duì)列限制和參數(shù)的信息,該結(jié)構(gòu)的類型為struct msginfo
    • MSG_INFO - 返回一個(gè)msginfo結(jié)構(gòu),其中包含有關(guān)消息隊(duì)列消耗的系統(tǒng)資源的信息。
  • 第三個(gè)參數(shù)buf是一個(gè)指向名為struct msqid_ds的消息隊(duì)列結(jié)構(gòu)的指針。 這個(gè)結(jié)構(gòu)的值將被用于任一集或者按照cmd得到。

這個(gè)調(diào)用將根據(jù)傳遞的命令返回值。 IPC_INFO和MSG_INFO或MSG_STAT的成功返回消息隊(duì)列的索引或標(biāo)識(shí)符,其他操作返回0,失敗時(shí)返回-1。 要知道失敗的原因,請(qǐng)檢查errno變量或perror()函數(shù)。

上面已經(jīng)看到有關(guān)消息隊(duì)列的基本信息和系統(tǒng)調(diào)用,現(xiàn)在是時(shí)候來看看程序代碼了。

讓我們看看這個(gè)程序?qū)崿F(xiàn)的描述 -

第1步 - 創(chuàng)建兩個(gè)進(jìn)程,一個(gè)用于發(fā)送到消息隊(duì)列(msgq_send.c),另一個(gè)用于從消息隊(duì)列(msgq_recv.c)
第2步 - 使用ftok()函數(shù)創(chuàng)建鍵(Key)。 為此,最初創(chuàng)建文件msgq.txt以獲取唯一的鍵。
第3步 - 發(fā)送過程執(zhí)行以下操作。

  • 讀取用戶輸入的字符串
  • 刪除新行,如果存在
  • 發(fā)送到消息隊(duì)列
  • 重復(fù)這個(gè)過程直到輸入結(jié)束(CTRL + D)
  • 一旦收到輸入結(jié)束,發(fā)送消息“end”來表示進(jìn)程結(jié)束。

第4步 - 在接收過程中,執(zhí)行以下操作。

  • 從隊(duì)列中讀取消息
  • 顯示輸出
  • 如果收到的消息是“end”,則結(jié)束該過程并退出。

為了簡化,我們沒有使用這個(gè)示例的消息類型。 另外,一個(gè)進(jìn)程正在寫入隊(duì)列,另一個(gè)進(jìn)程正在從隊(duì)列中讀取。 這可以根據(jù)需要進(jìn)行擴(kuò)展,即理想情況下一個(gè)進(jìn)程將寫入隊(duì)列中,多個(gè)進(jìn)程從隊(duì)列中讀取。

現(xiàn)在,讓我們看看一下進(jìn)程(消息發(fā)送到隊(duì)列) - 文件:msgq_send.c

/* Filename: msgq_send.c */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int len;
   key_t key;
   system("touch msgq.txt");

   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }

   if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) {
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to send messages.\n");
   printf("Enter lines of text, ^D to quit:\n");
   buf.mtype = 1; /* we don't really care in this case */

   while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL) {
      len = strlen(buf.mtext);
      /* remove newline at end, if it exists */
      if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0';
      if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
      perror("msgsnd");
   }
   strcpy(buf.mtext, "end");
   len = strlen(buf.mtext);
   if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
   perror("msgsnd");

   if (msgctl(msqid, IPC_RMID, NULL) == -1) {
      perror("msgctl");
      exit(1);
   }
   printf("message queue: done sending messages.\n");
   return 0;
}

執(zhí)行上面示例代碼,得到以下輸出結(jié)果 -

message queue: ready to send messages.
Enter lines of text, ^D to quit:
this is line 1
this is line 2
message queue: done sending messages.

以下是來自消息接收過程的代碼(從隊(duì)列中檢索消息) - 文件:msgq_recv.c -

/* Filename: msgq_recv.c */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int toend;
   key_t key;

   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }

   if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to receive messages.\n");

   for(;;) { /* normally receiving never ends but just to make conclusion 
             /* this program ends wuth string of end */
      if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) {
         perror("msgrcv");
         exit(1);
      }
      printf("recvd: \"%s\"\n", buf.mtext);
      toend = strcmp(buf.mtext,"end");
      if (toend == 0)
      break;
   }
   printf("message queue: done receiving messages.\n");
   system("rm msgq.txt");
   return 0;
}

執(zhí)行上面示例代碼,得到以下輸出結(jié)果 -

message queue: ready to receive messages.
recvd: "this is line 1"
recvd: "this is line 2"
recvd: "end"
message queue: done receiving messages.