python asyncio를 이용한 async socket server client example code
본 포스트는 python asyncio 라이브러리를 사용한 async socket 서버와 클라이언트 예제 코드를 싣고 있다. 이전 non-blocking socket의 예제를 asyncio 라이브러리에 맞게 변형하였다.
python socket API 와 non-blocking socket server client example code
1. asyncio (Asynchronous I/O)
asyncio는 async/await 구문을 사용해 multi-thread처럼 동시에 진행하는 코드를 작성하는 라이브러리다. 네트워크 및 웹 서버, DB 연결, 분산 작업 대기열을 제공하는 분산 프레임 워크의 기반으로 사용된다고 한다. ( https://docs.python.org/3.7/library/asyncio.html)
async socket server/client 구현에 필요한 asyncio 라이브러리 API는 예제 코드를 보면서, 설명하도록 하겠다.
2. Async socket client example code
import asyncio
async def pingpong_client():
try:
reader, writer = await
asyncio.open_connection(host='localhost',port=8000)
except OSError:
print('connection fail')
return
for _ in range(10):
writer.write(b'ping')
print('send: ping')
data = await reader.read(8)
print('recv:',data.decode())
await asyncio.sleep(1)
writer.write(b'done')
print('send: ping')
writer.close()
await writer.wait_closed()
print('connection was closed')
if __name__ == "__main__":
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
print('Async event loop already running')
tsk = loop.create_task(pingpong_client())
else:
print('Starting new event loop')
asyncio.run(pingpong_client())
2.1. coroutine 실행
asyncio는 coroutine으로 구현되어 있으며, 클라이언트 또한 coroutine으로 구현했다. coroutine는 'async def'를 사용하여 구현할 수 있다.
corutine의 실행은 asyncio.run을 사용하여 실행할 수 있다.
asyncio.run(coro, *, debug=False)
하지만 asyncio.run 실행 시 아래와 같이 RuntimeError가 발생하는 경우가 있다.
"RuntimeError: asyncio.run() cannot be called from a running event loop"
이 경우 실행중인 이벤트 루프를 받아와 coroutine 함수를 Task로 감싸고 실행해야 한다. 이벤트를 받아오고 Task를 생성하는 API는 아래와 같다. (python 3.7)
asyncio.get_running_loop()
asyncio.create_task(coro)
get_running_loop()는 현재 실행중인 루프가 없을 때 RuntimeError를 발생하기 때문에 이에 대한 처리도 필요하다.
2.3. Host Connection
asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)
coroutine 함수,
awaitable object이다.
주어진 호스트와 연결 후 asyncio.StreamReader,
asyncio.StreamWriter object를 반환한다. 예제 코드의 pingpong_client를 참조.
호스트와 연결에 실패할 경우 아래와 같은 OSError가 발생한다.
OSError: Multiple exceptions: [Errno 10061] Connect call failed ('::1', 8000, 0, 0), [Errno 10061] Connect call failed ('127.0.0.1', 8000)
이 경우를 대비하여 에러 처리 코드는 다음과 같이 할 수 있다.
try:
reader, writer = await
asyncio.open_connection(host='localhost',port=8000)
except OSError:
print('connection fail')
return
2.4. Stream Object
Stream은 네트워크 연결 시 별도의 socket API를 사용하지 않고도 비동기로 데이터를 주고 받을 수 있도록 만들어 졌다. 위 open_connection시 reader, writer 두개의 Stream Object가 반환된다.
asyncio.StreamReader.read(n=-1)
coroutine이며, Stream으로 부터 n바이트의 데이터를 읽어온다. 만약 n이 -1이면, EOF까지 읽어 온다. 만약 데이터에 EOF가 없는 경우 데이터가 반환되지 않는다.
asyncio.StreamWriter.write(data)
Stream에 데이터를 쓴다.
asyncio.StreamWriter.close()
Stream을 닫는다.
asyncio.StreamWriter.wait_close()
coroutine이며, Stream이 닫힐 때까지 기다린다.
3. Async socket server example code
import asyncio
async def handle_asyncclient(reader, writer):
print('client :', writer.get_extra_info('peername'))
while True:
data = await reader.read(8)
if data == b'ping':
writer.write(b'pong')
await writer.drain()
print('recv: ping -> send:
pong')
elif data == b'done':
print('recv: done')
break
elif len(data) == 0:
break
writer.close()
await writer.wait_closed()
print('connection was closed')
async def server_asyncmain():
server = await
asyncio.start_server(handle_asyncclient,'localhost',8000)
if server is not None:
print('server started')
#
await asyncio.sleep(60)
server.close()
await server.wait_closed()
print('server was closed')
if __name__ == "__main__":
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
print('Async event loop already running')
tsk = loop.create_task(server_asyncmain())
else:
print('Starting new event loop')
asyncio.run(server_asyncmain())
3.1. 서버 실행 및 종료
asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
coroutine 이며, socket 서버를 생성한다. ayncio.Server object를 반환한다. 새로운 client가 연결되면, client_connected_cb를 새로운 Task로 생성한다. client_connected_cb은 StreamReader, StreamWriter를 argument로 받는다. StreamBuffer의 디폴트 크기는 64KB이다.
argument start_serving는 기본으로 True로 되어있어 자동으로 즉시 서버가 실행된다. 만약 False인 경우 Server.start_serving()나 Server.serve_forever()를 호출해야 서버가 실행된다. start_serving과 serve_forever는 coroutine이다.
async with server:
await server.serve_forever()
asyncio.Server.close()
실행된 서비스를 중지한다. 실행된 서버는 close를 호출해야 종료된다. 예제 코드에서는 60초를 기다린 후 서버 서비스를 중지하도록 했다.
asyncio.Server.wait_closed()
coroutine이며, 서비스가 종료될 때까지 기다린다.
3.2. 추가 정보 받아오기
StreamWriter.get_extra_info(name, default=None)
name 스트링에 명시된 정보를 받아온다.
socket 관련 정보는 아래와 같은 정보를 받아올 수 있다.
- 'peername' : socket연결된 상대 측의 주소를 받아온다.
- 'socket' : socket.socket 인스턴스를 받아온다.
- 'sockname' : socket의 주소를 받아온다.
추가 사항은 다음 사이트에서 확인할 수 있다.
https://docs.python.org/3.7/library/asyncio-protocol.html#asyncio.BaseTransport.get_extra_info
4. 실행 결과
4.1. 서버 실행 결과
Async event loop already running
server started
client : ('::1', 62207, 0, 0)
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: ping -> send: pong
recv: done
connection was closed
server was closed
4.2. 클라이언트 실행 결과
Async event loop already running
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
recv: pong
send: ping
connection was closed
댓글
댓글 쓰기