Mortgage Refinancing Prediction
Task type: BinaryClassificationTask
Industry: Banking / Financial Services
This recipe predicts which customers are likely to apply for mortgage refinancing in the near future. Early identification lets your team offer competitive rates before the customer shops elsewhere, improving retention and share-of-wallet.
What counts as "refinancing" here? A customer is labeled positive (
1) if they submit at least one mortgage refinancing application within the target window. You control the window length and the application types that qualify.
Prerequisites
Before writing a target function you need:
- A trained foundation model built on event data that includes an
applicationsdata source (or equivalent) with a column distinguishing application types (e.g.,appl_type). - The monad library installed in your environment (for Python App).
Target Function
The target function tells the model how to label each customer for training. It receives four arguments:
| Argument | Type | Description |
|---|---|---|
history |
Events |
All events before the temporal split. |
future |
Events |
All events after the temporal split. |
attributes |
Attributes |
Static entity attributes. |
ctx |
Dict |
Context dictionary containing SPLIT_TIMESTAMP, data mode, etc. |
The function must return one of:
np.array([1], dtype=np.float32)— customer applied for refinancingnp.array([0], dtype=np.float32)— customer did not applyNone— exclude this customer from training (e.g., incomplete observation window)
Full Example
import numpy as np
from datetime import timedelta
from typing import Dict
from monad.ui.target_function import Events, Attributes
from monad.ui.target_function import SPLIT_TIMESTAMP
from monad.ui.target_function import has_incomplete_training_window
# === Configuration ===
TARGET_WINDOW_DAYS = 30 # Prediction horizon in days
APPLICATION_DATA_SOURCE = "applications" # Data source with application events
APPLICATION_TYPE_COLUMN = "appl_type" # Column distinguishing application types
REFINANCING_TYPES = ["mortgage_refinancing"] # Values that count as refinancing
def refinancing_target_fn(
history: Events,
future: Events,
attributes: Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Label a customer as likely to refinance (1) or not (0)."""
# 1. Ensure the training window is long enough
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if has_incomplete_training_window(ctx, target_window):
return None
# 2. Trim future events to the target window
future = future.interval_from(ctx[SPLIT_TIMESTAMP], target_window)
# 3. Filter future applications to refinancing only
refinancing_applications = future[APPLICATION_DATA_SOURCE].filter(
by=APPLICATION_TYPE_COLUMN,
condition=lambda x: x in REFINANCING_TYPES,
)
# 4. Apply label
applied = 1 if refinancing_applications.count() > 0 else 0
return np.array([applied], dtype=np.float32)
def refinancing_target_fn(
history: target_function.Events,
future: target_function.Events,
attributes: target_function.Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Label a customer as likely to refinance (1) or not (0)."""
# === Configuration ===
TARGET_WINDOW_DAYS = 30 # Prediction horizon in days
APPLICATION_DATA_SOURCE = "applications" # Data source with application events
APPLICATION_TYPE_COLUMN = "appl_type" # Column distinguishing application types
REFINANCING_TYPES = ["mortgage_refinancing"] # Values that count as refinancing
# 1. Ensure the training window is long enough
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if target_function.has_incomplete_training_window(ctx, target_window):
return None
# 2. Trim future events to the target window
future = future.interval_from(ctx[target_function.SPLIT_TIMESTAMP], target_window)
# 3. Filter future applications to refinancing only
refinancing_applications = future[APPLICATION_DATA_SOURCE].filter(
by=APPLICATION_TYPE_COLUMN,
condition=lambda x: x in REFINANCING_TYPES,
)
# 4. Apply label
applied = 1 if refinancing_applications.count() > 0 else 0
return np.array([applied], dtype=np.float32)
Step-by-Step Breakdown
① Validate the training window
During training, monad creates multiple temporal splits. Some land too close to the end of the dataset, leaving less than 30 days of observable future. has_incomplete_training_window returns True in those cases so you can safely skip them. This check is automatically bypassed at test/prediction time.
② Trim future events to the target window
future initially contains all events after the split. Narrowing it to exactly 30 days ensures every sample is evaluated against the same horizon.
Tip: A shorter window (e.g., 14 days) gives more actionable predictions but fewer positive samples. A longer window (e.g., 90 days) captures more refinancing events but reduces urgency. Start with 30 days and adjust based on your mortgage cycle.
③ Filter to refinancing applications
refinancing_applications = future[APPLICATION_DATA_SOURCE].filter(
by=APPLICATION_TYPE_COLUMN,
condition=lambda x: x in REFINANCING_TYPES,
)
The filter method keeps only events where appl_type matches one of the refinancing types. This discards unrelated applications (e.g., new account openings, credit card requests). Extend REFINANCING_TYPES if your data uses multiple codes for refinancing (e.g., ["mortgage_refinancing", "refi_rate_switch"]).
④ Apply the label
applied = 1 if refinancing_applications.count() > 0 else 0
return np.array([applied], dtype=np.float32)
If at least one refinancing application exists in the window, the customer is labeled positive (1). The result must be a 1-D float32 NumPy array of size 1.
Training
from pathlib import Path
from monad.ui.config import TrainingParams, MetricParams, MetricMonitoringMode
from monad.config.early_stopping import EarlyStopping
from monad.ui.module import load_from_foundation_model, BinaryClassificationTask
module = load_from_foundation_model(
checkpoint_path=Path("./foundation_model"),
downstream_task=BinaryClassificationTask(),
target_fn=refinancing_target_fn,
)
training_params = TrainingParams(
checkpoint_dir=Path("./<this_model>"),
learning_rate=1e-4,
epochs=20,
devices=[0],
metrics=[
MetricParams(alias="auroc", metric_name="AUROC", kwargs={"task": "binary"}),
MetricParams(alias="auprc", metric_name="AveragePrecision", kwargs={"task": "binary"}),
MetricParams(alias="recall", metric_name="Recall", kwargs={"task": "binary"}),
MetricParams(alias="precision", metric_name="Precision", kwargs={"task": "binary"}),
],
metric_to_monitor="val_auroc_0",
metric_monitoring_mode=MetricMonitoringMode.MAX,
early_stopping=EarlyStopping(min_delta=1e-4, patience=5),
)
module.fit(training_params, seed=42)
Evaluation
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, MetricParams, OutputType
module = load_from_checkpoint(Path("./<this_model>"))
testing_params = TestingParams(
prediction_date=datetime(2024, 5, 1, tzinfo=timezone.utc),
output_type=OutputType.DECODED,
devices=[0],
metrics=[
MetricParams(alias="auroc", metric_name="AUROC"),
MetricParams(alias="auprc", metric_name="AveragePrecision"),
MetricParams(alias="recall", metric_name="Recall"),
],
)
results = module.test(testing_params)
Prediction
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, OutputType
module = load_from_checkpoint(Path("./<this_model>"))
testing_params = TestingParams(
local_save_location=Path("./predictions.tsv"),
output_type=OutputType.DECODED,
prediction_date=datetime(2024, 6, 1, tzinfo=timezone.utc),
devices=[0],
)
predictions = module.predict(testing_params)
Variations
Broader product filter
Include multiple refinancing-adjacent application types to cast a wider net:
def refinancing_broad_target_fn(
history: target_function.Events,
future: target_function.Events,
attributes: target_function.Attributes,
ctx: Dict,
) -> np.ndarray | None:
# === Configuration ===
TARGET_WINDOW_DAYS = 30
APPLICATION_DATA_SOURCE = "applications"
APPLICATION_TYPE_COLUMN = "appl_type"
REFINANCING_TYPES = [
"mortgage_refinancing",
"rate_renegotiation",
"mortgage_transfer",
]
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if target_function.has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[target_function.SPLIT_TIMESTAMP], target_window)
refinancing_applications = future[APPLICATION_DATA_SOURCE].filter(
by=APPLICATION_TYPE_COLUMN,
condition=lambda x: x in REFINANCING_TYPES,
)
applied = 1 if refinancing_applications.count() > 0 else 0
return np.array([applied], dtype=np.float32)
Active-mortgage-only filter
Only score customers who currently hold a mortgage:
def refinancing_target_fn(
history: Events, future: Events, attributes: Attributes, ctx: Dict
) -> np.ndarray | None:
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[SPLIT_TIMESTAMP], target_window)
# Exclude customers with no active mortgage
has_mortgage = history["accounts"].filter(
by="product_type",
condition=lambda x: x == "mortgage",
).count() > 0
if not has_mortgage:
return None
refinancing_applications = future[APPLICATION_DATA_SOURCE].filter(
by=APPLICATION_TYPE_COLUMN,
condition=lambda x: x in REFINANCING_TYPES,
)
applied = 1 if refinancing_applications.count() > 0 else 0
return np.array([applied], dtype=np.float32)
def refinancing_target_fn(
history: target_function.Events,
future: target_function.Events,
attributes: target_function.Attributes,
ctx: Dict,
) -> np.ndarray | None:
TARGET_WINDOW_DAYS = 30
APPLICATION_DATA_SOURCE = "applications"
APPLICATION_TYPE_COLUMN = "appl_type"
REFINANCING_TYPES = ["mortgage_refinancing"]
target_window = timedelta(days=TARGET_WINDOW_DAYS)
if target_function.has_incomplete_training_window(ctx, target_window):
return None
future = future.interval_from(ctx[target_function.SPLIT_TIMESTAMP], target_window)
# Exclude customers with no active mortgage
has_mortgage = history["accounts"].filter(
by="product_type",
condition=lambda x: x == "mortgage",
).count() > 0
if not has_mortgage:
return None
refinancing_applications = future[APPLICATION_DATA_SOURCE].filter(
by=APPLICATION_TYPE_COLUMN,
condition=lambda x: x in REFINANCING_TYPES,
)
applied = 1 if refinancing_applications.count() > 0 else 0
return np.array([applied], dtype=np.float32)
Recommended Metrics
| Metric | Why it matters |
|---|---|
| AUROC | Overall ranking quality — how well the model separates refinancers from non-refinancers. |
| AUPRC | More informative than AUROC when refinancing events are rare (imbalanced classes). |
| Recall | Proportion of actual refinancers identified. Prioritize if missing a customer is costly. |
| Precision | Proportion of flagged customers who truly refinance. Prioritize if outreach is expensive. |
Production Tips
-
Tune the decision threshold. Refinancing is typically a rare event, so the default 0.5 threshold will miss most positives. Lower the threshold (e.g., 0.2) and accept more false positives if early outreach is cheap.
-
Align the window with your sales cycle. If your team needs 2 weeks to prepare an offer, use a 45–60 day prediction window so there is time to act.
-
Combine with rate sensitivity signals. Pair predictions with external interest rate trends for richer prioritization.
-
Retrain after rate changes. Customer behavior shifts significantly when benchmark rates move. Retrain the model promptly after major rate announcements.