IPC message Queue function(msgget,msgsnd,msgrcv) example source

본 글은 쓰레드/프로세스간 통신에 사용되는 메시지 큐 함수인 msgget, msgsnd, msgrcv 사용 예제를 싣고 있다.

개발을 하다 보면, 하나의 프로세스 하나의 쓰레드 만으로 원하는 성과를 낼 수 없는 경우가 많다. 다중 쓰레드, 프로세스를 사용해야 하는 경우가 많다. 그리고 각 프로세스나 쓰레드 간에 메시지나 데이터를 주고받아야 하는 경우도 많다. 
프로세스간 데이터를 주고받는 방법에는 여러가지가 있다.
- 본 글의 예제인 메시지 큐를 사용.
- 공유 메모리를 사용
- Sock 통신을 사용하는 등등.

리눅스에서 제공하는 IPC(interprocessor communication) 메시지 큐는 아래 함수들을 이용하여 사용할 수 있다.

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

//메시지 큐 생성
int msgget(key_t key, int msgflg);

//메시지 큐에 송신
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

//메시지 큐에서 수신
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

//메시지 큐 컨트롤 
int msgctl(int msqid, int cmd, struct msqid_ds *buf);


메시지 큐를 만들어 아래 그림과 같이 쓰레드/프로세스 간의 통신이 가능하다. 각각의 쓰레드/프로세스는 동일한 IPC 메시지 큐를 사용하기 위해 메시지 큐의 key를 공유해야 한다. long형식의 type데이터를 사용해 특정 쓰레드/프로세스만 메시지를 보낼 수도 있다.
 

아래 예제는 Host 모드로 실행되면 client 모드로 실행을 기다린 후 실행되면 1~10까지의 데이터를 보내고 그 합의 결과를 기다린 후 결과를 화면에 프린트하고, Client모드로 실행되면 메시지 큐의 메시지를 보고 command의 대한 응답을 Host로 보내준다.
아래 예제에서는 메시지 큐 생성을 2개의 함수로 구분했다. 일반적으로 msgget만을 사용해 IPC 메시지 큐 생성이 가능하고, 만약 만들어진 큐가 있으면 그 아이디를 반환한다. 하지만, 종종 IPC 메시지 큐에 전에 사용하던 원하지 않는 메시지가 남아있는 경우가 있다. 이런 메시지를 없애고 싶을 때는 ‘msgctl(msgqid, IPC_RMID, 0);’를 사용해 IPC 메시지 큐를 한번 지워주고 새로 생성해야 한다.

#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <fcntl.h>
#include <string.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>
#include <net/if.h>
#include <sys/ioctl.h>


void _sleep_ms(long ms)
{
struct timeval stTimeVal;
stTimeVal.tv_sec = ms / 1000;
ms -= (stTimeVal.tv_sec * 1000);
stTimeVal.tv_usec = ms * 1000;

select(0, 0, 0, 0, &stTimeVal);
}

#define ipcmsg_key 2001
#define ipcmsg_type_hostmsg 0x01
#define ipcmsg_type_clientmsg 0x02


#define ipcmsg_command_terminate 0xff
#define ipcmsg_command_ping 0x01
#define ipcmsg_command_pong 0x02

#define ipcmsg_command_add 0x03
#define ipcmsg_command_result 0xa1

// 메시지 데이터 구조체, 맨앞에 long 형식의 데이터는 반드시 있어야한다. 
typedef struct _msgdata_t
{
long type; // must be > 0

// message data
int command;
int data;

}msgdata_t;

int create_msgq()
{
int msgqid = msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
if (msgqid == -1)
{
perror("msgget fail:");
return -1;
}

/* 큐에 데이터가 남아있는 경우를 대비하여 한번 큐를 제거한 후 다시 생성한다.
*/
msgctl(msgqid, IPC_RMID, 0);

return msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
}

int get_msgq()
{
return msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
}


int send_msg(int msgqid, msgdata_t *msg)
{
int result = 0;
int len = sizeof(msgdata_t) - sizeof(long);

result = msgsnd(msgqid, msg, len, IPC_NOWAIT);

if (result == -1)
{
return 0;
}

return 1;
}

int read_msg(int msgqid, int type, msgdata_t *msg)
{
int result = 0;
int len = sizeof(msgdata_t) - sizeof(long);

result = msgrcv(msgqid, msg, len, type, IPC_NOWAIT);

if (result == -1)
{
return 0;
}

return 1;
}


int host_mode()
{
int i = 0;
msgdata_t msg;
int msgq_id = create_msgq();

printf("host started %d\n", msgq_id);


msg.type = ipcmsg_type_hostmsg;
msg.command = ipcmsg_command_ping;
msg.data = i + 1;
send_msg(msgq_id, &msg);

while (1)
{
// wait client
if (read_msg(msgq_id, ipcmsg_type_clientmsg, &msg) > 0)
{
if (msg.command == ipcmsg_command_pong)
break;
}
_sleep_ms(200);
}

for (i = 0; i < 10; i++)
{
msg.type = ipcmsg_type_hostmsg;
msg.command = ipcmsg_command_add;
msg.data = i + 1;
send_msg(msgq_id, &msg);

_sleep_ms(200);
if (read_msg(msgq_id, ipcmsg_type_clientmsg, &msg)>0)
printf("%d result: %d\n", i + 1, msg.data);
}

msg.type = ipcmsg_type_hostmsg;
msg.command = ipcmsg_command_terminate;
send_msg(msgq_id, &msg);

printf("host terminated\n");

return 0;
}


int client_mode()
{
int run = 1;
int result = 0;
int msgq_id = get_msgq();

if (msgq_id <= 0)
{
printf("client get_msgq fail\n");
return 0;
}
printf("client started %d\n", msgq_id);

while (run)
{
msgdata_t msg;
if (read_msg(msgq_id, ipcmsg_type_hostmsg, &msg) > 0)
{
switch (msg.command)
{
case ipcmsg_command_ping:
msg.type = ipcmsg_type_clientmsg;
msg.command = ipcmsg_command_pong;
send_msg(msgq_id, &msg);
break;
case ipcmsg_command_add:
result += msg.data;
msg.type = ipcmsg_type_clientmsg;
msg.command = ipcmsg_command_result;
msg.data = result;
send_msg(msgq_id, &msg);
break;
case ipcmsg_command_terminate:
run = 0;
break;
default:
break;
}
}

_sleep_ms(100);
}

printf("client terminated\n");

return 0;
}

int main(int argc, char *argv[])
{
if (argc >= 2)
client_mode();
else
host_mode();

return 0;
}

 
실행 결과
Host 모드 
 

Client 모드
 


댓글

이 블로그의 인기 게시물

windows에서 간단하게 크롬캐스트(Chromecast)를 통해 윈도우 화면 미러링 방법

딥러닝을 사용한 로또 번호 예측 실험

간단한 cfar 알고리즘에 대해

리눅스 디바이스 드라이버 기초와 예제

mkfs.fat Device or resource busy 에러 해결법