前段时间开始做股票数据分析的业余项目,希望能提高自己对大型数据量的处理能力。目前大致的想法是用python的tushare模块获取数据,用Java的框架做发布。
1.Tushare模块的说明
tushare是国内金融从业者@JimmyTu(挖地兔)搜集国内各个公开渠道的股票数据接口,并把这些接口整理后用python写的一个模块。文档地址为:http://tushare.org/index.html
首先说明一下tushare获取到的数据格式,这里以分笔历史纪录交易为例:
import tushare as ts
df = ts.get_tick_data('600848',date='2014-01-09')
df.head(4)
time price change volume amount type
0 15:00:00 6.05 -- 8 4840 卖盘
1 14:59:55 6.05 -- 50 30250 卖盘
2 14:59:35 6.05 -- 20 12100 卖盘
3 14:59:30 6.05 -0.01 165 99825 卖盘
请求得到的结果是pandas模块中的一种基本数据结构类——DataFrame,这是一种类似二维表的数据结构。DataFrame类提供了多种数据处理、存储的方法,其中也包括了将数据存入数据库的to_sql方法,更多用法可查阅pandas的官方文档。
2.数据库设计
目前每日分笔数据采用动态建表的方式保存,即将每天所有股票的分笔数据存储在一张以当天日期命名的表中,并且间隔一定时间周期后再分库。
3.代码
以下为一个初步的使用多线程请求历史分笔记录的python3代码
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 2 12:37:43 2017
@author: jerry
"""
import tushare as ts
import time
import queue
import threading
import pandas as ps
from sqlalchemy import create_engine
THREADS_NUM = 25 # 采集线程数
THREADS_EXITFLAG = 0 # 线程退出标志
TICKS_DATA_DATE = '2017-01-04' # 指定采集日期
MYSQL_ENGINE = 'mysql://root:pwd@ip:port/dbname?charset=utf8'
class GetStockData(threading.Thread):
def __init__(self, threadID, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.q = q
def run(self):
print ('线程%s开始下载' % (self.threadID))
self._process_data()
def _process_data(self):
engine = create_engine(MYSQL_ENGINE)
while not THREADS_EXITFLAG:
if not self.q.empty():
code = self.q.get()
remain_num = self.q.qsize()
tick_data = get_tick(code, TICKS_DATA_DATE)
#根据当请求的股票当日停牌时,返回的数据有三行
if len(tick_data) > 3:
save_to_mysql(TICKS_DATA_DATE, tick_data, engine, code, remain_num)
time.sleep(0.05)
else:
break
def get_stock_basics():
"""
获取当日股票列表
Return
--------
DataFrame
"""
basics = ts.get_stock_basics()
return basics
def get_tick(stockCode=None, date=None):
"""
根据股票列表的股票代码获取当日/指定日期历史分笔
Return
--------
DataFrame
"""
tick_data = ''
if date != None and date != '':
tick_data = ts.get_tick_data(stockCode, date)
else:
tick_data = ts.get_today_ticks(stockCode)
if not tick_data.dropna(axis=0, how='any', thresh=None).empty:
tick_data.insert(0, 'code', stockCode) #插入股票代码字段
return tick_data
def save_to_mysql(tablename=None, data=None, engine=None, code=None, num=None):
"""
保存获取的数据到MySQL数据库中
Return
--------
"""
for i in range(3):
try:
data.to_sql(tablename, engine, if_exists='append')
print('save %s %s' % (code, num))
break
except BaseException as e:
print ('Save Error %s ' % (code))
return
def main():
# reload(sys)
# sys.setdefaultencoding('utf8')
stock_codes = get_stock_basics()
threads = []
try:
"""
根据股票代码列表创建队列
"""
stocks = queue.Queue(len(stock_codes))
for code in stock_codes.index:
code = str(code)
if (len(code) != 6):
code = (6 - len(code)) * '0' + code
stocks.put(code)
"""
创建并运行线程
"""
for n in range(THREADS_NUM):
thread = GetStockData(n, stocks)
thread.start()
threads.append(thread)
while not stocks.empty():
pass
print ('数据请求完毕。')
THREADS_EXITFLAG = 1
for t in threads:
t.join()
except BaseException as e:
print ('Error', e)
return
if __name__ == '__main__':
print ('开始请求%s的数据' % (TICKS_DATA_DATE))
main()
代码很简单,请求当日的股票列表,并将列表中的股票代码放入队列中,尔后开启指定数量的线程并根据队列中的股票代码请求数据,请求完毕后将数据保存至mysql数据库。
当然,以上只是个初步版本,还需要修改一些纰漏之处,并增加日志等功能。