在不推荐使用weka作为算法库的前提下,使用Python的算法库是非常好的选择。
Python常用算法库:
pandas:核心在与了解DataFrame与Series两种数据结构
pandas.DataFrame – pandas 0.23.4 documentation
scikit-learn:包含丰富算法库
API Reference – scikit-learn 0.20.2 documentation
scipy
numpy
使用Anaconda作为算法库核心是非常好的选择,但是也要注意,anaconda下载算法包一流,但是对非算法包的支持有限。
本地开发:
开发阶段在本地启动java项目,启动本地的anaconda的Python解释器来执行Python程序。
由于Python程序中带有第三方包,因此只能通过Runtime开进程来实现。
输入的参数全转换为String类型,定位好Python解释器路径与Python主文件路径后全部传入即可:
String[] arguments = new String[]{AlgorithmLibConfig.PYTHON_INTERPRETER_PATH, AlgorithmLibConfig.PYTHON_ALGORITHM_MAIN_FILE_PATH, “zerodeal_dropFeatures”, dataFileName, zeroPercentThreshold};
pythonAlgoCommonBiz.callPythonFile(arguments);
因为无法用在Python中通过return来获取返回值,因此在Python中通过print将结果输出到流中,然后在java中读取。(两者之间要定义好输出的格式便于解析结果)
@Override
public String callPythonFile(String[] arguments) {
LOGGER.info("[callPythonFile]:调用python库文件开始,执行的函数名称是:" + arguments[2]);
StringBuilder pyReInfo = new StringBuilder();
try {
// String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); Process process = Runtime.getRuntime().exec(arguments);
BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
while ((line = in.readLine()) != null) {
// 需要注意的是,不能在Python中通过return语句返回结果, // 只能将返回值写入到标准输出流中,然后在Java中通过标准输入流读取Python的输出值。 pyReInfo.append(line);
LOGGER.info(line);
}
in.close();
// java代码中的process.waitFor() // 返回值为0表示我们调用python脚本成功, // 返回值为1表示调用python脚本失败 int re = process.waitFor();
if (re != 0) {
LOGGER.error("[callPythonFile]:调用python库文件失败,失败的函数是:" + arguments[2]);
throw new Exception();
}
} catch (Exception e) {
throw new BizException("[callPythonFile]:调用python库文件失败,失败的函数是:" + arguments[2]);
}
return pyReInfo.toString();
}
Python通过sys包获取参数:
线上实现Java与Python的交互需要用到RPC:
java端的请求代码:
@Override
public String rpcCallPython(Map<String, String> params) {
LOGGER.info("[rpcCallPython]:远程调用Python服务开始,执行的python选项是:" + params.get("option"));
String callUrl = AlgorithmLibConfig.CALL_PYTHON_ALGORITHM_URL;
String resCode;
String resInfo = "";
try {
//创建连接 java.net.URL url = new URL(callUrl);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setInstanceFollowRedirects(true);
connection.setRequestMethod("POST"); // 设置请求方式 connection.setRequestProperty("Accept", "application/json"); // 设置接收数据的格式 connection.setRequestProperty("Content-Type", "application/json"); // 设置发送数据的格式 LOGGER.info("connection.connect() is begin!");
connection.connect();
LOGGER.info("connection.connect() was success!");
//发送数据 OutputStreamWriter out = new OutputStreamWriter(connection.getOutputStream(), "UTF-8");
out.append(JSON.toJSONString(params));
out.flush();
out.close();
int code = connection.getResponseCode();
InputStream is;
if (code == 200) {
LOGGER.info("远程连接成功!");
is = connection.getInputStream();
} else {
LOGGER.info("远程连接失败,code = " + code);
is = connection.getErrorStream();
throw new BizException("[callPy]:远程连接Python服务器失败");
// return ; }
// System.out.println(System.currentTimeMillis());
// 读取响应 int length = connection.getContentLength();// 获取长度 if (length != -1) {
byte[] data = new byte[length];
byte[] temp = new byte[512];
int readLen = 0;
int destPos = 0;
while ((readLen = is.read(temp)) > 0) {
System.arraycopy(temp, 0, data, destPos, readLen);
destPos += readLen;
}
String result = new String(data, "UTF-8"); // utf-8编码 Map<String, Object> ret = JSON.parseObject(result, new TypeReference<Map<String, Object>>() {
});
// System.out.println("ret = " + ret); resCode = ret.get("resCode").toString();
resInfo = ret.get("resInfo").toString();
// System.out.println("resInfo = " + resInfo);
if (!resCode.equals("1")) {
throw new Exception("[callPy]的resCode不为1!");
}
}
// System.out.println(System.currentTimeMillis()); } catch (Exception e) {
throw new BizException("[rpcCallPython]:远程调用Python服务失败,执行的选项名称是:" + params.get("option"));
}
return resInfo;
}
Python端使用Flask框架:
init.make_app()
APP_NAME = “fund-algorithm-labs-service“
app = Flask(APP_NAME)
app = init.make_app()
@app.route('/check.txt')
def check():
path = os.path.join(init.getProjectDir(), 'check.txt')
f = open(path)
return f.read()
@app.route('/callPy/', methods=['POST'])
def callPy():
try:
# 服务器上的临时存储目录
os.chdir("/data/tmpfile")
# os.chdir("D:\\csv")
# if not request.json or 'option' not in request.json or 'fileName' not in request.json:
# abort(400, message="the vital param [option] or [fileName] doesn't exist")
returnVal = {'resCode': '0', 'resInfo': '', 'funName': 'none'}
option = request.json['option']
fileName = request.json['fileName']
modelId = request.json['modelId']
TMP_FILE_LOCAL = '/data/tmpfile/'
# TMP_FILE_LOCAL = 'D:\\csv\\'
TMP_FILE_REMOTE = "/rms/datamodel/" + str(modelId) + "/datafile/"
logger.info("callPy starting and option is " + option)
if option == "dropna":
logger.info("dropna enter")
axis = request.json['axis']
subset = request.json['subset']
na_deal.dropna(pd.read_csv(fileName), int(axis), [subset])
file_local = TMP_FILE_LOCAL + "data_dropna.csv"
file_remote = TMP_FILE_REMOTE + "data_dropna.csv"
ftpOper.ftp_main(0, file_local, file_remote)
returnVal['funName'] = 'dropna'
。。。。。。
else:
# abort(400, message="the vital param [option] have no choice!")
returnVal['resCode'] = '-1'
returnVal['resInfo'] = 'the param[option] have no choice!'
return json.dumps(returnVal)
returnVal['resCode'] = '1'
# returnVal['resInfo'] = 'call success'
return json.dumps(returnVal)
except Exception as e:
returnVal['resCode'] = '-1'
returnVal['resInfo'] = 'call /callPy/ raise error: ' + e
logger.error('call /callPy/ raise error: %s', e)
return json.dumps(returnVal)
if __name__ == "__main__":
# 官方启动方式参见:http://flask.pocoo.org/docs/0.12/quickstart/#a-minimal-application
# port=8383 debug=True
app.run(host="0.0.0.0")
Python项目架构:
最终构造成Java-FTP/DB-Python的三位一体协调结构。
附录:
Python与FTP:
import logging
from ftplib import FTP
from zhaogang import config
ftp = FTP()
logger = logging.getLogger('root')
def ftp_main(oper, localPath, remotePath):
ftpIp = config.get("others", "ftp.server.ip")
ftpAccount = config.get("others", "ftp.server.user.name")
ftpPassword = config.get("others", "ftp.server.user.password")
ftp.connect(host=ftpIp, port=21, timeout=30)
ftp.login(user=ftpAccount, passwd=ftpPassword)
if oper == 0:
ftp_upload(localPath, remotePath)
elif oper == 1:
ftp_download(localPath, remotePath)
ftp.quit()
def ftp_upload(file_local, file_remote):
logger.info("----------ftp_upload start----------")
# # 获取目录下的文件,获得目录列表
# fileList = ftp.nlst()
# for name in fileList:
# print(name)
# file_remote = "standardized.csv"
# file_local = "D:\\J2Python\\standardized.csv"
fp = open(file_local, "rb")
ftp.storbinary("STOR " + file_remote, fp=fp, blocksize=1024)
fp.close()
logger.info("----------ftp_upload start----------")
def ftp_download(file_local, file_remote):
logger.info("----------ftp_download start----------")
fp = open(file_local, "wb")
ftp.retrbinary("RETR %s" % file_remote, fp.write, blocksize=1024)
fp.close()
logger.info("----------ftp_download end----------")
Python与MySQL:
try:
dict = {'model_id': tModelIdList, 'data_range_head': tRangeHeadList, 'data_range_val': tRangeValueList,
'data_score_head': tScoreHeadList,
'data_score_val': tScoreValueList, 'gmt_create': tGmtCreateList}
df = pd.DataFrame(data=dict)
mysql.saveDataFrame2db(df)
except Exception as e:
logger.info('mysql.saveDataFrame2db somethin worry')
logger.error(e)
import logging.config
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
logger = logging.getLogger('root')
# # dev
# DB_USER = "#"
# DB_PASS = "#"
# DB_HOST = "#"
# DB_PORT = "#"
# DB_NAME = "#"
# 操作 MySQL 报错 ImportError: No module named 'MySQLdb'
# 执行 pip install MySQL-python 却报错 ImportError: No module named 'ConfigParser'。查了一下,这是由于 MySQL-python 不支持 Python 3
# 于是 找到了一个替代—— PyMySQL。执行 pip install PyMySQL,将数据库连接改为 mysql+pymysql://username:password@server/db
# connect_info = 'mysql+mysqldb://{}:{}@{}:{}/{}?charset=utf8'.format(DB_USER, DB_PASS, DB_HOST, DB_PORT,
# DB_NAME)
connect_info = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(DB_USER, DB_PASS, DB_HOST, DB_PORT,
DB_NAME)
def saveDataFrame2db(dataframe):
try:
logger.info("----------save dataFrame to DB is begin----------")
# 将数据写入mysql的数据库,但需要先通过sqlalchemy.create_engine建立连接,且字符编码设置为utf8,否则有些latin字符不能处理
con = create_engine(connect_info)
# pd.io.sql.to_sql(thedataframe, 'tablename', yconnect, schema='databasename', if_exists='append')
dataframe.to_sql(name='model_scorecard',
con=con,
if_exists='append',
index=False,
dtype={
'model_id': sqlalchemy.types.BIGINT(),
'data_range_head': sqlalchemy.types.VARCHAR(),
'data_range_val': sqlalchemy.types.VARCHAR(),
'data_score_head': sqlalchemy.types.VARCHAR(),
'data_score_val': sqlalchemy.types.VARCHAR(),
'gmt_create': sqlalchemy.types.DateTime()
})
logger.info("----------save dataFrame to DB was end----------")
except Exception as e:
logger.info('save dataFrame to DB raise a error')
logger.error(e)
def readDataFrame2db(sql):
try:
# 1. 用sqlalchemy构建数据库链接engine
con = create_engine(connect_info)
dataframe = pd.read_sql(sql=sql, con=con)
return dataframe
except Exception as e:
logger.error(e)
算法流程:
基于《周志华:机器学习西瓜书》
重复值处理:
- drop_duplicates
零元处理:
- 去掉0占比过多的维度
- 去掉0占比过多的样本
- 平均值代替0
异常值处理:
- 删除异常值
相关性处理:
- del_corrr
归一化处理:
- MinMaxScaler
y生成:
- 人工/专家生成y_行业数据
- 机器生成y
缺失值处理:
- dropna
- fillna
异常值检测:
- 单维度_箱线图
- 单维度_3α
- 多维度_Isolation Forest
标准化处理:
- StandardScaler
连续变量分组:
- 模型分组_Ctree
- 等距分组
WOE变换:
- WOE变换
参数调节:
- GridSearchCV
监督学习:
- Logistic Regression
综合评价模型:
- 熵值法
- PCA
x生成-特征提取:
- 人工/专家生成特征_业务
- 机器生成特征
特征选择:
- 专家特征选择
- 机器特征选择_RandomForestClassifier
模型评估与选择:
- 泛化能力_错误率与精度
- 泛化能力_ROC与AUC
- 行业