python asyncio를 이용한 async socket server client example code

 본 포스트는 python asyncio 라이브러리를 사용한 async socket 서버와 클라이언트 예제 코드를 싣고 있다. 이전 non-blocking socket의 예제를 asyncio 라이브러리에 맞게 변형하였다. 

python socket API 와 non-blocking socket server client example code


1. asyncio (Asynchronous I/O)

asyncio는 async/await 구문을 사용해 multi-thread처럼 동시에 진행하는 코드를 작성하는 라이브러리다. 네트워크 및 웹 서버, DB 연결, 분산 작업 대기열을 제공하는 분산 프레임 워크의 기반으로 사용된다고 한다. ( https://docs.python.org/3.7/library/asyncio.html)

async socket server/client 구현에 필요한 asyncio 라이브러리 API는 예제 코드를 보면서, 설명하도록 하겠다.


2. Async socket client example code


import asyncio

async def pingpong_client():
    try:
        reader, writer = await asyncio.open_connection(host='localhost',port=8000)
    except OSError:
        print('connection fail')
        return
    
    for _ in range(10):
        writer.write(b'ping')
        print('send: ping')
        
        data = await reader.read(8)
        print('recv:',data.decode())
        await asyncio.sleep(1)
    
    writer.write(b'done')
    print('send: ping')
    writer.close()
    await writer.wait_closed()
    print('connection was closed')
    


if __name__ == "__main__":
    
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None
    
    if loop and loop.is_running():
        print('Async event loop already running')
        tsk = loop.create_task(pingpong_client())
    else:
        print('Starting new event loop')
        asyncio.run(pingpong_client())


2.1. coroutine 실행

asyncio는 coroutine으로 구현되어 있으며, 클라이언트 또한 coroutine으로 구현했다. coroutine는 'async def'를 사용하여 구현할 수 있다.

corutine의 실행은 asyncio.run을 사용하여 실행할 수 있다.

asyncio.run(coro, *, debug=False)

하지만 asyncio.run 실행 시 아래와 같이 RuntimeError가 발생하는 경우가 있다.

"RuntimeError: asyncio.run() cannot be called from a running event loop"

이 경우 실행중인 이벤트 루프를 받아와 coroutine 함수를 Task로 감싸고 실행해야 한다. 이벤트를 받아오고 Task를 생성하는 API는 아래와 같다. (python 3.7)

asyncio.get_running_loop()
asyncio.create_task(coro)

get_running_loop()는 현재 실행중인 루프가 없을 때 RuntimeError를 발생하기 때문에 이에 대한 처리도 필요하다. 


2.3. Host Connection

asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)

coroutine 함수, awaitable object이다.
주어진 호스트와 연결 후 asyncio.StreamReader, asyncio.StreamWriter object를 반환한다. 예제 코드의 pingpong_client를 참조.

호스트와 연결에 실패할 경우 아래와 같은 OSError가 발생한다.

OSError: Multiple exceptions: [Errno 10061] Connect call failed ('::1', 8000, 0, 0), [Errno 10061] Connect call failed ('127.0.0.1', 8000)

이 경우를 대비하여 에러 처리 코드는 다음과 같이 할 수 있다.

try:
    reader, writer = await asyncio.open_connection(host='localhost',port=8000)
except OSError:
    print('connection fail')
    return


2.4. Stream Object

Stream은 네트워크 연결 시 별도의 socket API를 사용하지 않고도 비동기로 데이터를 주고 받을 수 있도록 만들어 졌다. 위 open_connection시 reader, writer 두개의 Stream Object가 반환된다. 

asyncio.StreamReader.read(n=-1)

coroutine이며, Stream으로 부터 n바이트의 데이터를 읽어온다. 만약 n이 -1이면, EOF까지 읽어 온다. 만약 데이터에 EOF가 없는 경우 데이터가 반환되지 않는다.

asyncio.StreamWriter.write(data)

Stream에 데이터를 쓴다.

asyncio.StreamWriter.close()

Stream을 닫는다.

asyncio.StreamWriter.wait_close()

coroutine이며, Stream이 닫힐 때까지 기다린다.


3. Async socket server example code


import asyncio    
    
async def handle_asyncclient(reader, writer):
    print('client :', writer.get_extra_info('peername'))
    while True:
        data = await reader.read(8)
        if data == b'ping':
            writer.write(b'pong')
            await writer.drain()
            print('recv: ping -> send: pong')
        elif data == b'done':
            print('recv: done')
            break
        elif len(data) == 0:
            break
    
    writer.close()
    await writer.wait_closed()
    print('connection was closed')
    
async def server_asyncmain():
    server = await asyncio.start_server(handle_asyncclient,'localhost',8000)  
    if server is not None:
        print('server started')
        #
        await asyncio.sleep(60)
        server.close()
        await server.wait_closed()
        print('server was closed')


if __name__ == "__main__":
    
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None
    
    if loop and loop.is_running():
        print('Async event loop already running')
        tsk = loop.create_task(server_asyncmain())
    else:
        print('Starting new event loop')
        asyncio.run(server_asyncmain())   


3.1. 서버 실행 및 종료

asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

coroutine 이며, socket 서버를 생성한다. ayncio.Server object를 반환한다. 새로운 client가 연결되면, client_connected_cb를 새로운 Task로 생성한다. client_connected_cb은 StreamReader, StreamWriter를 argument로 받는다. StreamBuffer의 디폴트 크기는 64KB이다. 

argument start_serving는 기본으로 True로 되어있어 자동으로 즉시 서버가 실행된다. 만약 False인 경우  Server.start_serving()나 Server.serve_forever()를 호출해야 서버가 실행된다. start_serving과 serve_forever는 coroutine이다.

async with server:
    await server.serve_forever()

asyncio.Server.close()

실행된 서비스를 중지한다. 실행된 서버는 close를 호출해야 종료된다. 예제 코드에서는 60초를 기다린 후 서버 서비스를 중지하도록 했다.

asyncio.Server.wait_closed()

coroutine이며, 서비스가 종료될 때까지 기다린다.


3.2. 추가 정보 받아오기

StreamWriter.get_extra_info(name, default=None)

name 스트링에 명시된 정보를 받아온다. 

socket 관련 정보는 아래와 같은 정보를 받아올 수 있다.
- 'peername' : socket연결된 상대 측의 주소를 받아온다.
- 'socket' : socket.socket 인스턴스를 받아온다.
- 'sockname' : socket의 주소를 받아온다.

추가 사항은 다음 사이트에서 확인할 수 있다.
https://docs.python.org/3.7/library/asyncio-protocol.html#asyncio.BaseTransport.get_extra_info


4. 실행 결과

서버와 클라이언트 실행 결과는 아래와 같다. 통신이 잘 이루어지는 것을 볼 수 있다. 

4.1. 서버 실행 결과

Async event loop already running
server started
client : ('::1', 62207, 0, 0)
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: done
connection was closed
server was closed

4.2. 클라이언트 실행 결과

Async event loop already running
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
connection was closed

댓글

이 블로그의 인기 게시물

간단한 cfar 알고리즘에 대해

바로 프로젝트 적용 가능한 FIR Filter (low/high/band pass filter )를 c나 python으로 만들기

windows에서 간단하게 크롬캐스트(Chromecast)를 통해 윈도우 화면 미러링 방법

CA-CFAR 예제 코드