Snowflake Import

The snowflake-import command uses a Snowflake STREAM object to track changes (CDC) to a table and read them into a Redis data structure like hash or json. The Snowflake STREAM is created and managed by RIOTX. The user credentials you provide must have the ability to create a stream in the database and schema specified by the fully qualified object name.

RIOT-X optimizes stream polling by using Snowflake’s SYSTEM$STREAM_HAS_DATA function to check if the stream contains new data before creating expensive temporary tables. This reduces unnecessary processing and improves performance when no changes are available.

Side effects and limitations
  • SAMPLE_DATABASE.SAMPLE_SCHEMA.DATA_TABLE_changestream will be created or replaced. For security, this can be created in a different schema than the table you are importing from by specifying --cdc-schema.

  • riotx:offset:SAMPLE_DATABASE.SAMPLE_SCHEMA.DATA_TABLE_changestream - this key will be stored in the destination Redis database and is used to track the stream offset. If RIOT-X fails in the middle of copying data from the stream when restarted it will resume copying data from this offset. Removing this offset key from Redis will result in RIOT-X creating recreating the stream at time "NOW". With --snapshot INITIAL (default) the stream will include the initial table data plus changes going forward. If you do not want initial table data to be included specify --snapshot NEVER.

  • snowflake-import currently works on tables and materialized views

The basic usage is:

riotx snowflake-import [TABLE] [OPTIONS] [REDIS COMMAND...]

The recommended minimal necessary permissions for a snowflake role and user to run this command are:

CREATE OR REPLACE ROLE riotx_cdc
COMMENT = 'minimum cdc role for riotx';

-- replace compute_wh with the name of the warehouse you want to use
GRANT USAGE, OPERATE ON WAREHOUSE compute_wh TO ROLE riotx_cdc;

-- replace tb_101.raw_pos_cdc with the name of a database and schema for {page-component-title} to create the stream in
CREATE OR REPLACE SCHEMA tb_101.raw_pos_cdc;
GRANT USAGE ON SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;

-- replace tb_101 with the name of the database {page-component-title} needs to read out of
GRANT USAGE ON DATABASE tb_101 TO ROLE riotx_cdc;

-- replace tb_101.raw_pos with the name of the schema {page-component-title} needs to read out of
GRANT USAGE ON SCHEMA tb_101.raw_pos TO ROLE riotx_cdc;

-- replace with the name of the table(s) you want to read from
GRANT SELECT ON TABLE tb_101.raw_pos.incremental_order_header TO ROLE riotx_cdc;
GRANT REFERENCE_USAGE ON TABLE tb_101.raw_pos.incremental_order_header TO ROLE riotx_cdc;
ALTER TABLE tb_101.raw_pos.INCREMENTAL_ORDER_HEADER SET CHANGE_TRACKING = TRUE;

GRANT SELECT ON FUTURE TABLES IN SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
GRANT CREATE TABLE ON SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
GRANT CREATE STREAM ON SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
GRANT SELECT ON FUTURE STREAMS IN SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;

CREATE OR REPLACE USER riotx_cdc
    DEFAULT_ROLE = 'riotx_cdc'
    DEFAULT_WAREHOUSE = 'compute_wh'
    PASSWORD = '{{PASSWORD}}';

GRANT ROLE riotx_cdc TO USER riotx_cdc;

For the full usage, run:

riotx snowflake-import --help
Example: CDC to Hashes

This command uses the example db, schema and table names from the minimal role setup above.

riotx snowflake-import \
      tb_101.raw_pos.incremental_order_header \
      --role riotx_cdc \
      --warehouse compute_wh \
      --cdc-schema raw_pos_cdc \
      --jdbc-url "jdbc:snowflake://abcdefg.abc12345.snowflakecomputing.com" \
      --jdbc-user databaseuser \
      --jdbc-pass databasepassword \
      --poll 10s \ (1)
      hset orderheader:#{order_id} (2)
1 Sleep 10 seconds after each CDC import
2 Column name to use as id

The command above imports CDC data from the Snowflake table tb_101.raw_pos.incremental_order_header into Redis hashes in the keyspace orderheader.

You can specify multiple tables to import. If you need to reference table information in the operation you can use the #table variable which exposes name, database, and schema fields.
Import Multiple Tables
riotx snowflake-import \
      db1.public.table1 db1.public.table2 \ (1)
      ...
      hset #{#table.name}:{id} (2)
1 Specify all tables to import
2 Use table name as key prefix

This command would generate keys table1:abc, table2:def, etc.

If you only need to do a one time import of data from Snowflake you can use the db-import command. This command will read all of the rows output from your SQL query and will write them to Redis. For more information see the db-import command.

Example: One-time Import
riotx db-import \
      "SELECT * FROM SAMPLE_DATABASE.SAMPLE_SCHEMA.DATA_TABLE" \
      --jdbc-url "jdbc:snowflake://abcdefg.abc12345.snowflakecomputing.com" \
      --jdbc-driver net.snowflake.client.jdbc.SnowflakeDriver \
      --jdbc-user databaseuser \
      --jdbc-pass databasepassword \
      hset datatable:#{data_id} (1)
1 Column name to use as id

This command performs a one-time import from Snowflake using the db-import command.

RDI Integration

This recipe contains step-by-step instructions for using RIOT-X as an external collector for Redis Data Integration (RDI).

Here is the end-to-end data flow:

Snowflake CDC Architecture

Snowflake

In your Snowflake UI (Snowsight) import this notebook: snowflake-cdc.ipynb. Run the first 2 steps in the notebook:

init

Set up roles, permissions and schema

populate

Create and populate table

Do not run the other steps yet. These are for later.

RDI

In Redis Insight use these pipeline config and job definitions:

Pipeline Config
sources:
  riotx:
    type: external
    # Redis Insight requires a connection element but it's not actually used by RDI
    connection: {}

targets:
  target:
    connection:
      type: redis
      host: # Target database hostname
      port: # Target database port
processors:
Job Config
source:
  table: incremental_order_header

output:
  - uses: redis.write
    with:
      connection: target
      data_type: hash
      key:
        expression: concat(['order:', ORDER_ID])
        language: jmespath

RIOT-X

Run this command to start RIOT-X:

riotx snowflake-import -h <target_host> -p <target_port> tb_101.raw_pos.incremental_order_header --role riotx_cdc --warehouse compute_wh --cdc-schema raw_pos_cdc --jdbc-url "jdbc:snowflake://<account>.snowflakecomputing.com" --jdbc-user $JDBC_USER --jdbc-pass $JDBC_PASS

RIOT-X will perform initial load of the incremental_order_header table which should translate to 1000 hashes in the target Redis database.

Snowflake Additional Data

Go back to your Snowflake UI and run the last step in the notebook. This will insert 100 rows in the incremental_order_header table which will be picked up by RIOT and written to the target database. You should now have 1,100 hashes in the target Redis database.

Service User

If your Snowflake account does not let you use JDBC password authentication you will have to use key-based authentication.

Key Pair

If you don’t already have a private/public key pair you need to generate one. Follow the steps at docs.snowflake.com/en/user-guide/key-pair-auth.

Use the following command to extract the actual public key for the last step: Assign the public key to a Snowflake user

grep -v "BEGIN\|END" rsa_key.pub | tr -d '\n'
JDBC URL

When using key authentication you need to modify the JDBC URL to:

jdbc:snowflake://<account>.snowflakecomputing.com?private_key_file=<path_to_key>/rsa_key.p8

Kubernetes Deployment

RIOT-X can be deployed on Kubernetes for production workloads using the provided deployment configuration.

Prerequisites

Before deploying, ensure you have:

  • A Kubernetes cluster with access to create deployments and secrets

  • Redis instance accessible from the cluster

  • Snowflake credentials and proper role permissions (see above)

  • Docker image riotx/riotx:latest available in your cluster

Configuration

The Kubernetes deployment configuration uses environment variables following the RIOT_REDIS_* and RIOT_* naming conventions.

Download the deployment configuration:

curl -O link:https://redis.github.io/riotx/_/attachments/snowflake-import-deployment.yaml

Or use the local file: snowflake-import-deployment.yaml

Secrets Setup

Before deploying the deployment, create the required secrets with your actual credentials:

# Create Redis credentials secret
kubectl create secret generic redis-credentials \
  --from-literal=username='redis_user' \
  --from-literal=password='redis_password'

# Create Snowflake credentials secret
kubectl create secret generic snowflake-credentials \
  --from-literal=jdbc-url='jdbc:snowflake://company.snowflakecomputing.com' \
  --from-literal=username='snowflake_user' \
  --from-literal=password='snowflake_password'

For key-based authentication, include the private key file path in the JDBC URL:

kubectl create secret generic snowflake-credentials \
  --from-literal=jdbc-url='jdbc:snowflake://company.snowflakecomputing.com?private_key_file=/path/to/key.p8' \
  --from-literal=username='snowflake_user' \
  --from-literal=password=''

Customization

Edit the deployment YAML to customize for your environment:

# Update Redis connection
- name: RIOT_REDIS_URI
  value: "rediss://your-redis-host:12000"  (1)

# Update Snowflake table and settings
- name: RIOT_TABLE
  value: "YOUR_DB.YOUR_SCHEMA.YOUR_TABLE"  (2)
- name: RIOT_ROLE
  value: "your_snowflake_role"  (3)
- name: RIOT_WAREHOUSE
  value: "your_warehouse"  (4)
- name: RIOT_CDC_SCHEMA
  value: "your_cdc_schema"  (5)
- name: RIOT_STREAM_LIMIT
  value: "20000"  (6)
1 Your Redis server URI (TLS enabled)
2 Fully qualified Snowflake table name
3 Snowflake role with CDC permissions
4 Snowflake warehouse to use
5 CDC schema for stream creation
6 Stream processing limit

Deployment

Deploy the application:

kubectl apply -f snowflake-import-deployment.yaml

Monitor the deployment:

# Check deployment status
kubectl get deployments -l app=riotx

# Check pod status
kubectl get pods -l app=riotx

# View logs
kubectl logs -f deployment/riotx-snowflake-import

# Check metrics (if enabled)
kubectl port-forward deployment/riotx-snowflake-import 8080:8080
curl http://localhost:8080/metrics

Production Considerations

For production deployments, consider:

  • Resource Limits: Adjust CPU and memory based on your workload

  • Persistent Storage: Mount volumes for key files if using key-based auth

  • Monitoring: Enable metrics and configure alerting

  • Security: Use proper RBAC and network policies

  • High Availability: The deployment ensures pods are restarted if they fail

  • Auto-scaling: Configure HPA based on metrics if needed

  • Replica Management: Keep replicas=1 for CDC consistency to avoid duplicate processing

Deployment Features

The snowflake-import-deployment.yaml includes:

  • Deployment: Ensures reliable pod management and restart policies

  • Service: Exposes metrics endpoint for monitoring

  • ServiceMonitor: Prometheus integration for metrics collection

  • Secrets: Secure credential management

  • ConfigMap: Non-sensitive configuration management

  • Health Checks: Liveness and readiness probes

  • Init Containers: Dependency checks before startup