API Reference

DataSourceEvents

class monad.targets.DataSourceEvents

Contains main entity events data source defined in the pretrain phase.


Properties


DataSourceEvents.timestamps

Timestamps of the events.

Returns

numpy.ndarray



Methods


DataSourceEvents.__getitem__(self, column_name)

def target_fn(history: Events, future: Events, attributes: Attributes, ctx: Dict) -> np.ndarray:
    ...

    # below we get price feature from the datasource transactions
    # note, it is ModalityEvents object, not the value of the feature
    price = attributes["transactions"]["price"]

    ...
Parameters

column_name: str | list[str]
Name of the feature from the data source.

Returns

ModalityEvents



DataSourceEvents.__len__(self)

Returns the length of the DataSourceEvents object, ie. the count of events.

def target_fn(history: Events, future: Events, attributes: Attributes, ctx: Dict) -> np.ndarray:
    ...

    no_events = len(future["transactions"])

    ...
Returns

int



DataSourceEvents.count(self)

Calculates the number of events.

def target_fn(history: Events, future: Events, attributes: Attributes, ctx: Dict) -> np.ndarray:
    ...

    churn = 0 if future['product_buy'].count() > 0 else 1

    ...
Returns

int



DataSourceEvents.sum(self, column)

Calculates the sum of feature values.

def target_fn(history: Events, future: Events, attributes: Attributes, ctx: Dict) -> np.ndarray:
    ...
    
    # sum up total transaction spend from future events
    total_spend = future["transactions"].sum(column="value")
    
    ...
Parameters

column: str
Name of the feature from the data source.

Returns

float



DataSourceEvents.mean(self, column)

Calculates the mean of feature values.

def target_fn(history: Events, future: Events, attributes: Attributes, ctx: Dict) -> np.ndarray:
    ...
    
    # calculate average transaction price from future events
    avg_price = future["transactions"].mean(column="price")
    
    ...
Parameters

column: str
Name of the feature from the data source.

Returns

float



DataSourceEvents.min(self, column)

Calculates the minimum of feature values.

def target_fn(history: Events, future: Events, attributes: Attributes, ctx: Dict) -> np.ndarray:
    ...
    
    # find minimum transaction price from future events
    min_price = future["transactions"].min(column="price")
    
    ...
Parameters

column: str
Name of the feature from the data source.

Returns

float



DataSourceEvents.max(self, column)

Calculates the maximum of feature values.

def target_fn(history: Events, future: Events, attributes: Attributes, ctx: Dict) -> np.ndarray:
    ...
    
    # find maximum transaction price from future events
    max_price = future["transactions"].max(column="price")
    
    ...
Parameters

column: str
Name of the feature from the data source.

Returns

float



DataSourceEvents.apply(self, func, target)

Applies function to a target column.

def target_fn(history: Events, future: Events, attributes: Attributes, ctx: Dict) -> np.ndarray:
    ...
    
    # apply lowercase transformation to brand values in future transactions
    trans_modified = future["transactions"].apply(lambda x: x.lower(), target="brand")
    
    ...
Parameters

func: Callable[[Any], Any]
Function to apply.


target: str
Name of the target column.

Returns

DataSourceEvents with column target transformed.



DataSourceEvents.filter(self, by, condition)

Filters events based on the condition checked against column by.

def target_fn(history: Events, future: Events, attributes: Attributes, ctx: Dict) -> np.ndarray:
    ...
    
    # filter transactions to include only products from the campaign
    PRODUCTS_IN_CAMPAIGN = ["prd_1", "prd_2", "prd_3", "prd_4", "prd_5", "prd_6", "prd_7"]
    
    trans_filtered = (
        future["transactions"]
        .filter(by="product_id", condition=lambda x: x in PRODUCTS_IN_CAMPAIGN)
    )
    
    ...
Parameters

by: str
Column to check condition against.


condition: Callable[[Any], bool]
Filtering condition.

Returns

DataSourceEvents with filtered events.



DataSourceEvents.groupBy(self, by)

Groups the events by values in a column (or list of columns) provided after by.

def target_fn(history: Events, future: Events, attributes: Attributes, ctx: Dict) -> np.ndarray:
    ...
    
    # check if purchases exist for nike or adidas brands in future transactions
    target = (
        future["transactions"]
        .groupBy("brand")
        .exists(groups=["Nike", "Adidas"])
    )
    
    ...
Parameters

by: str | list[str]
Columns to group by.

Returns

A proxy object EventsGroupBy see this article.