DOKKAEBI HILL

방문을 환영합니다!

실전편 >

[실전]Asynchronous I/O 적용 [0] 2025-01-14

[실전]Asynchronous I/O 적용

open_in_new

몇 년전 동학 개미 운동이 있었죠. 저도 그 당시 동참한 개미 중 한명입니다!

그 때부터 주식을 포함해서 금융, 경제, 재테크에 관심을 계속 가지게 됩니다.

그러다가 우연히 "인공지능 투자가 퀀트 - 알고리즘 금융 시장을 침공하다." 라는 책을 알게 되었고,

황금 알을 낳는 거위를 만드는 꿈과 함께 국내 주가 데이터를 수집하고 있습니다.

데이터를 제공해주는 한국투자증권(KIS)에 감사드리며 거래 수수료로 보답하겠습니다. 🙇🏻‍♂️ (다음 스텝을 차일피일 미루고 있지만..)


결과부터 말씀드리면, 

수집 데이터 | 일간 국내 주식 분봉 데이터 (포스팅을 위해 상장 주식 절반만 가지고 테스트 했습니다. 대략. 1430개 종목)

인서트 데이터 | 약 29만 row

case 1. 동기 수집 / 동기 인서트 총 742초 ( 610 + 132 )

case 2. 비동기 수집 / 동기 인서트 총 548초 ( 483 + 65 )

case 3. 비동기 수집 / 비동기 인서트 총 384초 (336 + 48) 기(비동기 인서트 누적 합)

환경 | python 3.12, airflow


주의 사항이 있다면,

KIS API는 1초에 20회 호출 제한 있습니다.

또한 주식일별분봉 조회의 경우 건당 100건의 데이터만 포함하여, 한 종목의 하루치 데이터를 수집하려면 총 4번 호출해야 합니다. 

따라서 산술적으로 가장 이상적인 경우에 수집을 위해 1430 * 4 / 20 = 286초 가 소요됩니다.


이런 부분을 고려하여, 

동기 수집의 경우 매 호출마다 0.05초 sleep.

비동기 수집의 경우 20회 호출 사이에 1초간 sleep을 두었습니다. await는 데이터 저장 같은 이전 과정의 완료가 보장되어야 하는 경우에만

사용하여 최대한 비동기 처리의 이점을 확보하고자 했습니다.


case 1


가장 단순한 방법입니다.

nested for loop 에서 시간별, ticker 별로 0.05초 마다 request를 합니다. 

그리고 시간에 대한 수집이 끝났을 때 sync_insert 함수를 통해서 insert 합니다. 

(간혹 호출 중에 에러가 나는 경우는 별도로 retry 프로세스를 둬서 정상적으로 수집, 삽입되도록 했으나 여기서는 생략하겠습니다. case2, 3도 마찬가지)


case 2

case 2은 case 3과 비교하기 위해 추가한 것으로.

비동기 처리 과정 중에 block되는 구간(동기 인서트)이 성능에 미치는 영향을 보이고자 추고했습니다!



async_requests 함수는 20개 종목을 받고 이를 실제로 하나씩 호출하는 async_request를 호출합니다.
api request에는 aiohttp를 사용했습니다. 

  • 매초 20개씩 호출을 보장하기 위해서 create_task 앞에 await 없이 코루틴을 실행시켰습니다. (762 줄)
  • 작업(코루틴)이 완료(completed)된 후에 데이터를 삽입할 수 있도록 gather와 await을 사용했습니다. (731, 762 줄)
  • 안쪽 while loop이 끝날 때까지 수집한 데이터를 삽입합니다. (772 줄) 여기서 처럼 동기식으로 데이터를 삽입하면 해당 구간에서 block이 됩니다. 바깥의 for loop이 돌면서 계속 수집을 진행할 수 있는데 그걸 막는 것이죠.

case 3

case 2와 달리 비동기로 데이터를 삽입합니다. 그리고 while loop 바깥에서 insert_task가 끝나길 gather와 await로 기다립니다. 
( 삽입이 끝나기 전에 airflow task가 끝나는 것을 막기 위함입니다. )
case 2와 같은 지점에서 데이터를 삽입하지만 이 역시도 비동기로 I/O 처리를 하기 때문에 바깥 for loop을 돌면서 계속 수집을 진행할 수 있습니다. 그 결과 case 2보다 1.43배 빠르게 끝낼 수 있었습니다. 삽입하는 데이터의 크기가 크면 그 차이는 더 크겠고요.


동기 처리와 비동기 처리의 차이에 대해서 알아보았습니다.

중요 !
1. case 2와 3을 통해서는 비동기 처리 중에 동기 처리가 있으면 말짱 꽝이라는 것을 알 수 있었습니다.
2. 한가지 더 중요한 점,
이 예시에서 params (dictionary 타입)를 코루틴 함수의 매개변수로 사용하는데,  이처럼 mutable object 파라미터의 값을 조작하면서 사용하는 경우에 critical section issue와 같은 문제가 생길 수 있습니다. mutable object는 값이 바뀌어도 메모리 주소는 변하지 않기 때문입니다.
즉, 다른 코루틴에서 변경된 값을 그대로 사용할 수 있습니다.
실제로 데이터를 수집할 때마다 수백, 수천개의 데이터에서 회사는 다른데 주가가 똑같은 문제가 있었고 오랜 삽질 끝에 이 문제라는 것을 알게 되었습니다.



게시글을 삭제하시겠습니까?

'delete'를 입력하고 '삭제' 버튼을 눌러주세요.

All rights reserved.

GUEST님 환영합니다! ^_^