PyData Amsterdam 2024
Scan
in NumPyro😅 …
for
loopsdef scan(f, init, xs):
"""Pure Python implementation of scan.
Parameters
----------
f : A a Python function to be scanned.
init : An initial loop carry value
xs : The value over which to scan along the leading axis.
"""
carry = init
ys = []
for x in xs:
carry, y = f(carry, x)
ys.append(y)
return carry, np.stack(ys)
def sum_of_powers_for_loop(phi: float, h: int) -> float:
"phi -> 1 + phi + phi^2 + ... + phi^h"
return sum(phi**i for i in range(1, h + 1))
assert sum_of_powers_for_loop(2, 0) == 0
assert sum_of_powers_for_loop(2, 1) == 2
assert sum_of_powers_for_loop(2, 2) == 2 + 2**2
assert sum_of_powers_for_loop(2, 3) == 2 + 2**2 + 2**3
\[ \begin{align*} \hat{y}_{t+h|t} = & \: l_t \\ l_t = & \: \alpha y_t + (1 - \alpha)l_{t-1} \end{align*} \]
\[ \begin{align*} \hat{y}_{t+h|t} = & \: l_t \\ l_t = & \: \alpha y_t + (1 - \alpha)l_{t-1} \end{align*} \]
def level_model(y: ArrayLike, future: int = 0) -> None:
# Get time series length
t_max = y.shape[0]
# --- Priors ---
## Level
level_smoothing = numpyro.sample(
"level_smoothing", dist.Beta(concentration1=1, concentration0=1)
)
level_init = numpyro.sample("level_init", dist.Normal(loc=0, scale=1))
## Noise
noise = numpyro.sample("noise", dist.HalfNormal(scale=1))
# --- Transition Function ---
def transition_fn(carry, t):
. . .
# --- Run Scan ---
with numpyro.handlers.condition(data={"pred": y}):
_, preds = scan(
transition_fn,
level_init,
jnp.arange(t_max + future),
)
# --- Forecast ---
if future > 0:
numpyro.deterministic("y_forecast", preds[-future:])
The method is based on the idea of separating the demand size \(z_t\) and the demand interval \(p_t\), and then forecasting them separately using simple exponential smoothing.
\[ \hat{y}_{t+h} = \frac{\hat{z}_{t+h}}{\hat{p}_{t+h}} \]
def croston_model(z: ArrayLike, p_inv: ArrayLike, future: int = 0) -> None:
z_forecast = scope(level_model, "demand")(z, future)
p_inv_forecast = scope(level_model, "period_inv")(p_inv, future)
if future > 0:
numpyro.deterministic("z_forecast", z_forecast)
numpyro.deterministic("p_inv_forecast", p_inv_forecast)
numpyro.deterministic("forecast", z_forecast * p_inv_forecast)
- The TSB method is similar to the Croston’s method: constructs two different time series out of the original one and then forecast each of them separately, so that the final forecast is generated by combining the forecasts of the two time series.
- The main difference between the two methods is that the TSB method uses the demand probability instead of the demand periods.
🧪 We can modify the TSB model to include zero-inflation by using a Zero-Inflated Negative Binomial Distribution.
def censored_normal(loc, scale, y, censored):
distribution = dist.Normal(loc=loc, scale=scale)
ccdf = 1 - distribution.cdf(y)
numpyro.sample(
"censored_label",
dist.Bernoulli(probs=ccdf).mask(censored == 1),
obs=censored
)
return numpyro.sample("pred", distribution.mask(censored != 1))
Change likelihood distribution in a time-series model:
Use a hierarchical structure to regularize the demand elasticity parameters.
from pymc_experimental.statespace import structural as st
slow_trend = st.LevelTrendComponent(order=2, innovations_order=[0, 1])
seasonality = st.FrequencySeasonality(name="annual_seasonality", season_length=52, n=2, innovations=False)
exog = st.RegressionComponent(name="exog", state_names=X.columns.tolist(), innovations=False)
measurement_error = st.MeasurementError("measurement_error")
ss_mod = (slow_trend + seasonality + exog + measurement_error).build()