본문 바로가기
IT/python

[python] asyncio 비동기 프로그래밍의 기초

by 통섭이 2023. 1. 30.

서두

  • 이글은 FastAPI를 사용하다가, 비동기를 좀더 깊이 알고자 작성하게 되었다.
  • python3 에서는 비동기 프로그래밍을 지원하고 있다. 구체적으로는 python3.4 에서는 asyncio 모듈이 추가되었고, python 3.5 에서는 async, await 키워드가 추가 되어서 비동기 프로그래밍을 지원하고 있다. 이를 간단히 설명해보고자 한다. (더 자세한 설명은 아래에 참고 링크를 참고 바랍니다)
  • 설명 순서는 1-1. iterator → 1-2. generator → 2-1. coroutine → 2-2. yield from → 3. future, tasks → 4. event loop → 5-1. asyncio → 5-2. async,await 이다
  • iterator와 generator 는 async의 핵심인 coroutine 을 이해하는데 도움이 되기 때문에 이부터 시작한다.

1-1. Iterator : iterable 한 객체로, 반복 가능하고, 값을 차례대로 꺼낼 수 있는 객체

  • 이 객체는 __iter__() 와 __next__() 함수를 가지고 있다.
    • __iter__() : __next__() method를 가진 객체를 리턴
    • __next__() : 호출 시 지금까지 읽은 값의 다음 값을 리턴
  • 대표적인 iterable 타입 : list, dict, set, str, bytes, tuple, range.
# 대표적인 iteralbe 타입을 이용해 iterator를 알아보자. dir()을 통해 알 수 있듯이, iterable 객체는 __iter__ 를 가지고 있다.
>>> a = [1,2,3]
>>> dir(a)
['__add__', '__class__', '__contains__', '__delattr__', '__delitem__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__iadd__', '__imul__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__reversed__', '__rmul__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', 'append', 'clear', 'copy', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 'reverse', 'sort']

# iterable 객체를 iterator 객체를 이용하면, next()를 통해 다음 인자에 접근할 수 있다
>>> a_iter = iter(a) # a 에는 __iter__ 가 있으니 이렇게 해서 iterator 객체를 받을 수 있다
>>> next(a_iter)
1
>>> next(a_iter)
2
>>> next(a_iter)
3
>>> next(a_iter) # 더이상 꺼낼 게 없어서 StopIteration 예외 발생한다. 이를 try-exception으로 처리 가능하다
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

# iterator는 아래와 같이 for loop를 사용할 수도 있다
>>> a_iter = iter(a)
>>> for a_elem in a_iter:
...     print(a_elem)
...
1
2
3

.

# 아래 예시처럼 iterable 객체를 선언하고, 해당 객체에 __next__()를 구현하면 된다
class Counter(object):
    def __iter__(self):
        iter = Iterator()
        return iter

# iterator 객체는 __init__() 과 __next__() 모두가 있다.
class Iterator(object):
    def __init__(self):
        self.index = 0

    def __next__(self):
        if self.index > 10:
            raise StopIteration
        n = self.index * 2
        self.index += 1
        return n

>>> c = Counter() # iterable 객체를
>>> i = iter(c) # iterator로 선언
>>> next(i)
0
>>> next (i)
2

1-2. Generator : yield를 사용해 편하게 iterator 로 만든 객체

  • 아래 코드를 보면 __init__() 과 __next__()를 구현하는 것보다 yield를 통해 itertor를 구현하는게 훨씬 더 편한 것을 알 수 있다.
  • generator는 호출 시 실행 되지 않고 generator 객체가 리턴된다. 그리고 이는 iterator처럼 next()를 통해 사용 가능하다
>>> def test():
...   for i in range(10):
...     yield i * 2
...
>>> a = test()
>>> a
<generator object test at 0x10532cc80>
>>> next(a)
0
>>> next(a)
2
>>> next(a)
4
>>> dir(a) # yield를 통해 구현한 generator 에도 __iter__ 과 __next__가 존재한다
['__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__next__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'gi_code', 'gi_frame', 'gi_running', 'gi_yieldfrom', 'send', 'throw']

# 위와 같은 코드가 대괄호만 소괄호로 바꾸어 생성 가능하다
>>> [ x * 2 for x in range(10) ]
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> (x * 2 for x in range(10))
<generator object <genexpr> at 0x10532cd60>
>>> a = (x * 2 for x in range(10))
>>> next(a)
0
>>> next(a)
2
>>> next(a)
4

2-1. Coroutine : yield를 기반으로 데이터와 제어권을 전달하는 generator

  • generator 의 단방향 데이터 전달을 양방향으로 하는 기능이 추가되어 상호 협력하는 루틴을 만듬.
  • 즉, coroutine은 generator의 확장된 버전이고, 이 때문에 사람들이 generator = coroutine이라고 많이 이야기한다.
  • 이를 통해 단일 thread 에서 다수의 작업을 concurrent(병렬적과는 다른 동시적인 의미)하게 실행할 수 있게 되었다.
>>> def coroutine1():
...   print('callee 1')
...   x = yield 1
...   print('callee 2: %d' % x)
...   x = yield 2
...   print('callee 3: %d' % x)
...
>>> task = coroutine1() # 실행되지 않고 coroutine 객체가 리턴됨 
>>> i = next(task) # 시작은 next() 로 해야 함
callee 1
>>> i = task.send(10) # 다음부터 값을 send()를 통해 값을 넘겨주면서 실행. 이게 바로 coroutine!! (generator의 확장!!)
callee 2: 10
>>> task.send(20) # 전부 실행되고는 StopIteration으로 끝남
callee 3: 20
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

# 참고사항
>>> task = coroutine1()
>>> task
<generator object coroutine1 at 0x105160c80> # 이렇게 그냥 generator 라고 나온다. 
  • iterator 부분에서 설명한 것처럼 StopIteration 은 try-except-finally 으로 처리가 가능한데, python 2.5 부터는 caller 에서 yield로 멈춰있는 coroutine에 exception 전달 지원이 되서, send() 처럼 throw(type, value, traceback) 을 이용해 coroutine에 전달할 수 있다. 또한 close()를 통해 종료 시킬 수도 있다.

2-2. Yield from : caller - sub-coroutine 간에 손쉽게 데이터 주고 받게 하는 도구

  • coroutine 안에서 다른 sub-coroutine을 호출 할 때 이전처럼 next()를 사용하면 caller 까지 yield 값이 돌아 오지만, send() , throw() , close() 를 쓰려면 부족하다.
>>> def subcoroutine():
...   yield 1
...   yield 2
...
>>> def coroutine():
...   for v in subcoroutine():
...     yield v
...
>>> x = coroutine()
>>> print(next(x))
1
>>> print(next(x))
2
>>> next(x)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration
>>> def subcoroutine():
...   print("Subcoroutine")
...   x = yield 1
...   print("Recv: " + str(x))
...   x = yield 2
...   print("Recv: " + str(x))
...
>>> def coroutine():
...   yield from subcoroutine() # 오른쪽에는 iterable 과 generator 가 온다.

.

  • 또한 return 도 지원해서 StopIteration 을 지원한다. 끝까지 가면 아래와 같이 최종 값이 전달되면서 끝난다.
>>> def test():
...   yield 1
...   return 10
...
>>> x = test()
>>> next(x)
1
>>> next(x)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: 10

.

  • return 을 추가로 지원한 것은 yield from 의 기능을 확장 한 것이다. return 이 지원되면 yield from 에서 직접 값을 받을 수 있다. 이런 형태로는 sub-coroutine은 필요한 만큼 돌고 반환할 수 있다.
def sum(max):
  tot = 0
  for i in range(max):
    tot += i
    yield tot # iterator 중간 값을 caller 에서 받는다
  return tot

def coroutine():
  x = yield from sum(10) # caller가 아니라 중간의 parent coroutine에서 받는다
  print("Total:{}".format(x))

.

  • (여기서 갑자기 어려워지는데... 이부분은 그냥 읽고 넘어가자 아래 내용을 먼저 공부하고 다시 읽어보면 조금 더 이해가 갈 것이다) asyncio 는 event loop(일감을 넣는 queue라고 생각하자)에서는 coroutine 이 task 로 되면서 순차적으로 실행되면서 제어권을 주고받는다. asyncio는 비동기라서 특정 이벤트에 따라 호출되는 것은 일반 callback 일 수 있고, await으로 다른 coroutine 결과를 받은 비동기 coroutine일 수 있다.
>>> async def coroutine():
...   print("coroutine")
...
>>> a = coroutine()
>>> a # async def 는 coroutine 객체가 리턴된다
<coroutine object coroutine at 0x1053d01c0> 
  • asyncio는 event loop에서 coroutine을 사용하는데, 정확한 동작을 알기 위해서는 future, task와 event loop를 알면 이해에 도움이 되니 이를 먼저 알아보자.

3. future, tasks : event loop 에서 사용되는 코루틴 단위

  • Future
    • 어떤 작업의 실행 상태 및 결과를 저장. PENDING, CANCELLED, FINISHED 세가지 중 하나를 가지고, 작업의 실행 개시역할은 수행하지 않는다.
    • future.result() 사용해 work 함수 결과 완료하거나 exception 을 받을 수 있다
    • executor.submit() 으로 thread pool 에서 돌릴 함수를 등록하면 future을 리턴한다. 함수는 비동기로 실행된다
    • futures.as_completed() 쓰면 완료된 순서대로 generator 리턴 받는다.
    • add_done_callback() 을 써서 해당 퓨처 객체가 완료 될 때 호출될 콜백을 등록할 수 있다. 이는 event loop 동작시 중요한 역할을 한다!
  • tasks
    • 퓨처를 상속하는 클래스이다. 퓨처 기능 상속받아 실행 상태 및 결과를 저장하고, + 어떠한 작업의 실행을 개시도 한다.
    • 이를 위해 coroutine이 필요하다. task 객체는 coroutine 객체를 갖고 있는 특별한 객체인데, asyncio.run() asyncio.create_task() 를 호출할 때 인자로 coroutine 을 넘긴다. 그러면 task 객체가 생성되면서 event loop에 예약된다.
    • 생성되는 즉시 현재 thread에 설정된 event loop에게 자신의 __step() 메소드를 호출할 것을 요청한다.
    • __step() 은 coroutine 실행하는 메소드인데, 그렇게 실행된 coroutine은 await 을 이용해 다른 coroutine 부르게 되는데 이것을 coroutine chain이라한다.
    • event loop 와 task의 연쇄 반응에 대해서는 밑에서 다시 설명하도록 한다.
    • 동시 실행(concurrent)을 위해서는 asyncio.create_task() 함수 호출해 task 추가 생성 실행해야 한다.

4. event loop : 무한 루프를 돌며 매 루프마다 task를 하나씩 실행시키는 로직이다

  • coroutine 객체는 생성 및 반환된다고 바로 실행되지 않는다. await, asyncio.run(), asyncio.create_task() 중 하나를 써야 한다.
  • 이때 await은 coroutine 내에서만(async함수 내에서 다른 async 함수를 호출 할 때) 쓸 수 있어서 첫 실행 용도는 아니다. 일반적인 시작점은 asyncio.run() 이다. asyncio.create_task() 는 동시 실행 하고 싶은 경우에 쓴다.
  • asyncio.run() 함수는 새 루프 설정하고, 인자로 넘어온 coroutine 객체를 task로 예약하여 실행한 뒤, 완료되면 event loop를 닫는 역할을 한다.
loop = asyncio.get_event_loop()
loop.run_until_complete(first_coroutine())
loop.close()
  • event loop를 사용한 비동기 동작 원리
    • 루프를 생성한다
    • 최초 진입점인 first_coroutine 이 task 객체로 생성되고, 루프에 예약된다
    • __step() 메소드를 호출해 실행된다 (task객체의 _coro 인자에 coroutine이 있는데 이 corountine에 send() 를 호출해 실행)
    • await 을 마주찰때마다 coroutine chain을 형성한다.
    • 종착지인 sleep 이나 Network I/O 가 발생하는 부분에 도달하면, future 객체를 새로 생성하여 await 한다.
    • future 의 __await__ 메소드는 자기 자신을 yield 하도록 구현되어 coroutine chain을 따라 task 객체의 __step() 메소드까지 전달된다. 그렇게 전달된 future 객체는 task가 처리한다.
    • task 객체는 yield 된 퓨처를 받으면 우선 이것을 저장하고, add_done_callback() 메소드를 호출해, 완료 상태가 될 때 event loop에게 실행을 예약할 콜백 함수를 등록한다.
    • __step() 을 등록한는 것이다. 이후 자신의 실행 중단하고 루프에게 제어권 넘긴다
    • 루프는 실행 준비가 된 task들을 실행한다.
    • 만약 더 이상 예약해둔 task가 없으면 event loop는 끝난다.
    • loop.close() 해서 닫아주자. (남아있는 모든 데이터를 제거)

5-1. asyncio

  • python 3.4에서는 event loop 방식의 비동기 프로그래밍인 asyncio 가 표준 라이브러리로 포함되었다.
  • 지금까지 위에서 복잡하게 설명했지만, asyncio는 event loop 에 task를 예약해서 blocking 이 발생하는 경우는 제어권을 다른 task로 넘겨서 실행해 concurrency(동시성)를 달성하는 도구라고 생각하면 될 것 같다.
  • asyncio 에서도 asyncio.future를 제공한다. 차이점은 일반 함수가 아니라, coroutine을 전달하는 것이고, future.result() 함수가 blocking 되지 않는다.
  • asyncio 는 future 없이도 callback만 사용 가능하다. call_later() 을 사용하면 되는데, 이러면 asyncio 장점을 살릴 수 없다. coroutine 은 future을 이용해서 사용한다. 실제적으로 future 를 직접 사용하지 않고, 수행까지 할 수 있는 Task class 를 사용한다.
  • coroutine은 일반적으로 caller에서 반복적으로 next() send() 를 이용하여 yield에 멈춘 coroutine을 재개 시킨다. asyncio 의 event loop에서 send() 를 반복적으로 호출해서 coroutine도 event loop에서 마치 별도의 thread 에서 도는 셈이다.

5-2. async, await

  • python 3.5에서 coroutine 을 지정하는 async 와 yield 를 대체하는 await 가 추가 되었다. 이를 yield 를 하는 generator 기반의 coroutine(iteratore부터 generator 확장한것)과 달리 명명하기 위해 native coroutine이라고 한다. (그래서 yield를 사용하면 객체가 generator 타입으로 나오고, async def 를 사용하면 coroutine으로 나온다!)
  • native coroutine 은 함수 앞에 async def 를 사용하고 이는 호출 시, await을 붙인다.
  • awaitable 객체는 기존의 generator based coroutine 과 유사하게 __await__() 로 generator 얻고, 이를 send() 를 이용하여 반복한다
  • await 은 뒤에 있는 coroutine 을 event loop에 등록하고, event loop 에게 실행권을 반환한다. event loop와 coroutine 이 실행권을 주고 받으며 멀티태스킹 달성한다.
  • asyncio.create_task() 는 파라미터로 들어오는 coroutine 을 등록하고, 끝나면 결과 받을 수 있는 future 객체를 반환. future 도 awaitable이라 await을 앞에 붙여 event loop 에 실행권을 넘기면서 종료까지 기다릴 수 있다.
  • 비동기 가능하려면 그 작업을 다른 어딘가 맡겨 놓고 퓨처 객체를 await 하면서 실행 중인 task의 제어를 event loop에 넘겨야 한다.
  • 간단히 얘기해서 loop.run_in_executor() 메소드는 동기 함수를 별도의 쓰레드에 넘겨서 마치 sleep 혹은 I/O coroutine처럼 사용하게 해준다
  • 아래 예시에서 await 뒤의 loop.run_in_executor 안에 동기함수인 time.sleep 이 들어갔지만, 비동기 동작을 위해 직접 루프에 넣어줬다. 첫 인자 None은 기본 실행기 사용하겠다는 의미고, 직접 실행기 지정해 워커 쓰레드를 원하는 개수만큼 생성 가능하다. 두번째 인자는 함수 이름이고, 세번째 인자부터는 그 함수를 호출할때 넘길 인자들이다.
async def sleep(sec):
    await loop.run_in_executor(None, time.sleep, sec)  # time.sleep(sec)
    return sec

async def main():
    sec_list = [1, 2]
    tasks = [asyncio.create_task(sleep(sec)) for sec in sec_list]  # [Task 1 객체, Task 2 객체]
    tasks_results = await asyncio.gather(*tasks)  # [Task 1 객체의 결과 값, Task 2 객체의 결과 값]
    return tasks_results

start = time.time()

loop = asyncio.get_event_loop()
result = loop.run_until_complete(main())
loop.close()

end = time.time()

print('result : {}'.format(result))
print('total time : {0:.2f} sec'.format(end - start))

마무리

  • asyncio 에 대해 적어봤는데 적으면서도 아주 명확하진 않은 부분들이 있다...;; 그럼에도 불구하고 여기까지 읽어주신 분들께 감사드린다. 나중에 한번 수정을 해야겠다...;;

참고
https://blog.humminglab.io/posts/python-coroutine-programming-1/
https://it-eldorado.tistory.com/159?category=749661

댓글