Target Examples
Ready-to-use target functions for each task type. Every example follows the same structure: guard against invalid entities, scope a time window, compute the label, and return.
For the full guide on events, attributes, aggregations, and filtering, see Target Function.
Binary Classification
Task: BinaryClassificationTask
Returns: np.ndarray with 0 or 1 (or None to exclude)
Churn prediction
Will the customer make zero transactions in the next 30 days?
from datetime import timedelta
import numpy as np
from monad.ui.target_function import Events, Attributes, has_incomplete_training_window, SPLIT_TIMESTAMP
def churn_target(history: Events, future: Events, entity: Attributes, ctx: dict):
# Need some history to be a meaningful prediction
if history["transactions"].count() < 2:
return None
if has_incomplete_training_window(ctx, timedelta(days=30)):
return None
future_30d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=30))
return np.array([1 if future_30d["transactions"].count() == 0 else 0],
dtype=np.float32)
For a full walkthrough with variations and production tips, see the Churn Prediction recipe.
High-value purchase propensity
Will the customer spend more than a threshold in the next 14 days?
def high_value_target(history: Events, future: Events, entity: Attributes, ctx: dict):
if history["transactions"].count() == 0:
return None
if has_incomplete_training_window(ctx, timedelta(days=14)):
return None
future_14d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=14))
total_spend = future_14d["transactions"].sum("price")
return np.array([1 if total_spend > 100 else 0], dtype=np.float32)
Multi-Class Classification
Task: MulticlassClassificationTask(class_names=CLASS_NAMES)
Returns: np.ndarray of class scores (or None)
Favorite brand prediction
Which single brand will the customer purchase most in the next 21 days?
CLASS_NAMES = ["BrandA", "BrandB", "BrandC", "BrandD"]
def favourite_brand_target(history: Events, future: Events, entity: Attributes, ctx: dict):
if has_incomplete_training_window(ctx, timedelta(days=21)):
return None
future_21d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=21))
if future_21d["transactions"].count() == 0:
return None
counts, names = future_21d["transactions"].groupBy("brand").count(groups=CLASS_NAMES)
if counts.sum() == 0:
return None
# Index of the most-purchased brand
return np.array([int(np.argmax(counts))], dtype=np.float32)
For a full walkthrough with variations and production tips, see the Favorite Brand recipe.
Multilabel Classification
Task: MultilabelClassificationTask(class_names=TARGET_NAMES)
Returns: np.ndarray of 0/1 per class (or None)
Category propensity
Which product categories will the customer buy from in the next 21 days?
TARGET_NAMES = ["Electronics", "Fashion", "Home", "Sports", "Beauty"]
def category_propensity_target(history: Events, future: Events, entity: Attributes, ctx: dict):
if has_incomplete_training_window(ctx, timedelta(days=21)):
return None
future_21d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=21))
purchase_flags, _ = future_21d["transactions"].groupBy("category").exists(groups=TARGET_NAMES)
# Exclude customers who bought nothing
if purchase_flags.sum() == 0:
return None
return purchase_flags
For a full walkthrough with variations and production tips, see the Category Propensity recipe.
Propensity with joined attributes
When category names live in a joined table, use get_qualified_column_name to resolve the column:
from monad.ui.target_function import get_qualified_column_name
TARGET_ENTITY = get_qualified_column_name("department_name", ["articles"])
TARGET_NAMES = ["Denim Trousers", "Swimwear", "Trousers", "Jersey Basic", "Knitwear"]
def joined_propensity_target(history: Events, future: Events, entity: Attributes, ctx: dict):
if has_incomplete_training_window(ctx, timedelta(days=21)):
return None
future_21d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=21))
purchase_flags, _ = future_21d["transactions"].groupBy(TARGET_ENTITY).exists(groups=TARGET_NAMES)
if purchase_flags.sum() == 0:
return None
return purchase_flags
Regression
Task: RegressionTask(num_targets=1, max_value=...)
Returns: np.ndarray of float (or None)
Future spend (LTV)
How much will the customer spend in the next 30 days?
def spend_target(history: Events, future: Events, entity: Attributes, ctx: dict):
if history["transactions"].count() < 2:
return None
if has_incomplete_training_window(ctx, timedelta(days=30)):
return None
future_30d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=30))
return np.array([future_30d["transactions"].sum("price")], dtype=np.float32)
For a full walkthrough with variations and production tips, see the Customer Spend recipe.
Item count
How many items will the customer purchase?
def item_count_target(history: Events, future: Events, entity: Attributes, ctx: dict):
if history["transactions"].count() == 0:
return None
if has_incomplete_training_window(ctx, timedelta(days=30)):
return None
future_30d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=30))
return np.array([future_30d["transactions"].count()], dtype=np.float32)
Recommendation
Task: RecommendationTask (high-cardinality catalogs) or OneHotRecommendationTask (low-cardinality)
Returns: Sketch (or None)
Next-basket recommendation
Which products will the customer buy next? Using gamma=0 targets only the very next basket.
from monad.ui.target_function import sketch, sequential_decay, Sketch
def next_basket_target(history: Events, future: Events, entity: Attributes, ctx: dict) -> Sketch:
if future["transactions"].count() == 0:
return None
items = future["transactions"]["product_id"]
weights = sequential_decay(future["transactions"], gamma=0)
return sketch(items, weights)
For a full walkthrough with variations and production tips, see the Next-Basket Recommendation recipe.
Broad recommendation with time decay
Recommend products weighted by recency, considering all future interactions.
from monad.targets.recommendation import time_decay
def broad_reco_target(history: Events, future: Events, entity: Attributes, ctx: dict) -> Sketch:
if future["transactions"].count() == 0:
return None
items = future["transactions"]["product_id"]
weights = time_decay(future["transactions"], daily_decay=0.1)
return sketch(items, weights)
Acquisition-focused with loss masking
Only recommend products the customer has not purchased before. Loss masking prevents historical items from contributing to the training loss.
from monad.ui.target_function import sketch_filtering_mask
def acquisition_target(history: Events, future: Events, entity: Attributes, ctx: dict):
# Filter future to new products only
known = set(history["transactions"]["product_id"].events)
future_txns = future["transactions"].filter("product_id", lambda x: x not in known)
if future_txns.count() == 0:
return None
items = future_txns["product_id"]
weights = sequential_decay(future_txns, gamma=1)
target_sketch = sketch(items, weights)
# Mask historical items out of loss calculation
mask = sketch_filtering_mask(history["transactions"]["product_id"])
return target_sketch, mask
For a full walkthrough with variations and production tips, see the Acquisition Recommendation recipe.
Note
Loss masking applies to OneHotRecommendationTask only. See Filtering Masks for more detail.
Hybrid sketch from multiple event sources
When purchase data is sparse, combine it with weaker signals like product views. See Sketch Arithmetic for how combining and scaling sketches works.
def hybrid_target(history: Events, future: Events, entity: Attributes, ctx: dict) -> Sketch:
if future["transactions"].count() == 0 and future["views"].count() == 0:
return None
purchase_sketch = sketch(
future["transactions"]["product_id"],
sequential_decay(future["transactions"], gamma=0),
)
view_sketch = sketch(
future["views"]["product_id"],
sequential_decay(future["views"], gamma=0),
)
# Purchases weighted 10x more than views
return 10 * purchase_sketch + view_sketch