python asyncio를 이용한 async socket server client example code

 본 포스트는 python asyncio 라이브러리를 사용한 async socket 서버와 클라이언트 예제 코드를 싣고 있다. 이전 non-blocking socket의 예제를 asyncio 라이브러리에 맞게 변형하였다. 

python socket API 와 non-blocking socket server client example code


1. asyncio (Asynchronous I/O)

asyncio는 async/await 구문을 사용해 multi-thread처럼 동시에 진행하는 코드를 작성하는 라이브러리다. 네트워크 및 웹 서버, DB 연결, 분산 작업 대기열을 제공하는 분산 프레임 워크의 기반으로 사용된다고 한다. ( https://docs.python.org/3.7/library/asyncio.html)

async socket server/client 구현에 필요한 asyncio 라이브러리 API는 예제 코드를 보면서, 설명하도록 하겠다.


2. Async socket client example code


import asyncio

async def pingpong_client():
    try:
        reader, writer = await asyncio.open_connection(host='localhost',port=8000)
    except OSError:
        print('connection fail')
        return
    
    for _ in range(10):
        writer.write(b'ping')
        print('send: ping')
        
        data = await reader.read(8)
        print('recv:',data.decode())
        await asyncio.sleep(1)
    
    writer.write(b'done')
    print('send: ping')
    writer.close()
    await writer.wait_closed()
    print('connection was closed')
    


if __name__ == "__main__":
    
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None
    
    if loop and loop.is_running():
        print('Async event loop already running')
        tsk = loop.create_task(pingpong_client())
    else:
        print('Starting new event loop')
        asyncio.run(pingpong_client())


2.1. coroutine 실행

asyncio는 coroutine으로 구현되어 있으며, 클라이언트 또한 coroutine으로 구현했다. coroutine는 'async def'를 사용하여 구현할 수 있다.

corutine의 실행은 asyncio.run을 사용하여 실행할 수 있다.

asyncio.run(coro, *, debug=False)

하지만 asyncio.run 실행 시 아래와 같이 RuntimeError가 발생하는 경우가 있다.

"RuntimeError: asyncio.run() cannot be called from a running event loop"

이 경우 실행중인 이벤트 루프를 받아와 coroutine 함수를 Task로 감싸고 실행해야 한다. 이벤트를 받아오고 Task를 생성하는 API는 아래와 같다. (python 3.7)

asyncio.get_running_loop()
asyncio.create_task(coro)

get_running_loop()는 현재 실행중인 루프가 없을 때 RuntimeError를 발생하기 때문에 이에 대한 처리도 필요하다. 


2.3. Host Connection

asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)

coroutine 함수, awaitable object이다.
주어진 호스트와 연결 후 asyncio.StreamReader, asyncio.StreamWriter object를 반환한다. 예제 코드의 pingpong_client를 참조.

호스트와 연결에 실패할 경우 아래와 같은 OSError가 발생한다.

OSError: Multiple exceptions: [Errno 10061] Connect call failed ('::1', 8000, 0, 0), [Errno 10061] Connect call failed ('127.0.0.1', 8000)

이 경우를 대비하여 에러 처리 코드는 다음과 같이 할 수 있다.

try:
    reader, writer = await asyncio.open_connection(host='localhost',port=8000)
except OSError:
    print('connection fail')
    return


2.4. Stream Object

Stream은 네트워크 연결 시 별도의 socket API를 사용하지 않고도 비동기로 데이터를 주고 받을 수 있도록 만들어 졌다. 위 open_connection시 reader, writer 두개의 Stream Object가 반환된다. 

asyncio.StreamReader.read(n=-1)

coroutine이며, Stream으로 부터 n바이트의 데이터를 읽어온다. 만약 n이 -1이면, EOF까지 읽어 온다. 만약 데이터에 EOF가 없는 경우 데이터가 반환되지 않는다.

asyncio.StreamWriter.write(data)

Stream에 데이터를 쓴다.

asyncio.StreamWriter.close()

Stream을 닫는다.

asyncio.StreamWriter.wait_close()

coroutine이며, Stream이 닫힐 때까지 기다린다.


3. Async socket server example code


import asyncio    
    
async def handle_asyncclient(reader, writer):
    print('client :', writer.get_extra_info('peername'))
    while True:
        data = await reader.read(8)
        if data == b'ping':
            writer.write(b'pong')
            await writer.drain()
            print('recv: ping -> send: pong')
        elif data == b'done':
            print('recv: done')
            break
        elif len(data) == 0:
            break
    
    writer.close()
    await writer.wait_closed()
    print('connection was closed')
    
async def server_asyncmain():
    server = await asyncio.start_server(handle_asyncclient,'localhost',8000)  
    if server is not None:
        print('server started')
        #
        await asyncio.sleep(60)
        server.close()
        await server.wait_closed()
        print('server was closed')


if __name__ == "__main__":
    
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None
    
    if loop and loop.is_running():
        print('Async event loop already running')
        tsk = loop.create_task(server_asyncmain())
    else:
        print('Starting new event loop')
        asyncio.run(server_asyncmain())   


3.1. 서버 실행 및 종료

asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

coroutine 이며, socket 서버를 생성한다. ayncio.Server object를 반환한다. 새로운 client가 연결되면, client_connected_cb를 새로운 Task로 생성한다. client_connected_cb은 StreamReader, StreamWriter를 argument로 받는다. StreamBuffer의 디폴트 크기는 64KB이다. 

argument start_serving는 기본으로 True로 되어있어 자동으로 즉시 서버가 실행된다. 만약 False인 경우  Server.start_serving()나 Server.serve_forever()를 호출해야 서버가 실행된다. start_serving과 serve_forever는 coroutine이다.

async with server:
    await server.serve_forever()

asyncio.Server.close()

실행된 서비스를 중지한다. 실행된 서버는 close를 호출해야 종료된다. 예제 코드에서는 60초를 기다린 후 서버 서비스를 중지하도록 했다.

asyncio.Server.wait_closed()

coroutine이며, 서비스가 종료될 때까지 기다린다.


3.2. 추가 정보 받아오기

StreamWriter.get_extra_info(name, default=None)

name 스트링에 명시된 정보를 받아온다. 

socket 관련 정보는 아래와 같은 정보를 받아올 수 있다.
- 'peername' : socket연결된 상대 측의 주소를 받아온다.
- 'socket' : socket.socket 인스턴스를 받아온다.
- 'sockname' : socket의 주소를 받아온다.

추가 사항은 다음 사이트에서 확인할 수 있다.
https://docs.python.org/3.7/library/asyncio-protocol.html#asyncio.BaseTransport.get_extra_info


4. 실행 결과

서버와 클라이언트 실행 결과는 아래와 같다. 통신이 잘 이루어지는 것을 볼 수 있다. 

4.1. 서버 실행 결과

Async event loop already running
server started
client : ('::1', 62207, 0, 0)
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: done
connection was closed
server was closed

4.2. 클라이언트 실행 결과

Async event loop already running
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
connection was closed

댓글

이 블로그의 인기 게시물

windows에서 간단하게 크롬캐스트(Chromecast)를 통해 윈도우 화면 미러링 방법

쉽게 설명한 파티클 필터(particle filter) 동작 원리와 예제

딥러닝을 사용한 로또 번호 예측 실험

간단한 cfar 알고리즘에 대해

아두이노(arduino) 심박센서 (heart rate sensor) 심박수 측정 example code