Skip to content

Predict Ticket Escalation Channels

Task type: MultilabelClassificationTask Industry: Customer Support

When a customer escalates a support ticket across multiple channels within hours of opening it, it signals frustration and high churn risk. By predicting which escalation channels a customer is likely to use, support teams can proactively staff the right channels and route tickets to agents trained in multi-channel de-escalation.

What makes this advanced? Multi-channel detection with extra columns — uses .extra["ticket_id"] to link ticket openings with channel-specific escalation events within a 24-hour window.


Prerequisites

Before writing a target function you need:

  • A trained foundation model built on event data that includes the relevant data sources.
  • The monad library installed in your environment.
  • Data source(s): tickets, chat, email, phone, social_media — each with a ticket_id extra column

Target Function

The target function tells monad how to label each entity for training. It receives four arguments:

Argument Type Description
history Events All events before the temporal split.
future Events All events after the temporal split.
attributes Attributes Static entity attributes.
ctx Dict Context dictionary containing SPLIT_TIMESTAMP, data mode, etc.

For multilabel classification, the function must return one of:

  • A 1-D float32 array of size num_labels — binary indicators (0 or 1) per channel.
  • Noneexclude this entity from training.

Full Example

Python
import numpy as np
from datetime import timedelta
from typing import Dict

from monad.ui.target_function import Events, Attributes
from monad.ui.target_function import SPLIT_TIMESTAMP
from monad.ui.target_function import has_incomplete_training_window

from monad.constants import SECONDS_PER_DAY
from monad.targets import DataSourceEvents

# === Configuration ===
TICKETS_DATA_SOURCE = "tickets"
ESCALATION_WINDOW_HOURS = 24
CHANNEL_SOURCES = ["chat", "email", "phone", "social_media"]

def ticket_escalation_target_fn(
    history: Events,
    future: Events,
    attributes: Attributes,
    ctx: Dict,
) -> np.ndarray | None:
    """Predict which channels a ticket will be escalated through within 24h."""

    def is_escalated(events: DataSourceEvents, ticket_id: str, deadline: float) -> bool:
        escalated = {
            tid: ts
            for tid, ts in zip(events.extra["ticket_id"], events.timestamps)
        }
        escalation_time = escalated.get(ticket_id)
        return escalation_time is not None and escalation_time < deadline

    # 1. Build ticket-to-deadline mapping
    ticket2deadline = {
        ticket_id: ts + SECONDS_PER_DAY
        for ticket_id, ts in zip(
            future[TICKETS_DATA_SOURCE].extra["ticket_id"],
            future[TICKETS_DATA_SOURCE].timestamps,
        )
    }

    if not ticket2deadline:
        return None

    # 2. For each ticket, check each channel
    target = np.zeros(len(CHANNEL_SOURCES), dtype=np.float32)
    for ticket_id, deadline in ticket2deadline.items():
        per_ticket = np.array([
            is_escalated(future[source], ticket_id, deadline)
            for source in CHANNEL_SOURCES
        ])
        target = np.logical_or(target, per_ticket)

    return target.astype(np.float32)

Step-by-Step Breakdown

① Define the escalation check helper

Python
def is_escalated(events: DataSourceEvents, ticket_id: str, deadline: float) -> bool:
    escalated = {
        tid: ts
        for tid, ts in zip(events.extra["ticket_id"], events.timestamps)
    }
    escalation_time = escalated.get(ticket_id)
    return escalation_time is not None and escalation_time < deadline

Builds a dictionary mapping ticket IDs to their earliest timestamp in a given channel. Returns True if the ticket appears in that channel before the 24-hour deadline. Using .extra["ticket_id"] accesses metadata columns not part of the main event schema.

② Build ticket-to-deadline mapping

Python
ticket2deadline = {
    ticket_id: ts + SECONDS_PER_DAY
    for ticket_id, ts in zip(
        future[TICKETS_DATA_SOURCE].extra["ticket_id"],
        future[TICKETS_DATA_SOURCE].timestamps,
    )
}

Each ticket opening creates a 24-hour escalation window. The deadline is simply the opening timestamp plus one day (86,400 seconds).

③ Check each channel for each ticket

Python
for ticket_id, deadline in ticket2deadline.items():
    per_ticket = np.array([
        is_escalated(future[source], ticket_id, deadline)
        for source in CHANNEL_SOURCES
    ])
    target = np.logical_or(target, per_ticket)

For every ticket, each of the four channels is checked for escalation within the deadline. np.logical_or accumulates across all tickets — if any ticket was escalated through a channel, that channel is flagged positive.

④ Return the binary target vector

Python
return target.astype(np.float32)

The boolean array is cast to float32 as required by MultilabelClassificationTask. A result like [1, 0, 1, 0] means escalation occurred via chat and phone but not email or social media.


Training

Once the target function is defined, fine-tune a downstream model:

Python
from pathlib import Path
from monad.ui.config import TrainingParams, MetricParams, MetricMonitoringMode
from monad.config.early_stopping import EarlyStopping

from monad.ui.module import load_from_foundation_model, MultilabelClassificationTask

module = load_from_foundation_model(
    checkpoint_path=Path("./foundation_model"),
    downstream_task=MultilabelClassificationTask(class_names=CHANNEL_SOURCES),
    target_fn=ticket_escalation_target_fn,
)

training_params = TrainingParams(
    checkpoint_dir=Path("./<this_model>"),
    learning_rate=1e-4,
    epochs=20,
    devices=[0],
    metrics=[
        MetricParams(alias="auroc", metric_name="AUROC", kwargs={"task": "multilabel", "num_labels": <num_labels>}),
        MetricParams(alias="auprc", metric_name="AveragePrecision", kwargs={"task": "multilabel", "num_labels": <num_labels>}),
        MetricParams(alias="f1", metric_name="F1Score", kwargs={"task": "multilabel", "num_labels": <num_labels>}),
    ],
    metric_to_monitor="val_auroc_0",
    metric_monitoring_mode=MetricMonitoringMode.MAX,
    early_stopping=EarlyStopping(min_delta=1e-4, patience=5),
)

module.fit(training_params, seed=42)

Evaluation

Python
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, MetricParams, OutputType

module = load_from_checkpoint(Path("./<this_model>"))

testing_params = TestingParams(
    prediction_date=datetime(2024, 5, 1, tzinfo=timezone.utc),
    output_type=OutputType.DECODED,
    devices=[0],
    metrics=[
        MetricParams(alias="auroc", metric_name="AUROC"),
        MetricParams(alias="auprc", metric_name="AveragePrecision"),
        MetricParams(alias="f1", metric_name="F1Score"),
    ],
)

results = module.test(testing_params)

Prediction

Python
from pathlib import Path
from datetime import datetime, timezone
from monad.ui.module import load_from_checkpoint
from monad.ui.config import TestingParams, OutputType

module = load_from_checkpoint(Path("./<this_model>"))

testing_params = TestingParams(
    local_save_location=Path("./predictions.tsv"),
    output_type=OutputType.DECODED,
    prediction_date=datetime(2024, 6, 1, tzinfo=timezone.utc),
    devices=[0],
)

predictions = module.predict(testing_params)

Variation

Pandas-based approach

An alternative implementation using pandas for more readable join logic, especially useful when ticket data spans the split boundary:

Python
def ticket_escalation_pandas_target_fn(
    history: Events,
    future: Events,
    attributes: Attributes,
    ctx: Dict,
) -> np.ndarray | None:
    """Alternative: pandas-based approach for ticket escalation."""
    import pandas as pd

    window = timedelta(hours=24)
    split_time = pd.to_datetime(ctx[SPLIT_TIMESTAMP], unit="s", utc=True)

    # Include tickets opened just before the split (within 24h)
    recent_tickets = history["ticket_opening"].filter(
        by="timestamps",
        condition=lambda ts: pd.to_datetime(ts, unit="s", utc=True) > split_time - window,
    )

    if len(future["ticket_opening"]) + len(recent_tickets) == 0:
        return None

    opens = pd.DataFrame({
        "ticket_id": np.concatenate([
            future["ticket_opening"]["ticket_id"].events,
            recent_tickets["ticket_id"].events,
        ]),
        "opened_at": np.concatenate([
            pd.to_datetime(future["ticket_opening"].timestamps, unit="s", utc=True),
            pd.to_datetime(recent_tickets.timestamps, unit="s", utc=True),
        ]),
    })

    def check_channel(table_name: str) -> int:
        previous = history[table_name].filter(
            by="timestamps",
            condition=lambda ts: pd.to_datetime(ts, unit="s", utc=True) > split_time - window,
        )
        contacts = pd.DataFrame({
            "ticket_id": np.concatenate([
                future[table_name]["ticket_id"].events,
                previous["ticket_id"].events,
            ]),
            "contact_at": np.concatenate([
                pd.to_datetime(future[table_name].timestamps, unit="s", utc=True),
                pd.to_datetime(previous.timestamps, unit="s", utc=True),
            ]),
        })
        joined = contacts.merge(opens, on="ticket_id", how="inner")
        return int((joined["contact_at"] < joined["opened_at"] + window).any())

    return np.array([
        check_channel("chat_contact"),
        check_channel("email_contact"),
        check_channel("phone_contact"),
        check_channel("social_media_contact"),
    ], dtype=np.float32)

This approach handles tickets that straddle the split boundary by looking back 24 hours into history. It uses pandas merge to join ticket openings with channel contacts, which is more readable for complex temporal joins but slightly slower than the dictionary-based primary approach.


Metric Why it matters
Hamming Loss Fraction of labels that are incorrectly predicted.
Subset Accuracy Proportion of samples with all labels correct.
Macro AUROC Average ranking quality across all labels.
Per-label F1 Identifies which labels the model struggles with.

Production Tips

  1. Adjust the escalation window for your SLA. 24 hours is a common threshold, but your support SLAs may define escalation differently. Align the window with your actual escalation policy.
  2. Weight channels by cost. Phone escalations are typically the most expensive. Consider training separate models per channel or weighting the loss function by channel cost.
  3. Handle boundary tickets carefully. Tickets opened shortly before the split timestamp may have escalations that fall after the split. The pandas variation above addresses this by looking back 24 hours into history.
  4. Monitor channel distribution shifts. As your support infrastructure evolves (e.g., adding a chatbot), the channel mix will change. Retrain periodically to capture these shifts.