Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Model] Distributed version of the model giving Arrow Capacity error #295

Closed
iamyihwa opened this issue Jan 9, 2024 · 6 comments
Closed

Comments

@iamyihwa
Copy link

iamyihwa commented Jan 9, 2024

What happened + What you expected to happen

  1. I am training a large dataset with the spark distributed version of the model, and am getting an pyarrow error that says 'PythonException: An exception was thrown from a UDF: 'pyarrow.lib.ArrowCapacityError: array cannot contain more than 2147483646 bytes, have 2745777856'.'

I wonder if it is due to the target transformation routine, that is using pandas, or distributed mlforecast uses pyarrow underneath, and there is a limitation in the size.

  1. Additionally it is saying that it is training with only 1 worker, when in theory there are 8 workers available.
2024-01-09 07:27:30,244 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
  1. Here is the error message.
---------------------------------------------------------------------------
PythonException                           Traceback (most recent call last)
<command-820427589813878> in <module>
     11  #   date_features=[week_of_month],
     12 )
---> 13 fcst.fit(
     14     spark_series,
     15   #  static_features=['embedding_x', 'embedding_y'],

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a70a3c46-5e29-436c-8d76-bed107dbdff0/lib/python3.8/site-packages/mlforecast/distributed/forecast.py in fit(self, df, id_col, time_col, target_col, static_features, dropna, keep_last_n)
    412             Forecast object with series values and trained models.
    413         """
--> 414         return self._fit(
    415             df,
    416             id_col=id_col,

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a70a3c46-5e29-436c-8d76-bed107dbdff0/lib/python3.8/site-packages/mlforecast/distributed/forecast.py in _fit(self, data, id_col, time_col, target_col, static_features, dropna, keep_last_n, window_info)
    357             train_data = featurizer.transform(prep)[target_col, "features"]
    358             for name, model in self.models.items():
--> 359                 trained_model = model._pre_fit(target_col).fit(train_data)
    360                 self.models_[name] = model.extract_local_model(trained_model)
    361         elif DASK_INSTALLED and isinstance(data, dd.DataFrame):

/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py in patched_method(self, *args, **kwargs)
     28             call_succeeded = False
     29             try:
---> 30                 result = original_method(self, *args, **kwargs)
     31                 call_succeeded = True
     32                 return result

/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    159                 return self.copy(params)._fit(dataset)
    160             else:
--> 161                 return self._fit(dataset)
    162         else:
    163             raise TypeError("Params must be either a param map or a list/tuple of param maps, "

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a70a3c46-5e29-436c-8d76-bed107dbdff0/lib/python3.8/site-packages/xgboost/spark/core.py in _fit(self, dataset)
   1134             dmatrix_kwargs,
   1135         )
-> 1136         (config, booster) = _run_job()
   1137         get_logger("XGBoost-PySpark").info("Finished xgboost training!")
   1138 

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a70a3c46-5e29-436c-8d76-bed107dbdff0/lib/python3.8/site-packages/xgboost/spark/core.py in _run_job()
   1112         def _run_job() -> Tuple[str, str]:
   1113             rdd = (
-> 1114                 dataset.mapInPandas(
   1115                     _train_booster,  # type: ignore
   1116                     schema="config string, booster string",

/databricks/spark/python/pyspark/sql/dataframe.py in rdd(self)
     88         """
     89         if self._lazy_rdd is None:
---> 90             jrdd = self._jdf.javaToPython()
     91             self._lazy_rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
     92         return self._lazy_rdd

/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise

PythonException: An exception was thrown from a UDF: 'pyarrow.lib.ArrowCapacityError: array cannot contain more than 2147483646 bytes, have 2745777856'. Full traceback below:
Traceback (most recent call last):
  File "pyarrow/array.pxi", line 1044, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 316, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 125, in pyarrow.lib.check_status
pyarrow.lib.ArrowCapacityError: array cannot contain more than 2147483646 bytes, have 2745777856

Thank you in advance!!!

Versions / Dependencies

0.11.5

Reproduction script

spark = (
    SparkSession
    .builder    
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:1.0.1")    #  "com.microsoft.azure:synapseml_2.12:0.10.2") com.microsoft.azure:synapseml_2.12:0.11.3
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
    .getOrCreate()
)


numPartitions = 8
spark_series = result_df.repartitionByRange(numPartitions, 'unique_id')

from mlforecast.distributed.models.spark.xgb import SparkXGBForecast
from mlforecast.distributed import DistributedMLForecast
from window_ops.expanding import expanding_mean
import pandas as pd
from window_ops.rolling import  rolling_mean
from mlforecast.target_transforms import BaseTargetTransform
from mlforecast.target_transforms import Differences


class Mean_Scaler(BaseTargetTransform):
    """Scales each serie to be normalized by its mean + 1."""
    ## Target transform DeepAR Approach https://arxiv.org/pdf/1704.04110.pdf , target transform  https://nixtla.github.io/mlforecast/docs/how-to-guides/target_transforms_guide.html
    def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        self.stats_ = df.groupby(self.id_col)[self.target_col].agg(['mean'])
        df = df.merge(self.stats_, on=self.id_col)
        df[self.target_col] = (df[self.target_col]) / (df['mean'] + 1)
        df = df.drop(columns=['mean'])
        return df

    def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.merge(self.stats_, on=self.id_col)
        for col in df.columns.drop([self.id_col, self.time_col, 'mean']):
            df[col] = df[col] * (df['mean'] +1) 
        df = df.drop(columns=['mean'])
        return df



models = [ SparkXGBForecast() ] # SparkXGBRegressor()]  # ,
fcst = DistributedMLForecast(
    models,
    freq='W-SAT',
    lags=[1, 52],
    lag_transforms={
        1: [expanding_mean], 
        4: [(rolling_mean, 4 )], 
    } ,
    target_transforms = [Differences([1]), Mean_Scaler()], 

)
fcst.fit(
    spark_series,
 )

Issue Severity

None

@iamyihwa iamyihwa added the bug label Jan 9, 2024
@iamyihwa
Copy link
Author

iamyihwa commented Jan 9, 2024

With regards to number of workers, by setting num_workers parameter to the number of workers I had worked! (xgboost documentation).
models = [ SparkXGBForecast(num_workers = 8) ]

@jmoralez
Copy link
Member

jmoralez commented Jan 9, 2024

Hey @iamyihwa, thanks for the great report. Do you have very long ids? This answer suggests that it may be an issue with large strings. This seems to be coming from the fit step, so I don't think it's related to your transformation. Increasing the number of partitions may help.

@iamyihwa
Copy link
Author

iamyihwa commented Jan 9, 2024

Thanks @jmoralez ! It worked!!

As you suggested increased the number of partitions , and i didn't have that error anymore!

Now the training is done seamlessly and very fast, however when getting the forecasted results it takes very long time. (Training took 10 minutes with 8 workers, however when i am getting a glimpse of forecasted results (.take()) it is taking more than 10 minutes, and still counting .. )

horizon = 52 
preds_sf = fcst.predict(h = horizon)
preds_sf.take(5)

What would be the best way to train very large dataset in mlforecast/ neuralforecast @jmoralez ??
Sorry it is a bit different topic from the title, but would love to hear your input on this!

@jmoralez
Copy link
Member

The lag transformations can take a long time if you have very long series. Can you try using the built-in ones? They should be significantly faster and also support multithreading, so try also setting num_threads to the number of cores in your machine in the forecast constructor.

Also I don't think spark is able to know it can take the first five rows from a single partition, so you can try saving the result first and then getting a subset, otherwise the whole computation will run only to return 5 rows.

@jmoralez
Copy link
Member

#301 should also make the predict step faster for distributed.

Copy link
Contributor

This issue has been automatically closed because it has been awaiting a response for too long. When you have time to to work with the maintainers to resolve this issue, please post a new comment and it will be re-opened. If the issue has been locked for editing by the time you return to it, please open a new issue and reference this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants