Spark、Python、Golang、C++生成TFrecord格式数据

引言

上一篇文章TensorFlow Estimator 模型从训练到部署,介绍了使用了Estimator API模型的训练和部署流程,并通过Python客户端请求TensorFlow serving服务。

这篇文章算是做一些补充,上一篇的数据集使用的是公开数据集,而在实际应用中都需要我们自己构建特征数据集,所以应该考虑什么保存什么格式数据能够加快训练。相比于csv格式数据,我们应该倾向于选择tfrecords格式数据。

另外上一篇中只使用了Python客户端请求服务,而实际的线上业务基本都不会考虑Python,所以需要实现多种语言去请求TensorFlow服务。

一、tfrecords格式数据介绍

tfrecords 格式是TensorFlow原生支持的一种高效保存特征数据的格式,使用该格式保存数据能够加快数据的读取。

该格式通过protocol buffer定义,protocol buffer支持跨语言的数据结构序列化,通常用于构建grpc服务,用于定义服务请求和响应的数据结构。

这里TensorFlow使用proto文件定义tfrecords数据格式,tfrecords文件的每一条记录都是一个序列化的example结构。

message BytesList {   repeated bytes value = 1; } message FloatList {   repeated float value = 1 [packed = true]; } message Int64List {   repeated int64 value = 1 [packed = true];  message Feature {   oneof kind {     BytesList bytes_list = 1;     FloatList float_list = 2;     Int64List int64_list = 3;   } };  message Features {   map<string, Feature> feature = 1; };  message FeatureList {   repeated Feature feature = 1; };  message FeatureLists {   map<string, FeatureList> feature_list = 1; };  message Example {   Features features = 1; };

上面是example数据结构的proto文件,example格式通过嵌套定义,这样看起来可能不太直观,后面会通过代码进行构建tfrecords文件。

proto文件定义了tf.Example的数据结构,通过不同的protoc命令生成对应语言的.pb文件之后,便可以根据.pb文件生成tfrecords格式文件了。

通常构建tfrecords格式数据是通过python API,但是在大数据场景下需要spark处理原始特征数据并保存下来,通过spark直接生成tfrecords格式相比于通过python将spark中间结果转换为tfrecords更人性化。

所幸TensorFlow提供了相关的支持项目,其链接在这,spark-tensorflow-connector,通过mvn构建jar包便可直接使用。或者可以访问https://mvnrepository.com/artifact/org.tensorflow/spark-tensorflow-connector获取构建好的jar包。

二、 spark生成tfrecords文件

  • 首先,启动spark时添加依赖,使用–jars 参数添加jar包
# 1. scala
spark-shell --jars spark-tensorflow-connector_2.11-1.14.0.jar

# 2. pyspark
pyspark --jars spark-tensorflow-connector_2.11-1.14.0.jar
  • 生成tfrecords
  • DataFrame格式

TensorFlow提供的结构支持将spark DataFrame结构数据直接保存为tfrecords格式数据。

// save to tfrecords train.write.mode("overwrite").format("tfrecords").option("recordType", "Example").save(savedPath)
  1. rdd格式

如果原始数据是rdd格式,在保存时要先将rdd转换为DataFrame格式再进行保存。

scala

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{ DataFrame, Row }
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._

// TFrecord schema fieldTypeMap = map(...)
val trainRdd = train.rdd.cache()
var fields = List[StructField]()
for ((field, curType) <- ) {
    if (curType == "int") {
        fields = fields :+ StructField(field, IntegerType)
    } else if (curType == "long") {
        fields = fields :+ StructField(field, LongType)
    } else if (curType == "str") {
        fields = fields :+ StructField(field, StringType)
    } else if (curType == "double") {
        fields = fields :+ StructField(field, DoubleType)
    } else if (curType == "double_list") {
        fields = fields :+ StructField(field, ArrayType(DoubleType))
    } else if (curType == "str_list") {
        fields = fields :+ StructField(field, ArrayType(StringType))
    }
}
val schema = StructType(fields)
val train_df = spark.createDataFrame(trainRdd, schema)
train_df.write.mode("overwrite").format("tfrecords").option("recordType", "Example").save(savedPath)

python

from pyspark.sql.types import *

path = "test-output.tfrecord"

fields = [StructField("id", IntegerType()), StructField("IntegerCol", IntegerType()),
          StructField("LongCol", LongType()), StructField("FloatCol", FloatType()),
          StructField("DoubleCol", DoubleType()), StructField("VectorCol", ArrayType(DoubleType(), True)),
          StructField("StringCol", StringType())]
schema = StructType(fields)
test_rows = [[11, 1, 23, 10.0, 14.0, [1.0, 2.0], "r1"], [21, 2, 24, 12.0, 15.0, [2.0, 2.0], "r2"]]
rdd = spark.sparkContext.parallelize(test_rows)
df = spark.createDataFrame(rdd, schema)
df.write.format("tfrecords").option("recordType", "Example").save(path)

三、Python生成tf.Example

使用tf.dataset API训练的模型,在模型部署之后,在请求TensorFlow serving服务时还需要客户端构建同样的tf.Example格式数据构建grpc请求。

本节将展示如何通过python客户端生成tf.Example数据,并请求TensorFlow服务。

# python 伪码 
feature_dict = {}
serialized_strings = []

for i in range(sample_len): # 样本数
    feature_dict = {}
    for j in range(feat_len): # 特征数
        name =  # 特征名
        val  =  # 特征值
        # str 类型特征
        feature_dict[name] = tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(val, encoding='utf-8')]))
        # int 类型特征
        feature_dict[name] = tf.train.Feature(int64_list=tf.train.Int64List(value=[val]))
        # float 类型特征
        feature_dict[name] = tf.train.Feature(float_list=tf.train.FloatList(value=[val]))
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature_dict))
    serialized = example_proto.SerializeToString()
    serialized_strings.append(serialized
request = predict_pb2.PredictRequest()
request.inputs['examples'].CopyFrom(tf.contrib.util.make_tensor_proto(data, shape=[size]))

四、Golang生成tf.Example

Python构建example格式数据很简单,也有直接生成的pb文件可以使用,而其他语言要想调用TensorFlow服务需要先生成本语言的pb文件。

生成pb文件的命令如下所示:

mkdir -p tensorflow tensorflow_serving
protoc -I generate_golang_files/ generate_golang_files/*.proto --go_out=plugins=grpc:tensorflow_serving
protoc -I generate_golang_files/ generate_golang_files/tensorflow/core/framework/* --go_out=plugins=grpc:.
protoc -I generate_golang_files/ generate_golang_files/tensorflow/core/example/*.proto --go_out=plugins=grpc:.

plugins=grpc: 后面是pb文件的指定输出地址

下面是Golang生成tf.Example格式的代码

import (
    "encoding/json"
    "fmt"
    "github.com/golang/protobuf/proto"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "io/ioutil"
    "math"
    "reflect"
    "strings"
    "sync"
    example "tensorflow/core/example"
    tf_core_framework "tensorflow/core/framework"
    pb "tensorflow_serving"
    "time"
)

type WideDeepFeature struct {
    ...
}

var marshalBytes [][]byte
for i := 0; i < len(featArr); i++ {
    singleWDFeat := WideDeepFeature{}
    err := json.Unmarshal([]byte(featArr[i]), &singleWDFeat)
    if err != nil {
        fmt.Println("json unmarshal error.", err)
    }
    featureMap := make(map[string]*example.Feature)
    value := reflect.ValueOf(singleWDFeat)
    typeOfFeat := reflect.TypeOf(singleWDFeat)
    fieldsNum := value.NumField()
    for i := 0; i < fieldsNum; i++ {
        field := value.Field(i)
        fieldInfo := typeOfFeat.Field(i)
        featKey := fieldInfo.Tag.Get("json")
        switch field.Kind() {
        case reflect.String:
            featVal := []byte(field.Interface().(string))
            featureMap[featKey] = &example.Feature{
                Kind: &example.Feature_BytesList{
                    BytesList: &example.BytesList{
                        Value: [][]byte{featVal}}}}
        case reflect.Int64:
            featVal := field.Interface().(int64)
            featureMap[featKey] = &example.Feature{
                Kind: &example.Feature_Int64List{
                    Int64List: &example.Int64List{
                        Value: []int64{featVal}}}}
        case reflect.Int32:
            featVal := int64(field.Interface().(int32))
            featureMap[featKey] = &example.Feature{
                Kind: &example.Feature_Int64List{
                    Int64List: &example.Int64List{
                        Value: []int64{featVal}}}}
        case reflect.Float32:
            featVal := field.Interface().(float32)
            featureMap[featKey] = &example.Feature{
                Kind: &example.Feature_FloatList{
                    FloatList: &example.FloatList{
                        Value: []float32{featVal}}}}
        case reflect.Slice:
            featVal := field.Interface().([]float32)
            featureMap[featKey] = &example.Feature{
                Kind: &example.Feature_FloatList{
                    FloatList: &example.FloatList{
                        Value: featVal}}}
        }
    }
    features := example.Features{Feature: featureMap}
    tmpExample := example.Example{Features: &features}
    exampleStr, _ := proto.Marshal(&tmpExample)
    marshalBytes = append(marshalBytes, []byte(exampleStr))
}

// create tensor tensorProto := tf_core_framework.TensorProto{
    Dtype: tf_core_framework.DataType_DT_STRING,
    TensorShape: &tf_core_framework.TensorShapeProto{
        Dim: []*tf_core_framework.TensorShapeProto_Dim{
            &tf_core_framework.TensorShapeProto_Dim{
                Size: int64(len(marshalBytes)),
            },
        },
    },
    StringVal: marshalBytes,
}

// request request := &pb.PredictRequest{
        ModelSpec: &pb.ModelSpec{
            Name:          "sm_model",
            SignatureName: "predict",
            //Version: &google_protobuf.Int64Value{             // Value: int64(1),             //},         },
        Inputs: map[string]*tf_core_framework.TensorProto{
            "examples": &tensorProto,
        },
    }

// predict resp, err := client.Predict(ctx, request)

五、C++生成tf.Example

c++ 生成tf.Example格式数据的伪码如下 :

tensorflow::Example example;
tensorflow::Features* features = example.mutable_features();
google::protobuf::Map<std::string, tensorflow::Feature >& feature_map = *features->mutable_feature();
tensorflow::TensorProto example_proto;
for () { // 样本数     for () { // 特征数         float value  ;// 特征值         std::string fname =  ;// 特征名         tensorflow::Feature _feature; // create a Feature         tensorflow::FloatList* fl = _feature.mutable_float_list(); 
        fl->add_value(value);
        feature_map[fname] = _feature;
    }
    std::string example_str = "";
    example.SerializeToString(&example_str);
    example_proto.set_dtype(tensorflow::DataType::DT_STRING);
    example_proto.add_string_val(example_str); 
}
example_proto.mutable_tensor_shape()->add_dim()->set_size(PREDICT_CNT);

// request tensorflow::serving::PredictRequest* request_pb = static_cast<tensorflow::serving::PredictRequest*>(request);
request_pb->mutable_model_spec()->set_name("");
google::protobuf::Map<std::string, tensorflow::TensorProto>& inputs = *request_pb->mutable_inputs();
inputs["examples"] = example_proto;

总结

本文通过多种语言构建tf.Example格式数据,能够在生成模型训练数据和在线模型预测的场景中得到应用,其核心还是proto文件定义的数据结构,搭配代码进行理解之后希望能对读者有所启发。

    原文作者:yone
    原文地址: https://zhuanlan.zhihu.com/p/76498588
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞