0%

비동기적으로 Python 다루기

0. 안녕, 비동기!

Python으로 동시다발적 HTTP 요청을 보내는 작업을 해야할 필요가 생겼습니다. Python 3에서 asynchronous한 작업을 처리하기가 수월해졌다는 이야기를 어디서 들은 것 같아서 구글링을 간단히 해봤습니다...만 내용들이 이해하기가 쉽지 않았습니다. 이와 관련된 수 많은 용어들의 정의와 그 범위가 사람들마다 미묘하게 달랐기 때문입니다. 뿐만 아니라, 'asynchronous'라는 용어 자체를 듣기만 해도 뭔가 어렵고 복잡하고 막연하게 두려운 느낌도 듭니다.

Async, Async Everywhere

하루 날을 잡고 asynchronous python과 관련된 자료들을 왕창 읽어보았습니다. 이 글은 키워드 generator, coroutine, asyncio와 신택스 yield, yield from, async, await 에 대한 이해가 없던 제가, 나름대로 이해를 하는 과정을 기록한 것입니다. 혹시나 제가 잘못 이해하고 있는 부분이 있으면 말씀해주시면 감사하겠습니다.

1. 선지자들의 기록

각 단계를 작성하기 위하여 다음과 같은 자료들을 참고하였습니다.


2. Generator

우리가 흔히 알고있는 Python의 함수, 즉 def sth_familiar_with(): ...과 같이 정의된 함수를 호출할 경우, 해당 함수의 시작부터 끝(혹은 return을 만날때)까지 진행이 됩니다. 그 뒤에 stack frame은 소멸되므로, 이 함수를 다시 호출할 경우 함수는 처음부터 다시 실행이 됩니다.

Generator는 약간 다릅니다. 결과물을 그때그때 생성해내는, 일종의 on-demand function 정도로 생각해볼 수 있겠습니다. 다음의 예시를 보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
>>> def hello_twice():
... print ('Gonna say hello')
... yield 'Hello'
... print ('Gonna say hello again')
... yield 'Hello again!'

>>> gen = hello_twice()
>>> print(next(gen))
Gonna say hello
Hello

>>> print(next(gen))
Gonna say hello again
Hello again!

위와 같이 함수를 정의할 경우, CPython 컴파일러가 함수 내 yield 키워드를 발견합니다. 그리고, 해당 함수를 generator function으로 처리합니다.

위의 7번 라인과 같이, generator 함수가 호출되면 generator object가 생성됩니다. 8번 라인에서 next(gen)을 실행하면, hello_twice의 3번 라인까지 실행이 됩니다. 실행 도중 yield 키워드를 만났을 경우, 해당 함수의 실행은 일시적으로 보류된 후 값을 return 합니다. 다시 next(gen)을 실행할 경우 (라인 12), 일시 보류되었던 함수의 진행이, 다음 yield문을 만날때까지 계속됩니다. (라인 4~5) 이 과정이 함수의 끝에 도달할 때 까지 반복됩니다.

Python의 stack frame은 (이름과 다르게) 메모리의 stack 영역에 올라가는 것이 아니라, heap 영역에 할당됩니다. 이러한 Python의 특징 덕분에, generator가 실행을 일시정지하고 값을 return할 때에 stack frame을 유지할 수 있습니다. 함수가 yield를 만나는 순간, 해당 stack frame과 코드를 어디서부터 재개할지에 대한 정보(last instruction pointer)를 보관하는 것입니다. (Syed Komail Abbas)

3. Coroutine

일단, 'Coroutine'이라는 용어가 Python-specific keyword가 아니라는 사실을 인지할 필요가 있을 것 같습니다.

Coroutines are computer-program components that generalize subroutines for non-preemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.

따라서, 질문을 "Python의 coroutine은 무엇인가?"로 잡는 것 보다는, "Python에서는 coroutine을 어떻게 구현하였나?"로 생각하는 쪽이 혼선을 줄이는 방향이라고 생각합니다.

3-1. Generator based 'Coroutine'

Python이 가지고 있는 generator를 이용한다면, coroutine을 구현할 수 있어 보입니다. 함수 실행을 일시정지하거나 재개할 수 있는 entry point들이 존재한다는 점과, 그 때마다 값을 yield 할 수 있는 generator의 성질을 잘 이용하는 것이지요.

Python generator와 관련한 두가지 기능을 더 살펴봅시다. 하나는 yield되어 실행이 일시정지 되어있는 generator에게 값을 전달하는 기능입니다. 아래의 예시를 보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
>>> def echo_me():
... msg_first = yield 'Ready to echo'
... msg_second = yield msg_first
... yield msg_second

>>> gen = echo_me()
>>> print(next(gen))
Ready to echo

>>> print(gen.send('Hello'))
Hello

>>> print(gen.send('There!'))
There!

gen.send('Hello')로 인해 generator의 실행이 재개 될 경우, yield msg_first 신택스 자체가 'Hello'로 처리됩니다. 이처럼, 실행이 멈춰있는 generator object의 send method를 호출함으로써, generator에게 데이터를 전달할 수 있습니다.

다른 하나는, generator에서 다른 generator를 yield하는 기능입니다. 이는 PEP380 (Syntax for Delegating to a Subgenerator)에 명시되어있습니다. 아래의 예시를 보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> def inner():
... yield 'INNER'
... yield 'GENERATOR'
... yield 'YEAH!'

>>> def outer():
... yield 'Get ready for ...'
... yield from inner()

>>> gen = outer()
>>> print(next(gen))
Get ready for ...

>>> print(next(gen), next(gen), next(gen))
INNER GENERATOR YEAH!

라인 8과 같이 yield from 키워드를 사용하여 generator 내부에서도 다른 generator를 생성하여 yield를 사용할 수 있습니다. 즉, yield from inner()from x from inner(): yield x와 같은 역할을 하는 셈입니다. PEP380에 따르면, 이 신택스는 Python 3.3부터 추가되었다고 합니다. (따라서, Python 2.7에서 위의 예시를 실행하면 yield from 신택스를 이해하지 못하고 에러가 발생합니다.)

위에서 정의된 echo_meinner, outer는 일종의 coroutine입니다. (실행을 일시정지하고 재개할 수 있는 entry points들이 존재하기 때문입니다.) Generator를 이용하여 만들었으므로, 'generator based coroutine'라고 생각할 수 있겠습니다. (여기까지는 generator와 coroutine이라는 용어의 구분이 명확하지 않습니다.)

이러한 generator based coroutine들이 값을 yield 할 경우, 항상 자신의 caller에게로 실행의 순서가 넘어가게 됩니다. 이러한 coroutine을 'Asymmetric coroutine'이라고 부르기도 합니다. 이와 구분되는 개념인 'Symmetric coroutine'의 경우에는 하나의 coroutine에서 값을 yield할 때, 다른 coroutine에게 실행 순서를 넘겨줄 수 있는 통제권을 가지고 있습니다. 무조건 자신의 caller에게 실행 순서를 넘겨주어야 하는 asymmetric coroutine과는 다르죠. (Moura)

3-2. asyncio Module

무조건 caller와만 상호작용하는 반쪽짜리 coroutine이라니.... 만약 caller가 이러한 asymmetric coroutine들을 유기적으로 엮어주는 역할을 해준다면 (예를 들면 하나의 coroutine이 suspend 되었을 때 다른 coroutine을 실행시켜주는 역할을 해준다면), 전체적으로 보았을 때 각각의 coroutine들이 서로에게 실행 순서를 넘겨주는 것 처럼 보일 수 있겠습니다.

asyncio 모듈이 이러한 역할을 해줍니다. 아래의 예시를 보겠습니다. asyncio docs의 예시 중 하나를 참고하였습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> import asyncio
>>> @asyncio.coroutine
... def echo_twice(msg):
... print(msg)
... yield from asyncio.sleep(1)
... print(msg)

>>> loop = asyncio.get_event_loop()
>>> asyncio.ensure_future(echo_twice('Hello'))
>>> asyncio.ensure_future(echo_twice('There'))
>>> loop.run_forever()
Hello
There
# 주춤...
Hello
There

asyncio.coroutine 데코레이터를 통해 echo_twice 함수를 coroutine으로 정의합니다. asyncio.ensure_future(...)를 통해, 실행할 coroutine을 생성합니다. asyncio.get_event_loop()로 받은 event loop의 run_forever를 실행함으로써 coroutine들을 실행합니다.

출력된 메시지를 확인해보면, 두 개의 coroutine이 동시에 진행된 것을 확인할 수 있습니다. echo_twice coroutine 실행 중 라인 5에 다다르면 asyncio.sleep(1)이 1초 뒤에 실행이 완료되는 (1초 뒤에 yield할 수 있는) coroutine을 생성합니다. 이 coroutine을 yield from하게 되면 event loop에게로 통제권이 넘어가고, event loop에 의해서 다른 coroutine이 실행됩니다. (Mansnun)

이러한 방법을 통해, 여러 coroutine들이 동시에 실행될 수 있습니다. 여기서 말하는 '동시에' 실행된다는 말은 parallelly가 아니라 concurrently를 의미합니다. Event loop은 single thread이므로 한번에 실행 되는 coroutine은 하나입니다. 여러 coroutine들이 하나의 thread를 타이밍 좋게 나누어 가지는 셈입니다. (Masnun) Concurrency에 관한 이야기와 asyncio에 관한 조금 더 디테일한 설명은 뒤에서 다시 하겠습니다.

3-3. Native Coroutine

Python 3.5부터 새로운 신택스인 asyncawait가 등장합니다. (PEP492) 이를 이용하면 위에서 선언한 echo_twice를 아래와 같이 다시 정의할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> import asyncio
>>> async def echo_twice(msg):
... print(msg)
... await asyncio.sleep(1)
... print(msg)

>>> loop = asyncio.get_event_loop()
>>> asyncio.ensure_future(echo_twice('Hello'))
>>> asyncio.ensure_future(echo_twice('There'))
>>> loop.run_forever()
Hello
There
# 주춤...
Hello
There

@asyncio.coroutine 데코레이터 대신 async 키워드를, yield from 키워드 대신 await 키워드를 사용했습니다. Python의 신택스만을 이용하여 정의한 coroutine이니, 이를 'Native Coroutine'이라고 불러도 괜찮겠습니다. (Masnun) 실행 도중 라인 4의 await asyncio.sleep(1)에 다다르면, echo_twice 함수는 event loop에게 컨트롤을 넘겨줍니다. 이 때, generator based coroutine에서 사용되는 신택스와 native coroutine에서 사용되는 신택스는 혼용될 수 없습니다. 따라서, 아래와 같은 코드는 허용되지 않습니다.

1
2
3
4
5
6
7
8
9
10
11
12
>>> @asyncio.coroutine
... def this_is_nono():
... await asyncio.sleep(2)
...
File "<stdin>", line 3
SyntaxError: 'await' outside async function

>>> async def this_is_nono():
... yield from asyncio.sleep(2)
...
File "<stdin>", line 2
SyntaxError: 'yield from' inside async function

4. Concurrency와 Parallelism

CPython의 메모리 관리는 thread-safe하지 않습니다. 따라서 GIL(Global Interpreter Lock)이라 불리는 mutex를 사용하여 여러 쓰레드가 동시에 Python 코드를 실행하지 못하도록 보호합니다. (ThomasWouters)

여기서 머리가 복잡해졌습니다. 그럼 위에서 말했던 동시에 실행되는 coroutine은 어떻게 '동시에' 돌아갈 수 있는 것일까요? 이를 잘 이해하기 위해서는 concurrency와 parallelism을 구분하여 정의내릴 필요가 있습니다.

Concurrency is about dealing with lots of things at once Parallelism is about doing lots of things at once

두 용어는 다른 차원의 개념입니다. Parallelism은 하나의 작업이 여러개의 작은 작업들로 나뉘어져 물리적으로 동시에 진행되는 것을 말합니다. 여러개의 CPU가 동시에 돌아가는 것을 생각하면 되겠습니다. 반면, Concurrency는 여러 작업들이 같은 순간에 (physically or logically) 처리되고 있는 상태입니다.

이 정의를 통해 알 수 있는 사실이 몇 가지 있습니다. 1. Single Thread 환경에서도 concurrent한 작업을 수행할 수 있다. (CPU time-slicing 혹은 Application level의 scheduling을 활용) 2. Parallel한 작업은 Concurrent하다. 그 반대는 항상 성립하지 않는다. (Skrew Everything)

Python의 coroutine들은 concurrent하게 실행됩니다. Parallel하게 돌아가는 것이 아닙니다. 앞서 이야기 했듯이, Python은 GIL로 인하여 한번에 하나의 thread만 실행이 됩니다. 따라서, 위에서 이야기 했던 asynchronous한 coroutine들은 'One process, One thread' 환경에서 concurrent하게 돌아가는 것입니다. 아래는 Skrew Everthing님의 포스트에 삽입된 Parallelism과 Concurrency without Parallelism을 시각화한 그림입니다. Python의 GIL을 고려하였을 때, 그림의 아랫부분에 있는 Concurrency without Parallelism이 Python에서의 coroutine들의 동작 형태와 유사하다고 볼 수 있습니다.

그러므로, "하나의 작업을 여러개의 coroutine으로 나누어 실행 할 경우 작업이 빨라진다"라는 statement는 항상 옳지 않습니다. 'One process, One thread' 환경에서 돌아가기 때문에, coroutine을 context switching하는 cost로 인하여 경우에 따라서는 performance가 더 나빠질 가능성도 있습니다.


5. More about async, await and asyncio

Awaitable Objects

__await__() method가 구현되어 있는 object를 awaitable object라고 합니다. (Python Docs) 이름 그대로, await 될 수 있는 object를 의미합니다. Coroutines, Tasks, Futures가 awaitable object에 속합니다.

Coroutines 앞서 설명한 coroutine들은 awaitable object입니다. 따라서, 다른 coroutine에서 await 될 수 있습니다.

1
2
3
4
5
6
7
8
9
10
import asyncio

async def coro():
return 'hello from coro!'

async def main_coroutines():
coro() # Coroutine object입니다. 하지만 await 되지 않았으므로 실행되지는 않습니다.
print(await coro()) # 이 경우, `await coro()` 가 인사를 합니다.

asyncio.run(main_coroutines())

Futures Future는 비동기 연산의 결과물을 대신하는 low-level awaitable object입니다. (A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation. / Represents the result of an asynchronous computation.) Future object가 await될 경우, 다른 곳에서 resolve 되기 전 까지 해당 coroutine이 기다리게 됩니다. 일반적인 경우, application level 코드에서 Future object를 생성 할 필요는 없습니다.

Tasks Task는 coroutine을 concurrent하게 실행하기 위하여 schedule을 해 놓은 object입니다. asyncio.create_task() 함수 등을 이용하여 coroutine을 Task로 wrapping 할 경우, 해당 coroutine은 자동적으로 schedule 되어서 실행 가능한 상태가 됩니다. 따라서, 위의 main_coroutines()에서는 main_coroutine()coro()가 concurrent하게 돌아가지 않지만, 아래의 main_tasks()에서는 main_tasks()coro()가 concurrent하게 실행될 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio

async def coro():
return 'hello from coro!'

async def main_tasks():
task = asyncio.create_task(coro()) # `coro()` coroutine과 `main_tasks()` coroutine이
# concurrent하게 돌아갈 수 있도록 scheduling 합니다

print(await task) # `await task` 가 인사를 합니다.

asyncio.run(main_tasks())

asyncio module

asyncio의 method 중 핵심적인 몇 가지를 정리해보면 아래와 같습니다.

asyncio.run(coro, *, debug=False)

전달 된 coroutine을 실행하고, asyncio의 event loop를 관리합니다. 같은 thread에서 다른 asyncio event loop가 실행중일 경우에는 실행할 수 없습니다. Python 3.7에서 새로 추가되었습니다.

asyncio.create_task(coro)

전달 된 coroutine을 wrapping하여 Task object를 만들고 schedule 합니다. Python 3.7에서 새로 추가되었습니다. 이전 Python에서는 asyncio.ensure_future()를 사용합니다.

asyncio.sleep(delay, result=None, loop=None)

delay초 동안 대기합니다. result가 주어진 경우, sleep이 완료된 후 해당 값을 caller에게 리턴합니다. sleep() 함수는 현재 task를 suspend 시키므로, 이 시점에 다른 task가 실행될 수 있습니다.

asyncio.gather(*aws, loop=None, return_exceptions=None)

주어진 awaitable objects들을 concurrent하게 실행합니다. 주어진 awaitable object 중 coroutine이 있으면, 자동으로 Task로 schedule 됩니다. 모든 awaitable들이 정상적으로 끝나면, 각 awaitable의 return value의 list가 return 됩니다. 순서는 aws의 순서와 동일합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def whoami_after_sleep(name, t):
print(f'I am {name} and gonna sleep for {t} seconds.')
await asyncio.sleep(t)
print(f'I am {name}. I slept for {t} seconds.')
return ('result', name, t)

async def main():
await asyncio.gather(
whoami_after_sleep('A', 1),
whoami_after_sleep('B', 2),
whoami_after_sleep('C', 3),
)

asyncio.run(main())
# I am A and gonna sleep for 1 seconds.
# I am B and gonna sleep for 2 seconds.
# I am C and gonna sleep for 3 seconds.
# I am A. I slept for 1 seconds.
# I am B. I slept for 2 seconds.
# I am C. I slept for 3 seconds.

asyncio.wait_for(aw, timeout, *, loop=None)

주어진 awaitable을 timeout초 동안 기다립니다. aw가 coroutine일 경우, 자동으로 Task로 schedule 됩니다. 해당 시간이 지나면, asyncio.TimeoutError가 발생합니다.

...

추가적인 설명은 Python Doc을 참조하면 되겠습니다.

Using asyncio based module : aiohttp

aiohttp는 asyncio를 활용한 HTTP 클라이언트/서버 모듈입니다. 이 모듈과 위의 내용들을 바탕으로 다수의 웹사이트를 불러오는 asynchronous를 만들어보면 다음과 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import aiohttp
import asyncio
import time

urls = ['https://jsonplaceholder.typicode.com/posts/{}'.format(id) for id in range(1, 101)]

async def fetch(url):
print('start fetching', url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
assert res.status == 200
return await res.text()

async def main():
await asyncio.gather(*[fetch(url) for url in urls])


start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print ('Time Elapsed : {} sec'.format(end-start)) # 1.61 sec

이와 같은 동작을 하는 synchronous한 스크립트는 시간이 얼마나 걸릴까요? 아래와 같이 간단한 테스트를 해보았습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import requests
import time

urls = ['https://jsonplaceholder.typicode.com/posts/{}'.format(id) for id in range(1, 101)]

def main():
with requests.Session() as client:
for url in urls:
res = client.get(url)
assert res.status_code == 200
print(res.status_code)

start = time.perf_counter()
main()
end = time.perf_counter()
print ('Time Elapsed : {} sec'.format(end-start)) # 10.41 sec

잘 짜여진 테스트는 아니지만, 10.41초에서 1.61초로 소요 시간이 크게 줄어들었음을 확인할 수 있습니다. 작업의 I/O 대기시간이 길어질수록 이러한 접근 방법은 더욱 더 효과적입니다.