0%

Python으로 이베스트증권 xingAPI 깔끔하게 사용하기 (w/ pandas)

0. Introduction

이베스트증권은 COM 형태와 dll 형태로 사용할 수 있는 xingAPI를 제공합니다. Python에서 COM object를 사용하는 라이브러리를 사용하면 xingAPI를 사용할 수 있지만, 사용법이 깔끔하지 못합니다.

개인적으로 사용하기 위하여 설탕을 팍팍쳐서 모듈을 만들었습니다. 이 모듈을 사용하면, 코드가 다음과 같이 정리가 됩니다.

>>> import ebest
>>> ebest.login()
[*] 접속 서버 ((r)eal / (D)emo / (a)ce) : # r
[*] 아이디 : # username
[*] 패스워드 : # password
[*] 공인인증서 암호 : # cert_password
[*] 로그인 성공

# XAQuery 요청
>>> ebest.query('t1101', dict(shcode='000020'))
{'t1101OutBlock': {'hname': '동화약품',
'price': 19100,
'sign': '5',
'change': 550,
# ...
'open': 19750,
'high': 19900,
'low': 18850}}

# XAQuery 요청 후 DataFrame 형태로 변환
>>> res = query('t8413', dict(
... shcode='000020', sdate='20200101', edate='20210101',
... gubun='2', qrycnt=2000, cts_date=' ', comp_yn='Y'
... ))
>>> pd.DataFrame(res.get('t8413OutBlock1')).set_index('date')
open high low close ... rate pricechk ratevalue sign
date ...
20200102 8340 8400 8290 8400 ... 0.0 0 0 2
20200103 8400 8450 8290 8360 ... 0.0 0 0 5
... ... ... ... ... ... ... ... ...
20201229 18750 19400 18750 19150 ... 0.0 0 0 2
20201230 19100 19800 18800 19650 ... 0.0 0 0 2
[248 rows x 11 columns]

# XAReal 실시간 데이터 구독 후 callback function으로 처리
>>> nws = Realtime('NWS', callback=print)
>>> nws.subscribe('NWS001')
>>> Realtime.listen()
NWS {'date': '20210104', 'time': '154734', 'title': '[포토] 코로나19 직무 교육현장 참관'}
NWS {'date': '20210104', 'time': '154752', 'title': '전성배 신임원장, "국가 디지털전환 선도"'}
NWS {'date': '20210104', 'time': '154750', 'title': '[포토] 정총리, 코로나19 교육현장 참관'}
# ...

여기를 클릭하시면 소스코드를 직접 확인하실 수 있고, 위 예시처럼 사용하실 수 있습니다. 모듈을 어떻게 구현하였는지 궁금하신 경우, 모듈의 소스코드를 실행 순서별로 설명을 해 놓은 본 포스팅의 본문을 확인하시면 되겠습니다.

개인적으로 사용하기 위해 구현한 코드인 관계로, 이런저런 문제점들이 많습니다. 😅 포스팅의 내용이나 소스에 개선이 필요한 경우, 피드백을 주시면 감사하겠습니다!


1. Requirements

  • Python 3 (32-bit) xingAPI는 32bit COM을 제공하므로, 64bit python에서 사용할 수 없습니다. 64bit 환경에서 32bit COM을 작동하게 하는 hacky한 방법이 있긴 하지만 레지스터값을 일일히 수정해야하는 과정이 번거로우므로, 32bit 파이썬을 사용하는 방법이 편리합니다. 32bit python을 따로 설치하는 것도 방법이지만, anaconda를 사용중이라면 아래와 같이 virtual environment를 이용하여 빠르게 환경을 구축할 수 있습니다.

    $ set CONDA_FORCE_32BIT=1
    $ conda create -n py37_32 python=3.7
    $ activate py37_32

  • xingAPI 이베스트증권사의 xingAPI 설치파일을 통해 API를 설치합니다. 이후, 다음 두 가지 작업이 필요합니다.

  1. C:\eBEST\xingAPI\reg.bat (혹은 (xingAPI 설치경로)\reg.bat)을 실행시켜 dll 등록
  2. DevCenter의 'Res 파일을 모두 다운로드' 버튼을 클릭하여 res파일을 다운로드
  • pypiwin32 Python에서 COM을 사용하기 위한 라이브러리 pypiwin32를 설치합니다.

    $ pip install pypiwin32

  • ... and other libraries 모듈에서 사용된 라이브러리는 아래와 같습니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import os, re, sys, time, pytz
    import pandas as pd
    from getpass import getpass
    from datetime import datetime, timedelta
    from pprint import pprint

    if sys.platform == 'win32':
    from win32com.client import DispatchWithEvents
    from pythoncom import PumpWaitingMessages
    else:
    raise Exception('xingAPI는 윈도우 환경에서만 사용 가능합니다')

2. 사용 가능한 res 파일 정보 읽어오기

xingAPI가 설치되어 있는 경로를 아래와 같이 명시해줍니다. 특별히 설치 경로를 변경하지 않았다면, C:\eBEST\xingAPI\ 폴더에 위치해 있습니다.

23
XINGAPI_PATH = '/eBEST/xingAPI/'

모듈이 로딩되는 시점에, 아래와 같이 xingAPI에서 제공하는 res 파일들에 대한 정보를 읽고 변수 meta_res에 저장해둡니다. 서버로부터 요청에 대한 응답을 수신할 때 제공되는 res 정보를 통해 데이터를 dictionary 형태로 formatting 하는데 사용할 수 있습니다. 이를 통해, 코드의 가독성을 높여주고, 타 라이브러리 (i.e. pandas)와의 호환성을 높일 수 있습니다.

26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
def build_meta_res():
""" res 파일들의 meta data

Example
-------
>>> build_meta_res()
{
't8413': {
'desc': '주식챠트(일주월)',
'input': {
't8413InBlock': {
'occurs': False,
'fields': [
{
'name': 'shcode',
'desc': '단축코드',
'type': 'char',
'size': 6
},
{ ... },
...
]
}
},
'output': {
't8413OutBlock1': {
'occurs': True,
'fields': [ 'price', ... ]
},
...
}
},
...
}
"""
meta = {}

fnames = filter(
lambda x: not re.match(r'.*\_\d+\.res$', x),
os.listdir(os.path.join(XINGAPI_PATH, 'res'))
)

def parse_field(line):
cols = line.split(',')
return {
'name': cols[1].strip(),
'desc': cols[0].strip(),
'type': cols[3].strip(),
'size': cols[4].strip()
}

def parse_file(lines):
parsed = {}
lines = list(map(lambda x: x.replace('\t','').replace('\n','').replace(';','').strip(), lines))
lines = list(filter(lambda x:x, lines))
for i in range(len(lines)):
if '.Func' in lines[i] or '.Feed' in lines[i]:
parsed['desc'] = lines[i].split(',')[1].strip()
elif lines[i] == 'begin':
latest_begin = i
elif lines[i] == 'end':
block_info = lines[latest_begin-1].split(',')

if not block_info[2] in parsed:
parsed[block_info[2]] = {}

parsed[block_info[2]][block_info[0]] = {
'occurs': 'occurs' in block_info,
'fields': list(map(parse_field, lines[latest_begin+1:i]))
}
return parsed

for fname in fnames:
meta[fname.replace('.res','')] = parse_file(
open(os.path.join(XINGAPI_PATH, 'res/', fname)).readlines()
)

return meta


meta_res = build_meta_res()

3. 서버에 접속하기 : XASession

Synchronous한 디자인에서 서버로 request를 하고 response를 받는 순서는 다음과 같습니다.

  1. 서버 응답 등의 이벤트를 처리하는 'Event Listener Class'를 정의
  2. 위의 class를 사용하여, xingAPI의 COM object를 생성
  3. 해당 object의 waiting property를 True로 설정
  4. COM object를 이용하여 서버로 데이터 요청
  5. 해당 object의 waiting property가 True인 경우, 반복적으로 PumpWaitingMessages를 호출 5-1. 이벤트 발생 시, 이벤트 처리 class가 자신의 waiting property를 False로 설정
    5-2. 수신된 데이터가 있을 경우, 자신의 response property에 수신된 데이터를 저장
  6. 해당 object의 response property를 참조하여, 수신된 데이터를 확인

서버에 접속하고 로그인하는 기능은 XASession COM에 명시되어 있습니다. COM object를 생성하기에 앞서서, 이벤트 발생 시 이를 처리할 Listener Class를 아래와 같이 선언합니다.

109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class _SessionHandler:
def OnLogin(self, code, msg):
""" 서버와의 로그인이 끝나면 실행되는 함수

@arg code[str] 서버에서 받은 메시지 코드
@arg msg[str] 서버에서 받은 메시지 정보
"""
self.waiting = False

if code == '0000':
logger.info('[*] 로그인 성공')
else:
logger.warning('[*] 로그인 실패 : {}'.format(msg))

def OnDisconnect(self):
""" 서버와의 연결이 끊어졌을 때 실행되는 함수
"""
self.waiting = False

logger.info('[*] 서버와의 연결이 끊어졌습니다')

위의 class를 사용하여, XASession COM object를 생성합니다.

130
_session = DispatchWithEvents('XA_Session.XASession', _SessionHandler)

이제, _session object를 이용하여 xingAPI에 명시된 함수들을 호출할 수 있습니다. 예를들어, ConnectServer라는 함수를 호출하는 것은 아래와 같은 방법으로 가능합니다.

_session.ConnectServer('hts.ebestsec.co.kr', 20001)

이를 사용하여, 서버로 로그인을 하는 함수는 아래와 같이 만들 수 있다.

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def login(
server=None,
username=None,
password=None,
):
""" 로그인
"""
# 기존에 연결되어 있는 서버가 있으면, 연결을 끊는다
if _session.IsConnected():
_session.DisconnectServer()

# 로그인 시 필요한 정보를 입력받는다
login_server = (server or input('[*] 접속 서버 ((r)eal / (D)emo / (a)ce) : ')).lower()[:1]
login_server = {
'r': 'hts.ebestsec.co.kr',
'd': 'demo.ebestsec.co.kr',
'a': '127.0.0.1'
}.get(login_server, 'demo.ebestsec.co.kr')
login_port = 20001
login_id = username or input('[*] 아이디 : ')
login_pw = password or getpass('[*] 패스워드 : ')
login_cert = '' if login_server == 'demo.ebestsec.co.kr' else getpass('[*] 공인인증서 암호 : ')

# 로그인 요청을 보낸다
_session.waiting = True
_session.ConnectServer(login_server, login_port)
_session.Login(login_id, login_pw, login_cert, 0, 0)
while _session.waiting:
PumpWaitingMessages()
time.sleep(0.05)

4. 데이터 요청하기 : XAQuery

255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def query(res, send, cont=False, timeout=10):
""" Query 요청

@arg res[str]`t1102` 사용할 res 파일명
@arg send[dict] 전송할 데이터
{
'Block1': [{'Field1': 'Value1', 'Field2': 'Value2'}, {...}, {...}],
'Block2': {'Field3': 'Value3', 'Field4': 'Value4'}
}

단일 InBlock의 경우에는 아래와 같이 간단한 형식도 입력받음
{'Field1': 'Value1', 'Field2': 'Value2'}
@arg cont[*bool=False] 연속조회 여부
@arg timeout[*int=10] 서버 응답 최대 대기 시간, -1인 경우 infinite time
"""

결과적으로 함수가 호출되는 모습은 다음과 같습니다.

query('t1101', {'shcode': '000020'})
query('t1104', {
't1104InBlock': {'code': '000020', 'nrec': 2},
't1104InBlock1': [
{'dat1': '2', 'dat2': '2', 'gubn': '1', 'indx': '0'},
{'dat1': '1', 'dat2': '1', 'gubn': '2', 'indx': '1'}
]
})

Query를 요청하고 결과를 수신하는 순서는 XASession과 동일합니다. 차이점은, XAQuery COM을 통해 Query 요청을 한다는 점입니다. 이벤트 class인 _QueryHandler가 구현되어 있다고 가정하고, query 함수를 아래와 같이 작성합니다.

191
_query_status = {}

255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def query(res, send, cont=False, timeout=10):
""" Query 요청

@arg res[str]`t1102` 사용할 res 파일명
@arg send[dict] 전송할 데이터
{
'Block1': [{'Field1': 'Value1', 'Field2': 'Value2'}, {...}, {...}],
'Block2': {'Field3': 'Value3', 'Field4': 'Value4'}
}

단일 InBlock의 경우에는 아래와 같이 간단한 형식도 입력받음
{'Field1': 'Value1', 'Field2': 'Value2'}
@arg cont[*bool=False] 연속조회 여부
@arg timeout[*int=10] 서버 응답 최대 대기 시간, -1인 경우 infinite time
"""
# res 파일 로드
_query = DispatchWithEvents('XA_DataSet.XAQuery', _QueryHandler)
_query.init(res)

if not cont:
# 전송 현황 업데이트
if not res in _query_status:
_query_status[res] = []

while _query_status[res] and _query_status[res][-1] + 1 < time.time():
_query_status[res].pop()

# 초당 전송 횟수를 고려
tr_count_per_sec = _query.GetTRCountPerSec(res)
if len(_query_status[res]) >= tr_count_per_sec:
delay = max(_query_status[res][-1] + 1.05 - time.time(), 0)
time.sleep(delay)

# 기간(10분)당 전송 횟수를 고려
# TODO : 10분 제한이 걸리면 blocking state 진입
tr_count_limit = _query.GetTRCountLimit(res)
while tr_count_limit and _query.GetTRCountRequest(res) >= tr_count_limit:
time.sleep(1)
_query = DispatchWithEvents('XA_DataSet.XAQuery', _QueryHandler)
_query.init(res)

# simplified 된 input를 받았을 경우
send_first_value = list(send.values())[0]
if not (
isinstance (send_first_value, list) or
isinstance (send_first_value, dict)
):
send = { '{}InBlock'.format(res): send }

# 전송할 데이터를 설정
for block in send.keys():
if isinstance(send[block], dict):
for (k, v) in send[block].items():
_query.set_data(block, k, v)
elif isinstance(send[block], list):
for i in range(len(send[block])):
for (k, v) in send[block][i].items():
_query.set_data(block, k, v, i)
else:
raise ValueError('알 수 없는 형태의 데이터입니다')

else:
time.sleep(0.05)

# 데이터 요청
_query.Request(cont)

now = time.time()
if not cont:
_query_status[res].insert(0, now)
_query.waiting = True
while _query.waiting:
if timeout >= 0 and now + timeout < time.time():
_query.waiting = False
raise TimeoutError('Query Timeout')
PumpWaitingMessages()

return _query.response

_QueryHandler를 초기화(init)하고 요청을 위해 메시지를 준비(set_data)하는 method들을 구현합니다.

193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class _QueryHandler:
def __init__(self):
self.response = {}
self.decomp = False
self.qrycnt = None
self.waiting = False
self.res = None

def init(self, res):
self.LoadFromResFile('/Res/{}.res'.format(res))
self.res = res

def set_data(self, block, k, v, index=0):
if k == 'comp_yn' and v.lower() == 'y':
self.decomp = True
elif k == 'qrycnt':
self.qrycnt = int(v)

self.SetFieldData(block, k, index, v)

이어서, 서버로부터 데이터 수신 시 불리는 이벤트 함수(OnReceiveData)와 이를 처리하는데 쓰이는 helper function(get_block_data)을 구현합니다. 이 과정에서, 2. 사용 가능한 res 파일 정보 읽어오기에서 선언한 meta_res를 사용하여 결과를 formatting 해줍니다.

210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
class _QueryHandler:
# (... continued)

def get_block_data(self, block, index=0):
block_data = {}
for field in meta_res[self.res]['output'][block]['fields']:
data = self.GetFieldData(block, field['name'], index)

if field['type'] == 'long':
if data == '-':
data = 0
data = int(data or 0)
elif field['type'] == 'double' or field['type'] == 'float':
data = float(data or 0.0)

block_data[field['name']] = data

return block_data

def OnReceiveData(self, res):
""" 요청 데이터 도착 Listener

self.GetFieldData(...)를 통해 전송받은 데이터 확인이 가능하다.

@arg res[str] 요청 res 파일명
"""
# decompress가 필요한 경우 압축해제
# TODO : OutBlock1 말고 다른 occurs가 있는 케이스 (ex. FOCCQ33600)
if self.decomp:
self.Decompress(res + 'OutBlock1')

for block in meta_res[res]['output'].keys():
# 해당 블럭이 occurs인 경우,
if meta_res[res]['output'][block]['occurs']:
row_res = []
for i in range(self.GetBlockCount(block)):
row_res.append(self.get_block_data(block, i))
# 해당 블럭이 단일 데이터인 경우,
else:
row_res = self.get_block_data(block)

self.response[block] = row_res

self.waiting = False

결과적으로, query를 호출할 경우 아래와 같이 동작합니다.

>>> query('t1101', {'shcode': '000020'})
{'t1101OutBlock': {'bid': 6780,
'bidho1': 9700,
'bidho10': 9600,
'bidho2': 9690,
(...)
'dnlmtprice': 6750,
'high': 9820,
'hname': '동화약품',
'ho_status': '2',
(...)
'yeprice': 0,
'yesign': '3',
'yevolume': 0}}

5. Wrapper Functions

위의 query 함수를 한번 더 wrapping해서, 조금 더 직관적인 함수들을 만들 수 있습니다. 예를 들어, xingAPI에서는 t8412를 이용하여 주식차트를 분 단위로 가져올 수 있습니다. 한번에 가져올 수 있는 최대 데이터의 양인 2,000개가 넘는 데이터에 대해서는 연속조회도 지원합니다. 해당 res를 호출하는 query 함수를 다음과 같이 wrapping 할 수 있습니다.

def transactions_min(shcode, interval=1, sdate=None, edate=None):
edate = edate or datetime.now().strftime('%Y%m%d')
sdate = sdate or edate

data = []
cts_date = ' '
cts_time = ' '

while True:
response = query('t8412', {
'shcode': shcode,
'ncnt': interval,
'qrycnt': 2000,
'nday': '0',
'sdate': sdate,
'edate': edate,
'cts_date': cts_date,
'cts_time': cts_time,
'comp_yn': 'Y'
}, len(data) > 0)

data = response['t8412OutBlock1'] + data
cts_date = response['t8412OutBlock']['cts_date']
cts_time = response['t8412OutBlock']['cts_time']
if not (cts_date or cts_time):
break

data = pd.DataFrame(data)
data.index = (data['date'] + data['time']).apply(lambda t: datetime.strptime(t, '%Y%m%d%H%M%S').astimezone(pytz.timezone('Asia/Seoul')))

data = data.rename(columns={'jdiff_vol': 'volumn'})
data = data[['open', 'high', 'low', 'close', 'volumn', 'value', 'jongchk', 'rate']]

return data

query 함수에서 결과물을 list of dict형태로 변환하므로, DataFrame 형태로 결과물을 바꾸는 작업이 간단해집니다. (data = pd.DataFrame(data))

위의 transactions_min 함수는 다음과 같이 사용할 수 있습니다.

>>> transactions_min('000020', interval=3, sdate='20201228')
open high low ... value jongchk rate
2020-12-28 09:03:00+09:00 20000 20050 19800 ... 748 0 0.0
2020-12-28 09:06:00+09:00 19800 19850 19650 ... 461 0 0.0
2020-12-28 09:09:00+09:00 19700 19700 19550 ... 584 0 0.0
... ... ... ... ... ... ...
2021-01-04 15:18:00+09:00 19100 19100 19000 ... 265 0 0.0
2021-01-04 15:20:00+09:00 19000 19100 18950 ... 138 0 0.0
2021-01-04 15:30:00+09:00 19100 19100 19100 ... 391 0 0.0
[489 rows x 8 columns]

6. 실시간 데이터 구독하기 : XAReal

실시간 데이터의 경우, XASession과 XAQuery와는 약간 다른 방법으로 데이터를 수신합니다.

  1. 데이터 수신 이벤트를 처리하는 Event Listener Class를 정의
  2. 위의 class를 사용하여, xingAPI의 COM object를 생성
  3. 해당 object의 callback property를 데이터 수신 시 실행 할 함수로 설정
  4. COM object를 이용하여 서버로 데이터 구독
  5. PumpWaitingMessages를 반복적으로 호출
    5-1. 데이터 수신 이벤트 발생 시, 이벤트 처리 class가 서버로부터 전달받은 데이터를 인자로 하여 callback property 함수를 실행
  6. (구독한 데이터가 더 이상 필요가 없는 경우) COM object를 이용하여 구독 취소

실시간 데이터를 위한 이벤트 리스너 클래스 (_RealtimeHandler)가 있다고 가정하고, 실시간 데이터 핸들링을 위한 클래스 Realtime을 구현하면 다음과 같습니다. xingAPI의 실시간 데이터의 경우 서버로 데이터를 요청할 시에 최대 하나의 field만 사용하므로, meta_res에서 해당 키 값의 이름을 가져와서 사용할 수 있습니다. (라인 357)

342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
class Realtime:
def __init__(self, res, callback):
self._res = res
self._instance = DispatchWithEvents('XA_DataSet.XAReal', _RealtimeHandler)
self._instance.LoadFromResFile(f'/Res/{res}.res')
self._instance.callback = callback

self.subscribed_keys = []

def subscribe(self, key=None):
if key in self.subscribed_keys:
print(f'{self._res}는 이미 {key} 데이터를 수신 중입니다.')
return None

if key:
self._instance.SetFieldData('InBlock', meta_res[self._res]['input']['InBlock']['fields'][0]['name'], key)
self._instance.AdviseRealData()

self.subscribed_keys.append(key)

def unsubscribe(self, key=None):
if key is None:
self._instance.UnadviseRealData()
else:
if key not in self.subscribed_keys:
raise ValueError(f'{self._res}{key} 데이터를 수신하고 있지 않습니다.')
self._instnace.UnadviseRealDataWithKey(key)

@staticmethod
def listen(delay=.01):
while True:
PumpWaitingMessages()
time.sleep(delay)

마지막으로, 실시간 데이터를 수신했을 때 호출되는 OnReceiveRealData를 정의해줍니다. 위에서 설정한 callback 함수를 호출합니다.

334
335
336
337
338
339
340
class _RealtimeHandler:
def OnReceiveRealData(self, res):
response = {}
for field in meta_res[res]['output']['OutBlock']['fields']:
response[field['name']] = self.GetFieldData('OutBlock', field['name'])

self.callback(res, response)

결과적으로, 위 코드를 사용하여 실시간 데이터를 요청하면 다음과 같이 실행이 됩니다.

>>> def callback(res, response):
... if res == 'NWS':
... print(f'현재시간 : {response["time"]} / 기사제목 : {response["title"]}')
>>> nws = Realtime('NWS', callback=callback)
>>> nws.subscribe('NWS001')
>>> Realtime.listen()
현재시간 154734 / 기사제목 [포토] 코로나19 직무 교육현장 참관
현재시간 154752 / 기사제목 전성배 신임원장, "국가 디지털전환 선도"
현재시간 154750 / 기사제목 [포토] 정총리, 코로나19 교육현장 참관
# ...

_RealtimeHandler가 인자로 res를 전달해주므로, callback 함수 내부적으로 어떤 실시간 데이터가 들어왔는지 구분할 수 있습니다.