diff --git a/pyspark/bigdl/examples/keras/README.md b/pyspark/bigdl/examples/keras/README.md index af4ece12bc1..a27886f3409 100644 --- a/pyspark/bigdl/examples/keras/README.md +++ b/pyspark/bigdl/examples/keras/README.md @@ -50,4 +50,4 @@ ${SPARK_HOME}/bin/spark-submit \ ``` * ```--batchSize``` an option that can be used to set batch size. * ```--max_epoch``` an option that can be used to set how many epochs for which the model is to be trained. -* ```--optimizerVersion``` an option that can be used to set DistriOptimizer version, the default value is "optimizerV2". \ No newline at end of file +* ```--optimizerVersion``` an option that can be used to set DistriOptimizer version, the default value is "optimizerV1". diff --git a/pyspark/bigdl/examples/keras/imdb_cnn_lstm.py b/pyspark/bigdl/examples/keras/imdb_cnn_lstm.py index 9cba23b1805..9a7b91f21f6 100644 --- a/pyspark/bigdl/examples/keras/imdb_cnn_lstm.py +++ b/pyspark/bigdl/examples/keras/imdb_cnn_lstm.py @@ -65,7 +65,7 @@ def build_keras_model(): parser = OptionParser() parser.add_option("-b", "--batchSize", type=int, dest="batchSize", default="32") parser.add_option("-m", "--max_epoch", type=int, dest="max_epoch", default="2") - parser.add_option("--optimizerVersion", dest="optimizerVersion", default="optimizerV2") + parser.add_option("--optimizerVersion", dest="optimizerVersion", default="optimizerV1") (options, args) = parser.parse_args(sys.argv) keras_model = build_keras_model() @@ -103,4 +103,4 @@ def build_keras_model(): trigger=EveryEpoch(), val_method=[Top1Accuracy()] ) - optimizer.optimize() \ No newline at end of file + optimizer.optimize() diff --git a/pyspark/bigdl/examples/keras/mnist_cnn.py b/pyspark/bigdl/examples/keras/mnist_cnn.py index 3bf787c6fe4..309116c588f 100644 --- a/pyspark/bigdl/examples/keras/mnist_cnn.py +++ b/pyspark/bigdl/examples/keras/mnist_cnn.py @@ -78,7 +78,7 @@ def build_keras_model(): parser.add_option("-b", "--batchSize", type=int, dest="batchSize", default="128") parser.add_option("-m", "--max_epoch", type=int, dest="max_epoch", default="12") parser.add_option("-d", "--dataPath", dest="dataPath", default="/tmp/mnist") - parser.add_option("--optimizerVersion", dest="optimizerVersion", default="optimizerV2") + parser.add_option("--optimizerVersion", dest="optimizerVersion", default="optimizerV1") (options, args) = parser.parse_args(sys.argv) keras_model = build_keras_model() diff --git a/pyspark/bigdl/models/lenet/lenet5.py b/pyspark/bigdl/models/lenet/lenet5.py index 6a1e0fec765..cb4a940d5ed 100644 --- a/pyspark/bigdl/models/lenet/lenet5.py +++ b/pyspark/bigdl/models/lenet/lenet5.py @@ -60,7 +60,7 @@ def build_model(class_num): parser.add_option("-t", "--endTriggerType", dest="endTriggerType", default="epoch") parser.add_option("-n", "--endTriggerNum", type=int, dest="endTriggerNum", default="20") parser.add_option("-d", "--dataPath", dest="dataPath", default="/tmp/mnist") - parser.add_option("--optimizerVersion", dest="optimizerVersion", default="optimizerV2") + parser.add_option("--optimizerVersion", dest="optimizerVersion", default="optimizerV1") (options, args) = parser.parse_args(sys.argv) diff --git a/pyspark/bigdl/models/textclassifier/textclassifier.py b/pyspark/bigdl/models/textclassifier/textclassifier.py index dfa731f6680..c4d0c5d6e12 100644 --- a/pyspark/bigdl/models/textclassifier/textclassifier.py +++ b/pyspark/bigdl/models/textclassifier/textclassifier.py @@ -164,7 +164,7 @@ def train(sc, data_path, parser.add_option("--model", dest="model_type", default="cnn") parser.add_option("-p", "--p", dest="p", default="0.0") parser.add_option("-d", "--data_path", dest="data_path", default="/tmp/news20/") - parser.add_option("--optimizerVersion", dest="optimizerVersion", default="optimizerV2") + parser.add_option("--optimizerVersion", dest="optimizerVersion", default="optimizerV1") (options, args) = parser.parse_args(sys.argv) if options.action == "train": diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLClassifier.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLClassifier.scala deleted file mode 100644 index e57ee03d168..00000000000 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLClassifier.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.tensor.Tensor -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric -import com.intel.analytics.bigdl.{Criterion, Module} -import org.apache.spark.ml.adapter.SchemaUtils -import org.apache.spark.ml.param.ParamMap -import org.apache.spark.ml.util.Identifiable -import org.apache.spark.sql.types._ - -import scala.reflect.ClassTag - -/** - * [[DLClassifier]] is a specialized [[DLEstimator]] that simplifies the data format for - * classification tasks. It only supports label column of DoubleType. - * and the fitted [[DLClassifierModel]] will have the prediction column of DoubleType. - * - * @param model BigDL module to be optimized - * @param criterion BigDL criterion method - * @param featureSize The size (Tensor dimensions) of the feature data. - */ -@deprecated("`DLClassifier` is deprecated." + - "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + - "and will be removed in future releases", "0.10.0") -class DLClassifier[T: ClassTag]( - @transient override val model: Module[T], - override val criterion : Criterion[T], - override val featureSize : Array[Int], - override val uid: String = Identifiable.randomUID("dlClassifier") - )(implicit ev: TensorNumeric[T]) - extends DLEstimator[T](model, criterion, featureSize, Array(1)) { - - override protected def wrapBigDLModel( - m: Module[T], featureSize: Array[Int]): DLClassifierModel[T] = { - val dlModel = new DLClassifierModel[T](m, featureSize) - copyValues(dlModel.setParent(this)).asInstanceOf[DLClassifierModel[T]] - } - - override def transformSchema(schema : StructType): StructType = { - validateParams(schema) - SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType) - } - - override def copy(extra: ParamMap): DLClassifier[T] = { - copyValues(new DLClassifier(model, criterion, featureSize), extra) - } -} - -/** - * [[DLClassifierModel]] is a specialized [[DLModel]] for classification tasks. - * The prediction column will have the datatype of Double. - * - * @param model BigDL module to be optimized - * @param featureSize The size (Tensor dimensions) of the feature data. - */ -@deprecated("`DLClassifierModel` is deprecated." + - "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + - "and will be removed in future releases", "0.10.0") -class DLClassifierModel[T: ClassTag]( - @transient override val model: Module[T], - featureSize : Array[Int], - override val uid: String = "DLClassifierModel" - )(implicit ev: TensorNumeric[T]) extends DLModel[T](model, featureSize) { - - protected override def outputToPrediction(output: Tensor[T]): Any = { - if (output.size().deep == Array(1).deep) { - val raw = ev.toType[Double](output.toArray().head) - if (raw > 0.5) 1.0 else 0.0 - } else { - ev.toType[Double](output.max(1)._2.valueAt(1)) - } - } - - override def transformSchema(schema : StructType): StructType = { - validateDataType(schema, $(featuresCol)) - SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType) - } -} - diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLEstimator.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLEstimator.scala deleted file mode 100644 index 9441be959f5..00000000000 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLEstimator.scala +++ /dev/null @@ -1,446 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.{Criterion, Module} -import com.intel.analytics.bigdl.dataset._ -import com.intel.analytics.bigdl.models.utils.ModelBroadcast -import com.intel.analytics.bigdl.optim._ -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric -import com.intel.analytics.bigdl.tensor.{Storage, Tensor} -import com.intel.analytics.bigdl.utils.T -import com.intel.analytics.bigdl.visualization.{TrainSummary, ValidationSummary} -import org.apache.spark.ml.adapter.{HasFeaturesCol, HasPredictionCol, SchemaUtils} -import org.apache.spark.ml.{DLEstimatorBase, DLTransformerBase, VectorCompatibility} -import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators, _} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row} - -import scala.reflect.ClassTag - -private[dlframes] trait HasBatchSize extends Params { - - final val batchSize: Param[Int] = new Param[Int](this, "batchSize", "batchSize") - - def getBatchSize: Int = $(batchSize) -} - -/** - * Common trait for DLEstimator and DLModel - */ -private[dlframes] trait DLParams[@specialized(Float, Double) T] extends HasFeaturesCol - with HasPredictionCol with VectorCompatibility with HasBatchSize { - - /** - * When to stop the training, passed in a [[Trigger]]. E.g. Trigger.maxIterations - */ - final val endWhen = new Param[Trigger](this, "endWhen", "Trigger to stop the training") - - def getEndWhen: Trigger = $(endWhen) - - /** - * learning rate for the optimizer in the DLEstimator. - * Default: 0.001 - */ - final val learningRate = new DoubleParam( - this, "learningRate", "learningRate", ParamValidators.gt(0)) - - def getLearningRate: Double = $(learningRate) - - /** - * learning rate decay for each iteration. - * Default: 0 - */ - final val learningRateDecay = new DoubleParam(this, "learningRateDecay", "learningRateDecay") - - def getLearningRateDecay: Double = $(learningRateDecay) - - /** - * Number of max Epoch for the training, an epoch refers to a traverse over the training data - * Default: 50 - */ - final val maxEpoch = new IntParam(this, "maxEpoch", "number of max Epoch", ParamValidators.gt(0)) - - def getMaxEpoch: Int = $(maxEpoch) - - /** - * optimization method to be used. BigDL supports many optimization methods like Adam, - * SGD and LBFGS. Refer to package com.intel.analytics.bigdl.optim for all the options. - * Default: SGD - */ - final val optimMethod = new Param[OptimMethod[T]](this, "optimMethod", "optimMethod") - - def getOptimMethod: OptimMethod[T] = $(optimMethod) - - setDefault(batchSize -> 1) - - /** - * Validate if feature and label columns are of supported data types. - * Default: 0 - */ - protected def validateDataType(schema: StructType, colName: String): Unit = { - val dataTypes = Seq( - new ArrayType(DoubleType, false), - new ArrayType(DoubleType, true), - new ArrayType(FloatType, false), - new ArrayType(FloatType, true), - DoubleType, - FloatType - ) ++ validVectorTypes - - // TODO use SchemaUtils.checkColumnTypes after convert to 2.0 - val actualDataType = schema(colName).dataType - require(dataTypes.exists(actualDataType.equals), - s"Column $colName must be of type equal to one of the following types: " + - s"${dataTypes.mkString("[", ", ", "]")} but was actually of type $actualDataType.") - } - - /** - * Get conversion function to extract data from original DataFrame - * Default: 0 - */ - protected def getConvertFunc(colType: DataType): (Row, Int) => Seq[AnyVal] = { - colType match { - case ArrayType(DoubleType, false) => - (row: Row, index: Int) => row.getSeq[Double](index) - case ArrayType(DoubleType, true) => - (row: Row, index: Int) => row.getSeq[Double](index) - case ArrayType(FloatType, false) => - (row: Row, index: Int) => row.getSeq[Float](index) - case ArrayType(FloatType, true) => - (row: Row, index: Int) => row.getSeq[Float](index) - case DoubleType => - (row: Row, index: Int) => Seq[Double](row.getDouble(index)) - case FloatType => - (row: Row, index: Int) => Seq[Float](row.getFloat(index)) - case _ => - if (colType.typeName.contains("vector")) { - (row: Row, index: Int) => getVectorSeq(row, colType, index) - } else { - throw new IllegalArgumentException( - s"$colType is not a supported (Unexpected path).") - } - } - } -} - - -/** - * [[DLEstimator]] helps to train a BigDL Model with the Spark ML Estimator/Transfomer pattern, - * thus Spark users can conveniently fit BigDL into Spark ML pipeline. - * - * [[DLEstimator]] supports feature and label data in the format of - * Array[Double], Array[Float], org.apache.spark.mllib.linalg.{Vector, VectorUDT}, - * org.apache.spark.ml.linalg.{Vector, VectorUDT}, Double and Float. - * - * User should specify the feature data dimensions and label data dimensions via the constructor - * parameters featureSize and labelSize respectively. Internally the feature and label data are - * converted to BigDL tensors, to further train a BigDL model efficiently. - * - * For details usage, please refer to examples in package - * com.intel.analytics.bigdl.example.MLPipeline - * - * @param model BigDL module to be optimized - * @param criterion BigDL criterion method - * @param featureSize The size (Tensor dimensions) of the feature data. e.g. an image may be with - * width * height = 28 * 28, featureSize = Array(28, 28). - * @param labelSize The size (Tensor dimensions) of the label data. - */ -@deprecated("`DLEstimator` is deprecated." + - "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + - "and will be removed in future releases", "0.10.0") -class DLEstimator[@specialized(Float, Double) T: ClassTag]( - @transient val model: Module[T], - val criterion : Criterion[T], - val featureSize : Array[Int], - val labelSize : Array[Int], - override val uid: String = "DLEstimator")(implicit ev: TensorNumeric[T]) - extends DLEstimatorBase[DLEstimator[T], DLModel[T]] with DLParams[T] { - - def setFeaturesCol(featuresColName: String): this.type = set(featuresCol, featuresColName) - - def setLabelCol(labelColName : String) : this.type = set(labelCol, labelColName) - - def setPredictionCol(value: String): this.type = set(predictionCol, value) - - def setBatchSize(value: Int): this.type = set(batchSize, value) - - def setEndWhen(trigger: Trigger): this.type = set(endWhen, trigger) - - def setLearningRate(value: Double): this.type = set(learningRate, value) - setDefault(learningRate -> 1e-3) - - def setLearningRateDecay(value: Double): this.type = set(learningRateDecay, value) - setDefault(learningRateDecay -> 0.0) - - def setMaxEpoch(value: Int): this.type = set(maxEpoch, value) - setDefault(maxEpoch -> 50) - - def setOptimMethod(value: OptimMethod[T]): this.type = set(optimMethod, value) - set(optimMethod, new SGD[T]) - - @transient private var trainSummary: Option[TrainSummary] = None - - def getTrainSummary: Option[TrainSummary] = trainSummary - - /** - * Statistics (LearningRate, Loss, Throughput, Parameters) collected during training for the - * training data, which can be used for visualization via Tensorboard. - * Use setTrainSummary to enable train logger. Then the log will be saved to - * logDir/appName/train as specified by the parameters of TrainSummary. - * - * Default: Not enabled - */ - def setTrainSummary(value: TrainSummary): this.type = { - this.trainSummary = Some(value) - this - } - - @transient private var validationSummary: Option[ValidationSummary] = None - - /** - * Statistics (LearningRate, Loss, Throughput, Parameters) collected during training for the - * validation data if validation data is set, which can be used for visualization via - * Tensorboard. Use setValidationSummary to enable validation logger. Then the log will be - * saved to logDir/appName/ as specified by the parameters of validationSummary. - * - * Default: None - */ - def getValidationSummary: Option[ValidationSummary] = validationSummary - - /** - * Enable validation Summary - */ - def setValidationSummary(value: ValidationSummary): this.type = { - this.validationSummary = Some(value) - this - } - - @transient private var validationTrigger: Option[Trigger] = None - @transient private var validationDF: DataFrame = _ - @transient private var validationMethods: Array[ValidationMethod[T]] = _ - @transient private var validationBatchSize: Int = 0 - /** - * Set a validate evaluation during training - * - * @param trigger how often to evaluation validation set - * @param validationDF validate data set - * @param vMethods a set of validation method [[ValidationMethod]] - * @param batchSize batch size for validation - * @return this optimizer - */ - def setValidation(trigger: Trigger, validationDF: DataFrame, - vMethods : Array[ValidationMethod[T]], batchSize: Int) - : this.type = { - this.validationTrigger = Some(trigger) - this.validationDF = validationDF - this.validationMethods = vMethods - this.validationBatchSize = batchSize - this - } - - protected def validateParams(schema : StructType): Unit = { - validateDataType(schema, $(featuresCol)) - validateDataType(schema, $(labelCol)) - if(isSet(endWhen) && isSet(maxEpoch)) { - throw new IllegalArgumentException(s"endWhen and maxEpoch cannot be both set") - } - if (validationTrigger.isEmpty && validationSummary.isDefined) { - throw new IllegalArgumentException( - s"validationSummary is only valid if validation data is set.") - } - } - - override def transformSchema(schema : StructType): StructType = { - validateParams(schema) - SchemaUtils.appendColumn(schema, $(predictionCol), ArrayType(DoubleType, false)) - } - - protected override def internalFit(dataFrame: DataFrame): DLModel[T] = { - val localFeatureCol = $(featuresCol) - val localLabelCol = $(labelCol) - - def getSamples(dataFrame: DataFrame): RDD[Sample[T]] = { - val featureType = dataFrame.schema(localFeatureCol).dataType - val featureColIndex = dataFrame.schema.fieldIndex(localFeatureCol) - val labelType = dataFrame.schema(localLabelCol).dataType - val labelColIndex = dataFrame.schema.fieldIndex(localLabelCol) - - val featureFunc = getConvertFunc(featureType) - val labelFunc = getConvertFunc(labelType) - - val featureAndLabel: RDD[(Seq[AnyVal], Seq[AnyVal])] = dataFrame.rdd.map { row => - val features = featureFunc(row, featureColIndex) - val labels = labelFunc(row, labelColIndex) - (features, labels) - } - - val samples = featureAndLabel.map { case (f, l) => - // convert feature and label data type to the same type with model - // TODO: investigate to reduce memory consumption during conversion. - val feature = f.head match { - case dd: Double => f.asInstanceOf[Seq[Double]].map(ev.fromType(_)) - case ff: Float => f.asInstanceOf[Seq[Float]].map(ev.fromType(_)) - } - val label = l.head match { - case dd: Double => l.asInstanceOf[Seq[Double]].map(ev.fromType(_)) - case ff: Float => l.asInstanceOf[Seq[Float]].map(ev.fromType(_)) - } - (feature, label) - }.map { case (feature, label) => - Sample(Tensor(feature.toArray, featureSize), Tensor(label.toArray, labelSize)) - } - samples - } - - val trainingSamples = getSamples(dataFrame) - val state = T("learningRate" -> $(learningRate), "learningRateDecay" -> $(learningRateDecay)) - val endTrigger = if (isSet(endWhen)) $(endWhen) else Trigger.maxEpoch($(maxEpoch)) - val optimizer = Optimizer(model, trainingSamples, criterion, $(batchSize)) - .setState(state) - .setOptimMethod($(optimMethod)) - .setEndWhen(endTrigger) - - if (validationTrigger.isDefined) { - val validationSamples = getSamples(validationDF) - optimizer.setValidation( - validationTrigger.get, - validationSamples, - validationMethods, - validationBatchSize) - if (this.validationSummary.isDefined) { - optimizer.setValidationSummary(this.validationSummary.get) - } - } - - if (this.trainSummary.isDefined) { - optimizer.setTrainSummary(this.trainSummary.get) - } - - val optimizedModel = optimizer.optimize() - wrapBigDLModel(optimizedModel, featureSize) - } - - /** - * sub classes can extend the method and return required model for different transform tasks - */ - protected def wrapBigDLModel(m: Module[T], featureSize: Array[Int]): DLModel[T] = { - val dlModel = new DLModel[T](m, featureSize) - copyValues(dlModel.setParent(this)) - } - - override def copy(extra: ParamMap): DLEstimator[T] = { - copyValues(new DLEstimator(model, criterion, featureSize, labelSize), extra) - } -} - -/** - * [[DLModel]] helps embed a BigDL model into a Spark Transformer, thus Spark users can - * conveniently merge BigDL into Spark ML pipeline. - * [[DLModel]] supports feature data in the format of - * Array[Double], Array[Float], org.apache.spark.mllib.linalg.{Vector, VectorUDT}, - * org.apache.spark.ml.linalg.{Vector, VectorUDT}, Double and Float. - * Internally [[DLModel]] use features column as storage of the feature data, and create - * Tensors according to the constructor parameter featureSize. - * - * [[DLModel]] is compatible with both spark 1.5-plus and 2.0 by extending ML Transformer. - * @param model trainned BigDL models to use in prediction. - * @param featureSize The size (Tensor dimensions) of the feature data. (e.g. an image may be with - * featureSize = 28 * 28). - */ -@deprecated("`DLModel` is deprecated." + - "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + - "and will be removed in future releases", "0.10.0") -class DLModel[@specialized(Float, Double) T: ClassTag]( - @transient val model: Module[T], - var featureSize : Array[Int], - override val uid: String = "DLModel" - )(implicit ev: TensorNumeric[T]) - extends DLTransformerBase[DLModel[T]] with DLParams[T] with HasBatchSize { - - def setFeaturesCol(featuresColName: String): this.type = set(featuresCol, featuresColName) - - def setPredictionCol(value: String): this.type = set(predictionCol, value) - - def setFeatureSize(value: Array[Int]): this.type = { - this.featureSize = value - this - } - - def setBatchSize(value: Int): this.type = set(batchSize, value) - - def getFeatureSize: Array[Int] = this.featureSize - - /** - * Perform a prediction on featureCol, and write result to the predictionCol. - */ - protected override def internalTransform(dataFrame: DataFrame): DataFrame = { - val featureType = dataFrame.schema($(featuresCol)).dataType - val featureColIndex = dataFrame.schema.fieldIndex($(featuresCol)) - val featureFunc = getConvertFunc(featureType) - val sc = dataFrame.sqlContext.sparkContext - val modelBroadCast = ModelBroadcast[T]().broadcast(sc, model.evaluate()) - val localBatchSize = $(batchSize) - val transformerBC = sc.broadcast(SampleToMiniBatch[T](localBatchSize)) - - val resultRDD = dataFrame.rdd.mapPartitions { rowIter => - val localModel = modelBroadCast.value() - val transformer = transformerBC.value.cloneTransformer() - rowIter.grouped(localBatchSize).flatMap { rowBatch => - val samples = rowBatch.map { row => - val features = featureFunc(row, featureColIndex) - val featureBuffer = features.head match { - case dd: Double => features.asInstanceOf[Seq[Double]].map(ev.fromType(_)) - case ff: Float => features.asInstanceOf[Seq[Float]].map(ev.fromType(_)) - } - Sample(Tensor(featureBuffer.toArray, featureSize)) - }.toIterator - val predictions = transformer(samples).flatMap { batch => - val batchResult = localModel.forward(batch.getInput()) - batchResult.toTensor.split(1).map(outputToPrediction) - } - rowBatch.toIterator.zip(predictions).map { case (row, predict) => - Row.fromSeq(row.toSeq ++ Seq(predict)) - } - } - } - - val resultSchema = transformSchema(dataFrame.schema) - dataFrame.sqlContext.createDataFrame(resultRDD, resultSchema) - } - - protected def outputToPrediction(output: Tensor[T]): Any = { - output.clone().storage().array().map(ev.toType[Double]) - } - - override def transformSchema(schema : StructType): StructType = { - validateDataType(schema, $(featuresCol)) - SchemaUtils.appendColumn(schema, $(predictionCol), ArrayType(DoubleType, false)) - } - - override def copy(extra: ParamMap): DLModel[T] = { - val copied = new DLModel(model, featureSize, uid).setParent(parent) - copyValues(copied, extra) - } -} - -// TODO, add save/load -object DLModel { - - -} - diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageReader.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageReader.scala deleted file mode 100644 index 2df4112b3d7..00000000000 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageReader.scala +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.tensor.{Storage, Tensor} -import com.intel.analytics.bigdl.transform.vision.image.{BytesToMat, ImageFeature, ImageFrame} -import org.apache.spark.SparkContext -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.opencv.core.CvType -import scala.language.existentials - -/** - * Definition for image data in a DataFrame - */ -object DLImageSchema { - - /** - * Schema for the image column in a DataFrame. Image data is saved in an array of Bytes. - * The format is compatible with Spark Image format in v2.3 - */ - val byteSchema = StructType( - StructField("origin", StringType, true) :: - StructField("height", IntegerType, false) :: - StructField("width", IntegerType, false) :: - StructField("nChannels", IntegerType, false) :: - // OpenCV-compatible type: CV_8UC3, CV_32FC3 in most cases - StructField("mode", IntegerType, false) :: - // Bytes in OpenCV-compatible order: row-wise BGR in most cases - StructField("data", BinaryType, false) :: Nil) - - /** - * Schema for the image column in a DataFrame. Image data is saved in an array of Floats. - */ - val floatSchema = StructType( - StructField("origin", StringType, true) :: - StructField("height", IntegerType, false) :: - StructField("width", IntegerType, false) :: - StructField("nChannels", IntegerType, false) :: - // OpenCV-compatible type: CV_8UC3, CV_32FC3 in most cases - StructField("mode", IntegerType, false) :: - // floats in OpenCV-compatible order: row-wise BGR in most cases - StructField("data", new ArrayType(FloatType, false), false) :: Nil) - - private[dlframes] def imf2Row(imf: ImageFeature): Row = { - val (mode, data) = if (imf.contains(ImageFeature.imageTensor)) { - val floatData = imf(ImageFeature.imageTensor).asInstanceOf[Tensor[Float]].storage().array() - val cvType = imf.getChannel() match { - case 1 => CvType.CV_32FC1 - case 3 => CvType.CV_32FC3 - case 4 => CvType.CV_32FC4 - case other => throw new IllegalArgumentException(s"Unsupported number of channels:" + - s" $other in ${imf.uri()}. Only 1, 3 and 4 are supported.") - } - (cvType, floatData) - } else if (imf.contains(ImageFeature.bytes)) { - val bytesData = imf.bytes() - val cvType = imf.getChannel() match { - case 1 => CvType.CV_8UC1 - case 3 => CvType.CV_8UC3 - case 4 => CvType.CV_8UC4 - case other => throw new IllegalArgumentException(s"Unsupported number of channels:" + - s" $other in ${imf.uri()}. Only 1, 3 and 4 are supported.") - } - (cvType, bytesData) - } else { - throw new IllegalArgumentException(s"ImageFeature should have imageTensor or bytes.") - } - - Row( - imf.uri(), - imf.getHeight(), - imf.getWidth(), - imf.getChannel(), - mode, - data - ) - } - - private[dlframes] def row2IMF(row: Row): ImageFeature = { - val (origin, h, w, c) = (row.getString(0), row.getInt(1), row.getInt(2), row.getInt(3)) - val imf = ImageFeature() - imf.update(ImageFeature.uri, origin) - imf.update(ImageFeature.size, (h, w, c)) - val storageType = row.getInt(4) - storageType match { - case CvType.CV_8UC3 | CvType.CV_8UC1 | CvType.CV_8UC4 => - imf.update(ImageFeature.bytes, row.getAs[Array[Byte]](5)) - BytesToMat().transform(imf) - case CvType.CV_32FC3 | CvType.CV_32FC1 | CvType.CV_32FC4 => - val data = row.getSeq[Float](5).toArray - val size = Array(h, w, c) - val ten = Tensor(Storage(data)).resize(size) - imf.update(ImageFeature.imageTensor, ten) - case _ => - throw new IllegalArgumentException(s"Unsupported data type in imageColumn: $storageType") - } - imf - } -} - -/** - * Primary DataFrame-based image loading interface, defining API to read images into DataFrame. - */ -object DLImageReader { - - /** - * DataFrame with a single column of images named "image" (nullable) - */ - private val imageColumnSchema = - StructType(StructField("image", DLImageSchema.byteSchema, true) :: Nil) - - /** - * Read the directory of images into DataFrame from the local or remote source. - * - * @param path Directory to the input data files, the path can be comma separated paths as the - * list of inputs. Wildcards path are supported similarly to sc.binaryFiles(path). - * @param sc SparkContext to be used. - * @param minPartitions Number of the DataFrame partitions, - * if omitted uses defaultParallelism instead - * @return DataFrame with a single column "image" of images; - * see DLImageSchema for the details - */ - def readImages(path: String, sc: SparkContext, minPartitions: Int = 1): DataFrame = { - val imageFrame = ImageFrame.read(path, sc, minPartitions) - val rowRDD = imageFrame.toDistributed().rdd.map { imf => - Row(DLImageSchema.imf2Row(imf)) - } - SQLContext.getOrCreate(sc).createDataFrame(rowRDD, imageColumnSchema) - } -} diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageTransformer.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageTransformer.scala deleted file mode 100644 index fa728e0eb17..00000000000 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageTransformer.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.dataset.Transformer -import com.intel.analytics.bigdl.transform.vision.image.{FeatureTransformer, ImageFeature, MatToTensor} -import org.apache.spark.ml.DLTransformerBase -import org.apache.spark.ml.adapter.{HasInputCol, HasOutputCol, SchemaUtils} -import org.apache.spark.ml.util.Identifiable -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row} - -/** - * Provides DataFrame-based API for image pre-processing and feature transformation. - * DLImageTransformer follows the Spark Transformer API pattern and can be used as one stage - * in Spark ML pipeline. - * - * The input column can be either DLImageSchema.byteSchema or DLImageSchema.floatSchema. If - * using DLImageReader, the default format is DLImageSchema.byteSchema - * The output column is always DLImageSchema.floatSchema. - * - * @param transformer Single or a sequence of BigDL FeatureTransformers to be used. E.g. - * Resize(256, 256) -> CenterCrop(224, 224) -> - * ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() - */ -class DLImageTransformer ( - val transformer: Transformer[ImageFeature, ImageFeature], - override val uid: String) - extends DLTransformerBase with HasInputCol with HasOutputCol { - - def this(transformer: FeatureTransformer) = - this(transformer, Identifiable.randomUID("DLImageTransformer")) - - setDefault(inputCol -> "image") - def setInputCol(value: String): this.type = set(inputCol, value) - - setDefault(outputCol -> "output") - def setOutputCol(value: String): this.type = set(outputCol, value) - - protected def validateInputType(inputType: DataType): Unit = { - val validTypes = Array(DLImageSchema.floatSchema, DLImageSchema.byteSchema) - - require(validTypes.exists(t => SchemaUtils.sameType(inputType, t)), - s"Bad input type: $inputType. Requires ${validTypes.mkString(", ")}") - } - - override def transformSchema(schema: StructType): StructType = { - val inputType = schema($(inputCol)).dataType - validateInputType(inputType) - if (schema.fieldNames.contains($(outputCol))) { - throw new IllegalArgumentException(s"Output column ${$(outputCol)} already exists.") - } - - val outputFields = schema.fields :+ - StructField($(outputCol), DLImageSchema.floatSchema, nullable = false) - StructType(outputFields) - } - - protected override def internalTransform(dataFrame: DataFrame): DataFrame = { - transformSchema(dataFrame.schema, logging = true) - val sc = dataFrame.sqlContext.sparkContext - val localTransformer = this.transformer - val transformerBC = sc.broadcast(localTransformer) - val toTensorBC = sc.broadcast(MatToTensor[Float](shareBuffer = true)) - - val inputColIndex = dataFrame.schema.fieldIndex($(inputCol)) - val resultRDD = dataFrame.rdd.mapPartitions { rowIter => - val localTransformer = transformerBC.value.cloneTransformer() - val toTensorTransformer = toTensorBC.value.cloneTransformer().asInstanceOf[MatToTensor[Float]] - rowIter.map { row => - val imf = DLImageSchema.row2IMF(row.getAs[Row](inputColIndex)) - val output = localTransformer.apply(Iterator(imf)).toArray.head - if (!output.contains(ImageFeature.imageTensor)) { - toTensorTransformer.transform(output) - } - Row.fromSeq(row.toSeq ++ Seq(DLImageSchema.imf2Row(output))) - } - } - - val resultSchema = transformSchema(dataFrame.schema) - dataFrame.sqlContext.createDataFrame(resultRDD, resultSchema) - } -} diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/SharedParamsAdapter.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/SharedParamsAdapter.scala deleted file mode 100644 index c7737f74de0..00000000000 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/SharedParamsAdapter.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.adapter - -import org.apache.spark.sql.types.{DataType, StructField, StructType} - - -trait HasPredictionCol extends org.apache.spark.ml.param.shared.HasPredictionCol - -trait HasFeaturesCol extends org.apache.spark.ml.param.shared.HasFeaturesCol - -trait HasInputCol extends org.apache.spark.ml.param.shared.HasInputCol - -trait HasOutputCol extends org.apache.spark.ml.param.shared.HasOutputCol - -object SchemaUtils { - - /** - * Appends a new column to the input schema. This fails if the given output column already exists - * @param schema input schema - * @param colName new column name. If this column name is an empty string "", this method returns - * the input schema unchanged. This allows users to disable output columns. - * @param dataType new column data type - * @return new schema with the input column appended - */ - def appendColumn( - schema: StructType, - colName: String, - dataType: DataType, - nullable: Boolean = false): StructType = { - - val colSF = StructField(colName, dataType, nullable) - require(!schema.fieldNames.contains(colSF.name), s"Column ${colSF.name} already exists.") - StructType(schema.fields :+ colSF) - } - - def sameType(a: DataType, b: DataType): Boolean = a.sameType(b) - -} diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/example/treeLSTMSentiment/README.md b/spark/dl/src/main/scala/com/intel/analytics/bigdl/example/treeLSTMSentiment/README.md index 5290117f7e3..35bdb816eef 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/example/treeLSTMSentiment/README.md +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/example/treeLSTMSentiment/README.md @@ -65,6 +65,6 @@ Next just run the following command to run the code: --regRate # number of L2 regularization rate, default is 1e-4 --p # number of dropout probability rate, default is 0.5 --epoch # number of epochs, default is 5 - --optimizerVersion # option to set DistriOptimizer version, default is "optimizerV2" + --optimizerVersion # option to set DistriOptimizer version, default is "optimizerV1" ``` diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/models/resnet/README.md b/spark/dl/src/main/scala/com/intel/analytics/bigdl/models/resnet/README.md index 3f31135810d..c4ea1a76ba8 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/models/resnet/README.md +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/models/resnet/README.md @@ -73,7 +73,7 @@ We support Local and Spark versions of training. Users can define env OptimizerV1 case "optimizerv2" => OptimizerV2 case optimizerVersion => throw new IllegalArgumentException(s"Unknown type $optimizerVersion") diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/LoggerFilterSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/LoggerFilterSpec.scala index 1a144882873..c1106071201 100644 --- a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/LoggerFilterSpec.scala +++ b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/LoggerFilterSpec.scala @@ -121,7 +121,7 @@ class LoggerFilterSpec extends FlatSpec with BeforeAndAfter with Matchers { } { - val pattern = s".*INFO.*DistriOptimizerV2.* - " + "" + + val pattern = s".*INFO.*DistriOptimizer.* - " + "" + s"\\[Epoch 1 100/100\\]\\[Iteration 2\\]\\[Wall Clock .*\\] " + s"Epoch finished. Wall clock time is .*ms" diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/OptimizerSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/OptimizerSpec.scala index 41ad905ef5c..98a6a2a7e17 100644 --- a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/OptimizerSpec.scala +++ b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/OptimizerSpec.scala @@ -285,7 +285,7 @@ class OptimizerSpec extends FlatSpec with Matchers with BeforeAndAfter { val model = Linear[Float](4, 3) val criterion = ClassNLLCriterion[Float]() val res = Optimizer(model, ds, criterion) - res.isInstanceOf[DistriOptimizerV2[Float]] should be(true) + res.isInstanceOf[DistriOptimizer[Float]] should be(true) res.isInstanceOf[LocalOptimizer[Float]] should be(false) } diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLClassifierSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLClassifierSpec.scala deleted file mode 100644 index 2d9eb786a2c..00000000000 --- a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLClassifierSpec.scala +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.models.lenet.LeNet5 -import com.intel.analytics.bigdl.nn._ -import com.intel.analytics.bigdl.optim.{Adam, LBFGS, Loss, Trigger} -import com.intel.analytics.bigdl.tensor.Tensor -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat -import com.intel.analytics.bigdl.utils.Engine -import com.intel.analytics.bigdl.utils.RandomGenerator.RNG -import com.intel.analytics.bigdl.visualization.ValidationSummary -import org.apache.log4j.{Level, Logger} -import org.apache.spark.ml.feature.MinMaxScaler -import org.apache.spark.SparkContext -import org.apache.spark.ml._ -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} - -import scala.collection.mutable.ArrayBuffer -import scala.util.Random - -class DLClassifierSpec extends FlatSpec with Matchers with BeforeAndAfter { - var sc : SparkContext = _ - var sqlContext : SQLContext = _ - var smallData: Seq[(Array[Double], Double)] = _ - val nRecords = 100 - val maxEpoch = 20 - - before { - val conf = Engine.createSparkConf().setAppName("Test DLEstimator").setMaster("local[1]") - sc = SparkContext.getOrCreate(conf) - sqlContext = new SQLContext(sc) - Random.setSeed(42) - RNG.setSeed(42) - smallData = DLEstimatorSpec.generateTestInput( - nRecords, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0), -1.0, 42L) - Engine.init - } - - after{ - if (sc != null) { - sc.stop() - } - } - - "An DLClassifier" should "has correct default params" in { - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLClassifier[Float](model, criterion, Array(10)) - assert(estimator.getFeaturesCol == "features") - assert(estimator.getLabelCol == "label") - assert(estimator.getMaxEpoch == 50) - assert(estimator.getBatchSize == 1) - assert(estimator.getLearningRate == 1e-3) - assert(estimator.getLearningRateDecay == 0) - } - - "An DLClassifier" should "get reasonale accuracy" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setBatchSize(nRecords) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = classifier.fit(df) - dlModel.isInstanceOf[DLClassifierModel[_]] should be(true) - assert(dlModel.transform(df).where("prediction=label").count() > nRecords * 0.8) - } - - "An DLClassifier" should "support different FEATURE types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setLearningRate(0.1) - .setBatchSize(2) - .setEndWhen(Trigger.maxIteration(2)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) - .toDF("features", "label"), // Array[Double] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.map(_.toFloat), p._2)))) - .toDF("features", "label"), // Array[Float] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (Vectors.dense(p._1), p._2)))) - .toDF("features", "label") // MLlib Vector - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = classifier.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLClassifier" should "support scalar FEATURE" in { - val model = new Sequential().add(Linear[Float](1, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(1)) - .setLearningRate(0.1) - .setBatchSize(2) - .setEndWhen(Trigger.maxIteration(2)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head.toFloat, p._2)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head, p._2)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = classifier.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLClassifier" should "support binary classification" in { - val model = new Sequential().add(Linear[Float](6, 1)).add(Sigmoid[Float]) - val criterion = BCECriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setLearningRate(0.1) - .setBatchSize(2) - .setEndWhen(Trigger.maxIteration(10)) - - Array( - sqlContext.createDataFrame(sc.parallelize( - smallData.map(p => (p._1.map(_.toFloat), p._2.toFloat - 1)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2 - 1)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = classifier.fit(df) - val result = dlModel.transform(df).collect() - val accuracy = result.count(v => v.get(1) == v.get(2)).toDouble / smallData.size - accuracy should be > math.max(smallData.count(_._2 == 1), - smallData.count(_._2 == 2)).toDouble / smallData.size - } - } - - "An DLClassifier" should "fit with adam and LBFGS" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - Seq(new LBFGS[Float], new Adam[Float]).foreach { optimMethod => - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setBatchSize(nRecords) - .setMaxEpoch(2) - .setOptimMethod(optimMethod) - .setLearningRate(0.1) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val dlModel = classifier.fit(df) - dlModel.isInstanceOf[DLClassifierModel[_]] should be(true) - } - } - - "An DLClassifier" should "supports validation data and summary" in { - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val logdir = com.google.common.io.Files.createTempDir() - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setBatchSize(nRecords) - .setEndWhen(Trigger.maxIteration(5)) - .setOptimMethod(new Adam[Float]) - .setLearningRate(0.1) - .setValidation(Trigger.severalIteration(1), df, Array(new Loss[Float]()), 2) - .setValidationSummary(ValidationSummary(logdir.getPath, "DLEstimatorValidation")) - - classifier.fit(df) - val validationSummary = classifier.getValidationSummary.get - val losses = validationSummary.readScalar("Loss") - validationSummary.close() - logdir.deleteOnExit() - } - - "An DLClassifier" should "get the same classification result with BigDL model" in { - Logger.getLogger("org").setLevel(Level.WARN) - Logger.getLogger("akka").setLevel(Level.WARN) - - val model = LeNet5(10) - - // init - val valTrans = new DLClassifierModel[Float](model, Array(28, 28)) - .setBatchSize(4) - - val tensorBuffer = new ArrayBuffer[Data]() - // generate test data with BigDL - val input = Tensor[Float](10, 28, 28).apply1(e => Random.nextFloat()) - val target = model.forward(input).toTensor[Float] - - // test against DLClassifierModel - val inputArr = input.storage().array() - val targetArr = target.max(2)._2.squeeze().storage().array() - (0 until 10).foreach(i => - tensorBuffer.append( - Data(targetArr(i), inputArr.slice(i * 28 * 28, (i + 1) * 28 * 28).map(_.toDouble)))) - val rowRDD = sc.parallelize(tensorBuffer) - val testData = sqlContext.createDataFrame(rowRDD) - assert(valTrans.transform(testData).where("prediction=label").count() == testData.count()) - tensorBuffer.clear() - } - - "An DLClassifier" should "works in ML pipeline" in { - var appSparkVersion = org.apache.spark.SPARK_VERSION - if (appSparkVersion.trim.startsWith("1")) { - val data = sc.parallelize( - smallData.map(p => (org.apache.spark.mllib.linalg.Vectors.dense(p._1), p._2))) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - - val scaler = new MinMaxScaler().setInputCol("features").setOutputCol("scaled") - .setMax(1).setMin(-1) - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLClassifier[Float](model, criterion, Array(6)) - .setBatchSize(nRecords) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setMaxEpoch(maxEpoch) - .setFeaturesCol("scaled") - val pipeline = new Pipeline().setStages(Array(scaler, estimator)) - - val pipelineModel = pipeline.fit(df) - pipelineModel.isInstanceOf[PipelineModel] should be(true) - assert(pipelineModel.transform(df).where("prediction=label").count() > nRecords * 0.8) - } - } -} - -private case class Data(label: Double, features: Array[Double]) diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLEstimatorSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLEstimatorSpec.scala deleted file mode 100644 index 2126f556c14..00000000000 --- a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLEstimatorSpec.scala +++ /dev/null @@ -1,348 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.nn._ -import com.intel.analytics.bigdl.optim.{LBFGS, Loss, Trigger} -import com.intel.analytics.bigdl.tensor.Tensor -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat -import com.intel.analytics.bigdl.utils.Engine -import com.intel.analytics.bigdl.utils.RandomGenerator.RNG -import com.intel.analytics.bigdl.visualization.{TrainSummary, ValidationSummary} -import org.apache.spark.SparkContext -import org.apache.spark.ml.feature.MinMaxScaler -import org.apache.spark.ml.{Pipeline, PipelineModel} -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} - -import scala.util.Random - -class DLEstimatorSpec extends FlatSpec with Matchers with BeforeAndAfter { - val model = new Sequential[Float]() - var sc : SparkContext = _ - var sqlContext : SQLContext = _ - var smallData: Seq[(Array[Double], Double)] = _ - val nRecords = 100 - val maxEpoch = 20 - - before { - Random.setSeed(42) - RNG.setSeed(42) - val conf = Engine.createSparkConf().setAppName("Test DLEstimator").setMaster("local[1]") - sc = SparkContext.getOrCreate(conf) - sqlContext = new SQLContext(sc) - smallData = DLEstimatorSpec.generateTestInput( - nRecords, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0), -1.0, 42L) - Engine.init - } - - after{ - if (sc != null) { - sc.stop() - } - } - - "An DLEstimator" should "has correct default params" in { - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(10), Array(1)) - assert(estimator.getFeaturesCol == "features") - assert(estimator.getLabelCol == "label") - assert(estimator.getMaxEpoch == 50) - assert(estimator.getBatchSize == 1) - assert(estimator.getLearningRate == 1e-3) - assert(estimator.getLearningRateDecay == 0) - - } - - "An DLEstimator" should "get reasonable accuracy" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(nRecords) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = estimator.fit(df) - dlModel.isInstanceOf[DLModel[_]] should be(true) - val correct = dlModel.transform(df).select("label", "prediction").rdd.filter { - case Row(label: Double, prediction: Seq[_]) => - label == prediction.indexOf(prediction.asInstanceOf[Seq[Double]].max) + 1 - }.count() - assert(correct > nRecords * 0.8) - } - - "An DLEstimator" should "support different FEATURE types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(2) - // intentionally set low since this only validates data format compatibility - .setEndWhen(Trigger.maxIteration(1)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) - .toDF("features", "label"), // Array[Double] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.map(_.toFloat), p._2)))) - .toDF("features", "label"), // Array[Float] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (Vectors.dense(p._1), p._2)))) - .toDF("features", "label") // MLlib Vector - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "support scalar FEATURE types" in { - val model = new Sequential().add(Linear[Float](1, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(1), Array(1)) - .setBatchSize(2) - // intentionally set low since this only validates data format compatibility - .setEndWhen(Trigger.maxIteration(1)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head.toFloat, p._2)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head, p._2)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "support different LABEL types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = MultiLabelSoftMarginCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(2)) - // intentionally set low since this only validates data format compatibitliy - .setEndWhen(Trigger.maxIteration(1)) - .setBatchSize(2) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, Array(p._2, p._2))))) - .toDF("features", "label"), // Array[Double] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, - Array(p._2.toFloat, p._2.toFloat))))).toDF("features", "label"), // Array[Float] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, - Vectors.dense(p._2, p._2))))).toDF("features", "label") // MLlib Vector - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "support scalar LABEL types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - // intentionally set low since this only validates data format compatibitliy - .setEndWhen(Trigger.maxIteration(1)) - .setBatchSize(2) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2.toFloat)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "work with tensor data" in { - - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(10), Array(1)) - .setMaxEpoch(1) - .setBatchSize(nRecords) - - val featureData = Array.tabulate(100)(_ => Tensor(10)) - val labelData = Array.tabulate(100)(_ => Tensor(1).fill(1.0f)) - val miniBatch = sc.parallelize( - featureData.zip(labelData).map(v => - MinibatchData(v._1.storage.array, v._2.storage.array)) - ) - val trainingDF: DataFrame = sqlContext.createDataFrame(miniBatch).toDF("features", "label") - - val dlModel = estimator.fit(trainingDF) - dlModel.transform(trainingDF).collect() - } - - "An DLEstimator" should "support different batchSize" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(51) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = estimator.fit(df) - dlModel.isInstanceOf[DLModel[_]] should be(true) - dlModel.transform(df).count() - } - - "An DLModel" should "support transform with different batchSize" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(nRecords) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - val dlModel = estimator.fit(df) - assert(df.count() == dlModel.setBatchSize(51).transform(df).count()) - } - - "An DLEstimator" should "throws exception without correct inputs" in { - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val inputs = Array[String]("Feature data", "Label data") - var estimator = new DLEstimator[Float](model, criterion, Array(10), Array(2, 1)). - setFeaturesCol(inputs(0)).setLabelCol(inputs(1)) - - val featureData = Tensor(2, 10) - val labelData = Tensor(2, 1) - val miniBatch = sc.parallelize(Seq( - MinibatchData[Float](featureData.storage().array(), labelData.storage().array()) - )) - var df: DataFrame = sqlContext.createDataFrame(miniBatch).toDF(inputs: _*) - // Spark 1.6 and 2.0 throws different exception here - intercept[Exception] { - estimator.fit(df) - } - } - - "An DLEstimator" should "supports training summary" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val logdir = com.google.common.io.Files.createTempDir() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(nRecords) - .setMaxEpoch(5) - .setTrainSummary(TrainSummary(logdir.getPath, "DLEstimatorTrain")) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = estimator.fit(df) - val trainSummary = estimator.getTrainSummary.get - val losses = trainSummary.readScalar("Loss") - assert(losses.length == 5) - trainSummary.close() - logdir.deleteOnExit() - } - - "An DLEstimator" should "supports validation data and summary" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val logdir = com.google.common.io.Files.createTempDir() - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(4) - .setEndWhen(Trigger.maxIteration(5)) - .setValidation(Trigger.severalIteration(1), df, Array(new Loss[Float]()), 2) - .setValidationSummary(ValidationSummary(logdir.getPath, "DLEstimatorValidation")) - - val dlModel = estimator.fit(df) - val validationSummary = estimator.getValidationSummary.get - val losses = validationSummary.readScalar("Loss") - assert(losses.length == 5) - validationSummary.close() - logdir.deleteOnExit() - } - - "An DLEstimator" should "throws exception when EndWhen and MaxEpoch are set" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val logdir = com.google.common.io.Files.createTempDir() - - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(4) - .setEndWhen(Trigger.maxIteration(5)) - .setMaxEpoch(5) - - intercept[Exception] { - estimator.fit(df) - } - } - - "An DLEstimator" should "works in ML pipeline" in { - var appSparkVersion = org.apache.spark.SPARK_VERSION - if (appSparkVersion.trim.startsWith("1")) { - val data = sc.parallelize( - smallData.map(p => (org.apache.spark.mllib.linalg.Vectors.dense(p._1), p._2))) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - - val scaler = new MinMaxScaler().setInputCol("features").setOutputCol("scaled") - .setMax(1).setMin(-1) - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setBatchSize(nRecords) - .setMaxEpoch(maxEpoch) - .setFeaturesCol("scaled") - val pipeline = new Pipeline().setStages(Array(scaler, estimator)) - - val pipelineModel = pipeline.fit(df) - pipelineModel.isInstanceOf[PipelineModel] should be(true) - val correct = pipelineModel.transform(df).select("label", "prediction").rdd.filter { - case Row(label: Double, prediction: Seq[_]) => - label == prediction.indexOf(prediction.asInstanceOf[Seq[Double]].max) + 1 - }.count() - assert(correct > nRecords * 0.8) - } - } -} - -private case class MinibatchData[T](featureData : Array[T], labelData : Array[T]) - -object DLEstimatorSpec { - // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) - def generateTestInput( - numRecords: Int, - weight: Array[Double], - intercept: Double, - seed: Long): Seq[(Array[Double], Double)] = { - val rnd = new Random(seed) - val data = (1 to numRecords) - .map( i => Array.tabulate(weight.length)(index => rnd.nextDouble() * 2 - 1)) - .map { record => - val y = record.zip(weight).map(t => t._1 * t._2).sum - +intercept + 0.01 * rnd.nextGaussian() - val label = if (y > 0) 2.0 else 1.0 - (record, label) - } - data - } -} diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageReaderSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageReaderSpec.scala deleted file mode 100644 index e6ba6d1e98e..00000000000 --- a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageReaderSpec.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.transform.vision.image.{ImageFrame, MatToTensor} -import com.intel.analytics.bigdl.transform.vision.image.augmentation.Resize -import com.intel.analytics.bigdl.utils.Engine -import com.intel.analytics.bigdl.utils.RandomGenerator.RNG -import org.apache.spark.SparkContext -import org.apache.spark.sql.{Row, SQLContext} -import org.opencv.core.CvType -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} - -import scala.util.Random - -class DLImageReaderSpec extends FlatSpec with Matchers with BeforeAndAfter { - - var sc : SparkContext = _ - var sQLContext: SQLContext = _ - val pascalResource = getClass.getClassLoader.getResource("pascal/") - private val imageNetResource = getClass.getClassLoader.getResource("imagenet/") - - before { - val conf = Engine.createSparkConf().setAppName("Test DLImageReader").setMaster("local[1]") - sc = SparkContext.getOrCreate(conf) - sQLContext = new SQLContext(sc) - - Random.setSeed(42) - RNG.setSeed(42) - - Engine.init - } - - after{ - if (sc != null) { - sc.stop() - } - } - - "DLImageReader" should "has correct result for pascal" in { - val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) - assert(imageDF.count() == 1) - val r = imageDF.head().getAs[Row](0) - assert(r.getString(0).endsWith("000025.jpg")) - assert(r.getInt(1) == 375) - assert(r.getInt(2) == 500) - assert(r.getInt(3) == 3) - assert(r.getInt(4) == CvType.CV_8UC3) - assert(r.getAs[Array[Byte]](5).length == 95959) - } - - "DLImageReader" should "has correct result for imageNet" in { - val imageDirectory = imageNetResource + "n02110063/" - val imageDF = DLImageReader.readImages(imageDirectory, sc) - assert(imageDF.count() == 3) - val expectedRows = Seq( - (imageDirectory + "n02110063_8651.JPEG", 99, 129, 3, CvType.CV_8UC3), - (imageDirectory + "n02110063_11239.JPEG", 333, 500, 3, CvType.CV_8UC3), - (imageDirectory + "n02110063_15462.JPEG", 332, 500, 3, CvType.CV_8UC3) - ) - val actualRows = imageDF.rdd.collect().map(r => r.getAs[Row](0)).map { r => - (r.getString(0), r.getInt(1), r.getInt(2), r.getInt(3), r.getInt(4)) - } - assert (expectedRows.toSet == actualRows.toSet) - } - - "DLImageReader" should "has correct result for imageNet with channel 1 and 4" in { - val imageDirectory = imageNetResource + "n99999999/" - val imageDF = DLImageReader.readImages(imageDirectory, sc) - assert(imageDF.count() == 3) - val expectedRows = Seq( - (imageDirectory + "n02105855_2933.JPEG", 189, 213, 4, CvType.CV_8UC4), - (imageDirectory + "n02105855_test1.bmp", 527, 556, 1, CvType.CV_8UC1), - (imageDirectory + "n03000134_4970.JPEG", 480, 640, 3, CvType.CV_8UC3) - ) - val actualRows = imageDF.rdd.collect().map(r => r.getAs[Row](0)).map { r => - (r.getString(0), r.getInt(1), r.getInt(2), r.getInt(3), r.getInt(4)) - } - assert (expectedRows.toSet == actualRows.toSet) - } - - "DLImageReader" should "read recursively by wildcard path" in { - val imageDF = DLImageReader.readImages(imageNetResource.getFile + "*", sc) - assert(imageDF.count() == 11) - } - - "DLImageReader" should "read from multiple path" in { - val imageDirectory1 = imageNetResource + "n02110063/" - val imageDirectory2 = imageNetResource + "n99999999/" - val imageDF = DLImageReader.readImages(imageDirectory1 + "," + imageDirectory2, sc) - assert(imageDF.count() == 6) - } - - "read gray scale image" should "work" in { - val resource = getClass().getClassLoader().getResource("gray/gray.bmp") - val df = DLImageReader.readImages(resource.getFile, sc) - assert(df.count() == 1) - val r = df.head().getAs[Row](0) - assert(r.getString(0).endsWith("gray.bmp")) - assert(r.getInt(1) == 50) - assert(r.getInt(2) == 50) - assert(r.getInt(3) == 1) - assert(r.getInt(4) == CvType.CV_8UC1) - } -} diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageTransformerSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageTransformerSpec.scala deleted file mode 100644 index 4c2fd5cdcfd..00000000000 --- a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageTransformerSpec.scala +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.transform.vision.image.{ImageFrame, ImageFrameToSample, MatToTensor} -import com.intel.analytics.bigdl.transform.vision.image.augmentation.{CenterCrop, ChannelNormalize, Resize} -import com.intel.analytics.bigdl.utils.Engine -import com.intel.analytics.bigdl.utils.RandomGenerator.RNG -import org.apache.spark.SparkContext -import org.apache.spark.sql.{Row, SQLContext} -import org.opencv.core.CvType -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat - -import scala.util.Random - -class DLImageTransformerSpec extends FlatSpec with Matchers with BeforeAndAfter { - private var sc : SparkContext = _ - private var sqlContext : SQLContext = _ - private val pascalResource = getClass.getClassLoader.getResource("pascal/") - private val imageNetResource = getClass.getClassLoader.getResource("imagenet/") - - before { - val conf = Engine.createSparkConf().setAppName("Test DLImageTransfomer").setMaster("local[1]") - sc = SparkContext.getOrCreate(conf) - sqlContext = new SQLContext(sc) - Random.setSeed(42) - RNG.setSeed(42) - Engine.init - } - - after{ - if (sc != null) { - sc.stop() - } - } - - "DLTransformer" should "setters work" in { - val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> - ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() - val trans = new DLImageTransformer(transformer) - .setInputCol("image1") - .setOutputCol("features1") - assert(trans.getInputCol == "image1") - assert(trans.getOutputCol == "features1") - } - - "DLTransformer" should "has correct result with pascal images" in { - val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) - assert(imageDF.count() == 1) - val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> - ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() - val transformedDF = new DLImageTransformer(transformer) - .setInputCol("image") - .setOutputCol("features") - .transform(imageDF) - val r = transformedDF.select("features").rdd.first().getAs[Row](0) - assert(r.getString(0).endsWith("pascal/000025.jpg")) - assert(r.getInt(1) == 224) - assert(r.getInt(2) == 224) - assert(r.getInt(3) == 3) - assert(r.getInt(4) == CvType.CV_32FC3) - assert(r.getSeq[Float](5).take(6).toArray.deep == Array(-30, -50, -69, -84, -46, -25).deep) - } - - "DLTransformer" should "has correct result without MatToTensor" in { - val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) - assert(imageDF.count() == 1) - val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> - ChannelNormalize(123, 117, 104, 1, 1, 1) - val transformedDF = new DLImageTransformer(transformer) - .setInputCol("image") - .setOutputCol("features") - .transform(imageDF) - val r = transformedDF.select("features").rdd.first().getAs[Row](0) - assert(r.getString(0).endsWith("pascal/000025.jpg")) - assert(r.getInt(1) == 224) - assert(r.getInt(2) == 224) - assert(r.getInt(3) == 3) - assert(r.getInt(4) == CvType.CV_32FC3) - assert(r.getSeq[Float](5).take(6).toArray.deep == Array(-30, -50, -69, -84, -46, -25).deep) - } - - "DLTransformer" should "ensure imf2Row and Row2Imf reversible" in { - val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) - assert(imageDF.count() == 1) - val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> - ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() - val transformedDF = new DLImageTransformer(transformer) - .setInputCol("image") - .setOutputCol("features") - .transform(imageDF) - val r = transformedDF.select("features").rdd.first().getAs[Row](0) - val convertedR = DLImageSchema.imf2Row(DLImageSchema.row2IMF(r)) - - assert(r.getSeq[Float](5).toArray.deep == convertedR.getAs[Array[Float]](5).deep) - } - - "DLTransformer" should "transform gray scale image" in { - val resource = getClass().getClassLoader().getResource("gray/gray.bmp") - val df = DLImageReader.readImages(resource.getFile, sc) - val dlTransformer = new DLImageTransformer(Resize(28, 28) -> MatToTensor[Float]()) - .setInputCol("image") - .setOutputCol("features") - val r = dlTransformer.transform(df).select("features").rdd.first().getAs[Row](0) - assert(r.getString(0).endsWith("gray.bmp")) - assert(r.getInt(1) == 28) - assert(r.getInt(2) == 28) - assert(r.getInt(3) == 1) - assert(r.getInt(4) == CvType.CV_32FC1) - } - - "DLTransformer" should "report error with same input and output columns" in { - val resource = getClass().getClassLoader().getResource("gray/gray.bmp") - val df = DLImageReader.readImages(resource.getFile, sc) - val dlTransformer = new DLImageTransformer(Resize(28, 28) -> MatToTensor[Float]()) - .setInputCol("image") - .setOutputCol("image") - intercept[IllegalArgumentException] { - val transformed = dlTransformer.transform(df) - } - } - -}