引言
上一篇文章TensorFlow Estimator 模型从训练到部署,介绍了使用了Estimator API模型的训练和部署流程,并通过Python客户端请求TensorFlow serving服务。
这篇文章算是做一些补充,上一篇的数据集使用的是公开数据集,而在实际应用中都需要我们自己构建特征数据集,所以应该考虑什么保存什么格式数据能够加快训练。相比于csv格式数据,我们应该倾向于选择tfrecords格式数据。
另外上一篇中只使用了Python客户端请求服务,而实际的线上业务基本都不会考虑Python,所以需要实现多种语言去请求TensorFlow服务。
一、tfrecords格式数据介绍
tfrecords 格式是TensorFlow原生支持的一种高效保存特征数据的格式,使用该格式保存数据能够加快数据的读取。
该格式通过protocol buffer定义,protocol buffer支持跨语言的数据结构序列化,通常用于构建grpc服务,用于定义服务请求和响应的数据结构。
这里TensorFlow使用proto文件定义tfrecords数据格式,tfrecords文件的每一条记录都是一个序列化的example结构。
message BytesList { repeated bytes value = 1; } message FloatList { repeated float value = 1 [packed = true]; } message Int64List { repeated int64 value = 1 [packed = true]; message Feature { oneof kind { BytesList bytes_list = 1; FloatList float_list = 2; Int64List int64_list = 3; } }; message Features { map<string, Feature> feature = 1; }; message FeatureList { repeated Feature feature = 1; }; message FeatureLists { map<string, FeatureList> feature_list = 1; }; message Example { Features features = 1; };
上面是example数据结构的proto文件,example格式通过嵌套定义,这样看起来可能不太直观,后面会通过代码进行构建tfrecords文件。
proto文件定义了tf.Example的数据结构,通过不同的protoc命令生成对应语言的.pb文件之后,便可以根据.pb文件生成tfrecords格式文件了。
通常构建tfrecords格式数据是通过python API,但是在大数据场景下需要spark处理原始特征数据并保存下来,通过spark直接生成tfrecords格式相比于通过python将spark中间结果转换为tfrecords更人性化。
所幸TensorFlow提供了相关的支持项目,其链接在这,spark-tensorflow-connector,通过mvn构建jar包便可直接使用。或者可以访问https://mvnrepository.com/artifact/org.tensorflow/spark-tensorflow-connector获取构建好的jar包。
二、 spark生成tfrecords文件
- 首先,启动spark时添加依赖,使用–jars 参数添加jar包
# 1. scala
spark-shell --jars spark-tensorflow-connector_2.11-1.14.0.jar
# 2. pyspark
pyspark --jars spark-tensorflow-connector_2.11-1.14.0.jar
- 生成tfrecords
- DataFrame格式
TensorFlow提供的结构支持将spark DataFrame结构数据直接保存为tfrecords格式数据。
// save to tfrecords train.write.mode("overwrite").format("tfrecords").option("recordType", "Example").save(savedPath)
- rdd格式
如果原始数据是rdd格式,在保存时要先将rdd转换为DataFrame格式再进行保存。
scala
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{ DataFrame, Row }
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._
// TFrecord schema fieldTypeMap = map(...)
val trainRdd = train.rdd.cache()
var fields = List[StructField]()
for ((field, curType) <- ) {
if (curType == "int") {
fields = fields :+ StructField(field, IntegerType)
} else if (curType == "long") {
fields = fields :+ StructField(field, LongType)
} else if (curType == "str") {
fields = fields :+ StructField(field, StringType)
} else if (curType == "double") {
fields = fields :+ StructField(field, DoubleType)
} else if (curType == "double_list") {
fields = fields :+ StructField(field, ArrayType(DoubleType))
} else if (curType == "str_list") {
fields = fields :+ StructField(field, ArrayType(StringType))
}
}
val schema = StructType(fields)
val train_df = spark.createDataFrame(trainRdd, schema)
train_df.write.mode("overwrite").format("tfrecords").option("recordType", "Example").save(savedPath)
python
from pyspark.sql.types import *
path = "test-output.tfrecord"
fields = [StructField("id", IntegerType()), StructField("IntegerCol", IntegerType()),
StructField("LongCol", LongType()), StructField("FloatCol", FloatType()),
StructField("DoubleCol", DoubleType()), StructField("VectorCol", ArrayType(DoubleType(), True)),
StructField("StringCol", StringType())]
schema = StructType(fields)
test_rows = [[11, 1, 23, 10.0, 14.0, [1.0, 2.0], "r1"], [21, 2, 24, 12.0, 15.0, [2.0, 2.0], "r2"]]
rdd = spark.sparkContext.parallelize(test_rows)
df = spark.createDataFrame(rdd, schema)
df.write.format("tfrecords").option("recordType", "Example").save(path)
三、Python生成tf.Example
使用tf.dataset API训练的模型,在模型部署之后,在请求TensorFlow serving服务时还需要客户端构建同样的tf.Example格式数据构建grpc请求。
本节将展示如何通过python客户端生成tf.Example数据,并请求TensorFlow服务。
# python 伪码
feature_dict = {}
serialized_strings = []
for i in range(sample_len): # 样本数
feature_dict = {}
for j in range(feat_len): # 特征数
name = # 特征名
val = # 特征值
# str 类型特征
feature_dict[name] = tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(val, encoding='utf-8')]))
# int 类型特征
feature_dict[name] = tf.train.Feature(int64_list=tf.train.Int64List(value=[val]))
# float 类型特征
feature_dict[name] = tf.train.Feature(float_list=tf.train.FloatList(value=[val]))
example_proto = tf.train.Example(features=tf.train.Features(feature=feature_dict))
serialized = example_proto.SerializeToString()
serialized_strings.append(serialized
request = predict_pb2.PredictRequest()
request.inputs['examples'].CopyFrom(tf.contrib.util.make_tensor_proto(data, shape=[size]))
四、Golang生成tf.Example
Python构建example格式数据很简单,也有直接生成的pb文件可以使用,而其他语言要想调用TensorFlow服务需要先生成本语言的pb文件。
生成pb文件的命令如下所示:
mkdir -p tensorflow tensorflow_serving
protoc -I generate_golang_files/ generate_golang_files/*.proto --go_out=plugins=grpc:tensorflow_serving
protoc -I generate_golang_files/ generate_golang_files/tensorflow/core/framework/* --go_out=plugins=grpc:.
protoc -I generate_golang_files/ generate_golang_files/tensorflow/core/example/*.proto --go_out=plugins=grpc:.
plugins=grpc: 后面是pb文件的指定输出地址
下面是Golang生成tf.Example格式的代码
import (
"encoding/json"
"fmt"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
"io/ioutil"
"math"
"reflect"
"strings"
"sync"
example "tensorflow/core/example"
tf_core_framework "tensorflow/core/framework"
pb "tensorflow_serving"
"time"
)
type WideDeepFeature struct {
...
}
var marshalBytes [][]byte
for i := 0; i < len(featArr); i++ {
singleWDFeat := WideDeepFeature{}
err := json.Unmarshal([]byte(featArr[i]), &singleWDFeat)
if err != nil {
fmt.Println("json unmarshal error.", err)
}
featureMap := make(map[string]*example.Feature)
value := reflect.ValueOf(singleWDFeat)
typeOfFeat := reflect.TypeOf(singleWDFeat)
fieldsNum := value.NumField()
for i := 0; i < fieldsNum; i++ {
field := value.Field(i)
fieldInfo := typeOfFeat.Field(i)
featKey := fieldInfo.Tag.Get("json")
switch field.Kind() {
case reflect.String:
featVal := []byte(field.Interface().(string))
featureMap[featKey] = &example.Feature{
Kind: &example.Feature_BytesList{
BytesList: &example.BytesList{
Value: [][]byte{featVal}}}}
case reflect.Int64:
featVal := field.Interface().(int64)
featureMap[featKey] = &example.Feature{
Kind: &example.Feature_Int64List{
Int64List: &example.Int64List{
Value: []int64{featVal}}}}
case reflect.Int32:
featVal := int64(field.Interface().(int32))
featureMap[featKey] = &example.Feature{
Kind: &example.Feature_Int64List{
Int64List: &example.Int64List{
Value: []int64{featVal}}}}
case reflect.Float32:
featVal := field.Interface().(float32)
featureMap[featKey] = &example.Feature{
Kind: &example.Feature_FloatList{
FloatList: &example.FloatList{
Value: []float32{featVal}}}}
case reflect.Slice:
featVal := field.Interface().([]float32)
featureMap[featKey] = &example.Feature{
Kind: &example.Feature_FloatList{
FloatList: &example.FloatList{
Value: featVal}}}
}
}
features := example.Features{Feature: featureMap}
tmpExample := example.Example{Features: &features}
exampleStr, _ := proto.Marshal(&tmpExample)
marshalBytes = append(marshalBytes, []byte(exampleStr))
}
// create tensor tensorProto := tf_core_framework.TensorProto{
Dtype: tf_core_framework.DataType_DT_STRING,
TensorShape: &tf_core_framework.TensorShapeProto{
Dim: []*tf_core_framework.TensorShapeProto_Dim{
&tf_core_framework.TensorShapeProto_Dim{
Size: int64(len(marshalBytes)),
},
},
},
StringVal: marshalBytes,
}
// request request := &pb.PredictRequest{
ModelSpec: &pb.ModelSpec{
Name: "sm_model",
SignatureName: "predict",
//Version: &google_protobuf.Int64Value{ // Value: int64(1), //}, },
Inputs: map[string]*tf_core_framework.TensorProto{
"examples": &tensorProto,
},
}
// predict resp, err := client.Predict(ctx, request)
五、C++生成tf.Example
c++ 生成tf.Example格式数据的伪码如下 :
tensorflow::Example example;
tensorflow::Features* features = example.mutable_features();
google::protobuf::Map<std::string, tensorflow::Feature >& feature_map = *features->mutable_feature();
tensorflow::TensorProto example_proto;
for () { // 样本数 for () { // 特征数 float value ;// 特征值 std::string fname = ;// 特征名 tensorflow::Feature _feature; // create a Feature tensorflow::FloatList* fl = _feature.mutable_float_list();
fl->add_value(value);
feature_map[fname] = _feature;
}
std::string example_str = "";
example.SerializeToString(&example_str);
example_proto.set_dtype(tensorflow::DataType::DT_STRING);
example_proto.add_string_val(example_str);
}
example_proto.mutable_tensor_shape()->add_dim()->set_size(PREDICT_CNT);
// request tensorflow::serving::PredictRequest* request_pb = static_cast<tensorflow::serving::PredictRequest*>(request);
request_pb->mutable_model_spec()->set_name("");
google::protobuf::Map<std::string, tensorflow::TensorProto>& inputs = *request_pb->mutable_inputs();
inputs["examples"] = example_proto;
总结
本文通过多种语言构建tf.Example格式数据,能够在生成模型训练数据和在线模型预测的场景中得到应用,其核心还是proto文件定义的数据结构,搭配代码进行理解之后希望能对读者有所启发。