-
Notifications
You must be signed in to change notification settings - Fork 313
Adding an MLeap Spark Transformer
This section discusses how to add MLeap support for an existing Spark transformer or a custom transformer.
Implementing MLeap support for an existing Spark transformer is a 5 step process:
- Build the core model - the actual logic of the model
- feature
- clustering
- classification
- regression
-
Build an MLeap transformer around the model - this takes in the data frame and outputs a new one with the transformed fields: Transformer
-
Implement Bundle.ML serialization for MLeap model/transformer example op serializers
-
Implement Bundle.ML serializer for the Spark model/transformer example Spark ops
-
Make sure to add your MLeap/Spark serialization Ops to the reference.conf files MLeap, Spark, Extending
-
Add tests for everything
Create a case class that implements the createTransformFunc
or transform
method of the spark transformer. The case class should have the apply
method that executes the transformer.
Naming convention: the case class should be called Model.
Code attribution: If you are borrowing the model code from Spark, make sure to attribute the source code with a @SparkCode
url annotation.
Note 1: Spark transformers that have inputs/outputs of Array[T]
should be implemented with Seq[T]
instead.
package ml.combust.mleap.core.feature
import org.apache.spark.ml.linalg.{Vector, Vectors}
/** Class for a one hot encoder model.
*
* One hot encoders are used to vectorize nominal features
* in preparation for models such as linear regression or
* logistic regression where binary and not multinomial features
* are supported in the feature vector.
*
* @param size size of the output one hot vectors
*/
case class OneHotEncoderModel(size: Int,
dropLast: Boolean = true) extends Serializable {
private val oneValue = Array(1.0)
private val emptyIndices = Array[Int]()
private val emptyValues = Array[Double]()
/** Turn a labeled feature into a one hot vector.
*
* @param label label to convert to a vector
* @return one hot vector representation of label
*/
def apply(label: Double): Vector = {
val labelInt = label.toInt
if(label != labelInt) {
throw new IllegalArgumentException(s"invalid label: $label, must be integer")
}
if(label < size) {
Vectors.sparse(size, Array(labelInt), oneValue)
} else {
Vectors.sparse(size, emptyIndices, emptyValues)
}
}
}
package ml.combust.mleap.core.feature
/** Class for string indexer model.
*
* String indexer converts a string into an integer representation.
*
* @param labels list of labels that can be indexed
*/
case class StringIndexerModel(labels: Seq[String]) extends Serializable {
private val stringToIndex: Map[String, Int] = labels.zipWithIndex.toMap
/** Convert a string into its integer representation.
*
* @param value label to index
* @return index of label
*/
def apply(value: String): Double = stringToIndex(value)
/** Create a [[ml.combust.mleap.core.feature.ReverseStringIndexerModel]] from this model.
*
* @return reverse string indexer of this string indexer
*/
def toReverse: ReverseStringIndexerModel = ReverseStringIndexerModel(labels)
}
The MLeap transformer allows you to execute your transformer on a LeapFrame. This process actually operates on something called a TransformBuilder, which is just an abstraction over the transformation process that allows you to transform over any data structure.
All transformers inherit from Transformer and must supply a unique identifier string called uid
. Transformers must also supply an implementation of the #transform
method. For simple feature transformers that have only one input and output value, you can inherit from FeatureTransformer and implement the #exec
value, which is an MLeap UserDefinedFunction. For more complex transformers, such as RandomForestClassifier, additional work will need to be done to implement the #transform
method.
package ml.combust.mleap.runtime.transformer.feature
import ml.combust.mleap.core.feature.OneHotEncoderModel
import ml.combust.mleap.runtime.function.UserDefinedFunction
import ml.combust.mleap.runtime.transformer.{FeatureTransformer, Transformer}
import scala.util.Try
/**
* Created by hollinwilkins on 5/10/16.
*/
case class OneHotEncoder(override val uid: String = Transformer.uniqueName("one_hot_encoder"),
override val inputCol: String,
override val outputCol: String,
model: OneHotEncoderModel) extends FeatureTransformer {
override val exec: UserDefinedFunction = (value: Double) => model(value)
}
package ml.combust.mleap.runtime.transformer.feature
import ml.combust.mleap.core.feature.StringIndexerModel
import ml.combust.mleap.runtime.function.UserDefinedFunction
import ml.combust.mleap.runtime.transformer.{FeatureTransformer, Transformer}
/**
* Created by hwilkins on 10/22/15.
*/
case class StringIndexer(override val uid: String = Transformer.uniqueName("string_indexer"),
override val inputCol: String,
override val outputCol: String,
model: StringIndexerModel) extends FeatureTransformer {
val exec: UserDefinedFunction = (value: Any) => model(value.toString)
}
package ml.combust.mleap.example.transformer
import ml.combust.mleap.runtime.function.UserDefinedFunction
import ml.combust.mleap.runtime.transformer.{FeatureTransformer, Transformer}
/** Simple feature transformer with multiple inputs.
*
* input1 is a double
* input2 is a string
*
* the output is double appended to the string
*/
case class ExampleTransformer(override val uid: String = Transformer.uniqueName("example"),
inputCol1: String,
inputCol2: String,
outputCol: String) extends Transformer {
override def transform[TB <: TransformBuilder[TB]](builder: TB): Try[TB] = {
builder.withOutput(outputCol, inputCol1, inputCol2) {
(v1: Double, v2: String) => s"$v2.$v1"
}
}
}
In the above two examples, we are making use of MLeap's UserDefinedFunction and implicit conversion from Scala functions. This is a powerful tool for quickly writing data frame transformations. All data types supported by MLeap are supported for use in a user defined function. MleapReflection provides the conversion mechanism for Scala to MLeap data types.
The supported types are:
Scala Type | MLeap Type | Notes |
---|---|---|
Boolean | BooleanType | |
String | StringType | |
Int | IntType | |
Long | LongType | |
Double | DoubleType | |
Seq[A] | ListType | |
linalg.Vector | TensorType | 1-dimensional Double tensor |
Any | AnyType | mostly needed internally by MLeap |
Custom |
CustomType | Bundle.ML and MLeap support custom types |
Serialization classes for MLeap transformers are located in mleap-runtime (ml.combust.mleap.bundle) and should have a name ending with Op
. Example ops are:
StringIndexerOp
LinearRegressionOp
KMeansOp
The Op extends ml.combust.bundle.op.OpNode
, that requires you to specify a Context, Transformer and Transformer Model.
Hint: if you're using intelliJ, you can quickly select members to implement using a shortcut (Command + I by default).
Note: if implementing a vanilla Spark transformer, make sure to add the opName
to ml.combust.bundle.dsl.Bundle.BuiltinOps
.
package ml.combust.mleap.bundle.ops.feature
import ml.combust.bundle.BundleContext
import ml.combust.bundle.dsl._
import ml.combust.bundle.op.{OpModel, OpNode}
import ml.combust.mleap.core.feature.OneHotEncoderModel
import ml.combust.mleap.runtime.MleapContext
import ml.combust.mleap.runtime.transformer.feature.OneHotEncoder
/**
* Created by hollinwilkins on 10/24/16.
*/
class OneHotEncoderOp extends OpNode[MleapContext, OneHotEncoder, OneHotEncoderModel] {
override val Model: OpModel[MleapContext, OneHotEncoderModel] = new OpModel[MleapContext, OneHotEncoderModel] {
override val klazz: Class[OneHotEncoderModel] = classOf[OneHotEncoderModel]
override def opName: String = Bundle.BuiltinOps.feature.one_hot_encoder
override def store(model: Model, obj: OneHotEncoderModel)
(implicit context: BundleContext[MleapContext]): Model = {
model.withAttr("size", Value.long(obj.size)).
withAttr("drop_last", Value.boolean(obj.dropLast))
}
override def load(model: Model)
(implicit context: BundleContext[MleapContext]): OneHotEncoderModel = {
OneHotEncoderModel(size = model.value("size").getLong.toInt,
dropLast = model.value("drop_last").getBoolean)
}
}
override val klazz: Class[OneHotEncoder] = classOf[OneHotEncoder]
override def name(node: OneHotEncoder): String = node.uid
override def model(node: OneHotEncoder): OneHotEncoderModel = node.model
override def load(node: Node, model: OneHotEncoderModel)
(implicit context: BundleContext[MleapContext]): OneHotEncoder = {
OneHotEncoder(uid = node.name,
inputCol = node.shape.standardInput.name,
outputCol = node.shape.standardOutput.name,
model = model)
}
override def shape(node: OneHotEncoder): Shape = Shape().withStandardIO(node.inputCol, node.outputCol)
}
package ml.combust.mleap.bundle.ops.feature
import ml.combust.bundle.BundleContext
import ml.combust.mleap.core.feature.StringIndexerModel
import ml.combust.mleap.runtime.transformer.feature.StringIndexer
import ml.combust.bundle.op.{OpModel, OpNode}
import ml.combust.bundle.dsl._
import ml.combust.mleap.runtime.MleapContext
/**
* Created by hollinwilkins on 8/22/16.
*/
class StringIndexerOp extends OpNode[MleapContext, StringIndexer, StringIndexerModel] {
override val Model: OpModel[MleapContext, StringIndexerModel] = new OpModel[MleapContext, StringIndexerModel] {
override val klazz: Class[StringIndexerModel] = classOf[StringIndexerModel]
override def opName: String = Bundle.BuiltinOps.feature.string_indexer
override def store(model: Model, obj: StringIndexerModel)
(implicit context: BundleContext[MleapContext]): Model = {
model.withAttr("labels", Value.stringList(obj.labels))
}
override def load(model: Model)
(implicit context: BundleContext[MleapContext]): StringIndexerModel = {
StringIndexerModel(labels = model.value("labels").getStringList)
}
}
override val klazz: Class[StringIndexer] = classOf[StringIndexer]
override def name(node: StringIndexer): String = node.uid
override def model(node: StringIndexer): StringIndexerModel = node.model
override def load(node: Node, model: StringIndexerModel)
(implicit context: BundleContext[MleapContext]): StringIndexer = {
StringIndexer(uid = node.name,
inputCol = node.shape.standardInput.name,
outputCol = node.shape.standardOutput.name,
model = model)
}
override def shape(node: StringIndexer): Shape = Shape().withStandardIO(node.inputCol, node.outputCol)
}
Serialization classes for vanilla Spark transformers are located in mleap-spark
and should be named the same as in mleap-runtime
.
The Op also extends OpNode, but now you have to specify specify the SparkBundleContext, Spark Transformer and Spark Transformer Model (a lot of times the same as the Spark Transformer).
Notes:
- The
opName
should be the same as the op name frommleap-runtime
serialization. - The
load
method ofModel
should just set theuid
to""
package ml.combust.mleap.bundle.ops.feature
import ml.combust.bundle.BundleContext
import ml.combust.bundle.dsl._
import ml.combust.bundle.op.{OpModel, OpNode}
import ml.combust.mleap.core.feature.OneHotEncoderModel
import ml.combust.mleap.runtime.MleapContext
import ml.combust.mleap.runtime.transformer.feature.OneHotEncoder
/**
* Created by hollinwilkins on 10/24/16.
*/
class OneHotEncoderOp extends OpNode[MleapContext, OneHotEncoder, OneHotEncoderModel] {
override val Model: OpModel[MleapContext, OneHotEncoderModel] = new OpModel[MleapContext, OneHotEncoderModel] {
override val klazz: Class[OneHotEncoderModel] = classOf[OneHotEncoderModel]
override def opName: String = Bundle.BuiltinOps.feature.one_hot_encoder
override def store(model: Model, obj: OneHotEncoderModel)
(implicit context: BundleContext[MleapContext]): Model = {
model.withAttr("size", Value.long(obj.size)).
withAttr("drop_last", Value.boolean(obj.dropLast))
}
override def load(model: Model)
(implicit context: BundleContext[MleapContext]): OneHotEncoderModel = {
OneHotEncoderModel(size = model.value("size").getLong.toInt,
dropLast = model.value("drop_last").getBoolean)
}
}
override val klazz: Class[OneHotEncoder] = classOf[OneHotEncoder]
override def name(node: OneHotEncoder): String = node.uid
override def model(node: OneHotEncoder): OneHotEncoderModel = node.model
override def load(node: Node, model: OneHotEncoderModel)
(implicit context: BundleContext[MleapContext]): OneHotEncoder = {
OneHotEncoder(uid = node.name,
inputCol = node.shape.standardInput.name,
outputCol = node.shape.standardOutput.name,
model = model)
}
override def shape(node: OneHotEncoder): Shape = Shape().withStandardIO(node.inputCol, node.outputCol)
}
package org.apache.spark.ml.bundle.ops.feature
import ml.combust.bundle.BundleContext
import ml.combust.bundle.op.{OpModel, OpNode}
import ml.combust.bundle.dsl._
import org.apache.spark.ml.bundle.SparkBundleContext
import org.apache.spark.ml.feature.StringIndexerModel
/**
* Created by hollinwilkins on 8/21/16.
*/
class StringIndexerOp extends OpNode[SparkBundleContext, StringIndexerModel, StringIndexerModel] {
override val Model: OpModel[SparkBundleContext, StringIndexerModel] = new OpModel[SparkBundleContext, StringIndexerModel] {
override val klazz: Class[StringIndexerModel] = classOf[StringIndexerModel]
override def opName: String = Bundle.BuiltinOps.feature.string_indexer
override def store(model: Model, obj: StringIndexerModel)
(implicit context: BundleContext[SparkBundleContext]): Model = {
model.withAttr("labels", Value.stringList(obj.labels))
}
override def load(model: Model)
(implicit context: BundleContext[SparkBundleContext]): StringIndexerModel = {
new StringIndexerModel(uid = "", labels = model.value("labels").getStringList.toArray)
}
}
override val klazz: Class[StringIndexerModel] = classOf[StringIndexerModel]
override def name(node: StringIndexerModel): String = node.uid
override def model(node: StringIndexerModel): StringIndexerModel = node
override def load(node: Node, model: StringIndexerModel)
(implicit context: BundleContext[SparkBundleContext]): StringIndexerModel = {
new StringIndexerModel(uid = node.name, labels = model.labels).
setInputCol(node.shape.standardInput.name).
setOutputCol(node.shape.standardOutput.name)
}
override def shape(node: StringIndexerModel): Shape = Shape().withStandardIO(node.getInputCol, node.getOutputCol)
}
The reference.conf
file is what let's Bundle.ML know about all the different transformers that are serializable to a bundle file. If you implement a transformer, you will have to let Bundle.ML know about it before you can start serializing/deserializing with it. In addition, if you are implementing custom data types, you will also need to let Bundle.ML know about it.
For MLeap transformers that should be part of the core feature offering (any vanilla Spark transformers fall into this category, make sure to add the OpNode to the reference.conf
file in mleap-runtime
.
For vanilla Spark transformers, make sure to add your OpNode to the reference.conf
file in the mleap-spark
module.
If you are implementing custom transformers, you will want to append your OpNode's to the existing reference.conf
files. See the reference.conf in mleap-spark-extension for a good example of this.
Tests should use FunSpec
and should test the apply
method in the transformer.
- mleap-core:
The mleap-spark-extension
module provides several extension transformers to Spark. The extensions include:
- SVM - support vector machine ML wrapper around the MLlib model
- OneVsRest - custom OneVsRest model that allows users to output the winning probability in addition to the winning label
- OneHotEncoder - functions just like the Spark OneHotEncoder, except it uses and estimator and model so the transformation does not rely on meta data in the data frame to execute the model transformer
In addition to these extensions, if we find other useful transformers to implement for Spark, we will place them here.