Predict Ticket Escalation Channels
Task type: MultilabelClassificationTask
Industry: Customer Support
When a customer escalates a support ticket across multiple channels within hours of opening it, it signals frustration and high churn risk. By predicting which escalation channels a customer is likely to use, support teams can proactively staff the right channels and route tickets to agents trained in multi-channel de-escalation.
What makes this advanced? Multi-channel detection with extra columns — uses
.extra["ticket_id"]to link ticket openings with channel-specific escalation events within a 24-hour window.
Prerequisites
Before writing a target function you need:
- A trained foundation model built on event data that includes the relevant data sources.
- The monad library installed in your environment.
- Data source(s):
tickets,chat,email,phone,social_media— each with aticket_idextra column
Target Function
The target function tells monad how to label each entity for training. It receives four arguments:
| Argument | Type | Description |
|---|---|---|
history |
Events |
All events before the temporal split. |
future |
Events |
All events after the temporal split. |
attributes |
Attributes |
Static entity attributes. |
ctx |
Dict |
Context dictionary containing SPLIT_TIMESTAMP, data mode, etc. |
For multilabel classification, the function must return one of:
- A 1-D
float32array of sizenum_labels— binary indicators (0or1) per channel. None— exclude this entity from training.
Full Example
import numpy as np
from datetime import timedelta
from typing import Dict
from monad.ui.target_function import Events, Attributes
from monad.ui.target_function import SPLIT_TIMESTAMP
from monad.ui.target_function import has_incomplete_training_window
from monad.constants import SECONDS_PER_DAY
from monad.targets import DataSourceEvents
# === Configuration ===
TICKETS_DATA_SOURCE = "tickets"
ESCALATION_WINDOW_HOURS = 24
CHANNEL_SOURCES = ["chat", "email", "phone", "social_media"]
def ticket_escalation_target_fn(
history: Events,
future: Events,
attributes: Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Predict which channels a ticket will be escalated through within 24h."""
def is_escalated(events: DataSourceEvents, ticket_id: str, deadline: float) -> bool:
escalated = {
tid: ts
for tid, ts in zip(events.extra["ticket_id"], events.timestamps)
}
escalation_time = escalated.get(ticket_id)
return escalation_time is not None and escalation_time < deadline
# 1. Build ticket-to-deadline mapping
ticket2deadline = {
ticket_id: ts + SECONDS_PER_DAY
for ticket_id, ts in zip(
future[TICKETS_DATA_SOURCE].extra["ticket_id"],
future[TICKETS_DATA_SOURCE].timestamps,
)
}
if not ticket2deadline:
return None
# 2. For each ticket, check each channel
target = np.zeros(len(CHANNEL_SOURCES), dtype=np.float32)
for ticket_id, deadline in ticket2deadline.items():
per_ticket = np.array([
is_escalated(future[source], ticket_id, deadline)
for source in CHANNEL_SOURCES
])
target = np.logical_or(target, per_ticket)
return target.astype(np.float32)
Step-by-Step Breakdown
① Define the escalation check helper
def is_escalated(events: DataSourceEvents, ticket_id: str, deadline: float) -> bool:
escalated = {
tid: ts
for tid, ts in zip(events.extra["ticket_id"], events.timestamps)
}
escalation_time = escalated.get(ticket_id)
return escalation_time is not None and escalation_time < deadline
Builds a dictionary mapping ticket IDs to their earliest timestamp in a given channel. Returns True if the ticket appears in that channel before the 24-hour deadline. Using .extra["ticket_id"] accesses metadata columns not part of the main event schema.
② Build ticket-to-deadline mapping
ticket2deadline = {
ticket_id: ts + SECONDS_PER_DAY
for ticket_id, ts in zip(
future[TICKETS_DATA_SOURCE].extra["ticket_id"],
future[TICKETS_DATA_SOURCE].timestamps,
)
}
Each ticket opening creates a 24-hour escalation window. The deadline is simply the opening timestamp plus one day (86,400 seconds).
③ Check each channel for each ticket
for ticket_id, deadline in ticket2deadline.items():
per_ticket = np.array([
is_escalated(future[source], ticket_id, deadline)
for source in CHANNEL_SOURCES
])
target = np.logical_or(target, per_ticket)
For every ticket, each of the four channels is checked for escalation within the deadline. np.logical_or accumulates across all tickets — if any ticket was escalated through a channel, that channel is flagged positive.
④ Return the binary target vector
The boolean array is cast to float32 as required by MultilabelClassificationTask. A result like [1, 0, 1, 0] means escalation occurred via chat and phone but not email or social media.
Training
Once the target function is defined, fine-tune a downstream model:
from pathlib import Path
from monad.ui.config import TrainingParams, MetricParams, MetricMonitoringMode
from monad.config.early_stopping import EarlyStopping
from monad.ui.module import load_from_foundation_model, MultilabelClassificationTask
module = load_from_foundation_model(
checkpoint_path=Path("./foundation_model"),
downstream_task=MultilabelClassificationTask(class_names=CHANNEL_SOURCES),
target_fn=ticket_escalation_target_fn,
)
training_params = TrainingParams(
checkpoint_dir=Path("./<this_model>"),
learning_rate=1e-4,
epochs=20,
devices=[0],
metrics=[
MetricParams(alias="auroc", metric_name="AUROC", kwargs={"task": "multilabel", "num_labels": <num_labels>}),
MetricParams(alias="auprc", metric_name="AveragePrecision", kwargs={"task": "multilabel", "num_labels": <num_labels>}),
MetricParams(alias="f1", metric_name="F1Score", kwargs={"task": "multilabel", "num_labels": <num_labels>}),
],
metric_to_monitor="val_auroc_0",
metric_monitoring_mode=MetricMonitoringMode.MAX,
early_stopping=EarlyStopping(min_delta=1e-4, patience=5),
)
module.fit(training_params, seed=42)
Evaluation
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, MetricParams, OutputType
module = load_from_checkpoint(Path("./<this_model>"))
testing_params = TestingParams(
prediction_date=datetime(2024, 5, 1, tzinfo=timezone.utc),
output_type=OutputType.DECODED,
devices=[0],
metrics=[
MetricParams(alias="auroc", metric_name="AUROC"),
MetricParams(alias="auprc", metric_name="AveragePrecision"),
MetricParams(alias="f1", metric_name="F1Score"),
],
)
results = module.test(testing_params)
Prediction
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, OutputType
module = load_from_checkpoint(Path("./<this_model>"))
testing_params = TestingParams(
local_save_location=Path("./predictions.tsv"),
output_type=OutputType.DECODED,
prediction_date=datetime(2024, 6, 1, tzinfo=timezone.utc),
devices=[0],
)
predictions = module.predict(testing_params)
Variation
Pandas-based approach
An alternative implementation using pandas for more readable join logic, especially useful when ticket data spans the split boundary:
def ticket_escalation_pandas_target_fn(
history: Events,
future: Events,
attributes: Attributes,
ctx: Dict,
) -> np.ndarray | None:
"""Alternative: pandas-based approach for ticket escalation."""
import pandas as pd
window = timedelta(hours=24)
split_time = pd.to_datetime(ctx[SPLIT_TIMESTAMP], unit="s", utc=True)
# Include tickets opened just before the split (within 24h)
recent_tickets = history["ticket_opening"].filter(
by="timestamps",
condition=lambda ts: pd.to_datetime(ts, unit="s", utc=True) > split_time - window,
)
if len(future["ticket_opening"]) + len(recent_tickets) == 0:
return None
opens = pd.DataFrame({
"ticket_id": np.concatenate([
future["ticket_opening"]["ticket_id"].events,
recent_tickets["ticket_id"].events,
]),
"opened_at": np.concatenate([
pd.to_datetime(future["ticket_opening"].timestamps, unit="s", utc=True),
pd.to_datetime(recent_tickets.timestamps, unit="s", utc=True),
]),
})
def check_channel(table_name: str) -> int:
previous = history[table_name].filter(
by="timestamps",
condition=lambda ts: pd.to_datetime(ts, unit="s", utc=True) > split_time - window,
)
contacts = pd.DataFrame({
"ticket_id": np.concatenate([
future[table_name]["ticket_id"].events,
previous["ticket_id"].events,
]),
"contact_at": np.concatenate([
pd.to_datetime(future[table_name].timestamps, unit="s", utc=True),
pd.to_datetime(previous.timestamps, unit="s", utc=True),
]),
})
joined = contacts.merge(opens, on="ticket_id", how="inner")
return int((joined["contact_at"] < joined["opened_at"] + window).any())
return np.array([
check_channel("chat_contact"),
check_channel("email_contact"),
check_channel("phone_contact"),
check_channel("social_media_contact"),
], dtype=np.float32)
This approach handles tickets that straddle the split boundary by looking back 24 hours into history. It uses pandas merge to join ticket openings with channel contacts, which is more readable for complex temporal joins but slightly slower than the dictionary-based primary approach.
Recommended Metrics
| Metric | Why it matters |
|---|---|
| Hamming Loss | Fraction of labels that are incorrectly predicted. |
| Subset Accuracy | Proportion of samples with all labels correct. |
| Macro AUROC | Average ranking quality across all labels. |
| Per-label F1 | Identifies which labels the model struggles with. |
Production Tips
- Adjust the escalation window for your SLA. 24 hours is a common threshold, but your support SLAs may define escalation differently. Align the window with your actual escalation policy.
- Weight channels by cost. Phone escalations are typically the most expensive. Consider training separate models per channel or weighting the loss function by channel cost.
- Handle boundary tickets carefully. Tickets opened shortly before the split timestamp may have escalations that fall after the split. The pandas variation above addresses this by looking back 24 hours into history.
- Monitor channel distribution shifts. As your support infrastructure evolves (e.g., adding a chatbot), the channel mix will change. Retrain periodically to capture these shifts.