Avro2TF
Deep learning has been successfully applied to multiple AI systems at LinkedIn that are related to recommendation and search. One of the important lessons that we have learned during this journey is to provide good deep learning platforms that help our modeling engineers become more efficient and productive. Avro2TF is part of this effort to reduce the complexity of data processing and improving velocity of advanced modeling. In addition to advanced deep learning techniques, LinkedIn has been at the forefront of Machine Learning innovation for years now. We have many different ML approaches that consume large amount of data everyday. Efficiency and accuracy are the most important measurements for these approaches. To effectively support deep learning at LinkedIn, we need to first address the data processing issues. Most of the datasets used by our ML algorithms (e.g., LinkedInβs large scale personalization engine Photon-ML) are in Avro format. Each record in a Avro dataset is essentially a sparse vector, and can be easily consumed by most of the modern classifiers. However, the format cannot be directly used by TensorFlow -- the leading deep learning package. The main blocker is that the sparse vector is not in the same format as Tensor. We believe that this is not only a LinkedIn problem, many companies have vast amount of ML data in similar sparse vector format, and Tensor format is still relatively new to many companies. Avro2TF bridges this gap by providing scalable Spark based transformation and extensions mechanism to efficiently convert the data into TF records that can be readily consumed by TensorFlow. With this technology, developers can improve their productivity by focusing on model building rather than data conversion.
Compatibility Notes
It is recommended to run Avro2TF with Scala 2.11 and above.
Build
How to build
Avro2TF is built using Gradle. To build Avro2TF, run:
./gradlew build
This will automatically run tests, if want to build without running tests, run:
./gradlew build -x test
Usage
Avro2TF reads raw user input data with any format supported by Spark to generate Avro or TFRecord tensorized training data.
Avro2TF exposes to users a JSON or HOCON config to specify the tensors that a modeler wants to use in training. For each tensor, a user should specify two kinds of information:
- What existing features are used to construct the tensor.
- The expected name, dtype, and shape of the tensor.
Use Avro2TF in a gradle project
- Specify the repositories hosting Avro2TF job.
repositories {
maven {
url "https://dl.bintray.com/linkedin/maven"
}
}
- Declare Avro2TF dependency
dependencies {
compile 'com.linkedin.avro2tf:avro2tf:2.0.1'
}
Input Data Requirements
We support all data format that Spark can read, including the most popular formats Avro and ORC. For categorical/sparse features, we require them represented in NTV (name-term-value) format.
The type of a single categorical/sparse feature will have a type of Array[NTV]. We treat Array[NTV] as a special primitive type. Thus, the supported input primitive types include:
- long
- float
- double
- String
- bytes (for multimedia data such as image, audio, and video)
- boolean
- Array[NTV]
Arrays of primitive types with any rank are supported.
Supported Data Types of Output Tensor
In Avro2TF, the supported data types (dtype) of output tensors are: int, long, float, double, string, boolean, bytes. A sparseVector tensor has two fields: indices and values.
In the below table, we list the corresponding data types after loading the serialized tensors in Avro2TF to TensorFlow. In TensorFlow, bytes is represented using tf.string and users can later decode it to images, audios, etc. The sparseVector type will be converted to tf.SparseTensor.
Data type of serialized tensor in Avro2TF | Data type of deserialized tensor in TensorFlow |
---|---|
int | tf.int32 |
long | tf.int64 |
float | tf.float32 |
double | tf.float64 |
String | tf.string |
bytes | tf.string |
boolean | tf.bool |
Avro2TF Configuration
The below table shows all the available configuration names and their detailed explanation.
Name | Required? | Default Value | Meaning |
---|---|---|---|
features | yes | / | A list of tensor configs. Each config includes inputFeatureInfo and outputTensorInfo. Must not be empty. |
labels | no | None | A list of tensor configs. |
inputFeatureInfo | no | {βcolumnExprβ : ${outputTensorInfo.name}} | Specify input features used to construct a tensor. |
columnExpr | no | None | Spark SQL column expression. If both columnExpr and columnConfig do not exist, columnExpr will have a default {βcolumnExprβ : ${outputTensorInfo.name}} value. |
columnConfig | no | None | Only for extracting NTV features. A user should not specify both columnExpr and columnConfig. |
transformConfig | no | None | Specify transformations applied on input features, available transformers: hashing and tokenization. |
hashInfo | no | None | Specify hashing related information. |
hashBucketSize | yes | / | The bucket size of the hash function. |
numHashFunctions | no | 1 | The number of hash functions used (only salt will be different). |
combiner | no | SUM | How to merge the values of repeated indices in a sparse vector (AVG, SUM, MAX). |
tokenization | no | None | Tokenization related configs. |
removeStopWords | no | false | Whether to remove stop words during tokenization. |
outputTensorInfo | yes | / | Info on expected output tensor. |
name | yes | / | Name of output tensor. |
dtype | yes | / | The expected dtype of output tensor. |
shape | no | [] | The expected shape of output tensor, examples: []: scalar; [-1] : 1D array of any length; [6]: 1D array with size 6; [2, 3]: matrix with 2 rows and 3 columns. |
isSparse | no | false | To indicate whether the output tensor is sparse tensor. |
isDocumentFeature | no | true | To indicate whether the output tensor is a document feature or query feature. |
Avro2TF Config Example
Suppose your input data has the following schema
{
"type": "record",
"name": "topLevelRecord",
"fields": [
{
"type": [
"int",
"null"
],
"name": "label"
},
{
"type": [
"string",
"null"
],
"name": "review"
},
{
"type": [
{
"type": "array",
"items": [
{
"type": "record",
"name": "words",
"namespace": "topLevelRecord",
"fields": [
{
"type": [
"string",
"null"
],
"name": "name"
},
{
"type": [
"string",
"null"
],
"name": "term"
},
{
"type": [
"float",
"null"
],
"name": "value"
}
]
},
"null"
]
},
"null"
],
"name": "words"
},
{
"type": [
{
"type": "array",
"items": [
{
"type": "record",
"name": "wideFeatures",
"namespace": "topLevelRecord",
"fields": [
{
"type": [
"string",
"null"
],
"name": "name"
},
{
"type": [
"string",
"null"
],
"name": "term"
},
{
"type": [
"float",
"null"
],
"name": "value"
}
]
},
"null"
]
},
"null"
],
"name": "wideFeatures"
}
]
}
An example Avro2TF config will looks like this:
{
"features": [
{
"inputFeatureInfo": {
"columnExpr": "words.term[0]"
},
"outputTensorInfo": {
"name": "firstWord",
"dtype": "long",
"shape": []
}
},
{
"inputFeatureInfo": {
"columnExpr": "review",
"transformConfig": {
"tokenization": {
"removeStopWords": true
}
}
},
"outputTensorInfo": {
"name": "wordSeq",
"dtype": "long",
"shape": [
-1
]
}
},
{
"inputFeatureInfo": {
"columnExpr": "review",
"transformConfig": {
"hashInfo": {
"hashBucketSize": 1000,
"numHashFunctions": 4
}
}
},
"outputTensorInfo": {
"name": "wordSeq_hashed",
"dtype": "long",
"shape": [
4
]
}
},
{
"inputFeatureInfo": {
"columnExpr": "words.term"
},
"outputTensorInfo": {
"name": "words_term",
"dtype": "long",
"shape": [
-1
]
}
},
{
"inputFeatureInfo": {
"columnConfig": {
"words": {
"whitelist": [
"*"
]
},
"wideFeatures": {
"blacklist": [
"wideFeatures"
]
}
}
},
"outputTensorInfo": {
"name": "words_wideFeatures_sparse",
"dtype": "float",
"shape": [],
"isSparse" : true
}
},
{
"inputFeatureInfo": {
"columnConfig": {
"words": {
"whitelist": [
"*"
]
},
"wideFeatures": {
"blacklist": [
"wideFeatures"
]
}
}
},
"outputTensorInfo": {
"name": "words_wideFeatures_dense",
"dtype": "float",
"shape": [],
"isSparse" : false
}
},
{
"inputFeatureInfo": {
"columnConfig": {
"words": {
"whitelist": [
"*"
]
},
"wideFeatures": {
"blacklist": [
"wideFeatures"
]
}
},
"transformConfig": {
"hashInfo": {
"hashBucketSize": 100,
"combiner": "AVG"
}
}
},
"outputTensorInfo": {
"name": "words_wideFeatures_hash",
"dtype": "float",
"shape": [],
"isSparse" : true
}
}
],
"labels": [
{
"inputFeatureInfo": {
"columnExpr": "label"
},
"outputTensorInfo": {
"name": "response",
"dtype": "int",
"shape": []
}
}
]
}
Avro2TF Job Parameters
--input-paths Required. A list of comma separated paths for input.
--working-dir Required. The path to working directory where the output should be saved.
--input-date-range Optional. The input date range in the format of yyyymmdd-yyyymmdd.
--input-days-range Optional. The input days range in the format of startOffest-endOffset.
--num-output-files Optional. The number of output files with the default set to -1.
--min-parts Optional. The minimum number of partitions for input data; if below this threshold, repartition will be triggered.
--shuffle Optional. Whether to shuffle the converted training data with the default set to true.
--external-feature-list-path Optional. The path to user supplied feature mapping files.
--avro2tf-config-path Required. The Avro2TF configuration in JSON format.
--execution-mode Optional. Whether to prepare training, validation, or test data.
--enable-cache Optional. Whether to cache the intermediate Spark DataFrame result with default set to false.
--skip-conversion Optional. Whether to skip the conversion step with default set to false.
--output-format Optional. The output format of tensorized data, e.g. Avro or TFRecord.
--extra-columns-to-keep Optional. A list of comma separated column names to specify extra columns to keep.
--tensors-sharing-feature-lists Optional. Groups of output tensor names separated by semicolon; tensors in the same group are separated by comma. Tensors within the same group share the same feature list.
--enable-filter-zero Optional. Whether to enable filter zero for all sparse vector output. Default is false.
Avro2TF Examples
Please take a look at our Avro2TF Official Tutorial! :)