Deployment on Snowflake Container Service
This is a step-by-step description how to get BaseModel up and running on Snowflake Container Service. In this guide, we will be using HM Kaggle dataset to model propensity. HM Personalized Fashion Recommendations
Snowflake containers - Prerequisites
The first step is to make sure we fulfil the necessary prerequisites as below:
Prerequisites
- Username and password in Snowflake subscription having access to Snowflake Containers - at any time you can refer to Snowflake's official documentation for help.
- SnowSQL or Snowflake plugin to VSCode
- Login into docker image repository in Snowflake - check documentation for help.
SnowSQL (Recommended)
Install
To install SnowSQL follow the instructions available at Install SnowSQL
Configuration
Once the installation is successfull, the configuration file for SnowSQL will be stored at ~/.snowsql/config
and for the purpose of this Readme we assume it has the following format:
[connections]
accountname = account_name
username = <user>
password = <password>
rolename = ACCOUNTADMIN
dbname = YOUR_DB
schemaname = YOUR_SCHEMA
warehousename = YOUR_WAREHOUSE_NAME
Other ways of connecting along with more details can be found in the documentation.
Snowflake for VSCode (Recommended)
Snowflake for VSCode It is a plugin that allows running queries directly from *.sql
files and displaying the commands output directly in the VSCode. A single query or a whole file comprised of multiple sql queries can be run using the plugin. For the purpose of this tutorial we will be using it to accomplish our tasks, but any other code/text editor will work as well.
More details about the extension and how to use it efficiently can be found here:Read More
Docker login
Adding snowflake docker repository is necessary to push images to it. To login to the repository in the Snowflake account use either snowpark token or username and password
In order to use Snowpark token
- Install SnowCLI. Recommended way is to create a python venv and install snowflake-cli-labs python package. Follow official instruction
- Snowflake CLI uses a a global configuration file called config.toml. If the file does not exist, running any
snow
command for the first time automatically creates an emptyconfig.toml
- Setup connection to the Snowflake instance. A configuration file is by default stored at
~/.config/snowflake/config.toml
for Linux and~/Library/Application Support/snowflake/config.toml
for MacOs. For the purpose of this Reamde let's assume that this is the content of theconfig.toml
file:
[connections]
account = account_name
user = <user>
password = <password>
rolename = ACCOUNTADMIN
- Run
snow snowpark registry token --connection <connection_name> --format=JSON | docker login <orgname>-<acctname>.registry.snowflakecomputing.com -u 0sessiontoken --password-stdin
In order to use Username and password
Run and give password to Snowflake when prompted:
docker login <orgname>-<acctname>.registry.snowflakecomputing.com -u <username>
Setup - One Time setup and configuration of environment
In this section we guide you though how to prepare the environment to use with BaseModel. You need an ACCOUNTADMIN
or similiar role to go though these setups.
You can read more about roles on the official documentation here
General setup
These initial steps will make sure you have the right compute pool, users, roles and tables created for this scenario.
Below is the sample setup.sql that we have used in our case.
CREATE ROLE monad_role;
GRANT ROLE monad_role TO USER <your_admin_user>;
ALTER USER <your_admin_user> SET DEFAULT_ROLE = monad_role;
CREATE COMPUTE POOL monad_compute_pool_gpu
MIN_NODES = 1
MAX_NODES = 1
INSTANCE_FAMILY = GPU_5
INITIALLY_SUSPENDED = TRUE
AUTO_SUSPEND_SECS = 300
AUTO_RESUME = TRUE;
GRANT USAGE ON COMPUTE POOL monad_compute_pool_gpu TO ROLE monad_role;
GRANT MONITOR ON COMPUTE POOL monad_compute_pool_gpu TO ROLE monad_role;
CREATE OR REPLACE WAREHOUSE monad_warehouse WITH
WAREHOUSE_SIZE='MEDIUM'
AUTO_SUSPEND = 180
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED= TRUE;
GRANT ALL ON WAREHOUSE monad_warehouse TO ROLE monad_role;
-- Alow using the warehouse by SYSADMIN role
GRANT ALL ON WAREHOUSE monad_warehouse TO ROLE SYSADMIN;
CREATE DATABASE monad_db;
GRANT OWNERSHIP ON DATABASE monad_db TO ROLE monad_role;
-- Setup Ingress - this needs to be done once per account
-- CREATE SECURITY INTEGRATION snowservices_ingress_oauth
-- TYPE=oauth
-- OAUTH_CLIENT=snowservices_ingress
-- ENABLED=true;
-- create db objects
USE ROLE monad_role;
USE DATABASE monad_db;
USE WAREHOUSE XSMALL_BASE_MODEL;
CREATE SCHEMA data_schema;
USE SCHEMA DATA_SCHEMA;
CREATE OR REPLACE IMAGE REPOSITORY image_repository;
CREATE STAGE stage DIRECTORY = ( ENABLE = true );
CREATE STAGE monad_stage
DIRECTORY = ( ENABLE = true )
ENCRYPTION = (type = 'SNOWFLAKE_SSE');
GRANT ALL ON SCHEMA data_schema TO ROLE ACCOUNTADMIN;
GRANT ALL ON DATABASE monad_db TO ROLE ACCOUNTADMIN;
GRANT ALL ON STAGE stage TO ROLE ACCOUNTADMIN;
GRANT ALL ON STAGE monad_stage TO ROLE ACCOUNTADMIN;
-- Add access to HM_KAGGLE to monad_role
USE ROLE SYSADMIN;
GRANT USAGE ON DATABASE HM_KAGGLE TO ROLE monad_role;
USE ROLE monad_role;
-- -- DEBUGGING
-- SHOW COMPUTE POOLS;
-- DESCRIBE COMPUTE POOL MONAD_COMPUTE_POOL_GPU;
-- ALTER COMPUTE POOL MONAD_COMPUTE_POOL_GPU SUSPEND;
-- ALTER COMPUTE POOL MONAD_COMPUTE_POOL_GPU STOP ALL;
-- ALTER COMPUTE POOL MONAD_COMPUTE_POOL_GPU RESUME;
Please Note:
This example assumes we have HM_KAGGLE database and tables already present in Snowflake.
- Replace
<username>
placeholders insetup.sql
file with your username. - Run SQL queries included in a
setup.sql
file. The script setups following resources:monad_role
that is used to manage monad-releated resources- creates compute pool with A10G GPU
- create warehouse of size
MEDIUM
- created
monad_db
database that is used to files stages and image repository - creates stages:
stage
for storing job configsmonad_stage
for storing all outputs from Monad – this stage is mounted to docker containers later on
- docker image repository named
image_repository
- necessary permissions and roles are granted
- Push docker image to snowflake's repo. First you need to get docker image either by pulling it from a resource you have access to or loading it. The push would look like this
docker push <orgname>-<acctname>.registry.snowflakecomputing.com/${SNOWFLAKE_REPO_PATH}/monad:${DOCKER_TAG}
Important:
In our example only the
monad_role
is allowed to use stages and image repository. To allow other roles Read/Write on a Stage see All Privilages. For the Image repository see Repository Privilages
Setup event table for the account
To allow logging from Python applications running in Snowflake Containers an active Event Table must be present in the Snowflake account.
setup_event_table.sql
file creates such an event table, sets it as active and grants necessary permissions to other roles.
The script needs to be run only once per account. The script is generally idempotent.
- Run all commands from
setup_event_table.sql
. The script in our case looks like this:
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE XSMALL_BASE_MODEL;
CREATE DATABASE IF NOT EXISTS event_db;
CREATE SCHEMA IF NOT EXISTS event_db.event_schema;
USE DATABASE event_db;
USE SCHEMA event_schema;
CREATE EVENT TABLE IF NOT EXISTS event_table;
ALTER ACCOUNT SET EVENT_TABLE = event_db.event_schema.event_table;
GRANT ALL ON DATABASE event_db TO ROLE SYSADMIN;
GRANT USAGE ON DATABASE event_db TO ROLE MONAD_ROLE;
GRANT USAGE ON SCHEMA event_schema TO ROLE MONAD_ROLE;
GRANT ALL ON TABLE event_table TO ROLE MONAD_ROLE;
Using Snowflake Containers with BaseModel
In this section we document how to use Containers with BaseModel once all the setup done on Admin account is completed. We do not recommend running these using ACCOUNTADMIN
role.
You can read more about roles on the official documentation here
Pretrain Configuration and Foundation Model Training
In this stage, we will configure necessary files to successully run pretrain phase in Snowflake Containers. The details of pretrain_configuration spec file is the same as in the HM Kaggle example.
-
Adjust paths in three
PUT
commands incommands.sql
, such that they point to the right files. Use absolute paths. The example command.sql file looks like this:USE ROLE monad_role; USE DATABASE monad_db; USE SCHEMA DATA_SCHEMA; USE WAREHOUSE XSMALL_BASE_MODEL; -- Upload config spec to monad_stage -- UPDATE ME Use abolute path to point to the config file PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/pretrain/config.yaml @monad_db.data_schema.monad_stage/configs/hm-kaggle/pretrain/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE; -- Upload job spec to Stage -- UPDATE ME Use abolute path to point to the job spec file PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/pretrain/pretrain.py @monad_db.data_schema.monad_stage/scripts/hm-kaggle/pretrain/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE; PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/pretrain/pretrain_job_spec.yaml @monad_db.data_schema.stage/jobs/hm-kaggle/pretrain/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE; -- Create a job EXECUTE SERVICE IN COMPUTE POOL monad_compute_pool_gpu FROM @stage SPEC='/jobs/hm-kaggle/pretrain/pretrain_job_spec.yaml'; -- get the job status -- SELECT SYSTEM$GET_JOB_STATUS('<job-query-id>'); -- -- get logs -- SELECT SYSTEM$GET_JOB_LOGS('<job-query-id>', 'main'); -- cancel query -- SELECT SYSTEM$CANCEL_QUERY('<job-query-id>');
-
Run all commands from
commands.sql
, which does the following:
-
Sets correct context
-
Uploads
pretrain_configuration
tomonad_stage
stage - as a reminder, the spec that we use here looks like this :datasources: - type: main_entity_attribute main_entity_column: customer_id name: customers data_location: source: snowflake connection_params: user: ${SNOWFLAKE_USER} password: ${SNOWFLAKE_PASSWORD} account: ${SNOWFLAKE_ACCOUNT} warehouse: ${SNOWFLAKE_WAREHOUSE} role: ${SNOWFLAKE_ROLE} database: HM_KAGGLE db_schema: PRIVATE table_name: customers - type: event main_entity_column: customer_id name: transactions date_column: t_dat text_columns: - prod_name - detail_desc data_location: source: snowflake connection_params: user: ${SNOWFLAKE_USER} password: ${SNOWFLAKE_PASSWORD} account: ${SNOWFLAKE_ACCOUNT} warehouse: ${SNOWFLAKE_WAREHOUSE} role: ${SNOWFLAKE_ROLE} database: HM_KAGGLE db_schema: PRIVATE table_name: transactions data_params: data_start_date: 2018-09-20 00:00:00 validation_start_date: 2020-09-01 00:00:00 check_target_for_next_N_days: 21 loader_params: batch_size: 256 num_workers: 10 training_params: learning_rate: 0.0001 epochs: 3 hidden_dim: 2048
-
Uploads
job_configuration
tostage
stage. In our case, the job used looks like this:spec: container: - name: main image: /monad_db/data_schema/image_repository/monad:latest-dev env: SNOWFLAKE_WAREHOUSE: monad_warehouse SNOWFLAKE_DATABASE: HM_KAGGLE SNOWFLAKE_SCHEMA: PUBLIC command: - python args: - -m - monad.pretrain - --config - /app/monad_stage/configs/hm-kaggle/pretrain/config.yaml - --features-path - /app/monad_stage/monad-features/hm-kaggle - --resume resources: limits: nvidia.com/gpu: 1 requests: nvidia.com/gpu: 1 volumeMounts: - name: monad-stage mountPath: /app/monad_stage - name: dev-shm mountPath: /dev/shm volumes: - name: monad-stage source: "@monad_stage" uid: 1000 # user in docker has uid and gid: 1000 gid: 1000 - name: dev-shm source: memory size: 48Gi # Out of 128Gi available on the machine
-
Uploads
pretrain.py
file tostage
. In this case, it loooks like this:
import argparse
from monad.ui import pretrain
from pathlib import Path
def parse_args() -> argparse.Namespace:
"""
Returns:
argument parser to be used when executing script
"""
argument_parser = argparse.ArgumentParser()
argument_parser.add_argument("--config", type=Path, required=True, help="Path to yaml pretraining config")
argument_parser.add_argument("--features-path", type=Path, required=True, help="Where to store result features.")
argument_parser.add_argument("--storage-config", type=Path, required=False)
rerun_group = argument_parser.add_mutually_exclusive_group()
rerun_group.add_argument(
"--resume", action="store_true", help="Whether to resume interrupted training", default=False
)
rerun_group.add_argument(
"--overwrite", action="store_true", help="Whether to overwrite existing results", default=False
)
return argument_parser.parse_args()
if __name__ == "__main__":
params = parse_args()
pretrain(
config_path=params.config,
output_path=params.features_path,
storage_config_path=params.storage_config,
resume=params.resume,
overwrite=params.overwrite,
)
More details about the configuration file can be found in Snowflake's documentation.
It is worth noting:
- BaseModel requires GPU and it needs to be provided in this config file.
- in the specification config yaml, under spec/container/args user needs to provide a python command to run BaseModel with parameters that they want to - similar to running it from CLI.
Final stage in the commands.sql
file is executing the service which starts the job.
Train Downstream Model and Configuration
The process will look very similiar to Foundation Model training. Once again we have to prepare
commands.sql
filejob spec file
train.py
file - python file describing the target function.
The steps are:
- Adjust paths in two
PUT
commands incommands.sql
, such that they point to the right files. Use absolute paths.USE ROLE monad_role; USE DATABASE monad_db; USE WAREHOUSE XSMALL_BASE_MODEL; USE SCHEMA DATA_SCHEMA; -- Upload train script to stage -- UPDATE ME Use abolute path to point to the training script PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/classification/train.py @monad_db.data_schema.monad_stage/scripts/hm-kaggle/propensity/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE; -- Upload job spec to Stage -- UPDATE ME Use abolute path to point to the job spec file PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/classification/train_job_spec.yaml @monad_db.data_schema.stage/jobs/hm-kaggle/propensity/ AUTO_COMPRESS=FALSE OVERWRITE=TRUE; -- Create a job EXECUTE SERVICE IN COMPUTE POOL monad_compute_pool_gpu FROM @stage SPEC='/jobs/hm-kaggle/propensity/train_job_spec.yaml'; -- get the job status -- SELECT SYSTEM$GET_JOB_STATUS('<job-query-id>'); -- -- get logs -- SELECT SYSTEM$GET_JOB_LOGS('<job-query-id>', 'main'); -- cancel query -- SELECT SYSTEM$CANCEL_QUERY('01afe1dd-0102-111f-0002-31e20004dbbe');
- Run all commands from
commands.sql
, which does the following:
-
Sets correct context
-
Uploads training script (in our case
train.py
)monad_stage
stageimport argparse from typing import Dict import torch import numpy as np from monad.ui.config import TrainingParams from monad.ui.module import MultilabelClassificationTask, load_from_foundation_model from monad.ui.target_function import Attributes, Events TARGET_NAMES = [ "Garment Upper body", "Underwear", "Socks & Tights", "Garment Lower body", "Accessories", "Items", "Nightwear", "Shoes", "Swimwear", "Garment Full body", ] TARGET_ENTITY = "product_group_name" def int_or_float(value): try: if "." in value: return float(value) else: return int(value) except ValueError as exc: raise argparse.ArgumentTypeError("Value must be an int or float") from exc def propensity_target_fn(_history: Events, future: Events, _entity: Attributes, _ctx: Dict) -> np.ndarray: purchase_target, _ = future["transactions"].groupBy(TARGET_ENTITY).exists(groups=TARGET_NAMES) return purchase_target def parse_args(): parser = argparse.ArgumentParser(description="Train a model for HM Kaggle classification task") parser.add_argument("--fm-path", type=str, required=True, help="Path to the features directory") parser.add_argument("--checkpoint-dir", type=str, required=True, help="Directory to save model checkpoints") parser.add_argument( "--learning-rate", type=float, required=False, default=5e-5, help="Learning rate for the model" ) parser.add_argument("--epochs", type=int, required=False, default=5, help="Number of epochs to train the model") parser.add_argument( "--devices", type=int, nargs="+", default=[0], required=False, help="List of device IDs to use for training" ) parser.add_argument( "--limit-train-batches", type=int_or_float, required=False, default=1.0, help="Number of workers for the data loader", ) parser.add_argument( "--limit-val-batches", type=int_or_float, required=False, default=1.0, help="Number of workers for the data loader", ) return parser.parse_args() def main(): args = parse_args() training_params = MonadTrainingParams( learning_rate=args.learning_rate, checkpoint_dir=args.checkpoint_dir, epochs=args.epochs, devices=args.devices, limit_train_batches=args.limit_train_batches, limit_val_batches=args.limit_val_batches, ) trainer = load_from_foundation_model( args.fm_path, MultilabelClassificationTask(), propensity_target_fn, num_outputs=len(TARGET_NAMES) ) trainer.fit(training_params=training_params) if __name__ == "__main__": main()
-
Uploads
job_configuration
tostage
stagespec: container: - name: main image: /monad_db/data_schema/image_repository/monad:latest-dev env: SNOWFLAKE_WAREHOUSE: monad_warehouse SNOWFLAKE_DATABASE: HM_KAGGLE SNOWFLAKE_SCHEMA: PUBLIC command: - python args: - /app/monad_stage/scripts/hm-kaggle/propensity/train.py - --fm-path - /app/monad_stage/monad-features/hm-kaggle/fm - --checkpoint-dir - /app/monad_stage/monad-features/hm-kaggle/propensity/checkpoints - --epochs - 1 resources: limits: nvidia.com/gpu: 1 requests: nvidia.com/gpu: 1 volumeMounts: - name: monad-stage mountPath: /app/monad_stage - name: dev-shm mountPath: /dev/shm volumes: - name: monad-stage source: "@monad_stage" uid: 1000 # user in docker has uid and gid: 1000 gid: 1000 - name: dev-shm source: memory size: 48Gi # Out of 128Gi available on the machine
Final stage in the commands.sql
file is executing the service which starts the job.
Prediction
The final step is running predictions. Once again we will be using 3 files:
commands.sql
job spec file
predict.py
- Adjust the paths in PUT commands in sql so that it looks for example like this:
USE ROLE monad_role;
USE DATABASE monad_db;
USE WAREHOUSE XSMALL_BASE_MODEL;
USE SCHEMA DATA_SCHEMA;
-- Upload train script to stage
-- UPDATE ME Use abolute path to point to the training script
PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/predict/predict.py @monad_db.data_schema.monad_stage/scripts/hm-kaggle/propensity/
AUTO_COMPRESS=FALSE
OVERWRITE=TRUE;
-- Upload job spec to Stage
-- UPDATE ME Use abolute path to point to the job spec file
PUT file:///home/<USER>/monad/integrations/snowflake/hm_kaggle/predict/predict_job_spec.yaml @monad_db.data_schema.stage/jobs/hm-kaggle/propensity/
AUTO_COMPRESS=FALSE
OVERWRITE=TRUE;
-- Create a job
EXECUTE SERVICE
IN COMPUTE POOL monad_compute_pool_gpu
FROM @stage
SPEC='/jobs/hm-kaggle/propensity/predict_job_spec.yaml';
-- get the job status
-- SELECT SYSTEM$GET_JOB_STATUS('<job-query-id>');
-- -- get logs
-- SELECT SYSTEM$GET_JOB_LOGS('<job-query-id>', 'main');
-- cancel query
-- SELECT SYSTEM$CANCEL_QUERY('01afe1dd-0102-111f-0002-31e20004dbbe');
- Run all commands from
commands.sql
file - as before, they upload the correct files and executes the job.
-
The job spec should look like this:
-
spec: container: - name: main image: /monad_db/data_schema/image_repository/monad:latest-dev env: SNOWFLAKE_WAREHOUSE: monad_warehouse SNOWFLAKE_DATABASE: HM_KAGGLE SNOWFLAKE_SCHEMA: PUBLIC command: - python args: - /app/monad_stage/scripts/hm-kaggle/propensity/predict.py - --save-path - /app/monad_stage/monad_features/hm-kaggle/output - --checkpoint-dir - /app/monad_stage/monad-features/hm-kaggle/propensity/checkpoints resources: limits: nvidia.com/gpu: 1 requests: nvidia.com/gpu: 1 volumeMounts: - name: monad-stage mountPath: /app/monad_stage - name: dev-shm mountPath: /dev/shm volumes: - name: monad-stage source: "@monad_stage" uid: 1000 # user in docker has uid and gid: 1000 gid: 1000 - name: dev-shm source: memory size: 48Gi # Out of 128Gi available on the machine
which is very similiar to previously described job configs - it provides necessary configuration around paths, resources and the command that executes the
predict.py
script. -
Finally, our
predict.py
: -
import argparse from datetime import datetime from monad.ui.module import load_from_checkpoint from monad.ui.config import MonadTestingParams def int_or_float(value): try: if "." in value: return float(value) else: return int(value) except ValueError as exc: raise argparse.ArgumentTypeError("Value must be an int or float") from exc def parse_args(): parser = argparse.ArgumentParser(description="Train a model for HM Kaggle classification task") parser.add_argument( "--save-path", type=str, required=True, help="Path to save the prediction", ) parser.add_argument( "--checkpoint-dir", type=str, required=True, help="Path to save the checkpoints", ) parser.add_argument( "--limit-test-batches", type=int_or_float, required=False, default=1.0, help="Number of workers for the data loader", ) return parser.parse_args() def main(): args = parse_args() testing_params = MonadTestingParams( save_path=args.save_path, limit_test_batches=args.limit_test_batches, ) testing_module = load_from_checkpoint(args.checkpoint_dir, test_start_date=datetime(year=2021, month=9, day=10)) testing_module.predict(testing_params=testing_params) if __name__ == "__main__": main()
- After the job is run, we will now have predictions in the output folder.
Troubleshooting
At any time, using for example SnowSql, users can check the status of the job as well as have a look at what is going on inside container. This is achieved in the following way:
SELECT SYSTEM$GET_JOB_STATUS('<job-query-id>');
- this should give you a status of the jobSELECT SYSTEM$GET_JOB_LOGS('<job-query-id>', 'main');
- this should give a detailed status of the job inside container, e.g how the training is doing, or in case of an error - what the error was.
Updated 5 months ago