- tushare是一个非常神奇的Python模块包,基于新浪的API,可提供并不限于股票的历史数据。
- 数据库选用的是sqlite3,单文件,轻量化,不需要配置。
以下是完整代码,且使用的是多线程的方式。此处提到的多线程的方法可以参考Python黑魔法,一行实现并行化这篇文章,讲的很好。
准备工作
import tushare as ts
from sqlalchemy import create_engine #注1
import sqlite3
import pandas as pd #注2
from multiprocessing.dummy import Pool as ThreadPool #注3
conn1=sqlite3.connect('Stocklist.db') #注4
engine = create_engine('sqlite:///History.db', echo = False) #注5
conn2 = sqlite3.connect('History.db')
cur2=conn2.cursor() #注6
stocklist = []
errorlist = []
alreadylist = []
cur1=conn1.cursor()
query1 = "select * from Allist" #注7
cur1.execute(query1) #注8
stocklist = cur1.fetchall() #注9
cur1.close()
conn1.close()
query2 = "select name from sqlite_master where type='table' order by name" #注10
alreadylist = pd.read_sql(query2, conn2) #注11
注1:sqlalchemy
是Python自带的与数据库联结的包,导入创建数据库联结的函数
注2:导入pandas包,Python上的科学计算用的包
注3:多线程
注4:Stocklist.db是存放股票列表的数据库
注5:创建与sqlite数据库的联结,名字为History.db
注6:创建一个游标
注7:SQL语句。Allist
是Stocklist.db中的表,存放股票列表的。
注8:执行SQL语句
注9:获取执行SQL查询后的结果,stocklist
是tuple类型
注10:意思是获取所有表名
注11:另一种读数据库的方法,直接用pandas读取数据库,alreadylist
是DataFrame类型,有代码的那一列名为name
获取数据并保存的函数
def save(stock):
code = stock[0][:6] #注1
if code not in list(alreadylist.name): #注2
marketday = stock[1] #注1
i= 0
try:
startday = pd.Timestamp(marketday)
df = ts.get_h_data(code, start=str(startday)[:10], retry_count = 5)
df = df.sort_index(ascending=True) #注4
ma_list = [5,10,20,60]
for ma in ma_list:
df['MA_' + str(ma)] = pd.rolling_mean(df.close, ma) #注5
df.to_sql(code, engine, if_exists='append') #注6
except:
errorlist.append(stock[0])
print errorless #注7
该函数的思路是这样的:
- 利用tushare的
get_h_data
函数获取数据。 - 由于会出现如网络错误或其他错误,导致该程序重新执行,所以必须验证以防止添加重复数据。
注1:stock
取自上文的stocklist
, 由于是tuple,含有两列,第一列取做code
,第二列取作marketday
,后者是该股票的上市日。
注2:上文中用SQL语句查询出了一个DataFramealreadlist
,包含了History.db数据库中已有的表名,用alreadlist.name
取出,name
是alreadlist
的列名。
注3:pd.Timestamp
可以把文本类型的日期转成时间戳类型的,这样就可以进行时间的运算,例如通过pd.Timedelta
。然后就照startday
和enday
的写法,三年一个跨度拉取数据。
注4:数据拉过来是以date
为索引的,但是还需要重新排序,因而这样写以升序排列。
注5:没有移动均线的数据,因而手动计算。pandas直接自带移动平均数的计算函数pd.rolling_mean
,两个参数分别是计算对象和计算参数。
注6:写入数据库,if_exists='append'
意为追加的形式。
注7:用try...except
的方式来避免异常中断,错误的股票写入errorlist
,最后程序结束时打印出来。
多线程处理
pool = ThreadPool(4)
try:
pool.map(save, stocklist)
except:
pool.map(save, stocklist)
f = open('Notsaved.txt', 'w')
print >> f, errorlist
f.close()
pool.close()
pool.join()
注:pool.map(save, stocklist)
意思就是从stocklist
中取每一个元素送入save
的函数中运行。最后把上段代码的errorlist
打印成文件。
每日的更新
import tushare as ts
from sqlalchemy import create_engine
import sqlite3
import pandas as pd
from datetime import datetime as dt
con = sqlite3.connect('History.db')
query1 = "select name from sqlite_master where type='table' order by name"
stocklist = pd.read_sql(query1, con).name
engine = create_engine('sqlite:///History.db', echo = False)
updatestock = []
for stock in stocklist:
query2 = "select * from '%s' order by date" %stock
df = pd.read_sql(query2, con)
df = df.set_index('date')
if dt.now().weekday() == 5: #注1
today = str(pd.Timestamp(dt.now())-pd.Timedelta(days = 1))[:10] #注2
elif dt.now().weekday() == 6:
today = str(pd.Timestamp(dt.now())-pd.Timedelta(days = 2))[:10]
else:
today = str(pd.Timestamp(dt.now()))[:10]
if today != df.ix[-1].name[:10]:
try:
df = ts.get_h_data(stock, start = df.ix[-1].name[:10], retry_count = 5)
df.to_sql(stock, engine, if_exists='append')
updatestock.append(stock)
except:
continue
f = open('updated.txt','w')
print >>f, updatestock
f.close()
注1:dt.now()
是指今天,dt.now().weekday
是返回今天是星期几,5代表星期六,6代表星期天。
注2:today
指的是最近的一个交易日,df.ix[-1].name
是数据库中最新的一天,if today != df.ix[-1].name[:10]
意思就是,如果数据库最新的一天不是最近一个交易日,则要开始更新数据。
清洗数据库
import pandas as pd
import sqlite3
from multiprocessing.dummy import Pool as ThreadPool
con = sqlite3.connect('History.db')
query1 = "select name from sqlite_master where type='table' order by name"
stocklist = pd.read_sql(query1, con).name
delstock = []
f = open('Deleted.txt', 'w')
for stock in stocklist:
query2 = "select * from '%s' order by date" %stock
df = pd.read_sql(query2, con)
cur=con.cursor()
query4 = "delete from '%s' where rowid not in(select max(rowid) from '%s' group by date)" %(stock, stock) #注1
cur.execute(query4)
con.commit()
con.close()
print >> f, delstock
f.close()
注1:这句SQL语句的意思是以date
分组,删除重复的行
注2:最后执行cur.execute(…)完后要con.commit()
提交,才能有效