Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue while backtesting using custom data with Zipline #2858

Open
vikaslamba99 opened this issue Oct 6, 2024 · 0 comments
Open

Issue while backtesting using custom data with Zipline #2858

vikaslamba99 opened this issue Oct 6, 2024 · 0 comments

Comments

@vikaslamba99
Copy link

Dear Zipline Maintainers,

Before I tell you about my issue, let me describe my environment:

Environment

Operating System: macOS High Sierra
Python Version: python 3.9
How did you install Zipline: conda

Now that you know a little about me, let me tell you about the issue I am
having:

Description of Issue

While following 'Machine Learning 4 Trading' by Stefan Jansen I am trying to perform backtesting using Zipline. I have successfully ingested custom data.
In the pipeline, I assign the custom data - 'returns' and factor data - 'DEMA' (Double Exponential Moving Average) to the pipeline columns and then run LinearRegression.
But, when the pipeline is run, I see that the pipeline columns have all 'nan' values but the original values for 'returns' as well as 'DEMA' are correctly populated and their dates also align.

It seems because of the 'nan' values - I get the error - "ValueError: Found array with 0 sample(s) (shape=(0, 1)) while a minimum of 1 is required by StandardScaler.".

Here is how you can reproduce this issue on your machine:

Reproduction Steps

Here is the snippet of the code I am using:

START = pd.Timestamp('2017-01-01').normalize()
END = pd.Timestamp('2020-12-31').normalize()
EARLIEST_START = pd.Timestamp('2017-03-03').normalize()

common_dates = returns.index.get_level_values('Date').intersection(dema21.index.get_level_values('Date'))
returns_aligned = returns[returns.index.get_level_values('Date').isin(common_dates)]
dema21_aligned = dema21[dema21.index.get_level_values('Date').isin(common_dates)]

print('..............', returns_aligned)
print('..............', dema21_aligned)

Here is the output of the above print statements:

.............. returns
Date sid
2017-03-03 0 0.011678
1 0.014784
8 0.024059
14 -0.012773
15 0.005901
... ...
2020-12-31 6715 0.001511
6718 -0.000102
6733 0.005090
6753 0.005517
6755 0.007488

[867468 rows x 1 columns]
.............. dema21
Date sid
2017-03-03 0 49.301678
1 35.406822
8 45.047879
14 142.457072
15 32.793468
... ...
2020-12-31 6715 387.504155
6718 87.496984
6733 43.274391
6753 27.739478
6755 161.174313

[867468 rows x 1 columns]

I have defined MyDataSet as:

class MyDataSet(DataSet):
returns = Column(dtype=float)
dema_21 = Column(dtype=float)
domain= US_EQUITIES

column_returns_frame = pd.DataFrame(
data=returns_aligned,
)

loaders = {
MyDataSet.returns: DataFrameLoader(MyDataSet.returns, returns_aligned),
MyDataSet.dema_21: DataFrameLoader(MyDataSet.dema_21, dema21_aligned),
}

def load_creator(column):
return loaders[column]

returns_loader = load_creator(MyDataSet.returns)
dema21_loader = load_creator(MyDataSet.dema_21)

class LinearModel(CustomFactor):
"""Obtain model predictions"""
train_on_weekday = [0, 2, 4]
print('\n\n Boston Boston:, \n\n\n Here are the assets in LinearModel at top, class decl. : \n', len(assets))
print("Returns data before compute method call:", np.isnan(returns).sum())

def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self._scaler = StandardScaler()
    self._model = SGDRegressor(penalty='l2')
    self._trained = False

def set_returns_factor(self, returns_factor):
    """Method to set the returns factor after initialization, in ser_returns."""
    self.returns_factor = returns_factor.flatten()

def _train_model(self, today, returns, inputs):
    scaler = self._scaler
    model = self._model
    
    outcome = returns.flatten()
    features = np.dstack(inputs)
    print("Returns data before loading, in _train_model:", np.isnan(returns).sum())  # Check NaN values
    print(f"============= Step 1 - Inputs before reshaping - flattening and shifting: {np.dstack(inputs).shape}\n", np.dstack(outcome).flatten())
    n_days, n_stocks, n_features = features.shape
    print(f"============= Step 1 - outcome before reshaping, in _train_model: {n_days}, {n_stocks}, {n_features}", )
    features = features.reshape(-1, n_features)       
    features = features[~np.isnan(outcome)]
    outcome = outcome[~np.isnan(outcome)]
    outcome = outcome[np.all(~np.isnan(features), axis=1)]
    features = features[np.all(~np.isnan(features), axis=1)]
    
    features = scaler.fit_transform(features)
    start = time()
    model.fit(X=features, y=outcome)
    self._trained = True

def _maybe_train_model(self, today, returns, inputs):
    if (today.weekday() in self.train_on_weekday) or not self._trained:
        self._train_model(today, returns, inputs)

def compute(self, today, assets, out, returns, *inputs):
    print("Returns data before loading............. :", np.isnan(returns).sum())  # Check NaN values
    for i, input_data in enumerate(inputs):
        print(f"+++++++++++++++++++++ Input {i} for {today}: {input_data}")
    self._maybe_train_model(today, returns, inputs)

    X = np.dstack(inputs)[-1]
    missing = np.any(np.isnan(X), axis=1)
    X[missing, :] = 0
    X = self._scaler.transform(X)
    preds = self._model.predict(X)
    out[:] = np.where(missing, np.nan, preds)

The output of the print statement (print("Returns data before loading............. :", np.isnan(returns).sum()) ), in the 'compute' method above in the LinearModel class is:
"Returns data before loading............. : 898"
which confirms that the return values for all 898 assets are coming as nan.

Likewise, for the below lines of code:
for i, input_data in enumerate(inputs):
print(f"+++++++++++++++++++++ Input {i} for {today}: {input_data}")

the output is:
+++++++++++++++++++++ Input 0 for 2017-03-03 00:00:00: [[nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
nan nan nan nan nan nan nan nan ...................................................................
nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan]]

The code for make_pipeline function is as below:

def make_pipeline(window_length=21, n_forward_days=5):
asset_filter = StaticAssets(assets)
pipeline_columns = OrderedDict()

print('+++++++++++++++', type(MyDataSet.returns))
print('+++++++++++++++', type(MyDataSet.dema_21))
pipeline_columns['Returns'] = MyDataSet.returns.latest
pipeline_columns['DEMA21'] = MyDataSet.dema_21.latest

pipeline_columns['predictions'] = LinearModel(inputs=pipeline_columns.values(), window_length=1, mask=asset_filter)
return Pipeline(columns=pipeline_columns, screen=asset_filter)

Both MyDataSet.returns and MyDataSet.dema_21 are confirmed to be of type <class 'zipline.pipeline.data.dataset.BoundColumn'> with the output of the print debug statement in this function above.

I have the initialize function as below:

def initialize(context):
context.initial_data_check = False
context.pipeline_created = False
context.actual_start = EARLIEST_START # The date when actual trading logic should start
context.assets = assets

set_slippage(slippage.FixedBasisPointsSlippage(basis_points=0, volume_limit=0.1))
set_commission(commission.PerShare(cost=0.00165, min_trade_cost=0))

schedule_function(
    rebalance, 
    date_rule=date_rules.month_start(),
    time_rule=time_rules.market_open(hours=1)
)

ml_pipeline = make_pipeline(n_forward_days=N_FORWARD_DAYS,
                               window_length=TRAINING_PERIOD)

attach_pipeline(ml_pipeline, 'ml_model')
print("Pipeline 'ml_model' attached successfully.")  # Debug message

Finally, the algorithm is run as per the below code:

results = run_algorithm(
custom_loader = loaders,
start=EARLIEST_START,
end=END,
initialize=initialize,
before_trading_start=before_trading_start,
analyze=analyze,
capital_base=CAPITAL_BASE,
data_frequency='daily',
bundle='us_stocks',
trading_calendar=TRADING_CALENDAR,
)

The full stack trace for the error is:

KeyError Traceback (most recent call last)
File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/algorithm.py:2254, in TradingAlgorithm._pipeline_output(self, pipeline, chunks, name)
2253 try:
-> 2254 data = self._pipeline_cache.get(name, today)
2255 except KeyError:
2256 # Calculate the next block.

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/utils/cache.py:147, in ExpiringCache.get(self, key, dt)
146 try:
--> 147 return self._cache[key].unwrap(dt)
148 except Expired as exc:

KeyError: 'ml_model'

During handling of the above exception, another exception occurred:

ValueError Traceback (most recent call last)
Cell In[47], line 2
1 # Run the algorithm
----> 2 results = run_algorithm(
3 custom_loader = loaders,
4 start=EARLIEST_START,
5 end=END,
6 initialize=initialize,
7 before_trading_start=before_trading_start,
8 analyze=analyze,
9 capital_base=CAPITAL_BASE,
10 data_frequency='daily',
11 bundle='us_stocks',
12 trading_calendar=TRADING_CALENDAR,
13 )

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/utils/run_algo.py:397, in run_algorithm(start, end, initialize, capital_base, handle_data, before_trading_start, analyze, data_frequency, bundle, bundle_timestamp, trading_calendar, metrics_set, benchmark_returns, default_extension, extensions, strict_extensions, environ, custom_loader, blotter)
393 load_extensions(default_extension, extensions, strict_extensions, environ)
395 benchmark_spec = BenchmarkSpec.from_returns(benchmark_returns)
--> 397 return _run(
398 handle_data=handle_data,
399 initialize=initialize,
400 before_trading_start=before_trading_start,
401 analyze=analyze,
402 algofile=None,
403 algotext=None,
404 defines=(),
405 data_frequency=data_frequency,
406 capital_base=capital_base,
407 bundle=bundle,
408 bundle_timestamp=bundle_timestamp,
409 start=start,
410 end=end,
411 output=os.devnull,
412 trading_calendar=trading_calendar,
413 print_algo=False,
414 metrics_set=metrics_set,
415 local_namespace=False,
416 environ=environ,
417 blotter=blotter,
418 custom_loader=custom_loader,
419 benchmark_spec=benchmark_spec,
420 )

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/utils/run_algo.py:200, in _run(handle_data, initialize, before_trading_start, analyze, algofile, algotext, defines, data_frequency, capital_base, bundle, bundle_timestamp, start, end, output, trading_calendar, print_algo, metrics_set, local_namespace, environ, blotter, custom_loader, benchmark_spec)
197 raise _RunAlgoError(str(e))
199 try:
--> 200 perf = TradingAlgorithm(
201 namespace=namespace,
202 data_portal=data,
203 get_pipeline_loader=choose_loader,
204 trading_calendar=trading_calendar,
205 sim_params=SimulationParameters(
206 start_session=start,
207 end_session=end,
208 trading_calendar=trading_calendar,
209 capital_base=capital_base,
210 data_frequency=data_frequency,
211 ),
212 metrics_set=metrics_set,
213 blotter=blotter,
214 benchmark_returns=benchmark_returns,
215 benchmark_sid=benchmark_sid,
216 **{
217 "initialize": initialize,
218 "handle_data": handle_data,
219 "before_trading_start": before_trading_start,
220 "analyze": analyze,
221 }
222 if algotext is None
223 else {
224 "algo_filename": getattr(algofile, "name", ""),
225 "script": algotext,
226 },
227 ).run()
228 except NoBenchmark:
229 raise _RunAlgoError(
230 (
231 "No benchmark_spec was provided, and"
(...)
239 ),
240 )

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/algorithm.py:625, in TradingAlgorithm.run(self, data_portal)
623 try:
624 perfs = []
--> 625 for perf in self.get_generator():
626 perfs.append(perf)
628 # convert perf dict to pandas dataframe

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/gens/tradesimulation.py:243, in AlgorithmSimulator.transform(self)
241 self.simulation_dt = dt
242 algo.on_dt_changed(dt)
--> 243 algo.before_trading_start(self.current_data)
244 elif action == MINUTE_END:
245 minute_msg = self._get_minute_message(
246 dt,
247 algo,
248 metrics_tracker,
249 )

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/algorithm.py:427, in TradingAlgorithm.before_trading_start(self, data)
426 def before_trading_start(self, data):
--> 427 self.compute_eager_pipelines()
429 if self._before_trading_start is None:
430 return

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/algorithm.py:594, in TradingAlgorithm.compute_eager_pipelines(self)
592 for name, pipe in self._pipelines.items():
593 if pipe.eager:
--> 594 self.pipeline_output(name)

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/utils/api_support.py:107, in require_initialized..decorator..wrapped_method(self, *args, **kwargs)
105 if not self.initialized:
106 raise exception
--> 107 return method(self, *args, **kwargs)

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/algorithm.py:2247, in TradingAlgorithm.pipeline_output(self, name)
2242 except KeyError as exc:
2243 raise NoSuchPipeline(
2244 name=name,
2245 valid=list(self._pipelines.keys()),
2246 ) from exc
-> 2247 return self._pipeline_output(pipe, chunks, name)

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/algorithm.py:2257, in TradingAlgorithm._pipeline_output(self, pipeline, chunks, name)
2254 data = self._pipeline_cache.get(name, today)
2255 except KeyError:
2256 # Calculate the next block.
-> 2257 data, valid_until = self.run_pipeline(
2258 pipeline,
2259 today,
2260 next(chunks),
2261 )
2262 self._pipeline_cache.set(name, data, valid_until)
2264 # Now that we have a cached result, try to return the data for today.

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/algorithm.py:2303, in TradingAlgorithm.run_pipeline(self, pipeline, start_session, chunksize)
2298 end_loc = min(start_date_loc + chunksize, sessions.get_loc(sim_end_session))
2300 end_session = sessions[end_loc]
2302 return (
-> 2303 self.engine.run_pipeline(pipeline, start_session, end_session),
2304 end_session,
2305 )

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/pipeline/engine.py:363, in SimplePipelineEngine.run_pipeline(self, pipeline, start_date, end_date, hooks)
361 hooks = self._resolve_hooks(hooks)
362 with hooks.running_pipeline(pipeline, start_date, end_date):
--> 363 return self._run_pipeline_impl(
364 pipeline,
365 start_date,
366 end_date,
367 hooks,
368 )

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/pipeline/engine.py:412, in SimplePipelineEngine._run_pipeline_impl(self, pipeline, start_date, end_date, hooks)
409 execution_order = plan.execution_order(workspace, refcounts)
411 with hooks.computing_chunk(execution_order, start_date, end_date):
--> 412 results = self.compute_chunk(
413 graph=plan,
414 dates=dates,
415 sids=sids,
416 workspace=workspace,
417 refcounts=refcounts,
418 execution_order=execution_order,
419 hooks=hooks,
420 )
422 return self._to_narrow(
423 plan.outputs,
424 results,
(...)
427 sids,
428 )

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/pipeline/engine.py:680, in SimplePipelineEngine.compute_chunk(self, graph, dates, sids, workspace, refcounts, execution_order, hooks)
678 else:
679 with hooks.computing_term(term):
--> 680 workspace[term] = term._compute(
681 self._inputs_for_term(
682 term,
683 workspace,
684 graph,
685 domain,
686 refcounts,
687 ),
688 mask_dates,
689 sids,
690 mask,
691 )
692 if term.ndim == 2:
693 assert workspace[term].shape == mask.shape

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/zipline/pipeline/mixins.py:225, in CustomTermMixin._compute(self, windows, dates, assets, mask)
222 out_row = out[idx][out_mask]
223 inputs = format_inputs(windows, inputs_mask)
--> 225 compute(date, masked_assets, out_row, *inputs, **params)
226 out[idx][out_mask] = out_row
227 return out

Cell In[45], line 81, in LinearModel.compute(self, today, assets, out, returns, *inputs)
79 for i, input_data in enumerate(inputs):
80 print(f"+++++++++++++++++++++ Input {i} for {today}: {input_data}")
---> 81 self._maybe_train_model(today, returns, inputs)
83 # Predict most recent feature values
84 X = np.dstack(inputs)[-1]

Cell In[45], line 71, in LinearModel._maybe_train_model(self, today, returns, inputs)
67 def _maybe_train_model(self, today, returns, inputs):
68 # print('============= Inside LinearModel - inside maybe train model.')
69 # Train the model if required
70 if (today.weekday() in self.train_on_weekday) or not self._trained:
---> 71 self._train_model(today, returns, inputs)

Cell In[45], line 57, in LinearModel._train_model(self, today, returns, inputs)
54 print(f"---------------Final outcome pre: {outcome[:5]}")
55 # print(f" 222 Training model on {today}. Features shape after filtering: {features.shape}")
---> 57 features = scaler.fit_transform(features)
58 print(f"---------------Final features: {features[:5]}")
59 print(f"---------------Final outcome: {outcome[:5]}")

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/sklearn/base.py:867, in TransformerMixin.fit_transform(self, X, y, **fit_params)
863 # non-optimized default implementation; override when a better
864 # method is possible for a given clustering algorithm
865 if y is None:
866 # fit method of arity 1 (unsupervised transformation)
--> 867 return self.fit(X, **fit_params).transform(X)
868 else:
869 # fit method of arity 2 (supervised transformation)
870 return self.fit(X, y, **fit_params).transform(X)

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/sklearn/preprocessing/_data.py:809, in StandardScaler.fit(self, X, y, sample_weight)
807 # Reset internal state before fitting
808 self._reset()
--> 809 return self.partial_fit(X, y, sample_weight)

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/sklearn/preprocessing/data.py:844, in StandardScaler.partial_fit(self, X, y, sample_weight)
812 """Online computation of mean and std on X for later scaling.
813
814 All of X is processed as a single batch. This is intended for cases
(...)
841 Fitted scaler.
842 """
843 first_call = not hasattr(self, "n_samples_seen
")
--> 844 X = self._validate_data(
845 X,
846 accept_sparse=("csr", "csc"),
847 dtype=FLOAT_DTYPES,
848 force_all_finite="allow-nan",
849 reset=first_call,
850 )
851 n_features = X.shape[1]
853 if sample_weight is not None:

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/sklearn/base.py:577, in BaseEstimator._validate_data(self, X, y, reset, validate_separately, **check_params)
575 raise ValueError("Validation should be done on X, y or both.")
576 elif not no_val_X and no_val_y:
--> 577 X = check_array(X, input_name="X", **check_params)
578 out = X
579 elif no_val_X and not no_val_y:

File ~/anaconda3/envs/testnew/lib/python3.9/site-packages/sklearn/utils/validation.py:909, in check_array(array, accept_sparse, accept_large_sparse, dtype, order, copy, force_all_finite, ensure_2d, allow_nd, ensure_min_samples, ensure_min_features, estimator, input_name)
907 n_samples = _num_samples(array)
908 if n_samples < ensure_min_samples:
--> 909 raise ValueError(
910 "Found array with %d sample(s) (shape=%s) while a"
911 " minimum of %d is required%s."
912 % (n_samples, array.shape, ensure_min_samples, context)
913 )
915 if ensure_min_features > 0 and array.ndim == 2:
916 n_features = array.shape[1]

ValueError: Found array with 0 sample(s) (shape=(0, 1)) while a minimum of 1 is required by StandardScaler.

What steps have you taken to resolve this already?

I have tried changing the approach in the code.
I have tried debugging.
I have tried the approach mentioned here - #2754
I have also tried the suggestions here - #911

Can anyone point out what am I missing here and provide a short example ? Any help would be greatly appreciated.

Sincerely,
$ viklam

@vikaslamba99 vikaslamba99 changed the title Issue while backtesting using custom data with Zipline as suggested in Machine Learning for Trading Issue while backtesting using custom data with Zipline Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant