股票数据分析(一)数据获取

前段时间开始做股票数据分析的业余项目,希望能提高自己对大型数据量的处理能力。目前大致的想法是用python的tushare模块获取数据,用Java的框架做发布。

1.Tushare模块的说明

tushare是国内金融从业者@JimmyTu(挖地兔)搜集国内各个公开渠道的股票数据接口,并把这些接口整理后用python写的一个模块。文档地址为:http://tushare.org/index.html

首先说明一下tushare获取到的数据格式,这里以分笔历史纪录交易为例:

import tushare as ts
df = ts.get_tick_data('600848',date='2014-01-09')
df.head(4)
     time       price change  volume  amount  type
0    15:00:00   6.05     --       8    4840   卖盘
1    14:59:55   6.05     --      50   30250   卖盘
2    14:59:35   6.05     --      20   12100   卖盘
3    14:59:30   6.05  -0.01     165   99825   卖盘

请求得到的结果是pandas模块中的一种基本数据结构类——DataFrame,这是一种类似二维表的数据结构。DataFrame类提供了多种数据处理、存储的方法,其中也包括了将数据存入数据库的to_sql方法,更多用法可查阅pandas的官方文档。

2.数据库设计

目前每日分笔数据采用动态建表的方式保存,即将每天所有股票的分笔数据存储在一张以当天日期命名的表中,并且间隔一定时间周期后再分库。

3.代码

以下为一个初步的使用多线程请求历史分笔记录的python3代码

# -*- coding: utf-8 -*-
"""
Created on Fri Jan  2 12:37:43 2017

@author: jerry
"""

import tushare as ts
import time
import queue
import threading
import pandas as ps
from sqlalchemy import create_engine

THREADS_NUM = 25  # 采集线程数
THREADS_EXITFLAG = 0  # 线程退出标志
TICKS_DATA_DATE = '2017-01-04'  # 指定采集日期
MYSQL_ENGINE = 'mysql://root:pwd@ip:port/dbname?charset=utf8'

class GetStockData(threading.Thread):
    def __init__(self, threadID, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.q = q

    def run(self):
        print ('线程%s开始下载' % (self.threadID))
        self._process_data()

    def _process_data(self):
        engine = create_engine(MYSQL_ENGINE)
        while not THREADS_EXITFLAG:
            if not self.q.empty():
                code = self.q.get()
                remain_num = self.q.qsize()
                tick_data = get_tick(code, TICKS_DATA_DATE)
                #根据当请求的股票当日停牌时,返回的数据有三行
                if len(tick_data) > 3:
                    save_to_mysql(TICKS_DATA_DATE, tick_data, engine, code, remain_num)
                time.sleep(0.05)
            else:
                break

def get_stock_basics():
    """
    获取当日股票列表
    Return
    --------
    DataFrame
    """
    basics = ts.get_stock_basics()
    return basics

def get_tick(stockCode=None, date=None):
    """
    根据股票列表的股票代码获取当日/指定日期历史分笔
    Return
    --------
    DataFrame
    """
    tick_data = ''
    if date != None and date != '':
        tick_data = ts.get_tick_data(stockCode, date)
    else:
        tick_data = ts.get_today_ticks(stockCode)
    if not tick_data.dropna(axis=0, how='any', thresh=None).empty:
        tick_data.insert(0, 'code', stockCode) #插入股票代码字段
    return tick_data


def save_to_mysql(tablename=None, data=None, engine=None, code=None, num=None):
    """
    保存获取的数据到MySQL数据库中
    Return
    --------
    """
    for i in range(3):
        try:
            data.to_sql(tablename, engine, if_exists='append')
            print('save %s %s' % (code, num))
            break
        except BaseException as e:
            print ('Save Error %s ' % (code))
    return


def main():
    #    reload(sys)
    #    sys.setdefaultencoding('utf8')

    stock_codes = get_stock_basics()
    threads = []
    try:
        """
        根据股票代码列表创建队列
        """
        stocks = queue.Queue(len(stock_codes))
        for code in stock_codes.index:
            code = str(code)
            if (len(code) != 6):
                code = (6 - len(code)) * '0' + code
            stocks.put(code)

        """
        创建并运行线程
        """
        for n in range(THREADS_NUM):
            thread = GetStockData(n, stocks)
            thread.start()
            threads.append(thread)

        while not stocks.empty():
            pass
        print ('数据请求完毕。')
        THREADS_EXITFLAG = 1

        for t in threads:
            t.join()
    except BaseException as e:
        print ('Error', e)
    return

if __name__ == '__main__':
    print ('开始请求%s的数据' % (TICKS_DATA_DATE))
    main()

代码很简单,请求当日的股票列表,并将列表中的股票代码放入队列中,尔后开启指定数量的线程并根据队列中的股票代码请求数据,请求完毕后将数据保存至mysql数据库。
当然,以上只是个初步版本,还需要修改一些纰漏之处,并增加日志等功能。

点赞