IPC message Queue function(msgget,msgsnd,msgrcv) example source

본 글은 쓰레드/프로세스간 통신에 사용되는 메시지 큐 함수인 msgget, msgsnd, msgrcv 사용 예제를 싣고 있다.

개발을 하다 보면, 하나의 프로세스 하나의 쓰레드 만으로 원하는 성과를 낼 수 없는 경우가 많다. 다중 쓰레드, 프로세스를 사용해야 하는 경우가 많다. 그리고 각 프로세스나 쓰레드 간에 메시지나 데이터를 주고받아야 하는 경우도 많다. 
프로세스간 데이터를 주고받는 방법에는 여러가지가 있다.
- 본 글의 예제인 메시지 큐를 사용.
- 공유 메모리를 사용
- Sock 통신을 사용하는 등등.

리눅스에서 제공하는 IPC(interprocessor communication) 메시지 큐는 아래 함수들을 이용하여 사용할 수 있다.

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

//메시지 큐 생성
int msgget(key_t key, int msgflg);

//메시지 큐에 송신
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

//메시지 큐에서 수신
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

//메시지 큐 컨트롤 
int msgctl(int msqid, int cmd, struct msqid_ds *buf);


메시지 큐를 만들어 아래 그림과 같이 쓰레드/프로세스 간의 통신이 가능하다. 각각의 쓰레드/프로세스는 동일한 IPC 메시지 큐를 사용하기 위해 메시지 큐의 key를 공유해야 한다. long형식의 type데이터를 사용해 특정 쓰레드/프로세스만 메시지를 보낼 수도 있다.
 

아래 예제는 Host 모드로 실행되면 client 모드로 실행을 기다린 후 실행되면 1~10까지의 데이터를 보내고 그 합의 결과를 기다린 후 결과를 화면에 프린트하고, Client모드로 실행되면 메시지 큐의 메시지를 보고 command의 대한 응답을 Host로 보내준다.
아래 예제에서는 메시지 큐 생성을 2개의 함수로 구분했다. 일반적으로 msgget만을 사용해 IPC 메시지 큐 생성이 가능하고, 만약 만들어진 큐가 있으면 그 아이디를 반환한다. 하지만, 종종 IPC 메시지 큐에 전에 사용하던 원하지 않는 메시지가 남아있는 경우가 있다. 이런 메시지를 없애고 싶을 때는 ‘msgctl(msgqid, IPC_RMID, 0);’를 사용해 IPC 메시지 큐를 한번 지워주고 새로 생성해야 한다.

#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <fcntl.h>
#include <string.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>
#include <net/if.h>
#include <sys/ioctl.h>


void _sleep_ms(long ms)
{
struct timeval stTimeVal;
stTimeVal.tv_sec = ms / 1000;
ms -= (stTimeVal.tv_sec * 1000);
stTimeVal.tv_usec = ms * 1000;

select(0, 0, 0, 0, &stTimeVal);
}

#define ipcmsg_key 2001
#define ipcmsg_type_hostmsg 0x01
#define ipcmsg_type_clientmsg 0x02


#define ipcmsg_command_terminate 0xff
#define ipcmsg_command_ping 0x01
#define ipcmsg_command_pong 0x02

#define ipcmsg_command_add 0x03
#define ipcmsg_command_result 0xa1

// 메시지 데이터 구조체, 맨앞에 long 형식의 데이터는 반드시 있어야한다. 
typedef struct _msgdata_t
{
long type; // must be > 0

// message data
int command;
int data;

}msgdata_t;

int create_msgq()
{
int msgqid = msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
if (msgqid == -1)
{
perror("msgget fail:");
return -1;
}

/* 큐에 데이터가 남아있는 경우를 대비하여 한번 큐를 제거한 후 다시 생성한다.
*/
msgctl(msgqid, IPC_RMID, 0);

return msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
}

int get_msgq()
{
return msgget((key_t)ipcmsg_key, IPC_CREAT | 0666);
}


int send_msg(int msgqid, msgdata_t *msg)
{
int result = 0;
int len = sizeof(msgdata_t) - sizeof(long);

result = msgsnd(msgqid, msg, len, IPC_NOWAIT);

if (result == -1)
{
return 0;
}

return 1;
}

int read_msg(int msgqid, int type, msgdata_t *msg)
{
int result = 0;
int len = sizeof(msgdata_t) - sizeof(long);

result = msgrcv(msgqid, msg, len, type, IPC_NOWAIT);

if (result == -1)
{
return 0;
}

return 1;
}


int host_mode()
{
int i = 0;
msgdata_t msg;
int msgq_id = create_msgq();

printf("host started %d\n", msgq_id);


msg.type = ipcmsg_type_hostmsg;
msg.command = ipcmsg_command_ping;
msg.data = i + 1;
send_msg(msgq_id, &msg);

while (1)
{
// wait client
if (read_msg(msgq_id, ipcmsg_type_clientmsg, &msg) > 0)
{
if (msg.command == ipcmsg_command_pong)
break;
}
_sleep_ms(200);
}

for (i = 0; i < 10; i++)
{
msg.type = ipcmsg_type_hostmsg;
msg.command = ipcmsg_command_add;
msg.data = i + 1;
send_msg(msgq_id, &msg);

_sleep_ms(200);
if (read_msg(msgq_id, ipcmsg_type_clientmsg, &msg)>0)
printf("%d result: %d\n", i + 1, msg.data);
}

msg.type = ipcmsg_type_hostmsg;
msg.command = ipcmsg_command_terminate;
send_msg(msgq_id, &msg);

printf("host terminated\n");

return 0;
}


int client_mode()
{
int run = 1;
int result = 0;
int msgq_id = get_msgq();

if (msgq_id <= 0)
{
printf("client get_msgq fail\n");
return 0;
}
printf("client started %d\n", msgq_id);

while (run)
{
msgdata_t msg;
if (read_msg(msgq_id, ipcmsg_type_hostmsg, &msg) > 0)
{
switch (msg.command)
{
case ipcmsg_command_ping:
msg.type = ipcmsg_type_clientmsg;
msg.command = ipcmsg_command_pong;
send_msg(msgq_id, &msg);
break;
case ipcmsg_command_add:
result += msg.data;
msg.type = ipcmsg_type_clientmsg;
msg.command = ipcmsg_command_result;
msg.data = result;
send_msg(msgq_id, &msg);
break;
case ipcmsg_command_terminate:
run = 0;
break;
default:
break;
}
}

_sleep_ms(100);
}

printf("client terminated\n");

return 0;
}

int main(int argc, char *argv[])
{
if (argc >= 2)
client_mode();
else
host_mode();

return 0;
}

 
실행 결과
Host 모드 
 

Client 모드
 


댓글

이 블로그의 인기 게시물

바로 프로젝트 적용 가능한 FIR Filter (low/high/band pass filter )를 c나 python으로 만들기

간단한 cfar 알고리즘에 대해

base64 인코딩 디코딩 예제 c 소스

쉽게 설명한 파티클 필터(particle filter) 동작 원리와 예제

python ctypes LoadLibrary로 windows dll 로드 및 함수 호출 예제