Intro
비동기 프로그래밍이란 무엇일까요? 작업이 완료될 때까지 기다리지 않고 잠재적으로 오래 실행되는 작업을 시작하여 해당 작업이 실행되는 동안에도 다른 이벤트에 응답할 수 있게 하는 기술입니다. 이러한 비동기 프로그래밍이 필요한 이유는 Server Request, Disk Read/Write 등 CPU작업 없이 결과를 기다리기 심심할 때(?) 그 사이에 CPU작업을 할 수 있도록 설계하기 위함입니다.
만약 Server와 통신하는 프로그램이 항상 동기적으로 작성하게 된다면 Server Request를 요청한 다음에 Response가 올때까지 아무 작업도 하지 못한 채 기다려야만 합니다. CPU는 놀고있는데 말이죠. 이렇게 외부 요청, Disk작업이 진행될 때에도 CPU가 멈추지 않고 계속 진행하도록 하는 것이 바로 비동기 프로그래밍입니다.
Multi thread를 생성하는것과는 좀 다릅니다. Multi thread는 CPU가 가지는 여러 Thread를 동시 다발적으로 활용하기 위함입니다. 반면에 비동기 프로그래밍에서는 기본적으로 CPU가 Single Main thread를 통해 작업을 처리하며, CPU가 아닌 외부 작업 (예: I/O 작업)은 Main thread에서 실행되는 Event loop를 통해 관리됩니다.
Event loop는 비동기 작업을 스케쥴링하고 실행하는 기능 입니다. 그리고 비동기 작업 단위를 Python에서는 보통 async def로 비동기 함수로 정의하며 이를 Coroutine이라 합니다. Coroutine으로 정의한 작업이 완료되면 완료되면 Main thread로 결과가 전달됩니다. 이 과정에서 thread는 필요에 따라 생성되거나 재사용되며, 모든 작업이 끝난 뒤에는 반환됩니다. 이렇게 별도의 thread를 모아놓은 것을 Thread Pool이라 합니다. 이러한 기능들을 제공하는 library가 바로 asyncio입니다.
지금까지 설명을 정리하자면 다음과 같습니다.
- 비동기의 의의 : CPU 외에서 처리하는 작업(예 : Server, I/O)이 동작하는 동안에도 CPU를 동작하게 하기 위함.
- Event loop : 비동기 작업 스케쥴링, 실행하는 주체, asyncio.get_event_loop()로 생성할 수 있다.
- Coroutine : 비동기 작업 단위 혹은 함수. async def로 정의할 수 있다.
- Thread Pool : 작업을 병렬로 처리하기 위한 Thread 모음. concurrent.futures.ThreadPoolExecutor로 Thread 단위 작업에 대해 비동기 처리가 가능
물론 Event loop에 CPU작업을 할당하는 것도 가능합니다. 하지만 이는 비동기 프로그래밍의 취지에 맞지 않는 Scene이며, 때에 따라서는 속도가 기존보다 더 느려질 수 있습니다. 따라서 Event loop가 관리하는 비동기 작업들은 CPU 작업이 아닌 다른 작업이라 생각해도 됩니다.
그럼 비동기의 동작 예제를 알아보도록 하겠습니다.
예제 코드 : API 호출, CPU 작업 처리
다음은 비동기 프로그래밍의 특징과 활용 방법을 보여주는 코드입니다. 이 코드에서는 API 호출과 CPU 연산 작업을 결합하여 다양한 비동기 시나리오를 처리합니다. 먼저 기본 API 요청 함수부터 알아보겠습니다.
기본 API 요청 함수 : 다음 함수는 비동기 API 요청 작업을 정의합니다. (Get은 1초, Post는 10초 소요됩니다)
async def api_request_get_data() -> str:
print("API Get Request start")
await asyncio.sleep(1) # 1초 대기 (API 처리 시뮬레이션)
print("API Get Request Complete")
return "Get Data"
async def api_request_post_data(body: str) -> int:
print("API Post Request Start")
await asyncio.sleep(10) # 10초 대기 (API 처리 시뮬레이션)
print("API Post Request Complete")
return len(body)
이 코드를 보시면 함수를 선언할 때 async def로 선언한 것을 보실 수 있습니다. 이렇게 선언한 비동기 함수를 바로 코루틴(Coroutine)이라 합니다. 그리고 코루틴 내부를 살펴보면 await keyword가 등장합니다. 이것은 코루틴에서만 사용하는 keyword이며 해당 작업이 끝나는 것을 기다리라는 의미입니다. await를 생략한다면 asyncio.sleep(10)은 비동기로 알아서 처리되어버리고 바로 다음 작업을 진행하게 됩니다. Coroutine들이 순차적으로 진행되기를 원한다면 반드시 await를 입력하도록 합시다.
다음은 CPU 연산 함수입니다.
CPU 연산 함수 : CPU 작업은 계산 시간이 오래 걸리는 연산을 수행합니다. (한번 연산 시 4초가 소요됩니다)
def calculate(value: int) -> int:
print("Calculation Start")
time.sleep(4) # 4초 대기 (CPU 연산 시뮬레이션)
value += 10
value *= value
print("Calculation Complete")
return value
이 코드는 CPU를 사용하는 코드입니다. 이 코드는 코루틴도 아니며 비동기로 동작하도록 설계되어 있지도 않습니다. 하지만 이제 이 함수를 코루틴과 같이 사용을 해야 합니다. 어떻게 사용해야 할까요? 다양한 Scene에 대해 경험해 보면서 알아봅시다.
시나리오 1. API 요청과 CPU 작업의 순차적 진행
첫 번째 시나리오는 API 요청 → CPU 작업 → API 요청 → CPU 작업을 순차적으로 처리합니다. 사실상 비동기 프로그래밍의 이점을 살리기 어려운 시나리오 입니다만, 실제로 API요청이 끝나야 CPU작업을 할수 있고 그 다음에야 API 요청을 하게 되는 Scene이 없는 것은 아닙니다. 이 시나리오의 의의는 coroutine과 function의 혼합을 어떻게 사용하는지에 대한 예제입니다.
흐름을 그림으로 표현하면 다음과 같습니다.
시나리오대로 동작하게 하기 위해서는 다음과 같이 코드를 작성하면 됩니다.
async def scene1():
api_result = await api_request_get_data()
cpu_result = calculate(len(api_result))
api_result = await api_request_post_data(str(cpu_result) + api_result)
cpu_result = calculate(api_result)
print(f"[Scene1] result : {cpu_result=}")
코드 동작 결과는 다음과 같습니다. 총 19초가 걸렸습니다.
시나리오 2. API 요청과 CPU 작업 결과 병합 후 진행.
시나리오 1에서는 API와 CPU의 동기적 동작이 필요했지만 이번에는 API 요청을 한 사이에 CPU 작업을 해도 된다고 합시다. 그리고 API, CPU의 작업이 완료되면 그 결과를 합치고 다시 API 요청, CPU 작업을 진행합니다.
그림으로 표현하면 다음과 같습니다.
API와 CPU가 동시에 작업 후 결과를 Gather한 후 다시 API, CPU작업을 요청하는 시나리오 입니다. 각각 작업단위에서 CPU 연산이 먼저 끝날 수도, 비동기 작업인 API 요청이 먼저 끝날 수도 있습니다.
시나리오대로 동작하게 하기 위해서는 다음과 같이 코드를 작성하면 됩니다. event loop와 ThreadPool, asyncio.gather를 사용해서 시나리오대로 동작하도록 구현했습니다.
async def scene2():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
api_future = asyncio.create_task(api_request_get_data())
cpu_future = loop.run_in_executor(pool, calculate, 10)
api_result, cpu_result = await asyncio.gather(api_future, cpu_future)
cpu_result += len(api_result)
api_future = asyncio.create_task(api_request_post_data(api_result))
cpu_future = loop.run_in_executor(pool, calculate, cpu_result)
api_result, cpu_result = await asyncio.gather(api_future, cpu_future)
cpu_result += api_result
print(f"[Scene2] result : {cpu_result=}")
코드 동작 결과는 다음과 같습니다. 총 14초가 걸렸습니다.
시나리오 3. API 요청과 CPU 작업의 독자적 진행
이번 시나리오에는 API는 API대로 요청하고 CPU는 CPU대로 동작하게 합니다. 동기 작업과 비동기 작업이 서로의 간섭 없이 진행되게 되며 이는 지금까지의 시나리오 중 가장 비동기의 강점이 잘 드러나는 시나리오 입니다. 그림으로 표현하면 다음과 같습니다.
시나리오대로 동작하게 하기 위해서는 다음과 같이 코드를 작성하면 됩니다. 함수의 연속 동작과 코루틴의 연속 동작을 위해 함수 내부에 따로 작성한 것을 확인할 수 있습니다.
async def scene3():
async def _api_tasks() -> int:
api_result = await api_request_get_data()
api_result = await api_request_post_data(api_result)
return api_result
def _cpu_tasks(number: int) -> int:
cpu_result = calculate(number)
cpu_result = calculate(cpu_result)
return cpu_result
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
api_future = asyncio.create_task(_api_tasks())
cpu_future = loop.run_in_executor(pool, _cpu_tasks, 20)
api_result, cpu_result = await asyncio.gather(api_future, cpu_future)
cpu_result += api_result
print(f"[Scene3] result : {cpu_result=}")
코드 동작 결과는 다음과 같습니다. 총 11초가 걸렸습니다. 지금까지의 시나리오 중 가장 짧게 걸린 시나리오 입니다.
마치며...
이 글을 잘못 이해하면 다음과 같은 질문을 할 수도 있습니다.
시나리오 3이 제일 빠르게 동작했으니 앞으로 시나리오 3처럼 비동기 프로그래밍을 하면 되나요?
아닙니다. 이 글에서 얻어가셔야 할 것은 최적화 시나리오가 아닙니다. 이 글의 목적은 CPU의 작업과 비동기 작업이 서로 어떤 영향을 주고 받는지에 따라 다양한 flow를 자유자재로 구현해야 한다는 것을 알려줍니다.
시나리오 1, 2, 3 모두 필요에 따라 실제로 사용될 수 있는 시나리오 입니다. 이 시나리오의 차이는 API 요청의 결과와 CPU 연산 결과가 서로 어떻게 영향을 미칠 것인지에 따라 다르며, 이는 옳고 그름이 없습니다. 여러분들의 환경에 따라서 다양한 비동기 시나리오를 자유자재로 구사하시길 바랍니다. 성숙한 비동기 프로그래밍으로 불필요한 CPU 유휴시간을 줄이고 좀더 성능 최적화된 설계를 구축하고 구현하시기를 바라며 글 마지막에는 전체 예제 코드를 남겨놓았습니다. 좋은 참고가 되길 바라며 글을 마치도록 하겠습니다. 감사합니다.
예제 Code 전체
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
async def api_request_get_data() -> str:
print("API Get Request start")
## API Request to Response takes 1 second
await asyncio.sleep(1)
server_response = "Get Data"
print("API Get Request Complete")
return server_response
async def api_request_post_data(body: str) -> int:
print("API Post Request Start")
## API Request to Response takes 10 seconds
await asyncio.sleep(10)
server_response = len(body)
print("API Post Request Complete")
return server_response
def calculate(value: int) -> int:
print("Calculation Start")
## Calculation start to end takes 4 seconds
time.sleep(4)
value += 10
value *= value
print("Calculation Complete")
return value
async def scene1():
"""
1. api get을 요청
2. api get 결과 기반 cpu 작업 요청
3. cpu 작업 결과 기반 api post를 요청
4. api post 결과 기반 cpu 작업을 요청
"""
api_result = await api_request_get_data()
cpu_result = calculate(len(api_result))
api_result = await api_request_post_data(str(cpu_result) + api_result)
cpu_result = calculate(api_result)
print(f"[Scene1] result : {cpu_result=}")
async def scene2():
"""
1. api, cpu task를 요청
2. 두 요청이 끝나면 요쳥 결과들을 cpu result에 반영
3. api 결과data 기반으로 api post 요청, cpu result 기반으로 cpu task를 요청
4. 두 요청이 또 끝나면 요쳥 결과들을 cpu result에 반영
"""
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
api_future = asyncio.create_task(api_request_get_data())
cpu_future = loop.run_in_executor(pool, calculate, 10)
api_result, cpu_result = await asyncio.gather(api_future, cpu_future)
## cpu result에 api result가 필요함
cpu_result += len(api_result)
api_future = asyncio.create_task(api_request_post_data(api_result))
cpu_future = loop.run_in_executor(pool, calculate, cpu_result)
api_result, cpu_result = await asyncio.gather(api_future, cpu_future)
cpu_result += api_result
print(f"[Scene2] result : {cpu_result=}")
async def scene3():
"""
1. api get 요청 후 결과 기반으로 api post 요청
2. cpu task 요청 후 결과 기반으로 cpu task 요청
3. api post 결과를 cpu task 결과에 반영
"""
async def _api_tasks() -> int:
api_result = await api_request_get_data()
api_result = await api_request_post_data(api_result)
return api_result
def _cpu_tasks(number: int) -> int:
cpu_result = calculate(number)
cpu_result = calculate(cpu_result)
return cpu_result
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
api_future = asyncio.create_task(_api_tasks())
cpu_future = loop.run_in_executor(pool, _cpu_tasks, 20)
api_result, cpu_result = await asyncio.gather(api_future, cpu_future)
cpu_result += api_result
print(f"[Scene3] result : {cpu_result=}")
async def main():
start_time = time.perf_counter()
await scene1()
print(f"[Scene1] Lead time : {time.perf_counter() - start_time:.2f} seconds")
print()
start_time = time.perf_counter()
await scene2()
print(f"[Scene2] Lead time : {time.perf_counter() - start_time:.2f} seconds")
print()
start_time = time.perf_counter()
await scene3()
print(f"[Scene3] Lead time : {time.perf_counter() - start_time:.2f} seconds")
print()
print("Done")
if __name__ == "__main__":
asyncio.run(main())
* reference
https://docs.python.org/ko/3/library/asyncio.html
https://developer.mozilla.org/ko/docs/Learn/JavaScript/Asynchronous/Introducing
'Python > Basic' 카테고리의 다른 글
Python - List Comprehension (0) | 2023.03.19 |
---|---|
Python - Int의 크기가 28bytes인 이유 (6) | 2023.03.01 |
Python - Mutable vs Immutable (2) | 2023.02.26 |
Python - 기본 문법정리 (0) | 2023.02.24 |
Python - List는 어떻게 데이터를 관리하는가? (0) | 2023.02.15 |