Snowflake Import
The snowflake-import
command uses a Snowflake STREAM
object to track changes (CDC) to a table and read them into a Redis data structure like hash
or json
.
The Snowflake STREAM
is created and managed by RIOTX.
The user credentials you provide must have the ability to create a stream in the database and schema specified by the fully qualified object name.
RIOT-X optimizes stream polling by using Snowflake’s SYSTEM$STREAM_HAS_DATA
function to check if the stream contains new data before creating expensive temporary tables.
This reduces unnecessary processing and improves performance when no changes are available.
-
SAMPLE_DATABASE.SAMPLE_SCHEMA.DATA_TABLE_changestream
will be created or replaced. For security, this can be created in a different schema than the table you are importing from by specifying--cdc-schema
. -
riotx:offset:SAMPLE_DATABASE.SAMPLE_SCHEMA.DATA_TABLE_changestream
- this key will be stored in the destination Redis database and is used to track the stream offset. If RIOT-X fails in the middle of copying data from the stream when restarted it will resume copying data from this offset. Removing this offset key from Redis will result in RIOT-X creating recreating the stream at time "NOW". With--snapshot INITIAL
(default) the stream will include the initial table data plus changes going forward. If you do not want initial table data to be included specify--snapshot NEVER
. -
snowflake-import
currently works on tables and materialized views
The basic usage is:
riotx snowflake-import [TABLE] [OPTIONS] [REDIS COMMAND...]
The recommended minimal necessary permissions for a snowflake role and user to run this command are:
CREATE OR REPLACE ROLE riotx_cdc
COMMENT = 'minimum cdc role for riotx';
-- replace compute_wh with the name of the warehouse you want to use
GRANT USAGE, OPERATE ON WAREHOUSE compute_wh TO ROLE riotx_cdc;
-- replace tb_101.raw_pos_cdc with the name of a database and schema for {page-component-title} to create the stream in
CREATE OR REPLACE SCHEMA tb_101.raw_pos_cdc;
GRANT USAGE ON SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
-- replace tb_101 with the name of the database {page-component-title} needs to read out of
GRANT USAGE ON DATABASE tb_101 TO ROLE riotx_cdc;
-- replace tb_101.raw_pos with the name of the schema {page-component-title} needs to read out of
GRANT USAGE ON SCHEMA tb_101.raw_pos TO ROLE riotx_cdc;
-- replace with the name of the table(s) you want to read from
GRANT SELECT ON TABLE tb_101.raw_pos.incremental_order_header TO ROLE riotx_cdc;
GRANT REFERENCE_USAGE ON TABLE tb_101.raw_pos.incremental_order_header TO ROLE riotx_cdc;
ALTER TABLE tb_101.raw_pos.INCREMENTAL_ORDER_HEADER SET CHANGE_TRACKING = TRUE;
GRANT SELECT ON FUTURE TABLES IN SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
GRANT CREATE TABLE ON SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
GRANT CREATE STREAM ON SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
GRANT SELECT ON FUTURE STREAMS IN SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
CREATE OR REPLACE USER riotx_cdc
DEFAULT_ROLE = 'riotx_cdc'
DEFAULT_WAREHOUSE = 'compute_wh'
PASSWORD = '{{PASSWORD}}';
GRANT ROLE riotx_cdc TO USER riotx_cdc;
For the full usage, run:
riotx snowflake-import --help
This command uses the example db, schema and table names from the minimal role setup above.
riotx snowflake-import \
tb_101.raw_pos.incremental_order_header \
--role riotx_cdc \
--warehouse compute_wh \
--cdc-schema raw_pos_cdc \
--jdbc-url "jdbc:snowflake://abcdefg.abc12345.snowflakecomputing.com" \
--jdbc-user databaseuser \
--jdbc-pass databasepassword \
--poll 10s \ (1)
hset orderheader:#{order_id} (2)
1 | Sleep 10 seconds after each CDC import |
2 | Column name to use as id |
The command above imports CDC data from the Snowflake table tb_101.raw_pos.incremental_order_header
into Redis hashes in the keyspace orderheader
.
You can specify multiple tables to import. If you need to reference table information in the operation you can use the #table variable which exposes name , database , and schema fields.
|
riotx snowflake-import \
db1.public.table1 db1.public.table2 \ (1)
...
hset #{#table.name}:{id} (2)
1 | Specify all tables to import |
2 | Use table name as key prefix |
This command would generate keys table1:abc
, table2:def
, etc.
If you only need to do a one time import of data from Snowflake you can use the db-import
command.
This command will read all of the rows output from your SQL query and will write them to Redis.
For more information see the db-import command.
riotx db-import \
"SELECT * FROM SAMPLE_DATABASE.SAMPLE_SCHEMA.DATA_TABLE" \
--jdbc-url "jdbc:snowflake://abcdefg.abc12345.snowflakecomputing.com" \
--jdbc-driver net.snowflake.client.jdbc.SnowflakeDriver \
--jdbc-user databaseuser \
--jdbc-pass databasepassword \
hset datatable:#{data_id} (1)
1 | Column name to use as id |
This command performs a one-time import from Snowflake using the db-import
command.
RDI Integration
This recipe contains step-by-step instructions for using RIOT-X as an external collector for Redis Data Integration (RDI).
Here is the end-to-end data flow:
Snowflake
In your Snowflake UI (Snowsight) import this notebook: snowflake-cdc.ipynb. Run the first 2 steps in the notebook:
init
|
Set up roles, permissions and schema |
populate
|
Create and populate table |
Do not run the other steps yet. These are for later. |
RDI
In Redis Insight use these pipeline config and job definitions:
sources:
riotx:
type: external
# Redis Insight requires a connection element but it's not actually used by RDI
connection: {}
targets:
target:
connection:
type: redis
host: # Target database hostname
port: # Target database port
processors:
source:
table: incremental_order_header
output:
- uses: redis.write
with:
connection: target
data_type: hash
key:
expression: concat(['order:', ORDER_ID])
language: jmespath
RIOT-X
Run this command to start RIOT-X:
riotx snowflake-import -h <target_host> -p <target_port> tb_101.raw_pos.incremental_order_header --role riotx_cdc --warehouse compute_wh --cdc-schema raw_pos_cdc --jdbc-url "jdbc:snowflake://<account>.snowflakecomputing.com" --jdbc-user $JDBC_USER --jdbc-pass $JDBC_PASS
RIOT-X will perform initial load of the incremental_order_header
table which should translate to 1000 hashes in the target Redis database.
Snowflake Additional Data
Go back to your Snowflake UI and run the last step in the notebook.
This will insert 100 rows in the incremental_order_header
table which will be picked up by RIOT and written to the target database.
You should now have 1,100 hashes in the target Redis database.
Service User
If your Snowflake account does not let you use JDBC password authentication you will have to use key-based authentication.
- Key Pair
-
If you don’t already have a private/public key pair you need to generate one. Follow the steps at docs.snowflake.com/en/user-guide/key-pair-auth.
Use the following command to extract the actual public key for the last step: Assign the public key to a Snowflake user
grep -v "BEGIN\|END" rsa_key.pub | tr -d '\n'
- JDBC URL
-
When using key authentication you need to modify the JDBC URL to:
jdbc:snowflake://<account>.snowflakecomputing.com?private_key_file=<path_to_key>/rsa_key.p8
Kubernetes Deployment
RIOT-X can be deployed on Kubernetes for production workloads using the provided deployment configuration.
Prerequisites
Before deploying, ensure you have:
-
A Kubernetes cluster with access to create deployments and secrets
-
Redis instance accessible from the cluster
-
Snowflake credentials and proper role permissions (see above)
-
Docker image
riotx/riotx:latest
available in your cluster
Configuration
The Kubernetes deployment configuration uses environment variables following the RIOT_REDIS_*
and RIOT_*
naming conventions.
Download the deployment configuration:
curl -O link:https://redis.github.io/riotx/_/attachments/snowflake-import-deployment.yaml
Or use the local file: snowflake-import-deployment.yaml
Secrets Setup
Before deploying the deployment, create the required secrets with your actual credentials:
# Create Redis credentials secret
kubectl create secret generic redis-credentials \
--from-literal=username='redis_user' \
--from-literal=password='redis_password'
# Create Snowflake credentials secret
kubectl create secret generic snowflake-credentials \
--from-literal=jdbc-url='jdbc:snowflake://company.snowflakecomputing.com' \
--from-literal=username='snowflake_user' \
--from-literal=password='snowflake_password'
For key-based authentication, include the private key file path in the JDBC URL:
kubectl create secret generic snowflake-credentials \
--from-literal=jdbc-url='jdbc:snowflake://company.snowflakecomputing.com?private_key_file=/path/to/key.p8' \
--from-literal=username='snowflake_user' \
--from-literal=password=''
Customization
Edit the deployment YAML to customize for your environment:
# Update Redis connection
- name: RIOT_REDIS_URI
value: "rediss://your-redis-host:12000" (1)
# Update Snowflake table and settings
- name: RIOT_TABLE
value: "YOUR_DB.YOUR_SCHEMA.YOUR_TABLE" (2)
- name: RIOT_ROLE
value: "your_snowflake_role" (3)
- name: RIOT_WAREHOUSE
value: "your_warehouse" (4)
- name: RIOT_CDC_SCHEMA
value: "your_cdc_schema" (5)
- name: RIOT_STREAM_LIMIT
value: "20000" (6)
1 | Your Redis server URI (TLS enabled) |
2 | Fully qualified Snowflake table name |
3 | Snowflake role with CDC permissions |
4 | Snowflake warehouse to use |
5 | CDC schema for stream creation |
6 | Stream processing limit |
Deployment
Deploy the application:
kubectl apply -f snowflake-import-deployment.yaml
Monitor the deployment:
# Check deployment status
kubectl get deployments -l app=riotx
# Check pod status
kubectl get pods -l app=riotx
# View logs
kubectl logs -f deployment/riotx-snowflake-import
# Check metrics (if enabled)
kubectl port-forward deployment/riotx-snowflake-import 8080:8080
curl http://localhost:8080/metrics
Production Considerations
For production deployments, consider:
-
Resource Limits: Adjust CPU and memory based on your workload
-
Persistent Storage: Mount volumes for key files if using key-based auth
-
Monitoring: Enable metrics and configure alerting
-
Security: Use proper RBAC and network policies
-
High Availability: The deployment ensures pods are restarted if they fail
-
Auto-scaling: Configure HPA based on metrics if needed
-
Replica Management: Keep replicas=1 for CDC consistency to avoid duplicate processing
The snowflake-import-deployment.yaml
includes:
-
Deployment: Ensures reliable pod management and restart policies
-
Service: Exposes metrics endpoint for monitoring
-
ServiceMonitor: Prometheus integration for metrics collection
-
Secrets: Secure credential management
-
ConfigMap: Non-sensitive configuration management
-
Health Checks: Liveness and readiness probes
-
Init Containers: Dependency checks before startup