여기를 클릭하시면 소스코드를 직접 확인하실 수 있고, 위 예시처럼 사용하실 수 있습니다. 모듈을 어떻게 구현하였는지 궁금하신 경우, 모듈의 소스코드를 실행 순서별로 설명을 해 놓은 본 포스팅의 본문을 확인하시면 되겠습니다.
개인적으로 사용하기 위해 구현한 코드인 관계로, 이런저런 문제점들이 많습니다. 😅 포스팅의 내용이나 소스에 개선이 필요한 경우, 피드백을 주시면 감사하겠습니다!
1. Requirements
Python 3 (32-bit) xingAPI는 32bit COM을 제공하므로, 64bit python에서 사용할 수 없습니다. 64bit 환경에서 32bit COM을 작동하게 하는 hacky한 방법이 있긴 하지만 레지스터값을 일일히 수정해야하는 과정이 번거로우므로, 32bit 파이썬을 사용하는 방법이 편리합니다. 32bit python을 따로 설치하는 것도 방법이지만, anaconda를 사용중이라면 아래와 같이 virtual environment를 이용하여 빠르게 환경을 구축할 수 있습니다.
xingAPI 이베스트증권사의 xingAPI 설치파일을 통해 API를 설치합니다. 이후, 다음 두 가지 작업이 필요합니다.
C:\eBEST\xingAPI\reg.bat (혹은 (xingAPI 설치경로)\reg.bat)을 실행시켜 dll 등록
DevCenter의 ‘Res 파일을 모두 다운로드’ 버튼을 클릭하여 res파일을 다운로드
pypiwin32 Python에서 COM을 사용하기 위한 라이브러리 pypiwin32를 설치합니다.
$ pip install pypiwin32
… and other libraries 모듈에서 사용된 라이브러리는 아래와 같습니다.
1 2 3 4 5 6 7 8 9 10 11
import os, re, sys, time, pytz import pandas as pd from getpass import getpass from datetime import datetime, timedelta from pprint import pprint
if sys.platform == 'win32': from win32com.client import DispatchWithEvents from pythoncom import PumpWaitingMessages else: raise Exception('xingAPI는 윈도우 환경에서만 사용 가능합니다')
2. 사용 가능한 res 파일 정보 읽어오기
xingAPI가 설치되어 있는 경로를 아래와 같이 명시해줍니다. 특별히 설치 경로를 변경하지 않았다면, C:\eBEST\xingAPI\ 폴더에 위치해 있습니다.
23
XINGAPI_PATH = '/eBEST/xingAPI/'
모듈이 로딩되는 시점에, 아래와 같이 xingAPI에서 제공하는 res 파일들에 대한 정보를 읽고 변수 meta_res에 저장해둡니다. 서버로부터 요청에 대한 응답을 수신할 때 제공되는 res 정보를 통해 데이터를 dictionary 형태로 formatting 하는데 사용할 수 있습니다. 이를 통해, 코드의 가독성을 높여주고, 타 라이브러리 (i.e. pandas)와의 호환성을 높일 수 있습니다.
Synchronous한 디자인에서 서버로 request를 하고 response를 받는 순서는 다음과 같습니다.
서버 응답 등의 이벤트를 처리하는 ‘Event Listener Class’를 정의
위의 class를 사용하여, xingAPI의 COM object를 생성
해당 object의 waiting property를 True로 설정
COM object를 이용하여 서버로 데이터 요청
해당 object의 waiting property가 True인 경우, 반복적으로 PumpWaitingMessages를 호출 5-1. 이벤트 발생 시, 이벤트 처리 class가 자신의 waiting property를 False로 설정 5-2. 수신된 데이터가 있을 경우, 자신의 response property에 수신된 데이터를 저장
해당 object의 response property를 참조하여, 수신된 데이터를 확인
서버에 접속하고 로그인하는 기능은 XASession COM에 명시되어 있습니다. COM object를 생성하기에 앞서서, 이벤트 발생 시 이를 처리할 Listener Class를 아래와 같이 선언합니다.
class_SessionHandler: defOnLogin(self, code, msg): """ 서버와의 로그인이 끝나면 실행되는 함수 @arg code[str] 서버에서 받은 메시지 코드 @arg msg[str] 서버에서 받은 메시지 정보 """ self.waiting = False if code == '0000': logger.info('[*] 로그인 성공') else: logger.warning('[*] 로그인 실패 : {}'.format(msg)) defOnDisconnect(self): """ 서버와의 연결이 끊어졌을 때 실행되는 함수 """ self.waiting = False logger.info('[*] 서버와의 연결이 끊어졌습니다')
defquery(res, send, cont=False, timeout=10): """ Query 요청 @arg res[str]`t1102` 사용할 res 파일명 @arg send[dict] 전송할 데이터 { 'Block1': [{'Field1': 'Value1', 'Field2': 'Value2'}, {...}, {...}], 'Block2': {'Field3': 'Value3', 'Field4': 'Value4'} } 단일 InBlock의 경우에는 아래와 같이 간단한 형식도 입력받음 {'Field1': 'Value1', 'Field2': 'Value2'} @arg cont[*bool=False] 연속조회 여부 @arg timeout[*int=10] 서버 응답 최대 대기 시간, -1인 경우 infinite time """
defquery(res, send, cont=False, timeout=10): """ Query 요청 @arg res[str]`t1102` 사용할 res 파일명 @arg send[dict] 전송할 데이터 { 'Block1': [{'Field1': 'Value1', 'Field2': 'Value2'}, {...}, {...}], 'Block2': {'Field3': 'Value3', 'Field4': 'Value4'} } 단일 InBlock의 경우에는 아래와 같이 간단한 형식도 입력받음 {'Field1': 'Value1', 'Field2': 'Value2'} @arg cont[*bool=False] 연속조회 여부 @arg timeout[*int=10] 서버 응답 최대 대기 시간, -1인 경우 infinite time """ # res 파일 로드 _query = DispatchWithEvents('XA_DataSet.XAQuery', _QueryHandler) _query.init(res) ifnot cont: # 전송 현황 업데이트 ifnot res in _query_status: _query_status[res] = [] while _query_status[res] and _query_status[res][-1] + 1 < time.time(): _query_status[res].pop() # 초당 전송 횟수를 고려 tr_count_per_sec = _query.GetTRCountPerSec(res) iflen(_query_status[res]) >= tr_count_per_sec: delay = max(_query_status[res][-1] + 1.05 - time.time(), 0) time.sleep(delay) # 기간(10분)당 전송 횟수를 고려 # TODO : 10분 제한이 걸리면 blocking state 진입 tr_count_limit = _query.GetTRCountLimit(res) while tr_count_limit and _query.GetTRCountRequest(res) >= tr_count_limit: time.sleep(1) _query = DispatchWithEvents('XA_DataSet.XAQuery', _QueryHandler) _query.init(res) # simplified 된 input를 받았을 경우 send_first_value = list(send.values())[0] ifnot ( isinstance (send_first_value, list) or isinstance (send_first_value, dict) ): send = { '{}InBlock'.format(res): send } # 전송할 데이터를 설정 for block in send.keys(): ifisinstance(send[block], dict): for (k, v) in send[block].items(): _query.set_data(block, k, v) elifisinstance(send[block], list): for i inrange(len(send[block])): for (k, v) in send[block][i].items(): _query.set_data(block, k, v, i) else: raise ValueError('알 수 없는 형태의 데이터입니다') else: time.sleep(0.05) # 데이터 요청 _query.Request(cont) now = time.time() ifnot cont: _query_status[res].insert(0, now) _query.waiting = True while _query.waiting: if timeout >= 0and now + timeout < time.time(): _query.waiting = False raise TimeoutError('Query Timeout') PumpWaitingMessages() return _query.response
_QueryHandler를 초기화(init)하고 요청을 위해 메시지를 준비(set_data)하는 method들을 구현합니다.
이어서, 서버로부터 데이터 수신 시 불리는 이벤트 함수(OnReceiveData)와 이를 처리하는데 쓰이는 helper function(get_block_data)을 구현합니다. 이 과정에서, 2. 사용 가능한 res 파일 정보 읽어오기에서 선언한 meta_res를 사용하여 결과를 formatting 해줍니다.
class_QueryHandler: # (... continued) defget_block_data(self, block, index=0): block_data = {} for field in meta_res[self.res]['output'][block]['fields']: data = self.GetFieldData(block, field['name'], index) if field['type'] == 'long': if data == '-': data = 0 data = int(data or0) elif field['type'] == 'double'or field['type'] == 'float': data = float(data or0.0) block_data[field['name']] = data return block_data defOnReceiveData(self, res): """ 요청 데이터 도착 Listener self.GetFieldData(...)를 통해 전송받은 데이터 확인이 가능하다. @arg res[str] 요청 res 파일명 """ # decompress가 필요한 경우 압축해제 # TODO : OutBlock1 말고 다른 occurs가 있는 케이스 (ex. FOCCQ33600) if self.decomp: self.Decompress(res + 'OutBlock1') for block in meta_res[res]['output'].keys(): # 해당 블럭이 occurs인 경우, if meta_res[res]['output'][block]['occurs']: row_res = [] for i inrange(self.GetBlockCount(block)): row_res.append(self.get_block_data(block, i)) # 해당 블럭이 단일 데이터인 경우, else: row_res = self.get_block_data(block) self.response[block] = row_res self.waiting = False
위의 query 함수를 한번 더 wrapping해서, 조금 더 직관적인 함수들을 만들 수 있습니다. 예를 들어, xingAPI에서는 t8412를 이용하여 주식차트를 분 단위로 가져올 수 있습니다. 한번에 가져올 수 있는 최대 데이터의 양인 2,000개가 넘는 데이터에 대해서는 연속조회도 지원합니다. 해당 res를 호출하는 query 함수를 다음과 같이 wrapping 할 수 있습니다.
deftransactions_min(shcode, interval=1, sdate=None, edate=None): edate = edate or datetime.now().strftime('%Y%m%d') sdate = sdate or edate data = [] cts_date = ' ' cts_time = ' ' whileTrue: response = query('t8412', { 'shcode': shcode, 'ncnt': interval, 'qrycnt': 2000, 'nday': '0', 'sdate': sdate, 'edate': edate, 'cts_date': cts_date, 'cts_time': cts_time, 'comp_yn': 'Y' }, len(data) > 0) data = response['t8412OutBlock1'] + data cts_date = response['t8412OutBlock']['cts_date'] cts_time = response['t8412OutBlock']['cts_time'] ifnot (cts_date or cts_time): break data = pd.DataFrame(data) data.index = (data['date'] + data['time']).apply(lambda t: datetime.strptime(t, '%Y%m%d%H%M%S').astimezone(pytz.timezone('Asia/Seoul'))) data = data.rename(columns={'jdiff_vol': 'volumn'}) data = data[['open', 'high', 'low', 'close', 'volumn', 'value', 'jongchk', 'rate']] return data
query 함수에서 결과물을 list of dict형태로 변환하므로, DataFrame 형태로 결과물을 바꾸는 작업이 간단해집니다. (data = pd.DataFrame(data))
실시간 데이터의 경우, XASession과 XAQuery와는 약간 다른 방법으로 데이터를 수신합니다.
데이터 수신 이벤트를 처리하는 Event Listener Class를 정의
위의 class를 사용하여, xingAPI의 COM object를 생성
해당 object의 callback property를 데이터 수신 시 실행 할 함수로 설정
COM object를 이용하여 서버로 데이터 구독
PumpWaitingMessages를 반복적으로 호출 5-1. 데이터 수신 이벤트 발생 시, 이벤트 처리 class가 서버로부터 전달받은 데이터를 인자로 하여 callback property 함수를 실행
(구독한 데이터가 더 이상 필요가 없는 경우) COM object를 이용하여 구독 취소
실시간 데이터를 위한 이벤트 리스너 클래스 (_RealtimeHandler)가 있다고 가정하고, 실시간 데이터 핸들링을 위한 클래스 Realtime을 구현하면 다음과 같습니다. xingAPI의 실시간 데이터의 경우 서버로 데이터를 요청할 시에 최대 하나의 field만 사용하므로, meta_res에서 해당 키 값의 이름을 가져와서 사용할 수 있습니다. (라인 357)
classRealtime: def__init__(self, res, callback): self._res = res self._instance = DispatchWithEvents('XA_DataSet.XAReal', _RealtimeHandler) self._instance.LoadFromResFile(f'/Res/{res}.res') self._instance.callback = callback self.subscribed_keys = [] defsubscribe(self, key=None): if key in self.subscribed_keys: print(f'{self._res}는 이미 {key} 데이터를 수신 중입니다.') returnNone if key: self._instance.SetFieldData('InBlock', meta_res[self._res]['input']['InBlock']['fields'][0]['name'], key) self._instance.AdviseRealData() self.subscribed_keys.append(key) defunsubscribe(self, key=None): if key isNone: self._instance.UnadviseRealData() else: if key notin self.subscribed_keys: raise ValueError(f'{self._res}는 {key} 데이터를 수신하고 있지 않습니다.') self._instnace.UnadviseRealDataWithKey(key) @staticmethod deflisten(delay=.01): whileTrue: PumpWaitingMessages() time.sleep(delay)
마지막으로, 실시간 데이터를 수신했을 때 호출되는 OnReceiveRealData를 정의해줍니다. 위에서 설정한 callback 함수를 호출합니다.
334 335 336 337 338 339 340
class_RealtimeHandler: defOnReceiveRealData(self, res): response = {} for field in meta_res[res]['output']['OutBlock']['fields']: response[field['name']] = self.GetFieldData('OutBlock', field['name'])