IPC message Queue function(msgget,msgsnd,msgrcv) example source
  본 글은 쓰레드/프로세스간 통신에 사용되는 메시지 큐 함수인 msgget, msgsnd,
  msgrcv 사용 예제를 싣고 있다.
  개발을 하다 보면, 하나의 프로세스 하나의 쓰레드 만으로 원하는 성과를 낼 수
  없는 경우가 많다. 다중 쓰레드, 프로세스를 사용해야 하는 경우가 많다. 그리고 각
  프로세스나 쓰레드 간에 메시지나 데이터를 주고받아야 하는 경우도 많다. 
프로세스간 데이터를 주고받는 방법에는 여러가지가 있다.
  - 본 글의 예제인 메시지 큐를 사용.
- 공유 메모리를 사용
- Sock 통신을 사용하는 등등.
  리눅스에서 제공하는 IPC(interprocessor communication) 메시지 큐는 아래
  함수들을 이용하여 사용할 수 있다.
  #include
  <sys/types.h>
  #include <sys/ipc.h>
  #include <sys/msg.h>
//메시지 큐 생성
  int msgget(key_t
  key, int msgflg);
//메시지 큐에 송신
  int msgsnd(int msqid, const void *msgp,
  size_t msgsz, int msgflg);
//메시지 큐에서 수신
  ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int
  msgflg);
//메시지 큐 컨트롤 
  int msgctl(int msqid, int cmd, struct
  msqid_ds *buf);
  메시지 큐를 만들어 아래 그림과 같이 쓰레드/프로세스 간의 통신이 가능하다.
  각각의 쓰레드/프로세스는 동일한 IPC 메시지 큐를 사용하기 위해 메시지 큐의
  key를 공유해야 한다. long형식의 type데이터를
  사용해 특정 쓰레드/프로세스만 메시지를 보낼 수도 있다.
  아래 예제는 Host 모드로 실행되면 client 모드로 실행을 기다린 후 실행되면
  1~10까지의 데이터를 보내고 그 합의 결과를 기다린 후 결과를 화면에 프린트하고,
  Client모드로 실행되면 메시지 큐의 메시지를 보고 command의 대한 응답을 Host로
  보내준다.
  아래 예제에서는 메시지 큐 생성을 2개의 함수로 구분했다. 일반적으로
  msgget만을 사용해 IPC 메시지 큐 생성이 가능하고, 만약 만들어진 큐가
  있으면 그 아이디를 반환한다. 하지만, 종종 IPC 메시지 큐에 전에 사용하던 원하지
  않는 메시지가 남아있는 경우가 있다. 이런 메시지를 없애고 싶을 때는 ‘msgctl(msgqid, IPC_RMID, 0);’를 사용해 IPC 메시지 큐를 한번 지워주고 새로 생성해야 한다.
#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <fcntl.h>
#include <string.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>
#include <net/if.h>
#include <sys/ioctl.h>
void _sleep_ms(long ms)
{
 struct timeval stTimeVal;
 stTimeVal.tv_sec = ms / 1000;
   ms -= (stTimeVal.tv_sec * 1000);
   stTimeVal.tv_usec = ms * 1000;
   select(0, 0, 0, 0, &stTimeVal);
}
#define ipcmsg_key 2001
  #define ipcmsg_type_hostmsg 0x01
  #define ipcmsg_type_clientmsg 0x02
#define ipcmsg_command_terminate 0xff
  #define ipcmsg_command_ping 0x01
  #define ipcmsg_command_pong 0x02
  #define ipcmsg_command_add 0x03
  #define ipcmsg_command_result 0xa1
  // 메시지 데이터 구조체, 맨앞에 long 형식의 데이터는 반드시
    있어야한다. 
typedef struct _msgdata_t
{
   long type;
  // must be > 0
 // message data
   int command;
  
 int data;
}msgdata_t;
  int create_msgq()
{
   int msgqid =
  msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
 if (msgqid == -1)
 {
 perror("msgget fail:");
 return -1;
 }
   /* 큐에 데이터가 남아있는 경우를 대비하여 한번 큐를 제거한 후 다시
    생성한다.
   */
   msgctl(msgqid, IPC_RMID, 0);
   return
  msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
}
  int get_msgq()
{
   return
  msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
}
  int send_msg(int msgqid, msgdata_t *msg)
{
 int result = 0;
   int len = sizeof(msgdata_t) -
  sizeof(long);
   result = msgsnd(msgqid, msg,
  len, IPC_NOWAIT);
 if (result == -1)
 {
 return 0;
 }
 return 1;
}
int read_msg(int msgqid, int type, msgdata_t *msg)
{
 int result = 0;
   int len = sizeof(msgdata_t) -
  sizeof(long);
   result = msgrcv(msgqid, msg,
  len, type, IPC_NOWAIT);
 if (result == -1)
 {
 return 0;
 }
 return 1;
}
int host_mode()
{
 int i = 0;
 msgdata_t msg;
 int msgq_id = create_msgq();
   printf("host started %d\n", msgq_id);
   msg.type = ipcmsg_type_hostmsg;
   msg.command = ipcmsg_command_ping;
 msg.data = i + 1;
 send_msg(msgq_id, &msg);
 while (1)
 {
 // wait client
   if (read_msg(msgq_id,
  ipcmsg_type_clientmsg, &msg) > 0)
 {
   if (msg.command ==
  ipcmsg_command_pong)
 break;
 }
 _sleep_ms(200);
 }
 for (i = 0; i < 10; i++)
 {
   msg.type = ipcmsg_type_hostmsg;
   msg.command = ipcmsg_command_add;
 msg.data = i + 1;
 send_msg(msgq_id, &msg);
 _sleep_ms(200);
   if (read_msg(msgq_id,
  ipcmsg_type_clientmsg, &msg)>0)
   printf("%d result: %d\n", i + 1,
  msg.data);
 }
   msg.type = ipcmsg_type_hostmsg;
   msg.command =
  ipcmsg_command_terminate;
 send_msg(msgq_id, &msg);
 printf("host terminated\n");
 return 0;
}
int client_mode()
{
 int run = 1;
 int result = 0;
 int msgq_id = get_msgq();
 if (msgq_id <= 0)
 {
   printf("client get_msgq fail\n");
 return 0;
 }
   printf("client started %d\n",
  msgq_id);
 while (run)
 {
 msgdata_t msg;
   if (read_msg(msgq_id,
  ipcmsg_type_hostmsg, &msg) > 0)
 {
 switch (msg.command)
 {
 case ipcmsg_command_ping:
   msg.type = ipcmsg_type_clientmsg;
   msg.command = ipcmsg_command_pong;
 send_msg(msgq_id, &msg);
 break;
 case ipcmsg_command_add:
 result += msg.data;
   msg.type = ipcmsg_type_clientmsg;
   msg.command = ipcmsg_command_result;
 msg.data = result;
 send_msg(msgq_id, &msg);
 break;
   case ipcmsg_command_terminate:
 run = 0;
 break;
 default:
 break;
 }
 }
 _sleep_ms(100);
 }
   printf("client terminated\n");
 return 0;
}
int main(int argc, char *argv[])
{
 if (argc >= 2)
 client_mode();
 else
 host_mode();
 return 0;
}
실행 결과
Host 모드 
Client 모드

 
 
 
댓글
댓글 쓰기