IPC message Queue function(msgget,msgsnd,msgrcv) example source
본 글은 쓰레드/프로세스간 통신에 사용되는 메시지 큐 함수인 msgget, msgsnd,
msgrcv 사용 예제를 싣고 있다.
개발을 하다 보면, 하나의 프로세스 하나의 쓰레드 만으로 원하는 성과를 낼 수
없는 경우가 많다. 다중 쓰레드, 프로세스를 사용해야 하는 경우가 많다. 그리고 각
프로세스나 쓰레드 간에 메시지나 데이터를 주고받아야 하는 경우도 많다.
프로세스간 데이터를 주고받는 방법에는 여러가지가 있다.
- 본 글의 예제인 메시지 큐를 사용.
- 공유 메모리를 사용
- Sock 통신을 사용하는 등등.
리눅스에서 제공하는 IPC(interprocessor communication) 메시지 큐는 아래
함수들을 이용하여 사용할 수 있다.
#include
<sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
//메시지 큐 생성
int msgget(key_t
key, int msgflg);
//메시지 큐에 송신
int msgsnd(int msqid, const void *msgp,
size_t msgsz, int msgflg);
//메시지 큐에서 수신
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int
msgflg);
//메시지 큐 컨트롤
int msgctl(int msqid, int cmd, struct
msqid_ds *buf);
메시지 큐를 만들어 아래 그림과 같이 쓰레드/프로세스 간의 통신이 가능하다.
각각의 쓰레드/프로세스는 동일한 IPC 메시지 큐를 사용하기 위해 메시지 큐의
key를 공유해야 한다. long형식의 type데이터를
사용해 특정 쓰레드/프로세스만 메시지를 보낼 수도 있다.
아래 예제는 Host 모드로 실행되면 client 모드로 실행을 기다린 후 실행되면
1~10까지의 데이터를 보내고 그 합의 결과를 기다린 후 결과를 화면에 프린트하고,
Client모드로 실행되면 메시지 큐의 메시지를 보고 command의 대한 응답을 Host로
보내준다.
아래 예제에서는 메시지 큐 생성을 2개의 함수로 구분했다. 일반적으로
msgget만을 사용해 IPC 메시지 큐 생성이 가능하고, 만약 만들어진 큐가
있으면 그 아이디를 반환한다. 하지만, 종종 IPC 메시지 큐에 전에 사용하던 원하지
않는 메시지가 남아있는 경우가 있다. 이런 메시지를 없애고 싶을 때는 ‘msgctl(msgqid, IPC_RMID, 0);’를 사용해 IPC 메시지 큐를 한번 지워주고 새로 생성해야 한다.
#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <fcntl.h>
#include <string.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>
#include <net/if.h>
#include <sys/ioctl.h>
void _sleep_ms(long ms)
{
struct timeval stTimeVal;
stTimeVal.tv_sec = ms / 1000;
ms -= (stTimeVal.tv_sec * 1000);
stTimeVal.tv_usec = ms * 1000;
select(0, 0, 0, 0, &stTimeVal);
}
#define ipcmsg_key 2001
#define ipcmsg_type_hostmsg 0x01
#define ipcmsg_type_clientmsg 0x02
#define ipcmsg_command_terminate 0xff
#define ipcmsg_command_ping 0x01
#define ipcmsg_command_pong 0x02
#define ipcmsg_command_add 0x03
#define ipcmsg_command_result 0xa1
// 메시지 데이터 구조체, 맨앞에 long 형식의 데이터는 반드시
있어야한다.
typedef struct _msgdata_t
{
long type;
// must be > 0
// message data
int command;
int data;
}msgdata_t;
int create_msgq()
{
int msgqid =
msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
if (msgqid == -1)
{
perror("msgget fail:");
return -1;
}
/* 큐에 데이터가 남아있는 경우를 대비하여 한번 큐를 제거한 후 다시
생성한다.
*/
msgctl(msgqid, IPC_RMID, 0);
return
msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
}
int get_msgq()
{
return
msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
}
int send_msg(int msgqid, msgdata_t *msg)
{
int result = 0;
int len = sizeof(msgdata_t) -
sizeof(long);
result = msgsnd(msgqid, msg,
len, IPC_NOWAIT);
if (result == -1)
{
return 0;
}
return 1;
}
int read_msg(int msgqid, int type, msgdata_t *msg)
{
int result = 0;
int len = sizeof(msgdata_t) -
sizeof(long);
result = msgrcv(msgqid, msg,
len, type, IPC_NOWAIT);
if (result == -1)
{
return 0;
}
return 1;
}
int host_mode()
{
int i = 0;
msgdata_t msg;
int msgq_id = create_msgq();
printf("host started %d\n", msgq_id);
msg.type = ipcmsg_type_hostmsg;
msg.command = ipcmsg_command_ping;
msg.data = i + 1;
send_msg(msgq_id, &msg);
while (1)
{
// wait client
if (read_msg(msgq_id,
ipcmsg_type_clientmsg, &msg) > 0)
{
if (msg.command ==
ipcmsg_command_pong)
break;
}
_sleep_ms(200);
}
for (i = 0; i < 10; i++)
{
msg.type = ipcmsg_type_hostmsg;
msg.command = ipcmsg_command_add;
msg.data = i + 1;
send_msg(msgq_id, &msg);
_sleep_ms(200);
if (read_msg(msgq_id,
ipcmsg_type_clientmsg, &msg)>0)
printf("%d result: %d\n", i + 1,
msg.data);
}
msg.type = ipcmsg_type_hostmsg;
msg.command =
ipcmsg_command_terminate;
send_msg(msgq_id, &msg);
printf("host terminated\n");
return 0;
}
int client_mode()
{
int run = 1;
int result = 0;
int msgq_id = get_msgq();
if (msgq_id <= 0)
{
printf("client get_msgq fail\n");
return 0;
}
printf("client started %d\n",
msgq_id);
while (run)
{
msgdata_t msg;
if (read_msg(msgq_id,
ipcmsg_type_hostmsg, &msg) > 0)
{
switch (msg.command)
{
case ipcmsg_command_ping:
msg.type = ipcmsg_type_clientmsg;
msg.command = ipcmsg_command_pong;
send_msg(msgq_id, &msg);
break;
case ipcmsg_command_add:
result += msg.data;
msg.type = ipcmsg_type_clientmsg;
msg.command = ipcmsg_command_result;
msg.data = result;
send_msg(msgq_id, &msg);
break;
case ipcmsg_command_terminate:
run = 0;
break;
default:
break;
}
}
_sleep_ms(100);
}
printf("client terminated\n");
return 0;
}
int main(int argc, char *argv[])
{
if (argc >= 2)
client_mode();
else
host_mode();
return 0;
}
실행 결과
Host 모드
Client 모드
댓글
댓글 쓰기