우리는 보통 전일의 미국 증시, 원-달러 환율과 같은 핵심 지표를 바탕으로 당일의 국내 장 등락을 예측한다. 어제는 미국 장이 반등을 했으니 오늘 국내 장도 그러할꺼야, 나스닥이 엄청 빠졌으니 국내 기술주도 많이 빠지겠지? 더 관심있는 사람은 무엇도 볼 것이고, 무엇도 추가로 고려할 것이다.
수치적인 지표에서 한 발자국 벗어나면 뉴스 기사도 눈에 들어온다. 어제 파월이 자이언트 스텝*(설명필요)을 거론했네? 우리 증시가 많이 빠지겠구나. 이렇게 하나 둘 고려해야할 사항이 늘어난다면, 그 때부터는 인간의 영역으로는 불가능하다. 우리는 직업도 있고, 자녀도 돌봐야하고, 운동도 해야하고, 휴식도 취해야한다.
(세상의 모든 정보를 모은다면 주가 예측은 가능하다 +
파이썬을 통해서 한다면 시간적 투자 없이 자동으로 수집이 가능하다.)
2.1.1. 무슨 데이터를 어디서 수집할 것인가
본 게시글에서는 거시 경제 지표로 채권, 원자재, 환율, 지수 그리고 가상화폐를 사용할 것이다. 이들은 초기 데이터 셋에 해당하며 추후 예측 모델 구축, 분석을 진행하면서 필요한 데이터를 가감할 것이다.
각 지표의 데이터는 각종 지표가 망라된 investing.com에서 가져온다. Investing.com은 종목, ETF, 환율, 원자재를 포함하여 전세계 경제 지표를 원하는 시점을 선택하여 데이터를 가져올 수 있어 널리 사용된다.
Investing.com에서 데이터를 가져오는 방법은 두 가지가 있다.
첫 번째 방법으로는 크롤링 라이브러리 selenium과 beautifulsoap을 활용하는 방법이다. 해당 방법은 사용자가 직접 investing.com에서 데이터를 조회하듯 모든 데이터를 가져올 수 있다는 장점이 있으나, 동적 크롤링을 수행하면서 통신 장애로 인해 수집에 실패하는 경우가 종종 발생한다.
두 번째 방법은 investpy 라이브러리를 활용하는 것이다. Investpy는 그림 1-2와 같이 주식, 펀드, ETF, 지수 등을 함수로 제공하며 이를 통해 안정적으로 데이터를 수집할 수 있다.
Investpy는 그림 1-3과 같이 명령 프롬포트 창에서 하기 코드를 통해 설치가 가능하다.
pip install investpy
2.2. 채권 수집하기
가장 먼저 채권을 수집해보자.
investpy에서 제공하는 채권의 종류를 알기 위해서는 다음과 같이 코드를 작성한다.
investpy.get_bonds()
데이터의 형태를 보면 국가/채권이름/채권전체이름의 형태로 구성됨을 알 수 있다.
총 66개국의 연도별 국채를 제공하고 있고 우리는 이 중에서 특정 국가만을 선택해서 가져올 것이다.
개별 국채의 가격은 어떻게 가져올 수 있을까?
예시로 대한민국의 2년 국가채의 수익률을 가져와보자.
개별 국가채의 데이터는 get_bond_historical_data() 함수를 통해서 가져올 수 있으며 내부 인자는 다음과 같다.
investpy.get_bond_historical_data('국채 이름', from_date = 시작 날짜, end_date = 종료 날짜)
해당 함수는 시작과 종료 날짜를 필요로 한다. 따라서 날짜를 먼저 지정해야한다.
오늘의 날짜는 코드를 수행하는 당일로 할당하기 위해 datetime 라이브러리의 now 함수를 사용한다.
investpy.get_bond_historical_data('South Korea 2Y', from_date=start_date, to_date = end_date)
get_bond_historical_data 함수는 국채명을 통해서 채권의 과거 데이터를 가져온다.
따라서 원하는 채권의 과거 데이터를 가져오기 위해서는 국채명을 먼저 알아야한다.
66개의 나라 중 사용자가 지정한 국가들의 단기채(통상 2년)와 장기채(통상 10년)를 가져오는 코드는 다음과 같다.
get_bond_list = ['brazil','canada','china','france','germany','india','japan','russia','south korea','united kingdom','united states'] # (1)bond_infos = investpy.get_bonds()# (2)bond_infos = bond_infos.loc[bond_infos.country.isin(get_bond_list),:]# (3)select_list = [x for x in bond_infos.name if' 2Y'in x or'10Y'in x] # (4)bond_infos = bond_infos.loc[bond_infos.name.isin(select_list),:]# (5)bond_infos
(1-3) : 원하는 국가들을 조건을 통해 선별한다.
(4-5) : bond_infos의 이름 중 ' 2Y'나 '10Y'가 들어가는 경우를 재선별한다. 여기서 '2Y'가 아닌 ' 2Y'를 적용한 이유는 전자로 선별 시 12년채('12Y')도 선택되기 때문이다.
bond_infos의 결과는 그림 1-6과 같다.
그림 1-6과 같이 수집하고자 하는 채권 이름을 알게 되었다면, 다음으로는 반복문을 통해서 이를 수집하면 된다. 코드는 다음의 구조로 작성된다.
for name in bond_infos.name:# 데이터 수집 temp_df = investpy.get_bond_historical_data(name, from_date = start_date, to_date = end_date)# 데이터 병합 ...# 대기 시간 time.sleep(wait_time)
첫째로 선택된 채권명(name)을 통해 시계열 데이터를 수집한다.
이후 이전에 수집된 데이터와 병합(merge)하는 과정을 거치고 약간의 대기시간을 두고 다음 loop을 수행한다.
매 loop의 마지막에 대기 시간을 두는 이유는 investpy를 통해 단기간에 많은 요청이 들어올 시 접근이 차단되기 때문이다.
완성된 코드는 다음과 같다.
start_idx =0# (1)select_cols = ['Close'] # (2)for name intqdm(bond_infos.name[start_idx:]): wait_time = np.round(np.random.sample(1)[0] *5, 2)# (3)if wait_time <=1: wait_time =1.5 save_name = re.sub(" ","_",name)# (4) save_name = re.sub("\.","",save_name)if start_idx ==0:# (5) output_df = investpy.get_bond_historical_data(name, from_date = start_date, to_date = end_date) output_df = output_df.loc[:, select_cols] output_df.columns = [save_name +"_"+ x for x in select_cols]else:# (6) temp_df = investpy.get_bond_historical_data(name, from_date = start_date, to_date = end_date) temp_df = temp_df.loc[:, select_cols] temp_df.columns = [save_name +"_"+ x for x in select_cols] output_df = pd.merge(output_df, temp_df, left_index =True, right_index =True, how ='outer')# (7) time.sleep(wait_time) start_idx +=1# (8)output_df = output_df.sort_values("Date")# (9)
(1) : start_idx는 loop의 횟수를 측정하는 변수이다. 이를 통해 중간에 에러가 발생하여 loop문이 멈추더라도 멈추기 직전 loop에서부터 다시 시작할 수 있다.
(2) : select_cols는 그림 1-5의 데이터에서 시,고,저,종가 중 종가인 Close 만을 선택하겠다는 뜻이다. 추가하고자한다면 해당 리스트에 추가하면 된다.
(3) : wait_time은 각 loop 별 대기 시간을 뜻한다. (8) 코드 바로 윗줄의 time.sleep 함수에 사용되며 1~5 사이의 임의의 값이 할당된다.
(4) : save_name은 최종으로 병합된 데이터에 들어갈 컬럼명을 뜻한다. 채권의 경우 Brazil 2Y와 같이 띄어쓰기로 컬럼명이 작성되어있는데 편의를 위해 공백을 언더바("_")로 변경한다.
(5) : 만약 start_idx가 0이라면, 다른 말로 첫 번째 loop라면 최종 데이터(output_df)를 바로 선언한다.
(6) : 만약 start_idx가 0이 아니라면, 다른 말로 첫 번째 loop가 아니라면 진행될 코드이다.
(7) : (6) 조건이라면 temp_df로 해당 채권의 데이터를 가져온 후 이를 pd.merge() 함수를 통해 최종 데이터와 병합한다. 병합의 기준이 양 데이터의 index에 해당하는 Date를 사용하기 때문에 on이 아닌 left_index와 right_index 인자를 사용한다.
(8) : loop가 완료되면 start_idx에 1을 더해준다.
(9) : 모든 loop가 종료되면 최종 데이터를 Date를 기준으로 오름차순 정렬한다.
2.2. 원자재 수집하기
다음으로 원자재를 수집해보자.
원자재도 채권과 동일하게 get_commodities() 함수로 제공 가능한 원자재명을 확인 후 get_commodity_historical_data로 시계열 데이터를 가져오는 구조이다.
위의 말이 언급되면 우리는 '코로나 관련주', '백신 관련주'를 자연스레 포털에 검색하곤 한다. 화면에는 제약 회사, 바이오 회사, 진단 키트 관련 회사 등 많은 종목이 리스트업 될 것이다.
물망에 오른 수 개 내지 수십 개의 종목 중 상승하는 종목이 존재하는 반면 하락하는 종목도 있을 것이다. 즉 동일 이슈에 의해 같은 섹터로 묶인 종목이더라도 내가 산 종목만 하락하는 경우가 발생할 수 있다. 상승 종목을 선택할 능력이 떨어지는 투자자는 해당 섹터로 분류된 모든 종목들을 구매하는 편이 더 나을 것이다.
이와 같이 여러 종목들의 주가를 가중합하여 하나의 지표로 만든 것을 지수(index)라고 하며, 이를 추종하는 상품들로 인덱스 펀드와 ETF가 대표적이다. Investing.com에서 제공하는 지수는 다음 코드 결과를 통해 확인할 수 있으며 총 7,800개의 지수가 존재한다.
그림 1-21을 살펴보면 2.6.1.에서 수집한 국가별 주요 지수와 중복되는 시계열이 수집되지 않을까라는 우려가 존재한다. 가령 그림 1-20의 S&P/ASX 200과 1-21의 DJ Australia USD는 모두 호주 국가의 전반적인 시장 추세를 보여줄 것이다. 한 가지 차이점은 2.6.1.에서 수집한 지표는 해당 국가의 화폐를 사용하며(예: 호주-AUD) 2.6.2.의 경우 전부 미국 달러로 통일시켰다는 것이다.
따라서 2.6.1.에서 수집한 지수들은 개별 국가들 스스로의 내부 시장 변화를 보여주는 역할을 하는 반면, 2.6.2.의 지수들은 동일 화폐(USD)를 기준으로 국가의 경제 규모가 어떠한지 비교가 가능하다는 차이가 존재한다. 이에 더해 특정 국가와 미국 간의 상대 환율이 어떻게 변화하고 있는지 간접적으로 확인할 수 있다.
이는 동일한 지수를 가르키는 DJ South Korea와 DJ South Korea USD의 그래프를 비교해보면 쉽게 이해할 수 있다. 코스피 지수는 2022년도 들어서 상당한 하락이 발생하였는데, DJ South Korea USD의 경우 지수의 하락에 더하여 원화 가치 하락 또한 동반되면서 그 폭이 훨씬 큰 것을 알 수 있다. 그림 1-21의 오른쪽 그래프는 코스피 하락에 원화 가치의 하락이 더해져 보다 큰 폭으로 하락한 것을 확인할 수 있다.
2.6.3. 주요 산업 지수 수집하기
2.6.3.1. 전세계 산업 지수 수집하기
현재 헬스케어 산업은 호황인가? 반도체 산업은 불황인가?
이렇게 섹터별로 측정된 가치가 코스피에는 어떤 영향을 미칠까?
이를 확인하기 위해 먼저 전 세계 각 산업별로 MSCI에서 측정한 지수를 수집하면 다음과 같다.
cond_msci = [Trueif"MSCI World"in x elseFalsefor x in index_infos.name]cond_msci_detail = [Trueif"USD"notin x and"10 40"notin x elseFalsefor x in index_infos.name] # (1)cond_pmsector = index_infos['class']=='primary_sectors'COND3 = cond_msci & cond_pmsector & cond_msci_detail # (2)index_infos.loc[COND3,:]
(1) : MSCI 전세계 섹터별 지수 중 세부적인 산업 지수는 제거
(2) : MSCI 세계 섹터별 지수와 primary sector 와의 교집합 지수들
그림 1-23을 확인하면 텔레콤, 헬스케어, 금융, 에너지 등의 섹터가 존재함을 알 수 있다.
세부적으로 해당 지수가 어떤 것을 추종하는지를 살펴보자.
가장 먼저 MSCI World Utilities를 살펴볼텐데 구글에 MSCI World Utilities를 검색하면 msci.com에서 발간한 문서를 찾을 수 있다.
해당 문서를 들어가면 첫 장에는 지수에 대한 performance table 및 returns의 시각화 자료를 확인할 수 있다.
우리가 볼 것은 두 번째 장표에 존재하는데 해당 지수가 높은 비중으로 추종하고 있는 TOP 10 회사들의 리스트 및 비중이 나와있다.
가장 높은 비중을 지닌 NEXTERA ENERGY는 무슨 회사일까?
이 또한 구글에 검색하면 위키백과의 설명을 쉽게 찾을 수 있으며, 조금 더 자세한 정보를 얻고자 하는 경우 yahoo finance를 이용할 수 있다.
해당 설명을 확인하면 미국의 에너지 회사임을 알 수 있다. 그런데 에너지 회사의 비중이 가장 크다면 그림 1-23의 MSCI World Energy와 동일한 것이 아닌가? 란 의문이 들 수 있다.
의문점을 해결하기 위해 MSCI World Energy는 무엇을 추종하는지 살펴보도록 하자.
가장 많은 비중을 차지하는 ExxonMobil의 기업 설명은 그림 1-28과 같다.
그림 1-28의 설명을 확인하면 크루드 오일 및 천연 가스를 생산하는 기업임을 확인할 수 있다.
그렇다면 MSCI World Utilities와 MSCI World Energy는 동일한 지수인가?
(1) : main 함수의 인자는 두 가지로, opt는 total과 append 값을 가진다. total이 들어올 경우 초기에 실행하는 경우를 상정하여 사용자가 정의한 시작 날짜부터 당일까지 데이터를 모두 수집한다. append가 들어올 경우 두 번째 인자인 renew_date를 사용하는데, 여기서 renew_date는 최근 N일치의 N에 해당한다. Default 값은 5로 최근 5일치만 갱신한다는 뜻이다.
(2) : data_path의 경우 함수 인자로 받지 않고 전역 변수를 그대로 사용하기 위해 global로 할당하였다.
(3) : 전체 날짜를 수집하는 경우 실행됨.
(4) : 사용자가 정의한 시작 날짜
(5-6) : 수집 결과를 need_append_df.csv와 need_imputation_df.csv 파일로 저장함
(7) : append 인자가 들어올 경우 기존에 수집했던 데이터가 존재한다는 뜻이기에, need_append_df를 읽어오는 것 부터 시작함.
(8) : append 인자가 들어올 경우 시작 날짜는 크롤링 당일 기준으로 (1)에서 정의한 renew_date 만큼 차감한 날짜이다. 즉 이 경우 코드 실행일을 기준으로 5일 전부터 재수집을 실시한다.
(9) : 갱신할 일자의 데이터를 수집하며 이를 append_df 변수에 할당한다.
(10-11) : 기존에 수집된 데이터(total_df)에서 재수집된 데이터를 갱신한다.
최종적으로 수집 시 사용하는 코드는 하단의 전체 코드를 참고하면 된다.
포스팅 마무리
본 포스팅을 통해 investpy를 활용하여 데이터를 수집하는 방법을 알아보았다.
먼저 채권, 원자재, 가상화폐, 환율, 지수 섹터를 선정하였고 개별 섹터마다 분석가의 주관에 의해 종목들을 선별하였다.
다음으로 선별된 종목들의 과거 시계열 데이터를 수집하였으며 이를 날짜를 기준으로 병합하였다.
마지막으로 수집된 데이터를 저장하고, 중복 수집이 발생하지 않도록 최근의 날짜만을 갱신하는 main 함수를 작성하였다.
다음 포스팅으로는 이렇게 수집된 데이터 중 결측치가 존재하는 경우 이를 대치(imputation)하는 과정을 살펴볼 것이다.
전체 코드
## Importfrom tqdm import tqdmfrom functools import reduceimport numpy as npimport pandas as pdimport investpyimport datetimeimport reimport timeimport os## define functionsdefget_infos(options):assert options in ['bond','commodities','crypto','currency','index']if options =='bond': get_bond_list = ['brazil','canada','china','france','germany','india','japan','russia','south korea','united kingdom','united states']# total bond bond_infos = investpy.get_bonds()# select country bond_infos = bond_infos.loc[bond_infos.country.isin(get_bond_list),:]# select 2y and 10y select_list = [x for x in bond_infos.name if' 2Y'in x or'10Y'in x] bond_infos = bond_infos.loc[bond_infos.name.isin(select_list),:] infos = bond_infoselif options =='commodities': commo_infos = investpy.get_commodities() commo_name_list =list(commo_infos.loc[(commo_infos.group =='metals') & (commo_infos.currency =='USD')].name) commo_name_list.extend(['Brent Oil', 'Crude Oil WTI', 'Natural Gas', 'Lumber', 'Rough Rice', 'US Corn']) commo_infos = commo_infos.loc[commo_infos.name.isin(commo_name_list),:]# del u.k copper infos = commo_infos.loc[~commo_infos.index.isin([10])]elif options =='crypto': cryp_list = ['Bitcoin'] cryp_infos = investpy.get_cryptos() cryp_infos = cryp_infos.loc[cryp_infos.name.isin(cryp_list)] infos = cryp_infoselif options =='currency': curr_infos = investpy.get_currency_crosses() curr_name = ['EUR','JPY','USD','AUD','GBP','CNY','RUB'] curr_name = [x+"/KRW"for x in curr_name] curr_name.extend(["USD/"+ x for x in ['EUR', 'JPY', 'GBP']]) curr_infos = curr_infos.loc[curr_infos.name.isin(curr_name)] infos = curr_infoselif options =='index': index_infos = investpy.get_indices() cond_major = index_infos['class']=='major_indices' cond_world = index_infos.market =='world_indices' COND1 = cond_major & cond_world cond_global = index_infos.market =='global_indices' cond_dj = [Trueif"DJ"in x elseFalsefor x in index_infos.name] cond_local = [Trueif ("USD"in x) and ("EUR"notin x) elseFalsefor x in index_infos.name] COND2 = cond_global & cond_dj & cond_local & cond_major cond_msci = [Trueif"MSCI World"in x elseFalsefor x in index_infos.name] cond_pmsector = index_infos['class']=='primary_sectors' cond_msci_detail = [Trueif"USD"notin x and"10 40"notin x elseFalsefor x in index_infos.name] COND3 = cond_msci & cond_pmsector & cond_msci_detail cond_korea = index_infos.country =="south korea" cond_krx = index_infos.name.isin(['KOSDAQ'])# 코스닥 임의 추가 COND4 = cond_korea & cond_pmsector | cond_krx infos = index_infos.loc[COND1 | COND2 | COND3 | COND4,:] infos.index = [x for x inrange(infos.shape[0])]return infosdefget_option_data(name,country,start_date,end_date,options):if options =='bond':return investpy.get_bond_historical_data(name, from_date=start_date, to_date = end_date)elif options =='commodities':return investpy.get_commodity_historical_data(name, from_date=start_date, to_date = end_date)elif options =='crypto':return investpy.get_crypto_historical_data(name, from_date=start_date, to_date = end_date)elif options =='currency':return investpy.get_currency_cross_historical_data(name, from_date=start_date, to_date = end_date)elif options =='index':return investpy.get_index_historical_data(name, country, from_date=start_date, to_date = end_date)defget_hist_data(infos,options,start_date,end_date): country =Noneif options in ['bond','currency','index']: select_cols = ['Close']elif options in ['commodities','crypto']: select_cols = ['Close','Volume'] start_idx =0for name intqdm(infos.name[start_idx:]):if options in ['index']: country = infos.country[start_idx] wait_time = np.round(np.random.sample(1)[0] *5, 2)if wait_time <=1: wait_time =1.5 save_name = re.sub(" ","_",name) save_name = re.sub("\.","",save_name)try:if start_idx ==0: output_df =get_option_data(name, country, start_date, end_date, options) output_df = output_df.loc[:, select_cols] output_df.columns = [save_name +"_"+ x for x in select_cols]else: temp_df =get_option_data(name, country, start_date, end_date, options) temp_df = temp_df.loc[:, select_cols] temp_df.columns = [save_name +"_"+ x for x in select_cols] output_df = pd.merge(output_df, temp_df, left_index =True, right_index =True, how ='outer')except: start_idx +=1continue start_idx +=1 time.sleep(wait_time) output_df = output_df.sort_values("Date")return output_dfdefget_total(opt_list,start_date,end_date):for idx, opt inenumerate(opt_list):if idx ==0: infos =get_infos(opt) output_df =get_hist_data(infos, opt, start_date, end_date)else: temp_infos =get_infos(opt) temp_df =get_hist_data(temp_infos, opt, start_date, end_date) output_df = pd.merge(output_df, temp_df, left_index =True, right_index =True, how ='outer') time.sleep(3) output_df = output_df.reset_index() output_df = output_df.sort_values("Date")return output_dfdefmain(opt='append',renew_date=5):global data_path opt_list = ['bond','commodities','crypto','currency','index']if opt =='total':# date setting start_date ='01/01/2006' end_date = datetime.datetime.now().strftime("%d/%m/%Y") total_df =get_total(opt_list, start_date, end_date) total_df.to_csv(data_path +"need_append_df.csv", index =False) total_df.to_csv(data_path +"need_imputation_df.csv", index =False)else: total_df = pd.read_csv(data_path +"need_append_df.csv") start_date = (datetime.datetime.now()- datetime.timedelta(renew_date)).strftime("%d/%m/%Y") end_date = datetime.datetime.now().strftime("%d/%m/%Y") append_df =get_total(opt_list, start_date, end_date) total_df.Date = pd.to_datetime(total_df.Date) append_df.Date = pd.to_datetime(append_df.Date) total_df = total_df.loc[total_df.Date < append_df.Date[0]] total_df = pd.concat([total_df, append_df], axis =0) total_df.to_csv(data_path +"need_imputation_df.csv", index =False)return total_df## data pathfront_path ='C:\\Windows\\python_windows\\python_base_env\\notebooks'data_path ='{}\\Stock-price-prediction\\datasets\\'.format(front_path)## 전체 데이터를 수집하고자 하는 경우total_df =main(opt ='total')## 기 수집된 데이터를 활용하여 최근 데이터만 갱신할 경우total_df =main()