Detect Subscription Churn from Activity Drop
Task type: BinaryClassificationTask
Industry: Fitness / Wellness
Many fitness subscribers stop working out long before they cancel. Detecting this "silent churn" pattern — where the subscription remains active but engagement drops — gives retention teams a window to intervene with personalized workout plans, trainer outreach, or incentive offers before the member formally cancels.
What makes this advanced? Multi-condition boolean logic — combines three conditions: active subscription verification, subscription continuity check using numpy operations, and activity comparison between past and future periods.
Prerequisites
Before writing a target function you need:
- A trained foundation model built on event data that includes the relevant data sources.
- The monad library installed in your environment.
- Data source(s):
workout_logs,subscriptions
Target Function
The target function tells monad how to label each entity for training. It receives four arguments:
| Argument | Type | Description |
|---|---|---|
history |
Events |
All events before the temporal split. |
future |
Events |
All events after the temporal split. |
attributes |
Attributes |
Static entity attributes. |
ctx |
Dict |
Context dictionary containing SPLIT_TIMESTAMP, data mode, etc. |
The function must return one of:
np.array([1], dtype=np.float32)— positive casenp.array([0], dtype=np.float32)— negative caseNone— exclude this entity from training
Full Example
import numpy as np
from datetime import timedelta
from typing import Dict
from monad.ui.target_function import Events, Attributes
from monad.ui.target_function import SPLIT_TIMESTAMP
from monad.ui.target_function import has_incomplete_training_window
from monad.constants import SECONDS_PER_DAY
# === Configuration ===
TARGET_WINDOW_DAYS = 30
ACTIVITY_DATA_SOURCE = "workout_logs"
SUBSCRIPTIONS_DATA_SOURCE = "subscriptions"
MIN_PAST_ACTIVITY = 2
MAX_FUTURE_ACTIVITY = 2
def subscription_churn_target_fn(
history: Events,
future: Events,
attributes: Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Detect churn risk: active subscription + reduced activity."""
if has_incomplete_training_window(ctx, timedelta(days=TARGET_WINDOW_DAYS)):
return None
split_ts = ctx[SPLIT_TIMESTAMP]
# 1. Verify active subscription at split point
has_active = (
history[SUBSCRIPTIONS_DATA_SOURCE]
.filter(by="end_date", condition=lambda v: v.astype(float) > split_ts)
.count()
> 0
) or split_ts in future[SUBSCRIPTIONS_DATA_SOURCE].timestamps
if not has_active:
return None
# 2. Check subscription continuity over next 30 days
last_end = history[SUBSCRIPTIONS_DATA_SOURCE].max(column="end_date")
future_starts = future[SUBSCRIPTIONS_DATA_SOURCE].timestamps
window_end = split_ts + TARGET_WINDOW_DAYS * SECONDS_PER_DAY
idx = np.searchsorted(future_starts, window_end)
future_starts = future_starts[:idx]
future_ends = future[SUBSCRIPTIONS_DATA_SOURCE].get_or_empty(
"end_date"
).events[:idx]
if future_starts.size > 0:
maintains = (
np.all(
np.concatenate([np.array([last_end]), future_ends[:-1]])
+ SECONDS_PER_DAY
>= future_starts
)
and future_ends[-1] >= window_end
)
else:
maintains = last_end >= window_end
# 3. Compare activity: past 30 days vs. next 30 days
next_activity = future[ACTIVITY_DATA_SOURCE].interval_from(
split_ts, timedelta(days=TARGET_WINDOW_DAYS)
).count()
prev_activity = history[ACTIVITY_DATA_SOURCE].interval_from(
split_ts - TARGET_WINDOW_DAYS * SECONDS_PER_DAY,
timedelta(days=TARGET_WINDOW_DAYS),
).count()
activity_reduced = (
next_activity < MAX_FUTURE_ACTIVITY
and prev_activity > next_activity
)
# 4. Churn risk = maintains subscription but activity dropped
if maintains and activity_reduced:
return np.array([1], dtype=np.float32)
return np.array([0], dtype=np.float32)
Step-by-Step Breakdown
① Verify active subscription
has_active = (
history[SUBSCRIPTIONS_DATA_SOURCE]
.filter(by="end_date", condition=lambda v: v.astype(float) > split_ts)
.count()
> 0
) or split_ts in future[SUBSCRIPTIONS_DATA_SOURCE].timestamps
if not has_active:
return None
The function first checks whether the member has an active subscription at the split point. This is verified by looking for historical subscriptions with an end date beyond the split timestamp, or a new subscription event at the split timestamp itself. Non-subscribers are excluded.
② Check subscription continuity
last_end = history[SUBSCRIPTIONS_DATA_SOURCE].max(column="end_date")
future_starts = future[SUBSCRIPTIONS_DATA_SOURCE].timestamps
window_end = split_ts + TARGET_WINDOW_DAYS * SECONDS_PER_DAY
idx = np.searchsorted(future_starts, window_end)
future_starts = future_starts[:idx]
future_ends = future[SUBSCRIPTIONS_DATA_SOURCE].get_or_empty(
"end_date"
).events[:idx]
if future_starts.size > 0:
maintains = (
np.all(
np.concatenate([np.array([last_end]), future_ends[:-1]])
+ SECONDS_PER_DAY
>= future_starts
)
and future_ends[-1] >= window_end
)
else:
maintains = last_end >= window_end
Subscription continuity is verified using numpy operations. np.searchsorted efficiently finds subscription events within the 30-day window. The continuity check ensures there are no gaps longer than one day between consecutive subscription periods, and that coverage extends through the entire window.
③ Compare activity levels
next_activity = future[ACTIVITY_DATA_SOURCE].interval_from(
split_ts, timedelta(days=TARGET_WINDOW_DAYS)
).count()
prev_activity = history[ACTIVITY_DATA_SOURCE].interval_from(
split_ts - TARGET_WINDOW_DAYS * SECONDS_PER_DAY,
timedelta(days=TARGET_WINDOW_DAYS),
).count()
activity_reduced = (
next_activity < MAX_FUTURE_ACTIVITY
and prev_activity > next_activity
)
Workout counts from the previous 30 days and next 30 days are compared. The activity is considered reduced if the future count drops below the threshold and is lower than the past count — capturing genuine decline rather than consistently low activity.
④ Combine conditions
if maintains and activity_reduced:
return np.array([1], dtype=np.float32)
return np.array([0], dtype=np.float32)
The final label requires both conditions: the member maintains their subscription (they have not formally churned) but their workout activity has significantly dropped. This combination identifies the "silent churn" pattern.
Training
Once the target function is defined, fine-tune a downstream model:
from pathlib import Path
from monad.ui.config import TrainingParams, MetricParams, MetricMonitoringMode
from monad.config.early_stopping import EarlyStopping
from monad.ui.module import load_from_foundation_model, BinaryClassificationTask
module = load_from_foundation_model(
checkpoint_path=Path("./foundation_model"),
downstream_task=BinaryClassificationTask(),
target_fn=subscription_churn_target_fn,
)
training_params = TrainingParams(
checkpoint_dir=Path("./<this_model>"),
learning_rate=1e-4,
epochs=20,
devices=[0],
metrics=[
MetricParams(alias="auroc", metric_name="AUROC", kwargs={"task": "binary"}),
MetricParams(alias="auprc", metric_name="AveragePrecision", kwargs={"task": "binary"}),
MetricParams(alias="recall", metric_name="Recall", kwargs={"task": "binary"}),
MetricParams(alias="precision", metric_name="Precision", kwargs={"task": "binary"}),
],
metric_to_monitor="val_auroc_0",
metric_monitoring_mode=MetricMonitoringMode.MAX,
early_stopping=EarlyStopping(min_delta=1e-4, patience=5),
)
module.fit(training_params, seed=42)
Evaluation
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, MetricParams, OutputType
module = load_from_checkpoint(Path("./<this_model>"))
testing_params = TestingParams(
prediction_date=datetime(2024, 5, 1, tzinfo=timezone.utc),
output_type=OutputType.DECODED,
devices=[0],
metrics=[
MetricParams(alias="auroc", metric_name="AUROC"),
MetricParams(alias="auprc", metric_name="AveragePrecision"),
MetricParams(alias="recall", metric_name="Recall"),
],
)
results = module.test(testing_params)
Prediction
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, OutputType
module = load_from_checkpoint(Path("./<this_model>"))
testing_params = TestingParams(
local_save_location=Path("./predictions.tsv"),
output_type=OutputType.DECODED,
prediction_date=datetime(2024, 6, 1, tzinfo=timezone.utc),
devices=[0],
)
predictions = module.predict(testing_params)
Recommended Metrics
| Metric | Why it matters |
|---|---|
| AUROC | Measures overall ranking quality. |
| AUPRC | More informative when the positive class is rare. |
| Recall | Proportion of actual positives caught. |
| Precision | Proportion of predicted positives that are correct. |
| F1 Score | Harmonic mean of precision and recall. |
Production Tips
- Account for seasonal workout patterns. Activity naturally drops during holidays, bad weather, or vacation seasons. Consider using year-over-year comparisons instead of simple period-over-period to reduce false positives.
- Differentiate workout types. A member who switches from gym visits to home workouts may appear to have reduced activity if only gym check-ins are tracked. Include all available activity sources.
- Adjust the activity threshold by membership tier. Premium members may have different baseline activity levels than basic subscribers. Consider per-tier thresholds or adding membership tier as a feature.
- Validate against actual cancellation data. Cross-reference predicted churn risks with members who eventually cancelled to confirm the activity drop precedes cancellation by a useful intervention window.
- Monitor subscription continuity logic carefully. Different subscription billing models (monthly, annual, auto-renew) create different event patterns. Ensure the continuity check handles your specific billing system correctly.