风控算法库(下)

在不推荐使用weka作为算法库的前提下,使用Python的算法库是非常好的选择。

Python常用算法库:

pandas:核心在与了解DataFrame与Series两种数据结构

pandas.DataFrame – pandas 0.23.4 documentation

scikit-learn:包含丰富算法库

API Reference – scikit-learn 0.20.2 documentation

scipy

numpy

使用Anaconda作为算法库核心是非常好的选择,但是也要注意,anaconda下载算法包一流,但是对非算法包的支持有限。

公瑾:Anaconda与python

本地开发:

开发阶段在本地启动java项目,启动本地的anaconda的Python解释器来执行Python程序。

由于Python程序中带有第三方包,因此只能通过Runtime开进程来实现。

输入的参数全转换为String类型,定位好Python解释器路径与Python主文件路径后全部传入即可:

String[] arguments = new String[]{AlgorithmLibConfig.PYTHON_INTERPRETER_PATH, AlgorithmLibConfig.PYTHON_ALGORITHM_MAIN_FILE_PATH, “zerodeal_dropFeatures”, dataFileName, zeroPercentThreshold};

pythonAlgoCommonBiz.callPythonFile(arguments);

因为无法用在Python中通过return来获取返回值,因此在Python中通过print将结果输出到流中,然后在java中读取。(两者之间要定义好输出的格式便于解析结果)

    @Override
    public String callPythonFile(String[] arguments) {
        LOGGER.info("[callPythonFile]:调用python库文件开始,执行的函数名称是:" + arguments[2]);

        StringBuilder pyReInfo = new StringBuilder();
        try {
            // String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();             Process process = Runtime.getRuntime().exec(arguments);
            BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                // 需要注意的是,不能在Python中通过return语句返回结果,                 // 只能将返回值写入到标准输出流中,然后在Java中通过标准输入流读取Python的输出值。                 pyReInfo.append(line);
                LOGGER.info(line);
            }
            in.close();
            // java代码中的process.waitFor()             // 返回值为0表示我们调用python脚本成功,             // 返回值为1表示调用python脚本失败             int re = process.waitFor();
            if (re != 0) {
                LOGGER.error("[callPythonFile]:调用python库文件失败,失败的函数是:" + arguments[2]);
                throw new Exception();
            }
        } catch (Exception e) {
            throw new BizException("[callPythonFile]:调用python库文件失败,失败的函数是:" + arguments[2]);
        }

        return pyReInfo.toString();
    }

Python通过sys包获取参数:

《风控算法库(下)》
《风控算法库(下)》

线上实现Java与Python的交互需要用到RPC:

公瑾:关于Java与Python的跨服务器调用方案

java端的请求代码:

    @Override
    public String rpcCallPython(Map<String, String> params) {

        LOGGER.info("[rpcCallPython]:远程调用Python服务开始,执行的python选项是:" + params.get("option"));

        String callUrl = AlgorithmLibConfig.CALL_PYTHON_ALGORITHM_URL;

        String resCode;
        String resInfo = "";
        try {
            //创建连接             java.net.URL url = new URL(callUrl);
            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
            connection.setDoOutput(true);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setInstanceFollowRedirects(true);
            connection.setRequestMethod("POST");                                // 设置请求方式             connection.setRequestProperty("Accept", "application/json");        // 设置接收数据的格式             connection.setRequestProperty("Content-Type", "application/json");  // 设置发送数据的格式             LOGGER.info("connection.connect() is begin!");
            connection.connect();
            LOGGER.info("connection.connect() was success!");

            //发送数据             OutputStreamWriter out = new OutputStreamWriter(connection.getOutputStream(), "UTF-8");
            out.append(JSON.toJSONString(params));
            out.flush();
            out.close();

            int code = connection.getResponseCode();
            InputStream is;
            if (code == 200) {
                LOGGER.info("远程连接成功!");
                is = connection.getInputStream();
            } else {
                LOGGER.info("远程连接失败,code = " + code);
                is = connection.getErrorStream();
                throw new BizException("[callPy]:远程连接Python服务器失败");
// return ;             }

            // System.out.println(System.currentTimeMillis()); 
            // 读取响应             int length = connection.getContentLength();// 获取长度             if (length != -1) {
                byte[] data = new byte[length];
                byte[] temp = new byte[512];
                int readLen = 0;
                int destPos = 0;
                while ((readLen = is.read(temp)) > 0) {
                    System.arraycopy(temp, 0, data, destPos, readLen);
                    destPos += readLen;
                }
                String result = new String(data, "UTF-8"); // utf-8编码                 Map<String, Object> ret = JSON.parseObject(result, new TypeReference<Map<String, Object>>() {
                });
                // System.out.println("ret = " + ret);                 resCode = ret.get("resCode").toString();
                resInfo = ret.get("resInfo").toString();
                // System.out.println("resInfo = " + resInfo); 
                if (!resCode.equals("1")) {
                    throw new Exception("[callPy]的resCode不为1!");
                }

            }
            // System.out.println(System.currentTimeMillis());         } catch (Exception e) {
            throw new BizException("[rpcCallPython]:远程调用Python服务失败,执行的选项名称是:" + params.get("option"));
        }

        return resInfo;
    }

Python端使用Flask框架:

init.make_app()

APP_NAME = fund-algorithm-labs-service
app = Flask(APP_NAME)

app = init.make_app()



@app.route('/check.txt')
def check():
    path = os.path.join(init.getProjectDir(), 'check.txt')
    f = open(path)
    return f.read()

@app.route('/callPy/', methods=['POST'])
def callPy():
    try:
        # 服务器上的临时存储目录
        os.chdir("/data/tmpfile")
        # os.chdir("D:\\csv")

        # if not request.json or 'option' not in request.json or 'fileName' not in request.json:
        # abort(400, message="the vital param [option] or [fileName] doesn't exist")
        returnVal = {'resCode': '0', 'resInfo': '', 'funName': 'none'}
        option = request.json['option']

        fileName = request.json['fileName']
        modelId = request.json['modelId']
        TMP_FILE_LOCAL = '/data/tmpfile/'
        # TMP_FILE_LOCAL = 'D:\\csv\\'
        TMP_FILE_REMOTE = "/rms/datamodel/" + str(modelId) + "/datafile/"

        logger.info("callPy starting and option is " + option)
        if option == "dropna":
            logger.info("dropna enter")
            axis = request.json['axis']
            subset = request.json['subset']
            na_deal.dropna(pd.read_csv(fileName), int(axis), [subset])

            file_local = TMP_FILE_LOCAL + "data_dropna.csv"
            file_remote = TMP_FILE_REMOTE + "data_dropna.csv"
            ftpOper.ftp_main(0, file_local, file_remote)

            returnVal['funName'] = 'dropna'
。。。。。。
        else:
            # abort(400, message="the vital param [option] have no choice!")
            returnVal['resCode'] = '-1'
            returnVal['resInfo'] = 'the param[option] have no choice!'
            return json.dumps(returnVal)

        returnVal['resCode'] = '1'
        # returnVal['resInfo'] = 'call success'
        return json.dumps(returnVal)

    except Exception as e:
        returnVal['resCode'] = '-1'
        returnVal['resInfo'] = 'call /callPy/ raise error: ' + e
        logger.error('call /callPy/ raise error: %s', e)
        return json.dumps(returnVal)


if __name__ == "__main__":
    # 官方启动方式参见:http://flask.pocoo.org/docs/0.12/quickstart/#a-minimal-application
    # port=8383 debug=True
    app.run(host="0.0.0.0")

Python项目架构:

《风控算法库(下)》
《风控算法库(下)》

最终构造成Java-FTP/DB-Python的三位一体协调结构。

附录:

Python与FTP:

import logging
from ftplib import FTP
from zhaogang import config

ftp = FTP()
logger = logging.getLogger('root')


def ftp_main(oper, localPath, remotePath):
    ftpIp = config.get("others", "ftp.server.ip")
    ftpAccount = config.get("others", "ftp.server.user.name")
    ftpPassword = config.get("others", "ftp.server.user.password")
    ftp.connect(host=ftpIp, port=21, timeout=30)
    ftp.login(user=ftpAccount, passwd=ftpPassword)

    if oper == 0:
        ftp_upload(localPath, remotePath)
    elif oper == 1:
        ftp_download(localPath, remotePath)

    ftp.quit()


def ftp_upload(file_local, file_remote):
    logger.info("----------ftp_upload start----------")

    # # 获取目录下的文件,获得目录列表
    # fileList = ftp.nlst()
    # for name in fileList:
    # print(name)

    # file_remote = "standardized.csv"
    # file_local = "D:\\J2Python\\standardized.csv"
    fp = open(file_local, "rb")
    ftp.storbinary("STOR " + file_remote, fp=fp, blocksize=1024)

    fp.close()
    logger.info("----------ftp_upload start----------")


def ftp_download(file_local, file_remote):
    logger.info("----------ftp_download start----------")

    fp = open(file_local, "wb")
    ftp.retrbinary("RETR %s" % file_remote, fp.write, blocksize=1024)
    fp.close()
    logger.info("----------ftp_download end----------")

Python与MySQL:

try:
    dict = {'model_id': tModelIdList, 'data_range_head': tRangeHeadList, 'data_range_val': tRangeValueList,
            'data_score_head': tScoreHeadList,
            'data_score_val': tScoreValueList, 'gmt_create': tGmtCreateList}
    df = pd.DataFrame(data=dict)
    mysql.saveDataFrame2db(df)
except Exception as e:
    logger.info('mysql.saveDataFrame2db somethin worry')
    logger.error(e)

import logging.config

import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine

logger = logging.getLogger('root')
# # dev
# DB_USER = "#"
# DB_PASS = "#"
# DB_HOST = "#"
# DB_PORT = "#"
# DB_NAME = "#"



# 操作 MySQL 报错 ImportError: No module named 'MySQLdb'
# 执行 pip install MySQL-python 却报错 ImportError: No module named 'ConfigParser'。查了一下,这是由于 MySQL-python 不支持 Python 3
# 于是 找到了一个替代—— PyMySQL。执行 pip install PyMySQL,将数据库连接改为 mysql+pymysql://username:password@server/db
# connect_info = 'mysql+mysqldb://{}:{}@{}:{}/{}?charset=utf8'.format(DB_USER, DB_PASS, DB_HOST, DB_PORT,
# DB_NAME)
connect_info = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(DB_USER, DB_PASS, DB_HOST, DB_PORT,
                                                                    DB_NAME)

def saveDataFrame2db(dataframe):
    try:
        logger.info("----------save dataFrame to DB is begin----------")

        # 将数据写入mysql的数据库,但需要先通过sqlalchemy.create_engine建立连接,且字符编码设置为utf8,否则有些latin字符不能处理
        con = create_engine(connect_info)

        # pd.io.sql.to_sql(thedataframe, 'tablename', yconnect, schema='databasename', if_exists='append')
        dataframe.to_sql(name='model_scorecard',
                         con=con,
                         if_exists='append',
                         index=False,
                         dtype={
                             'model_id': sqlalchemy.types.BIGINT(),
                             'data_range_head': sqlalchemy.types.VARCHAR(),
                             'data_range_val': sqlalchemy.types.VARCHAR(),
                             'data_score_head': sqlalchemy.types.VARCHAR(),
                             'data_score_val': sqlalchemy.types.VARCHAR(),
                             'gmt_create': sqlalchemy.types.DateTime()
                         })

        logger.info("----------save dataFrame to DB was end----------")
    except Exception as e:
        logger.info('save dataFrame to DB raise a error')
        logger.error(e)

def readDataFrame2db(sql):
    try:
        # 1. 用sqlalchemy构建数据库链接engine
        con = create_engine(connect_info)

        dataframe = pd.read_sql(sql=sql, con=con)

        return dataframe

    except Exception as e:
        logger.error(e)

算法流程:

基于《周志华:机器学习西瓜书》

重复值处理:

  1. drop_duplicates

零元处理:

  1. 去掉0占比过多的维度
  2. 去掉0占比过多的样本
  3. 平均值代替0

异常值处理:

  1. 删除异常值

相关性处理:

  1. del_corrr

归一化处理:

  1. MinMaxScaler

y生成:

  1. 人工/专家生成y_行业数据
  2. 机器生成y

缺失值处理:

  1. dropna
  2. fillna

异常值检测:

  1. 单维度_箱线图
  2. 单维度_3α
  3. 多维度_Isolation Forest

标准化处理:

  1. StandardScaler

连续变量分组:

  1. 模型分组_Ctree
  2. 等距分组

WOE变换:

  1. WOE变换

参数调节:

  1. GridSearchCV

监督学习:

  1. Logistic Regression

综合评价模型:

  1. 熵值法
  2. PCA

x生成-特征提取:

  1. 人工/专家生成特征_业务
  2. 机器生成特征

特征选择:

  1. 专家特征选择
  2. 机器特征选择_RandomForestClassifier

模型评估与选择:

  1. 泛化能力_错误率与精度
  2. 泛化能力_ROC与AUC
  3. 行业

    原文作者:公瑾
    原文地址: https://zhuanlan.zhihu.com/p/53154096
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞