Deployment on Snowflake Container Service

This is a step-by-step description how to get BaseModel up and running on Snowflake Container Service. In this guide, we will be using HM Kaggle dataset to model propensity. HM Personalized Fashion Recommendations

Snowflake containers - Prerequisites

The first step is to make sure we fulfil the necessary prerequisites as below:

Prerequisites

  • Username and password in Snowflake subscription having access to Snowflake Containers - at any time you can refer to Snowflake's official documentation for help.
  • SnowSQL or Snowflake plugin to VSCode
  • Login into docker image repository in Snowflake - check documentation for help.

SnowSQL (Recommended)

Install

To install SnowSQL follow the instructions available at Install SnowSQL

Configuration

Once the installation is successfull, the configuration file for SnowSQL will be stored at ~/.snowsql/config and for the purpose of this Readme we assume it has the following format:

[connections]
accountname = account_name
username = <user>
password = <password>
rolename = ACCOUNTADMIN
dbname = YOUR_DB
schemaname = YOUR_SCHEMA
warehousename = YOUR_WAREHOUSE_NAME

Other ways of connecting along with more details can be found in the documentation.

Snowflake for VSCode (Recommended)

Snowflake for VSCode It is a plugin that allows running queries directly from *.sql files and displaying the commands output directly in the VSCode. A single query or a whole file comprised of multiple sql queries can be run using the plugin. For the purpose of this tutorial we will be using it to accomplish our tasks, but any other code/text editor will work as well.

More details about the extension and how to use it efficiently can be found here:Read More

Docker login

Adding snowflake docker repository is necessary to push images to it. To login to the repository in the Snowflake account use either snowpark token or username and password

In order to use Snowpark token

  1. Install SnowCLI. Recommended way is to create a python venv and install snowflake-cli-labs python package. Follow official instruction
  2. Snowflake CLI uses a a global configuration file called config.toml. If the file does not exist, running any snow command for the first time automatically creates an empty config.toml
  3. Setup connection to the Snowflake instance. A configuration file is by default stored at ~/.config/snowflake/config.toml for Linux and ~/Library/Application Support/snowflake/config.toml for MacOs. For the purpose of this Reamde let's assume that this is the content of the config.toml file:
[connections]
account = account_name
user = <user>
password = <password>
rolename = ACCOUNTADMIN

Read More

  1. Run
snow snowpark registry token --connection <connection_name> --format=JSON | docker login <orgname>-<acctname>.registry.snowflakecomputing.com -u 0sessiontoken --password-stdin

In order to use Username and password

Run and give password to Snowflake when prompted:

docker login <orgname>-<acctname>.registry.snowflakecomputing.com -u <username>

Setup - One Time setup and configuration of environment

In this section we guide you though how to prepare the environment to use with BaseModel. You need an ACCOUNTADMIN or similiar role to go though these setups.

You can read more about roles on the official documentation here

General setup

These initial steps will make sure you have the right compute pool, users, roles and tables created for this scenario.

Below is the sample setup.sql that we have used in our case.

CREATE ROLE monad_role;
GRANT ROLE monad_role TO USER <your_admin_user>;

ALTER USER <your_admin_user> SET DEFAULT_ROLE = monad_role;

CREATE COMPUTE POOL monad_compute_pool_gpu
  MIN_NODES = 1
  MAX_NODES = 1
  INSTANCE_FAMILY = GPU_5
  INITIALLY_SUSPENDED = TRUE
  AUTO_SUSPEND_SECS = 300
  AUTO_RESUME = TRUE;

GRANT USAGE ON COMPUTE POOL monad_compute_pool_gpu TO ROLE monad_role;
GRANT MONITOR ON COMPUTE POOL monad_compute_pool_gpu TO ROLE monad_role;

CREATE OR REPLACE WAREHOUSE monad_warehouse WITH
  WAREHOUSE_SIZE='MEDIUM'
  AUTO_SUSPEND = 180
  AUTO_RESUME = TRUE
  INITIALLY_SUSPENDED= TRUE;

GRANT ALL ON WAREHOUSE monad_warehouse TO ROLE monad_role;
-- Alow using the warehouse by SYSADMIN role
GRANT ALL ON WAREHOUSE monad_warehouse TO ROLE SYSADMIN;

CREATE DATABASE monad_db;

GRANT OWNERSHIP ON DATABASE monad_db TO ROLE monad_role;
-- Setup Ingress - this needs to be done once per account
-- CREATE SECURITY INTEGRATION snowservices_ingress_oauth
--   TYPE=oauth
--   OAUTH_CLIENT=snowservices_ingress
--   ENABLED=true;
-- create db objects
USE ROLE monad_role;
USE DATABASE monad_db;
USE WAREHOUSE XSMALL_BASE_MODEL;

CREATE SCHEMA data_schema;
USE SCHEMA DATA_SCHEMA;
CREATE OR REPLACE IMAGE REPOSITORY image_repository;

CREATE STAGE stage DIRECTORY = ( ENABLE = true );
CREATE STAGE monad_stage
DIRECTORY = ( ENABLE = true )
ENCRYPTION = (type = 'SNOWFLAKE_SSE');

GRANT ALL ON SCHEMA data_schema TO ROLE ACCOUNTADMIN;
GRANT ALL ON DATABASE monad_db TO ROLE ACCOUNTADMIN;
GRANT ALL ON STAGE stage TO ROLE ACCOUNTADMIN;
GRANT ALL ON STAGE monad_stage TO ROLE ACCOUNTADMIN;
-- Add access to HM_KAGGLE to monad_role
USE ROLE SYSADMIN;
GRANT USAGE ON DATABASE HM_KAGGLE TO ROLE monad_role;
USE ROLE monad_role;


-- -- DEBUGGING
-- SHOW COMPUTE POOLS;
-- DESCRIBE COMPUTE POOL MONAD_COMPUTE_POOL_GPU;
-- ALTER COMPUTE POOL MONAD_COMPUTE_POOL_GPU SUSPEND;
-- ALTER COMPUTE POOL MONAD_COMPUTE_POOL_GPU STOP ALL;
-- ALTER COMPUTE POOL MONAD_COMPUTE_POOL_GPU RESUME;

📘

Please Note:

This example assumes we have HM_KAGGLE database and tables already present in Snowflake.

  1. Replace <username> placeholders in setup.sql file with your username.
  2. Run SQL queries included in a setup.sql file. The script setups following resources:
    • monad_role that is used to manage monad-releated resources
    • creates compute pool with A10G GPU
    • create warehouse of size MEDIUM
    • created monad_db database that is used to files stages and image repository
    • creates stages:
      • stage for storing job configs
      • monad_stage for storing all outputs from Monad – this stage is mounted to docker containers later on
    • docker image repository named image_repository
    • necessary permissions and roles are granted
  3. Push docker image to snowflake's repo. First you need to get docker image either by pulling it from a resource you have access to or loading it. The push would look like this
    docker push <orgname>-<acctname>.registry.snowflakecomputing.com/${SNOWFLAKE_REPO_PATH}/monad:${DOCKER_TAG}
    

🚧

Important:

In our example only the monad_role is allowed to use stages and image repository. To allow other roles Read/Write on a Stage see All Privilages. For the Image repository see Repository Privilages

Setup event table for the account

To allow logging from Python applications running in Snowflake Containers an active Event Table must be present in the Snowflake account.
setup_event_table.sql file creates such an event table, sets it as active and grants necessary permissions to other roles.
The script needs to be run only once per account. The script is generally idempotent.

  1. Run all commands from setup_event_table.sql. The script in our case looks like this:
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE XSMALL_BASE_MODEL;
CREATE DATABASE IF NOT EXISTS event_db;
CREATE SCHEMA IF NOT EXISTS event_db.event_schema;
USE DATABASE event_db;
USE SCHEMA event_schema;
CREATE EVENT TABLE IF NOT EXISTS event_table;
ALTER ACCOUNT SET EVENT_TABLE = event_db.event_schema.event_table;
GRANT ALL ON DATABASE event_db TO ROLE SYSADMIN;
GRANT USAGE ON DATABASE event_db TO ROLE MONAD_ROLE;
GRANT USAGE ON SCHEMA event_schema TO ROLE MONAD_ROLE;
GRANT ALL ON TABLE event_table TO ROLE MONAD_ROLE;

Using Snowflake Containers with BaseModel

In this section we document how to use Containers with BaseModel once all the setup done on Admin account is completed. We do not recommend running these using ACCOUNTADMIN role.

You can read more about roles on the official documentation here

Pretrain Configuration and Foundation Model Training

In this stage, we will configure necessary files to successully run pretrain phase in Snowflake Containers. The details of pretrain_configuration spec file is the same as in the HM Kaggle example.

  1. Adjust paths in three PUT commands in commands.sql, such that they point to the right files. Use absolute paths. The example command.sql file looks like this:

    USE ROLE monad_role;
    USE DATABASE monad_db;
    USE SCHEMA DATA_SCHEMA;
    USE WAREHOUSE XSMALL_BASE_MODEL;
    -- Upload config spec to monad_stage
    -- UPDATE ME Use abolute path to point to the config file
    PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/pretrain/config.yaml @monad_db.data_schema.monad_stage/configs/hm-kaggle/pretrain/
      AUTO_COMPRESS=FALSE
      OVERWRITE=TRUE;
    -- Upload job spec to Stage
    -- UPDATE ME Use abolute path to point to the job spec file
    PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/pretrain/pretrain.py @monad_db.data_schema.monad_stage/scripts/hm-kaggle/pretrain/
      AUTO_COMPRESS=FALSE
      OVERWRITE=TRUE;
    
    PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/pretrain/pretrain_job_spec.yaml @monad_db.data_schema.stage/jobs/hm-kaggle/pretrain/
      AUTO_COMPRESS=FALSE
      OVERWRITE=TRUE;
    -- Create a job
    EXECUTE SERVICE
      IN COMPUTE POOL monad_compute_pool_gpu
      FROM @stage
    	SPECIFICATION_FILE ='/jobs/hm-kaggle/propensity/train_job_spec.yaml'
      NAME= FM_Train
    
  2. Run all commands from commands.sql, which does the following:

  • Sets correct context

  • Uploads pretrain_configuration to monad_stage stage - as a reminder, the spec that we use here looks like this :

    datasources:
      - type: main_entity_attribute
        main_entity_column: customer_id
        name: customers
        data_location:
          source: snowflake
          connection_params:
            user: ${SNOWFLAKE_USER}
            password: ${SNOWFLAKE_PASSWORD}
            account: ${SNOWFLAKE_ACCOUNT}
            warehouse: ${SNOWFLAKE_WAREHOUSE}
            role: ${SNOWFLAKE_ROLE}
            database: HM_KAGGLE
            db_schema: PRIVATE
          table_name: customers
      - type: event
        main_entity_column: customer_id
        name: transactions
        date_column: t_dat
        text_columns:
          - prod_name
          - detail_desc
        data_location:
          source: snowflake
          connection_params:
            user: ${SNOWFLAKE_USER}
            password: ${SNOWFLAKE_PASSWORD}
            account: ${SNOWFLAKE_ACCOUNT}
            warehouse: ${SNOWFLAKE_WAREHOUSE}
            role: ${SNOWFLAKE_ROLE}
            database: HM_KAGGLE
            db_schema: PRIVATE
          table_name: transactions
          
    
    data_params:
      data_start_date: 2018-09-20 00:00:00
      validation_start_date: 2020-09-01 00:00:00
      check_target_for_next_N_days: 21
    
    loader_params:
      batch_size: 256
      num_workers: 10
    
    training_params:
      learning_rate: 0.0001
      epochs: 3
    
    hidden_dim: 2048
    
    
  • Uploads job_configuration to stage stage. In our case, the job used looks like this:

    spec:
      container:
      - name: main
        image: /monad_db/data_schema/image_repository/monad:latest-dev
        env:
          SNOWFLAKE_WAREHOUSE: monad_warehouse
          SNOWFLAKE_DATABASE: HM_KAGGLE
          SNOWFLAKE_SCHEMA: PUBLIC
        command:
        - python
        args:
        - -m
        - monad.pretrain
        - --config
        - /app/monad_stage/configs/hm-kaggle/pretrain/config.yaml
        - --features-path
        - /app/monad_stage/monad-features/hm-kaggle
        - --resume
        resources:
         limits:
           nvidia.com/gpu: 1
         requests:
           nvidia.com/gpu: 1
        volumeMounts:
        - name: monad-stage
          mountPath: /app/monad_stage
        - name: dev-shm
          mountPath: /dev/shm
      volumes:
      - name: monad-stage
        source: "@monad_stage"
        uid: 1000  # user in docker has uid and gid: 1000
        gid: 1000
      - name: dev-shm
        source: memory
        size: 48Gi  # Out of 128Gi available on the machine
    
    
  • Uploads pretrain.py file to stage. In this case, it loooks like this:

import argparse
from monad.ui import pretrain
from pathlib import Path



def parse_args() -> argparse.Namespace:
    """
    Returns:
        argument parser to be used when executing script
    """
    argument_parser = argparse.ArgumentParser()
    argument_parser.add_argument("--config", type=Path, required=True, help="Path to yaml pretraining config")
    argument_parser.add_argument("--features-path", type=Path, required=True, help="Where to store result features.")
    argument_parser.add_argument("--storage-config", type=Path, required=False)
    rerun_group = argument_parser.add_mutually_exclusive_group()
    rerun_group.add_argument(
        "--resume", action="store_true", help="Whether to resume interrupted training", default=False
    )
    rerun_group.add_argument(
        "--overwrite", action="store_true", help="Whether to overwrite existing results", default=False
    )
    return argument_parser.parse_args()


if __name__ == "__main__":
    params = parse_args()
    pretrain(
        config_path=params.config,
        output_path=params.features_path,
        storage_config_path=params.storage_config,
        resume=params.resume,
        overwrite=params.overwrite,
    )

More details about the configuration file can be found in Snowflake's documentation.

It is worth noting:

  • BaseModel requires GPU and it needs to be provided in this config file.
  • in the specification config yaml, under spec/container/args user needs to provide a python command to run BaseModel with parameters that they want to - similar to running it from CLI.

Final stage in the commands.sql file is executing the service which starts the job.

Train Downstream Model and Configuration

The process will look very similiar to Foundation Model training. Once again we have to prepare

  • commands.sql file
  • job spec file
  • train.py file - python file describing the target function.

The steps are:

  1. Adjust paths in two PUT commands in commands.sql, such that they point to the right files. Use absolute paths.
    USE ROLE monad_role;
    USE DATABASE monad_db;
    USE WAREHOUSE XSMALL_BASE_MODEL;
    USE SCHEMA DATA_SCHEMA;
    -- Upload train script to stage
    -- UPDATE ME Use abolute path to point to the training script
    PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/classification/train.py @monad_db.data_schema.monad_stage/scripts/hm-kaggle/propensity/
      AUTO_COMPRESS=FALSE
      OVERWRITE=TRUE;
    -- Upload job spec to Stage
    -- UPDATE ME Use abolute path to point to the job spec file
    PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/classification/train_job_spec.yaml @monad_db.data_schema.stage/jobs/hm-kaggle/propensity/
      AUTO_COMPRESS=FALSE
      OVERWRITE=TRUE;
    -- Create a job
    EXECUTE SERVICE
      IN COMPUTE POOL monad_compute_pool_gpu
      FROM @stage
    	SPECIFICATION_FILE ='/jobs/hm-kaggle/pretrain/pretrain_job_spec.yaml'
      NAME = Downstream_train
    
  2. Run all commands from commands.sql, which does the following:
  • Sets correct context

  • Uploads training script (in our case train.py)monad_stage stage

    import argparse
    from typing import Dict
    
    import torch
    import numpy as np
    from monad.ui.config import TrainingParams
    from monad.ui.module import MultilabelClassificationTask, load_from_foundation_model
    from monad.ui.target_function import Attributes, Events
    
    
    TARGET_NAMES = [
        "Garment Upper body",
        "Underwear",
        "Socks & Tights",
        "Garment Lower body",
        "Accessories",
        "Items",
        "Nightwear",
        "Shoes",
        "Swimwear",
        "Garment Full body",
    ]
    TARGET_ENTITY = "product_group_name"
    
    
    def int_or_float(value):
        try:
            if "." in value:
                return float(value)
            else:
                return int(value)
        except ValueError as exc:
            raise argparse.ArgumentTypeError("Value must be an int or float") from exc
    
    
    def propensity_target_fn(_history: Events, future: Events, _entity: Attributes, _ctx: Dict) -> np.ndarray:
        purchase_target, _ = future["transactions"].groupBy(TARGET_ENTITY).exists(groups=TARGET_NAMES)
        return purchase_target
    
    
    def parse_args():
        parser = argparse.ArgumentParser(description="Train a model for HM Kaggle classification task")
        parser.add_argument("--fm-path", type=str, required=True, help="Path to the features directory")
        parser.add_argument("--checkpoint-dir", type=str, required=True, help="Directory to save model checkpoints")
        parser.add_argument(
            "--learning-rate", type=float, required=False, default=5e-5, help="Learning rate for the model"
        )
        parser.add_argument("--epochs", type=int, required=False, default=5, help="Number of epochs to train the model")
        parser.add_argument(
            "--devices", type=int, nargs="+", default=[0], required=False, help="List of device IDs to use for training"
        )
        parser.add_argument(
            "--limit-train-batches",
            type=int_or_float,
            required=False,
            default=1.0,
            help="Number of workers for the data loader",
        )
        parser.add_argument(
            "--limit-val-batches",
            type=int_or_float,
            required=False,
            default=1.0,
            help="Number of workers for the data loader",
        )
    
        return parser.parse_args()
    
    
    def main():
        args = parse_args()
    
        training_params = MonadTrainingParams(
            learning_rate=args.learning_rate,
            checkpoint_dir=args.checkpoint_dir,
            epochs=args.epochs,
            devices=args.devices,
            limit_train_batches=args.limit_train_batches,
            limit_val_batches=args.limit_val_batches,
        )
    
        trainer = load_from_foundation_model(
            args.fm_path, MultilabelClassificationTask(), propensity_target_fn, num_outputs=len(TARGET_NAMES)
        )
    
        trainer.fit(training_params=training_params)
    
    
    if __name__ == "__main__":
        main()
    
    
  • Uploads job_configuration to stage stage

    spec:
      container:
      - name: main
        image: /monad_db/data_schema/image_repository/monad:latest-dev
        env:
          SNOWFLAKE_WAREHOUSE: monad_warehouse
          SNOWFLAKE_DATABASE: HM_KAGGLE
          SNOWFLAKE_SCHEMA: PUBLIC
        command:
        - python
        args:
        - /app/monad_stage/scripts/hm-kaggle/propensity/train.py
        - --fm-path
        - /app/monad_stage/monad-features/hm-kaggle/fm
        - --checkpoint-dir
        - /app/monad_stage/monad-features/hm-kaggle/propensity/checkpoints
        - --epochs
        - 1
        resources:
          limits:
            nvidia.com/gpu: 1
          requests:
            nvidia.com/gpu: 1
        volumeMounts:
        - name: monad-stage
          mountPath: /app/monad_stage
        - name: dev-shm
          mountPath: /dev/shm
      volumes:
      - name: monad-stage
        source: "@monad_stage"
        uid: 1000  # user in docker has uid and gid: 1000
        gid: 1000
      - name: dev-shm
        source: memory
        size: 48Gi  # Out of 128Gi available on the machine
    
    

Final stage in the commands.sql file is executing the service which starts the job.

Prediction

The final step is running predictions. Once again we will be using 3 files:

  • commands.sql
  • job spec file
  • predict.py
  1. Adjust the paths in PUT commands in sql so that it looks for example like this:
USE ROLE monad_role;
USE DATABASE monad_db;
USE WAREHOUSE XSMALL_BASE_MODEL;
USE SCHEMA DATA_SCHEMA;
-- Upload train script to stage
-- UPDATE ME Use abolute path to point to the training script
PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/predict/predict.py @monad_db.data_schema.monad_stage/scripts/hm-kaggle/propensity/
  AUTO_COMPRESS=FALSE
  OVERWRITE=TRUE;
-- Upload job spec to Stage
-- UPDATE ME Use abolute path to point to the job spec file
PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/predict/predict_job_spec.yaml @monad_db.data_schema.stage/jobs/hm-kaggle/propensity/
  AUTO_COMPRESS=FALSE
  OVERWRITE=TRUE;
-- Create a job
EXECUTE JOB SERVICE
  IN COMPUTE POOL monad_compute_pool_gpu
  FROM @stage
  SPECIFICATION_FILE ='/jobs/hm-kaggle/propensity/predict_job_spec.yaml'
  NAME = Predict
  1. Run all commands from commands.sql file - as before, they upload the correct files and executes the job.
  • The job spec should look like this:

  • spec:
      container:
      - name: main
        image: /monad_db/data_schema/image_repository/monad:latest-dev
        env:
          SNOWFLAKE_WAREHOUSE: monad_warehouse
          SNOWFLAKE_DATABASE: HM_KAGGLE
          SNOWFLAKE_SCHEMA: PUBLIC
        command:
        - python
        args:
        - /app/monad_stage/scripts/hm-kaggle/propensity/predict.py
        - --save-path
        - /app/monad_stage/monad_features/hm-kaggle/output
        - --checkpoint-dir
        - /app/monad_stage/monad-features/hm-kaggle/propensity/checkpoints
        resources:
          limits:
            nvidia.com/gpu: 1
          requests:
            nvidia.com/gpu: 1
        volumeMounts:
        - name: monad-stage
          mountPath: /app/monad_stage
        - name: dev-shm
          mountPath: /dev/shm
      volumes:
      - name: monad-stage
        source: "@monad_stage"
        uid: 1000  # user in docker has uid and gid: 1000
        gid: 1000
      - name: dev-shm
        source: memory
        size: 48Gi  # Out of 128Gi available on the machine
    
    

    which is very similiar to previously described job configs - it provides necessary configuration around paths, resources and the command that executes the predict.py script.

  • Finally, our predict.py:

  • import argparse
    from datetime import datetime
    
    from monad.ui.module import load_from_checkpoint
    from monad.ui.config import MonadTestingParams
    
    
    def int_or_float(value):
        try:
            if "." in value:
                return float(value)
            else:
                return int(value)
        except ValueError as exc:
            raise argparse.ArgumentTypeError("Value must be an int or float") from exc
    
    
    def parse_args():
        parser = argparse.ArgumentParser(description="Train a model for HM Kaggle classification task")
        parser.add_argument(
            "--save-path",
            type=str,
            required=True,
            help="Path to save the prediction",
        )
        parser.add_argument(
            "--checkpoint-dir",
            type=str,
            required=True,
            help="Path to save the checkpoints",
        )
        parser.add_argument(
            "--limit-test-batches",
            type=int_or_float,
            required=False,
            default=1.0,
            help="Number of workers for the data loader",
        )
        return parser.parse_args()
    
    
    def main():
    
        args = parse_args()
        testing_params = MonadTestingParams(
            local_save_location=args.save_path,
            limit_test_batches=args.limit_test_batches,
        )
    
        testing_module = load_from_checkpoint(args.checkpoint_dir, test_start_date=datetime(year=2021, month=9, day=10))
    
        testing_module.predict(testing_params=testing_params)
    
    
    if __name__ == "__main__":
        main()
    
    
    • After the job is run, we will now have predictions in the output folder.

Troubleshooting

At any time, using for example SnowSql, users can check the status of the job as well as have a look at what is going on inside container. This is achieved in the following way:

  1. SHOW SERVICE CONTAINERS IN SERVICE <name_of_service>- this should give you a status of the job
  2. SELECT SYSTEM$GET_SERVICE_LOGS('<NAME_OF_SERVICE>',0,'main'); - this should give a detailed status of the job inside container, e.g how the training is doing, or in case of an error - what the error was.