Skip to content

Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray

Notifications You must be signed in to change notification settings

liangs6212/analytics-zoo

 
 

Repository files navigation


Building Large-Scale AI Applications for Distributed Big Data


BigDL makes it easy for data scientists and data engineers to build end-to-end, distributed AI applications. The BigDL 2.0 release combines the original BigDL and Analytics Zoo projects, providing the following features:

  • DLlib: distributed deep learning library for Apache Spark (i.e., the original BigDL framework with Keras-style API and Spark ML pipeline support)

  • Orca: seamlessly scale out TensorFlow and PyTorch pipelines for distributed Big Data

  • RayOnSpark: run Ray programs directly on Big Data clusters

  • Chronos: scalable time series analysis using AutoML

  • PPML: privacy preserving big data analysis and machine learning (experimental)

For more information, you may read the docs.


Installing

You can use BigDL on Google Colab without any installation. BigDL also includes a set of notebooks that you can directly open and run in Colab.

To install BigDL, we recommend using conda environments.

conda create -n my_env 
conda activate my_env
pip install bigdl 

To install latest nightly build, use pip install --pre --upgrade bigdl; see Python and Scala user guide for more details.

Getting Started with DLlib

DLlib is a distributed deep learning library for Apache Spark; with DLlib, users can write distributed deep learning applications as standard Spark programs (using either Scala or Python APIs).

First, call initNNContext at the beginning of the code:

import com.intel.analytics.zoo.common.NNContext
val sc = NNContext.initNNContext()

Then, define the BigDL model using Keras-style API:

val input = Input[Float](inputShape = Shape(10))  
val dense = Dense[Float](12).inputs(input)  
val output = Activation[Float]("softmax").inputs(dense)  
val model = Model(input, output)

After that, use NNEstimator to train/predict/evaluate the model using Spark Dataframes and ML pipelines:

val trainingDF = spark.read.parquet("train_data")
val validationDF = spark.read.parquet("val_data")
val scaler = new MinMaxScaler().setInputCol("in").setOutputCol("value")
val estimator = NNEstimator(model, CrossEntropyCriterion())  
        .setBatchSize(size).setOptimMethod(new Adam()).setMaxEpoch(epoch)
val pipeline = new Pipeline().setStages(Array(scaler, estimator))

val pipelineModel = pipeline.fit(trainingDF)  
val predictions = pipelineModel.transform(validationDF)

See the NNframes and Keras API user guides for more details.

Getting Started with Orca

Most AI projects start with a Python notebook running on a single laptop; however, one usually needs to go through a mountain of pains to scale it to handle larger data set in a distributed fashion. The Orca library seamlessly scales out your single node TensorFlow or PyTorch notebook across large clusters (so as to process distributed Big Data).

First, initialize Orca Context:

from bigdl.orca import init_orca_context

# cluster_mode can be "local", "k8s" or "yarn"
sc = init_orca_context(cluster_mode="yarn", cores=4, memory="10g", num_nodes=2) 

Next, perform data-parallel processing in Orca (supporting standard Spark Dataframes, TensorFlow Dataset, PyTorch DataLoader, Pandas, Pillow, etc.):

from pyspark.sql.functions import array

df = spark.read.parquet(file_path)
df = df.withColumn('user', array('user')) \  
       .withColumn('item', array('item'))

Finally, use sklearn-style Estimator APIs in Orca to perform distributed TensorFlow, PyTorch or Keras training and inference:

from tensorflow import keras
from bigdl.orca.learn.tf.estimator import Estimator

user = keras.layers.Input(shape=[1])  
item = keras.layers.Input(shape=[1])  
feat = keras.layers.concatenate([user, item], axis=1)  
predictions = keras.layers.Dense(2, activation='softmax')(feat)  
model = keras.models.Model(inputs=[user, item], outputs=predictions)  
model.compile(optimizer='rmsprop',  
              loss='sparse_categorical_crossentropy',  
              metrics=['accuracy'])

est = Estimator.from_keras(keras_model=model)  
est.fit(data=df,  
        batch_size=64,  
        epochs=4,  
        feature_cols=['user', 'item'],  
        label_cols=['label'])

See TensorFlow and PyTorch quickstart, as well as the document website, for more details.

Getting Started with RayOnSpark

Ray is an open source distributed framework for emerging AI applications. RayOnSpark allows users to directly run Ray programs on existing Big Data clusters, and directly write Ray code inline with their Spark code (so as to process the in-memory Spark RDDs or DataFrames).

from bigdl.orca import init_orca_context

# cluster_mode can be "local", "k8s" or "yarn"
sc = init_orca_context(cluster_mode="yarn", cores=4, memory="10g", num_nodes=2, init_ray_on_spark=True) 

import ray

@ray.remote
class Counter(object):
      def __init__(self):
          self.n = 0

      def increment(self):
          self.n += 1
          return self.n

counters = [Counter.remote() for i in range(5)]
print(ray.get([c.increment.remote() for c in counters]))

See the RayOnSpark user guide and quickstart for more details.

Getting Started with Chronos

Time series prediction takes observations from previous time steps as input and predicts the values at future time steps. The Chronos library makes it easy to build end-to-end time series analysis by applying AutoML to extremely large-scale time series prediction.

To train a time series model with AutoML, first initialize Orca Context:

from bigdl.orca import init_orca_context

#cluster_mode can be "local", "k8s" or "yarn"
sc = init_orca_context(cluster_mode="yarn", cores=4, memory="10g", num_nodes=2, init_ray_on_spark=True)

Next, create an AutoTSTrainer.

from bigdl.chronos.autots.deprecated.forecast import AutoTSTrainer

trainer = AutoTSTrainer(dt_col="datetime", target_col="value")

Finally, call fit on AutoTSTrainer, which applies AutoML to find the best model and hyper-parameters; it returns a TSPipeline which can be used for prediction or evaluation.

#train a pipeline with AutoML support
ts_pipeline = trainer.fit(train_df, validation_df)

#predict
ts_pipeline.predict(test_df)

See the Chronos user guide and example for more details.

PPML (Privacy Preserving Machine Learning)

BigDL PPML provides a Trusted Cluster Environment for protecting the end-to-end Big Data AI pipeline. It combines various low level hardware and software security technologies (e.g., Intel SGX, LibOS such as Graphene and Occlum, Federated Learning, etc.), and allows users to run unmodified Big Data analysis and ML/DL programs (such as Apache Spark, Apache Flink, Tensorflow, PyTorch, etc.) in a secure fashion on (private or public) cloud.

See the PPML user guide for more details.

More information

Citing BigDL

If you've found BigDL useful for your project, you may cite the paper as follows:

@inproceedings{SOCC2019_BIGDL,
  title={BigDL: A Distributed Deep Learning Framework for Big Data},
  author={Dai, Jason (Jinquan) and Wang, Yiheng and Qiu, Xin and Ding, Ding and Zhang, Yao and Wang, Yanzhang and Jia, Xianyan and Zhang, Li (Cherry) and Wan, Yan and Li, Zhichao and Wang, Jiao and Huang, Shengsheng and Wu, Zhongyuan and Wang, Yang and Yang, Yuhao and She, Bowen and Shi, Dongjie and Lu, Qi and Huang, Kai and Song, Guoqiong},
  booktitle={Proceedings of the ACM Symposium on Cloud Computing},
  publisher={Association for Computing Machinery},
  pages={50--60},
  year={2019},
  series={SoCC'19},
  doi={10.1145/3357223.3362707},
  url={https://arxiv.org/pdf/1804.05839.pdf}
}

About

Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Jupyter Notebook 69.6%
  • Scala 20.0%
  • Python 8.0%
  • Java 1.9%
  • Shell 0.4%
  • Dockerfile 0.1%