Predict Weekly Category Purchases
Task type: MultilabelClassificationTask
Industry: Retail
Understanding category purchase patterns at weekly granularity enables precision marketing — weekly promotional calendars, personalized flyers, and dynamic coupon scheduling. Rather than a single aggregate prediction, this recipe produces a week-by-week category forecast that marketing teams can act on sequentially.
What makes this advanced? Rolling 12-week windows — iterates through 12 consecutive weekly windows using
split_history_future, stacks per-week results into a single output vector.
Prerequisites
Before writing a target function you need:
- A trained foundation model built on event data that includes the relevant data sources.
- The monad library installed in your environment.
- Data source(s):
purchases_eventswith acategorycolumn
Target Function
The target function tells monad how to label each entity for training. It receives four arguments:
| Argument | Type | Description |
|---|---|---|
history |
Events |
All events before the temporal split. |
future |
Events |
All events after the temporal split. |
attributes |
Attributes |
Static entity attributes. |
ctx |
Dict |
Context dictionary containing SPLIT_TIMESTAMP, data mode, etc. |
For multilabel classification, the function must return one of:
- A 1-D
float32array of sizenum_labels(categories x weeks = 60) — binary indicators per category per week. None— exclude this entity from training.
Full Example
import numpy as np
from datetime import timedelta
from typing import Dict
from monad.ui.target_function import Events, Attributes
from monad.ui.target_function import SPLIT_TIMESTAMP
from monad.ui.target_function import has_incomplete_training_window
from monad.constants import SECONDS_PER_DAY, SplitPointInclusion
# === Configuration ===
NUM_WEEKS = 12
PURCHASE_DATA_SOURCE = "purchases_events"
TARGET_CATEGORIES = ["t-shirt", "jeans", "jacket", "socks", "shoes"]
def weekly_category_target_fn(
history: Events,
future: Events,
attributes: Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Predict category purchases per week over 12 weeks."""
target_window_days = NUM_WEEKS * 7
if has_incomplete_training_window(ctx, timedelta(days=target_window_days)):
return None
split_ts = ctx[SPLIT_TIMESTAMP]
future = future.interval_from(split_ts, timedelta(days=target_window_days))
# Iterate through each week
purchase_target = np.array([], dtype=np.float32)
for week in range(1, NUM_WEEKS + 1):
week_duration = 7 * SECONDS_PER_DAY
week_purchases, _ = next(
future[PURCHASE_DATA_SOURCE].split_history_future(
split_ts + week * week_duration,
history_limit=week_duration,
include=SplitPointInclusion.FUTURE,
)
)
week_result, _ = week_purchases.groupBy("category").exists(
TARGET_CATEGORIES
)
purchase_target = np.hstack([purchase_target, week_result])
return purchase_target
Step-by-Step Breakdown
① Validate the full 12-week window
target_window_days = NUM_WEEKS * 7
if has_incomplete_training_window(ctx, timedelta(days=target_window_days)):
return None
The function needs 84 days (12 weeks) of future data. Samples without enough future coverage are excluded to avoid partial labels.
② Trim future events to the target window
split_ts = ctx[SPLIT_TIMESTAMP]
future = future.interval_from(split_ts, timedelta(days=target_window_days))
Restricts future events to exactly 12 weeks, ensuring consistent labeling across all split timestamps.
③ Iterate through weekly windows
for week in range(1, NUM_WEEKS + 1):
week_duration = 7 * SECONDS_PER_DAY
week_purchases, _ = next(
future[PURCHASE_DATA_SOURCE].split_history_future(
split_ts + week * week_duration,
history_limit=week_duration,
include=SplitPointInclusion.FUTURE,
)
)
split_history_future is used to isolate each 7-day window. The split point advances by one week each iteration, and history_limit=week_duration looks back exactly one week from that point. This effectively slices the future into 12 non-overlapping weekly buckets.
④ Stack per-week binary vectors
week_result, _ = week_purchases.groupBy("category").exists(
TARGET_CATEGORIES
)
purchase_target = np.hstack([purchase_target, week_result])
For each week, groupBy("category").exists() produces a binary vector indicating which categories had at least one purchase. np.hstack concatenates all 12 weekly vectors into a single flat array of length 60 (5 categories x 12 weeks).
Training
Once the target function is defined, fine-tune a downstream model:
from pathlib import Path
from monad.ui.config import TrainingParams, MetricParams, MetricMonitoringMode
from monad.config.early_stopping import EarlyStopping
from monad.ui.module import load_from_foundation_model, MultilabelClassificationTask
module = load_from_foundation_model(
checkpoint_path=Path("./foundation_model"),
downstream_task=MultilabelClassificationTask(
class_names=[f"w{w}_{cat}" for w in range(NUM_WEEKS) for cat in TARGET_CATEGORIES],
),
target_fn=weekly_category_target_fn,
)
training_params = TrainingParams(
checkpoint_dir=Path("./<this_model>"),
learning_rate=1e-4,
epochs=20,
devices=[0],
metrics=[
MetricParams(alias="auroc", metric_name="AUROC", kwargs={"task": "multilabel", "num_labels": <num_labels>}),
MetricParams(alias="auprc", metric_name="AveragePrecision", kwargs={"task": "multilabel", "num_labels": <num_labels>}),
MetricParams(alias="f1", metric_name="F1Score", kwargs={"task": "multilabel", "num_labels": <num_labels>}),
],
metric_to_monitor="val_auroc_0",
metric_monitoring_mode=MetricMonitoringMode.MAX,
early_stopping=EarlyStopping(min_delta=1e-4, patience=5),
)
module.fit(training_params, seed=42)
Evaluation
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, MetricParams, OutputType
module = load_from_checkpoint(Path("./<this_model>"))
testing_params = TestingParams(
prediction_date=datetime(2024, 5, 1, tzinfo=timezone.utc),
output_type=OutputType.DECODED,
devices=[0],
metrics=[
MetricParams(alias="auroc", metric_name="AUROC"),
MetricParams(alias="auprc", metric_name="AveragePrecision"),
MetricParams(alias="f1", metric_name="F1Score"),
],
)
results = module.test(testing_params)
Prediction
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, OutputType
module = load_from_checkpoint(Path("./<this_model>"))
testing_params = TestingParams(
local_save_location=Path("./predictions.tsv"),
output_type=OutputType.DECODED,
prediction_date=datetime(2024, 6, 1, tzinfo=timezone.utc),
devices=[0],
)
predictions = module.predict(testing_params)
Recommended Metrics
| Metric | Why it matters |
|---|---|
| Hamming Loss | Fraction of labels that are incorrectly predicted. |
| Subset Accuracy | Proportion of samples with all labels correct. |
| Macro AUROC | Average ranking quality across all labels. |
| Per-label F1 | Identifies which labels the model struggles with. |
Production Tips
- Reshape predictions for downstream use. The flat 60-element output can be reshaped to
(12, 5)for week-by-week analysis:predictions.reshape(NUM_WEEKS, len(TARGET_CATEGORIES)). - Weight near-term weeks more heavily. Weeks 1-4 are more actionable than weeks 9-12. Consider training separate models for short-term and long-term horizons.
- Monitor sparsity. Most customers will not purchase from all categories every week. If the label matrix is very sparse, consider reducing the number of weeks or categories.
- Use weekly predictions for campaign scheduling. Feed week-specific predictions into your marketing automation platform to trigger category-level promotions at the right time.
- Validate calendar alignment. Ensure that weeks align with your promotional calendar (Monday-Sunday vs Sunday-Saturday) to match business reporting.