Skip to content

Target Examples

Ready-to-use target functions for each task type. Every example follows the same structure: guard against invalid entities, scope a time window, compute the label, and return.

For the full guide on events, attributes, aggregations, and filtering, see Target Function.


Binary Classification

Task: BinaryClassificationTask
Returns: np.ndarray with 0 or 1 (or None to exclude)

Churn prediction

Will the customer make zero transactions in the next 30 days?

Python
from datetime import timedelta
import numpy as np
from monad.ui.target_function import Events, Attributes, has_incomplete_training_window, SPLIT_TIMESTAMP

def churn_target(history: Events, future: Events, entity: Attributes, ctx: dict):
    # Need some history to be a meaningful prediction
    if history["transactions"].count() < 2:
        return None
    if has_incomplete_training_window(ctx, timedelta(days=30)):
        return None

    future_30d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=30))
    return np.array([1 if future_30d["transactions"].count() == 0 else 0],
                    dtype=np.float32)

For a full walkthrough with variations and production tips, see the Churn Prediction recipe.

High-value purchase propensity

Will the customer spend more than a threshold in the next 14 days?

Python
def high_value_target(history: Events, future: Events, entity: Attributes, ctx: dict):
    if history["transactions"].count() == 0:
        return None
    if has_incomplete_training_window(ctx, timedelta(days=14)):
        return None

    future_14d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=14))
    total_spend = future_14d["transactions"].sum("price")
    return np.array([1 if total_spend > 100 else 0], dtype=np.float32)

Multi-Class Classification

Task: MulticlassClassificationTask(class_names=CLASS_NAMES)
Returns: np.ndarray of class scores (or None)

Favorite brand prediction

Which single brand will the customer purchase most in the next 21 days?

Python
CLASS_NAMES = ["BrandA", "BrandB", "BrandC", "BrandD"]

def favourite_brand_target(history: Events, future: Events, entity: Attributes, ctx: dict):
    if has_incomplete_training_window(ctx, timedelta(days=21)):
        return None

    future_21d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=21))
    if future_21d["transactions"].count() == 0:
        return None

    counts, names = future_21d["transactions"].groupBy("brand").count(groups=CLASS_NAMES)
    if counts.sum() == 0:
        return None

    # Index of the most-purchased brand
    return np.array([int(np.argmax(counts))], dtype=np.float32)

For a full walkthrough with variations and production tips, see the Favorite Brand recipe.


Multilabel Classification

Task: MultilabelClassificationTask(class_names=TARGET_NAMES)
Returns: np.ndarray of 0/1 per class (or None)

Category propensity

Which product categories will the customer buy from in the next 21 days?

Python
TARGET_NAMES = ["Electronics", "Fashion", "Home", "Sports", "Beauty"]

def category_propensity_target(history: Events, future: Events, entity: Attributes, ctx: dict):
    if has_incomplete_training_window(ctx, timedelta(days=21)):
        return None

    future_21d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=21))
    purchase_flags, _ = future_21d["transactions"].groupBy("category").exists(groups=TARGET_NAMES)

    # Exclude customers who bought nothing
    if purchase_flags.sum() == 0:
        return None

    return purchase_flags

For a full walkthrough with variations and production tips, see the Category Propensity recipe.

Propensity with joined attributes

When category names live in a joined table, use get_qualified_column_name to resolve the column:

Python
from monad.ui.target_function import get_qualified_column_name

TARGET_ENTITY = get_qualified_column_name("department_name", ["articles"])
TARGET_NAMES = ["Denim Trousers", "Swimwear", "Trousers", "Jersey Basic", "Knitwear"]

def joined_propensity_target(history: Events, future: Events, entity: Attributes, ctx: dict):
    if has_incomplete_training_window(ctx, timedelta(days=21)):
        return None

    future_21d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=21))
    purchase_flags, _ = future_21d["transactions"].groupBy(TARGET_ENTITY).exists(groups=TARGET_NAMES)

    if purchase_flags.sum() == 0:
        return None

    return purchase_flags

Regression

Task: RegressionTask(num_targets=1, max_value=...)
Returns: np.ndarray of float (or None)

Future spend (LTV)

How much will the customer spend in the next 30 days?

Python
def spend_target(history: Events, future: Events, entity: Attributes, ctx: dict):
    if history["transactions"].count() < 2:
        return None
    if has_incomplete_training_window(ctx, timedelta(days=30)):
        return None

    future_30d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=30))
    return np.array([future_30d["transactions"].sum("price")], dtype=np.float32)

For a full walkthrough with variations and production tips, see the Customer Spend recipe.

Item count

How many items will the customer purchase?

Python
def item_count_target(history: Events, future: Events, entity: Attributes, ctx: dict):
    if history["transactions"].count() == 0:
        return None
    if has_incomplete_training_window(ctx, timedelta(days=30)):
        return None

    future_30d = future.interval_from(ctx[SPLIT_TIMESTAMP], timedelta(days=30))
    return np.array([future_30d["transactions"].count()], dtype=np.float32)

Recommendation

Task: RecommendationTask (high-cardinality catalogs) or OneHotRecommendationTask (low-cardinality)
Returns: Sketch (or None)

Next-basket recommendation

Which products will the customer buy next? Using gamma=0 targets only the very next basket.

Python
from monad.ui.target_function import sketch, sequential_decay, Sketch

def next_basket_target(history: Events, future: Events, entity: Attributes, ctx: dict) -> Sketch:
    if future["transactions"].count() == 0:
        return None

    items = future["transactions"]["product_id"]
    weights = sequential_decay(future["transactions"], gamma=0)
    return sketch(items, weights)

For a full walkthrough with variations and production tips, see the Next-Basket Recommendation recipe.

Broad recommendation with time decay

Recommend products weighted by recency, considering all future interactions.

Python
from monad.targets.recommendation import time_decay

def broad_reco_target(history: Events, future: Events, entity: Attributes, ctx: dict) -> Sketch:
    if future["transactions"].count() == 0:
        return None

    items = future["transactions"]["product_id"]
    weights = time_decay(future["transactions"], daily_decay=0.1)
    return sketch(items, weights)
Acquisition-focused with loss masking

Only recommend products the customer has not purchased before. Loss masking prevents historical items from contributing to the training loss.

Python
from monad.ui.target_function import sketch_filtering_mask

def acquisition_target(history: Events, future: Events, entity: Attributes, ctx: dict):
    # Filter future to new products only
    known = set(history["transactions"]["product_id"].events)
    future_txns = future["transactions"].filter("product_id", lambda x: x not in known)

    if future_txns.count() == 0:
        return None

    items = future_txns["product_id"]
    weights = sequential_decay(future_txns, gamma=1)
    target_sketch = sketch(items, weights)

    # Mask historical items out of loss calculation
    mask = sketch_filtering_mask(history["transactions"]["product_id"])
    return target_sketch, mask

For a full walkthrough with variations and production tips, see the Acquisition Recommendation recipe.

Note

Loss masking applies to OneHotRecommendationTask only. See Filtering Masks for more detail.

Hybrid sketch from multiple event sources

When purchase data is sparse, combine it with weaker signals like product views. See Sketch Arithmetic for how combining and scaling sketches works.

Python
def hybrid_target(history: Events, future: Events, entity: Attributes, ctx: dict) -> Sketch:
    if future["transactions"].count() == 0 and future["views"].count() == 0:
        return None

    purchase_sketch = sketch(
        future["transactions"]["product_id"],
        sequential_decay(future["transactions"], gamma=0),
    )
    view_sketch = sketch(
        future["views"]["product_id"],
        sequential_decay(future["views"], gamma=0),
    )

    # Purchases weighted 10x more than views
    return 10 * purchase_sketch + view_sketch