Skip to content

Latest commit

 

History

History
62 lines (49 loc) · 2.58 KB

ftrl_lr_spark.md

File metadata and controls

62 lines (49 loc) · 2.58 KB

Training Logistic Regression with FTRL on Spark on Angel

FTRL (Follow-the-regularized-leader) is an optimization algorithm which is widely deployed by online learning. Employing FTRL is easy in Spark-on-Angel and you can train a model with billions, even ten billions, dimensions once you have enough machines.

Is you are not familiar with how to programming on Spark-on-Angel, please first refer to Programming Guide for Spark-on-Angel;

Using the FTRL Optimizer

import com.tencent.angel.ml.matrix.RowType
import com.tencent.angel.spark.ml.online_learning.FTRL

// allocate a ftrl optimizer with (lambda1, lambda2, alpha, beta)
val optim = new FTRL(lambda1, lambda2, alpha, beta)
// initializing the model
optim.init(dim)

There are four hyper-parameters for the FTRL optimizer, which are lambda1, lambda2, alpha and beta. We allocate a FTRL optimizer with these four hyper-parameters. The next step is to initialized a FTRL model. There are three vectors for FTRL, including z, n and w. In the aboving code, we allocate a sparse distributed matrix with 3 rows and dim columns.

set the dimension

In the scenaro of online learning, the index of features can be range from (long.min, long.max), which is usually generated by a hash function. In Spark-on-Angel, you can set the dim=-1 when your feature index range from (long.min, long.max). If the feature index range from [0, n), you can set the dim=n.

Training with Spark

loading data

Using the interface of RDD to load data and parse them to vectors.

val data = sc.textFile(input).repartition(partNum)
      .map(s => (DataLoader.parseLongDouble(s, dim), DataLoader.parseLabel(s, false)))
      .map {
        f =>
          f._1.setY(f._2)
          f._1
      }.map(point => DataLoader.appendBias(point))

training model

val size = data.count()
for (epoch <- 1 to numEpoch) {
    val totalLoss = data.mapPartitions {
        case iterator =>
        // for each partition
          val loss = iterator.map(f => (f.getX, f.getY))
            .sliding(batchSize, batchSize)
            .map(f => optim.optimize(f.toArray, calcGradientLoss)).sum
          Iterator.single(loss)
    }.sum()
    println(s"epoch=$epoch loss=${totalLoss / size}")
}

saving model

output = "hdfs://xxx"
optim.weight
optim.saveWeight(output)

The example code can be find at https://github.com/Angel-ML/angel/blob/master/spark-on-angel/examples/src/main/scala/com/tencent/angel/spark/examples/cluster/FTRLExample.scala