Purchase Value, Quantity, and Visit Prediction
Task type: RegressionTask
Industry: Retail / E-commerce
This recipe demonstrates three regression targets from the same transaction data, each answering a different business question:
| Target | Question answered |
|---|---|
| Item quantity | How many items will the customer buy? |
| Purchase value | How much will the customer spend in total? |
| Visit count | How many separate shopping trips will the customer make? |
Each target function produces a single continuous value per customer. Choose the one that matches your use case, or train separate models for each.
Prerequisites
Before writing a target function you need:
- A trained foundation model built on event data that includes a
transactionsdata source with columns for quantity (qty), price (price), and a basket/visit identifier (basket_id). - The monad library installed in your environment (for Python App).
Target Functions
All three functions share the same signature:
| Argument | Type | Description |
|---|---|---|
history |
Events |
All events before the temporal split. |
future |
Events |
All events after the temporal split. |
attributes |
Attributes |
Static entity attributes. |
ctx |
Dict |
Context dictionary containing SPLIT_TIMESTAMP, data mode, etc. |
Each returns one of:
np.array([value], dtype=np.float32)— the predicted continuous value.None— exclude this customer (incomplete window).
Shared Imports and Configuration
import numpy as np
from datetime import timedelta
from typing import Dict
from monad.ui.target_function import Events, Attributes
from monad.ui.target_function import SPLIT_TIMESTAMP
from monad.ui.target_function import has_incomplete_training_window
# === Configuration ===
TARGET_WINDOW_DAYS = 21
TRANSACTION_DATA_SOURCE = "transactions"
Target 1: Item Quantity
Predict the total number of items a customer will purchase.
def item_quantity_target_fn(
history: Events,
future: Events,
attributes: Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Predict total item quantity purchased in the target window."""
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[SPLIT_TIMESTAMP], target_window)
# Sum the quantity column across all future transactions
total_quantity = future[TRANSACTION_DATA_SOURCE]["qty"].events.sum()
return np.array([total_quantity], dtype=np.float32)
def item_quantity_target_fn(
history: target_function.Events,
future: target_function.Events,
attributes: target_function.Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Predict total item quantity purchased in the target window."""
# === Configuration ===
TARGET_WINDOW_DAYS = 21
TRANSACTION_DATA_SOURCE = "transactions"
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if target_function.has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[target_function.SPLIT_TIMESTAMP], target_window)
# Sum the quantity column across all future transactions
total_quantity = future[TRANSACTION_DATA_SOURCE]["qty"].events.sum()
return np.array([total_quantity], dtype=np.float32)
How it works: Accesses the qty column via ["qty"].events (which returns the raw NumPy array), then sums all values. A customer who buys 3 apples and 2 bananas gets a target of 5.
Target 2: Purchase Value (Total Spend)
Predict the total monetary value of a customer's purchases.
def purchase_value_target_fn(
history: Events,
future: Events,
attributes: Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Predict total purchase value in the target window."""
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[SPLIT_TIMESTAMP], target_window)
# Compute value = price * quantity for each line item, then sum
quantities = future[TRANSACTION_DATA_SOURCE]["qty"].events
prices = future[TRANSACTION_DATA_SOURCE]["price"].events
total_value = np.sum(prices * quantities)
return np.array([total_value], dtype=np.float32)
def purchase_value_target_fn(
history: target_function.Events,
future: target_function.Events,
attributes: target_function.Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Predict total purchase value in the target window."""
# === Configuration ===
TARGET_WINDOW_DAYS = 21
TRANSACTION_DATA_SOURCE = "transactions"
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if target_function.has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[target_function.SPLIT_TIMESTAMP], target_window)
# Compute value = price * quantity for each line item, then sum
quantities = future[TRANSACTION_DATA_SOURCE]["qty"].events
prices = future[TRANSACTION_DATA_SOURCE]["price"].events
total_value = np.sum(prices * quantities)
return np.array([total_value], dtype=np.float32)
How it works: Multiplies price by quantity for each transaction line item, then sums across all items. This handles multi-quantity purchases correctly (e.g., 3 items at 5 EUR = 15 EUR).
Target 3: Visit Count
Predict how many separate shopping trips a customer will make.
def visit_count_target_fn(
history: Events,
future: Events,
attributes: Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Predict the number of distinct store visits in the target window."""
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[SPLIT_TIMESTAMP], target_window)
# Count unique basket IDs (each basket = one visit)
basket_ids = future[TRANSACTION_DATA_SOURCE]["basket_id"].events
num_visits = len(np.unique(basket_ids))
return np.array([num_visits], dtype=np.float32)
def visit_count_target_fn(
history: target_function.Events,
future: target_function.Events,
attributes: target_function.Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Predict the number of distinct store visits in the target window."""
# === Configuration ===
TARGET_WINDOW_DAYS = 21
TRANSACTION_DATA_SOURCE = "transactions"
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if target_function.has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[target_function.SPLIT_TIMESTAMP], target_window)
# Count unique basket IDs (each basket = one visit)
basket_ids = future[TRANSACTION_DATA_SOURCE]["basket_id"].events
num_visits = len(np.unique(basket_ids))
return np.array([num_visits], dtype=np.float32)
How it works: Each unique basket_id represents one shopping trip. np.unique() deduplicates the basket IDs, and len() gives the visit count. A customer who made 3 trips (with potentially many items each) gets a target of 3.
Training
Swap in whichever target function matches your use case:
from pathlib import Path
from monad.ui.config import TrainingParams, MetricParams, MetricMonitoringMode
from monad.config.early_stopping import EarlyStopping
from monad.ui.module import load_from_foundation_model, RegressionTask
# Choose one:
target_fn = item_quantity_target_fn
# target_fn = purchase_value_target_fn
# target_fn = visit_count_target_fn
module = load_from_foundation_model(
checkpoint_path=Path("./foundation_model"),
downstream_task=RegressionTask(num_targets=1),
target_fn=target_fn,
)
training_params = TrainingParams(
checkpoint_dir=Path("./<this_model>"),
learning_rate=1e-4,
epochs=20,
devices=[0],
metrics=[
MetricParams(alias="mae", metric_name="MeanAbsoluteError"),
MetricParams(alias="mse", metric_name="MeanSquaredError"),
MetricParams(alias="r2", metric_name="R2Score"),
],
metric_to_monitor="val_mae_0",
metric_monitoring_mode=MetricMonitoringMode.MIN,
early_stopping=EarlyStopping(min_delta=1e-4, patience=5),
)
module.fit(training_params, seed=42)
Evaluation
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, MetricParams, OutputType
module = load_from_checkpoint(Path("./<this_model>"))
testing_params = TestingParams(
prediction_date=datetime(2024, 5, 1, tzinfo=timezone.utc),
output_type=OutputType.DECODED,
devices=[0],
metrics=[
MetricParams(alias="mae", metric_name="MeanAbsoluteError"),
MetricParams(alias="mse", metric_name="MeanSquaredError"),
MetricParams(alias="r2", metric_name="R2Score"),
],
)
results = module.test(testing_params)
Prediction
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, OutputType
module = load_from_checkpoint(Path("./<this_model>"))
testing_params = TestingParams(
local_save_location=Path("./predictions.tsv"),
output_type=OutputType.DECODED,
prediction_date=datetime(2024, 6, 1, tzinfo=timezone.utc),
devices=[0],
)
predictions = module.predict(testing_params)
Variations
Category-specific spend
Predict spend in a particular product category:
def category_spend_target_fn(
history: Events, future: Events, attributes: Attributes, ctx: Dict
) -> np.ndarray | None:
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[SPLIT_TIMESTAMP], target_window)
category_transactions = future[TRANSACTION_DATA_SOURCE].filter(
by="category",
condition=lambda x: x == "Electronics",
)
quantities = category_transactions["qty"].events
prices = category_transactions["price"].events
total_value = np.sum(prices * quantities)
return np.array([total_value], dtype=np.float32)
def category_spend_target_fn(
history: target_function.Events,
future: target_function.Events,
attributes: target_function.Attributes,
ctx: Dict,
) -> np.ndarray | None:
TARGET_WINDOW_DAYS = 21
TRANSACTION_DATA_SOURCE = "transactions"
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if target_function.has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[target_function.SPLIT_TIMESTAMP], target_window)
category_transactions = future[TRANSACTION_DATA_SOURCE].filter(
by="category",
condition=lambda x: x == "Electronics",
)
quantities = category_transactions["qty"].events
prices = category_transactions["price"].events
total_value = np.sum(prices * quantities)
return np.array([total_value], dtype=np.float32)
Exclude zero-activity customers
Skip customers with no historical transactions to focus on active buyers:
def item_quantity_active_only_target_fn(
history: target_function.Events,
future: target_function.Events,
attributes: target_function.Attributes,
ctx: Dict,
) -> np.ndarray | None:
# === Configuration ===
TARGET_WINDOW_DAYS = 21
TRANSACTION_DATA_SOURCE = "transactions"
# Exclude customers with no historical transactions
if history[TRANSACTION_DATA_SOURCE].count() == 0:
return None
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if target_function.has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[target_function.SPLIT_TIMESTAMP], target_window)
total_quantity = future[TRANSACTION_DATA_SOURCE]["qty"].events.sum()
return np.array([total_quantity], dtype=np.float32)
Recommended Metrics
| Metric | Why it matters |
|---|---|
| MAE | Average error in the same units as the target — easy to interpret (e.g., "off by 2.5 items on average"). |
| RMSE | Penalizes large errors more than MAE — important when big misses are costly. |
| R² Score | Proportion of variance explained. Values above 0.5 suggest meaningful predictions. |
Production Tips
-
Choose the right target for your action. Use visit count for staffing/scheduling, purchase value for revenue forecasting, and item quantity for inventory planning.
-
Log-transform heavily skewed targets. Purchase values often have a long tail. Consider
np.log1p(total_value)as the target andnp.expm1()to reverse it at prediction time. -
Segment by customer type. New customers and loyal customers have very different baseline behaviors. Train separate models or include tenure as a feature.
-
Combine with classification. Pair a churn binary model with a spend regression model: first identify who will return, then estimate how much they'll spend.