Snowflake Import

The snowflake-import command uses a Snowflake STREAM object to track changes (CDC) to a table and read them into a Redis data structure like hash or json. The Snowflake STREAM is created and managed by RIOTX. The user credentials you provide must have the ability to create a stream in the database and schema specified by the fully qualified object name.

RIOT-X optimizes stream polling by using Snowflake’s SYSTEM$STREAM_HAS_DATA function to check if the stream contains new data before creating expensive temporary tables. This reduces unnecessary processing and improves performance when no changes are available.

Side effects and limitations
  • Snowflake Stream: A stream named SAMPLE_DATABASE.SAMPLE_SCHEMA.DATA_TABLE_changestream will be created or replaced. The stream follows the naming pattern <DATABASE>.<SCHEMA>.<TABLE>_changestream. For security, this can be created in a different schema than the table you are importing from by specifying --cdc-schema. The stream is automatically managed by RIOT-X and will be recreated if deleted.

  • Offset Tracking: A key named riotx:offset:SAMPLE_DATABASE.SAMPLE_SCHEMA.DATA_TABLE_changestream will be stored in the destination Redis database to track the stream offset. If RIOT-X fails during data copying, it will resume from this offset when restarted. Removing this offset key from Redis will cause RIOT-X to recreate the stream at time "NOW". With --snapshot INITIAL (default), the stream will include the initial table data plus changes going forward. If you do not want initial table data to be included, specify --snapshot NEVER.

  • Supported Objects: snowflake-import currently works on tables and materialized views

The basic usage is:

riotx snowflake-import [TABLE] [OPTIONS] [REDIS COMMAND...]

The recommended minimal necessary permissions for a snowflake role and user to run this command are:

CREATE OR REPLACE ROLE riotx_cdc
COMMENT = 'minimum cdc role for riotx';

-- replace compute_wh with the name of the warehouse you want to use
GRANT USAGE, OPERATE ON WAREHOUSE compute_wh TO ROLE riotx_cdc;

-- replace tb_101.raw_pos_cdc with the name of a database and schema for {page-component-title} to create the stream in
CREATE OR REPLACE SCHEMA tb_101.raw_pos_cdc;
GRANT USAGE ON SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;

-- replace tb_101 with the name of the database {page-component-title} needs to read out of
GRANT USAGE ON DATABASE tb_101 TO ROLE riotx_cdc;

-- replace tb_101.raw_pos with the name of the schema {page-component-title} needs to read out of
GRANT USAGE ON SCHEMA tb_101.raw_pos TO ROLE riotx_cdc;

-- replace with the name of the table(s) you want to read from
GRANT SELECT ON TABLE tb_101.raw_pos.incremental_order_header TO ROLE riotx_cdc;
GRANT REFERENCE_USAGE ON TABLE tb_101.raw_pos.incremental_order_header TO ROLE riotx_cdc;
ALTER TABLE tb_101.raw_pos.INCREMENTAL_ORDER_HEADER SET CHANGE_TRACKING = TRUE;

GRANT SELECT ON FUTURE TABLES IN SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
GRANT CREATE TABLE ON SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
GRANT CREATE STREAM ON SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;
GRANT SELECT ON FUTURE STREAMS IN SCHEMA tb_101.raw_pos_cdc TO ROLE riotx_cdc;

CREATE OR REPLACE USER riotx_cdc
    DEFAULT_ROLE = 'riotx_cdc'
    DEFAULT_WAREHOUSE = 'compute_wh'
    PASSWORD = '{{PASSWORD}}';

GRANT ROLE riotx_cdc TO USER riotx_cdc;

For the full usage, run:

riotx snowflake-import --help
Example: CDC to Hashes

This command uses the example db, schema and table names from the minimal role setup above.

riotx snowflake-import \
      tb_101.raw_pos.incremental_order_header \
      --role riotx_cdc \
      --warehouse compute_wh \
      --cdc-schema raw_pos_cdc \
      --jdbc-url "jdbc:snowflake://abcdefg.abc12345.snowflakecomputing.com" \
      --jdbc-user databaseuser \
      --jdbc-pass databasepassword \
      --poll 10s \ (1)
      hset orderheader:#{order_id} (2)
1 Sleep 10 seconds after each CDC import
2 Column name to use as id

The command above imports CDC data from the Snowflake table tb_101.raw_pos.incremental_order_header into Redis hashes in the keyspace orderheader.

You can specify multiple tables to import. If you need to reference table information in the operation you can use the #table variable which exposes name, database, and schema fields.
Import Multiple Tables
riotx snowflake-import \
      db1.public.table1 db1.public.table2 \ (1)
      ...
      hset #{#table.name}:{id} (2)
1 Specify all tables to import
2 Use table name as key prefix

This command would generate keys table1:abc, table2:def, etc.

If you only need to do a one time import of data from Snowflake you can use the db-import command. This command will read all of the rows output from your SQL query and will write them to Redis. For more information see the db-import command.

Example: One-time Import
riotx db-import \
      "SELECT * FROM SAMPLE_DATABASE.SAMPLE_SCHEMA.DATA_TABLE" \
      --jdbc-url "jdbc:snowflake://abcdefg.abc12345.snowflakecomputing.com" \
      --jdbc-driver net.snowflake.client.jdbc.SnowflakeDriver \
      --jdbc-user databaseuser \
      --jdbc-pass databasepassword \
      hset datatable:#{data_id} (1)
1 Column name to use as id

This command performs a one-time import from Snowflake using the db-import command.

RDI Integration

This recipe contains step-by-step instructions for using RIOT-X as an external collector for Redis Data Integration (RDI).

Here is the end-to-end data flow:

Snowflake CDC Architecture

Snowflake

In your Snowflake UI (Snowsight) import this notebook: snowflake-cdc.ipynb. Run the first 2 steps in the notebook:

init

Set up roles, permissions and schema

populate

Create and populate table

Do not run the other steps yet. These are for later.

RDI

In Redis Insight use these pipeline config and job definitions:

Pipeline Config
sources:
  riotx:
    type: external
    # Redis Insight requires a connection element but it's not actually used by RDI
    connection: {}

targets:
  target:
    connection:
      type: redis
      host: # Target database hostname
      port: # Target database port
processors:
Job Config
source:
  table: incremental_order_header

output:
  - uses: redis.write
    with:
      connection: target
      data_type: hash
      key:
        expression: concat(['order:', ORDER_ID])
        language: jmespath

RIOT-X

Run this command to start RIOT-X:

riotx snowflake-import -h <target_host> -p <target_port> tb_101.raw_pos.incremental_order_header --role riotx_cdc --warehouse compute_wh --cdc-schema raw_pos_cdc --jdbc-url "jdbc:snowflake://<account>.snowflakecomputing.com" --jdbc-user $JDBC_USER --jdbc-pass $JDBC_PASS

RIOT-X will perform initial load of the incremental_order_header table which should translate to 1000 hashes in the target Redis database.

Snowflake Additional Data

Go back to your Snowflake UI and run the last step in the notebook. This will insert 100 rows in the incremental_order_header table which will be picked up by RIOT and written to the target database. You should now have 1,100 hashes in the target Redis database.

Service User

If your Snowflake account does not let you use JDBC password authentication you will have to use key-based authentication.

Key Pair

If you don’t already have a private/public key pair you need to generate one. Follow the steps at docs.snowflake.com/en/user-guide/key-pair-auth.

Use the following command to extract the actual public key for the last step: Assign the public key to a Snowflake user

grep -v "BEGIN\|END" rsa_key.pub | tr -d '\n'
JDBC URL

When using key authentication you need to modify the JDBC URL to:

jdbc:snowflake://<account>.snowflakecomputing.com?private_key_file=<path_to_key>/rsa_key.p8

Performance Tuning

The snowflake-import command can be tuned for higher throughput using several options.

Key Options

Option Description Default

--fetch

Number of rows to fetch from Snowflake per query

Driver default

--batch

Number of items per Redis write batch

50

--threads

Number of concurrent writer threads

1

--pool

Max number of Redis connections

8

Benchmark Results

The following benchmarks were performed importing data from a Snowflake table to a local Redis instance. Results may vary based on network latency, Snowflake warehouse size, and Redis configuration.

Table 1. 100K Rows Initial Load
Configuration Peak Throughput Total Time Improvement

Defaults (--batch 50)

12,496/s

9s

baseline

--fetch 1000 --batch 500

14,238/s

8s

+14%

--fetch 5000 --batch 1000 --threads 4

16,112/s

7s

+29%

--fetch 5000 --batch 500 --threads 4 --pool 16

16,558/s

7s

+33%

Table 2. 1M Rows Initial Load (Optimized Settings)
Redis Command Peak Throughput Total Time

hset (hashes)

57,025/s

18s

xadd (stream)

49,979/s

21s

For high-throughput imports, use:

riotx snowflake-import \
      tb_101.raw_pos.incremental_order_header \
      --jdbc-url "jdbc:snowflake://account.snowflakecomputing.com" \
      --jdbc-user user \
      --warehouse COMPUTE_WH \
      --cdc-schema raw_pos_cdc \
      --fetch 5000 \
      --batch 500 \
      --threads 4 \
      --pool 16 \
      hset orderheader:#{order_id}

Tuning Guidelines

  • --fetch: Larger values reduce Snowflake round-trips but increase memory usage. Values between 1000-5000 work well.

  • --batch: Larger batches improve Redis throughput but too large (2000+) can cause diminishing returns.

  • --threads: 4 threads typically provide good parallelism. More threads may not help if Snowflake query is the bottleneck.

  • --pool: Should be at least equal to --threads to avoid connection contention.

For initial load benchmarks, use --offset-clear to reset the stream offset and get accurate measurements.

RDI End-to-End Benchmarks

When using RIOTX as an RDI collector, the end-to-end throughput includes both RIOTX ingestion and RDI processor stages.

Data Flow
Snowflake Table → RIOTX Collector → Redis Stream → RDI Processor → Target Redis (Hashes)

Test Environment

  • Platform: Docker Desktop Kubernetes (macOS, Apple Silicon)

  • Redis Enterprise: 7.22.0-17

  • Snowflake: Standard account (TPC-H sample data)

  • RIOTX Collector: 1600Mi memory limit

Pipeline Configuration

The following config.yaml was used for the benchmarks:

config.yaml
sources:
  snowflake:
    type: riotx
    connection:
      type: snowflake
      url: "jdbc:snowflake://<account>.snowflakecomputing.com/"
      username: <username>
      privateKey:
        content: "<base64-encoded-private-key>"
      database: RDI_TEST
      schema: PUBLIC
      warehouse: COMPUTE_WH
    tables:
      RDI_TEST.PUBLIC.BENCHMARK_ORDERS_300K: {}
    advanced:
      riotx:
        poll: "10s"          (1)
        snapshot: INITIAL    (2)
        streamPrefix: "data:" (3)
        streamLimit: 200000  (4)
        clearOffset: true    (5)

targets:
  target:
    connection:
      type: redis
      host: target-db.redis.svc.cluster.local
      port: 12000
      password: <password>

processors:
  target_data_type: hash     (6)
1 Poll interval for CDC changes after initial snapshot
2 INITIAL includes existing table data; NEVER starts from current point
3 Prefix for Redis stream keys (e.g., data:riotx.PUBLIC.BENCHMARK_ORDERS_300K)
4 Maximum stream backlog before backpressure throttles ingestion
5 Clear offset on startup to re-process from beginning (for benchmarking)
6 Default data type when no job file matches

Snowflake Stream Setup

Create a Snowflake stream with SHOW_INITIAL_ROWS to include existing data:

-- Create benchmark table from TPC-H sample data
CREATE OR REPLACE TABLE RDI_TEST.PUBLIC.BENCHMARK_ORDERS_300K AS
SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS
LIMIT 300000;

-- Create stream for CDC (includes initial rows)
CREATE OR REPLACE STREAM RDI_TEST.PUBLIC.BENCHMARK_ORDERS_300K_STREAM
ON TABLE RDI_TEST.PUBLIC.BENCHMARK_ORDERS_300K
SHOW_INITIAL_ROWS = TRUE;

Baseline Results (No Transformations)

With passthrough pipeline (no job file, uses target_data_type: hash):

Table 3. End-to-End Results
Dataset Records Duration Throughput Notes

TPC-H Customers

150,000

19.5s

7,681/s

200 bytes avg per record

TPC-H Orders

300,000

25s

12,000/s

150 bytes avg per record

TPC-H Orders (1M)

742,000

38s

~19,500/s

Memory limited at 500MB

Results With Transformations

Adding RDI job transformations impacts throughput. The following test used 9 rename_field operations and a JMESPath key expression:

Job Configuration
source:
  server_name: riotx
  schema: PUBLIC
  table: BENCHMARK_ORDERS_300K
  row_format: full
transform:
  - uses: rename_field
    with:
      from_field: after.O_ORDERKEY
      to_field: after.orderkey
  - uses: rename_field
    with:
      from_field: after.O_CUSTKEY
      to_field: after.custkey
  - uses: rename_field
    with:
      from_field: after.O_ORDERSTATUS
      to_field: after.orderstatus
  - uses: rename_field
    with:
      from_field: after.O_TOTALPRICE
      to_field: after.totalprice
  - uses: rename_field
    with:
      from_field: after.O_ORDERDATE
      to_field: after.orderdate
  - uses: rename_field
    with:
      from_field: after.O_ORDERPRIORITY
      to_field: after.orderpriority
  - uses: rename_field
    with:
      from_field: after.O_CLERK
      to_field: after.clerk
  - uses: rename_field
    with:
      from_field: after.O_SHIPPRIORITY
      to_field: after.shippriority
  - uses: rename_field
    with:
      from_field: after.O_COMMENT
      to_field: after.comment
output:
  - uses: redis.write
    with:
      connection: target
      key:
        expression: "concat(['order:', to_string(after.orderkey)])"
        language: jmespath
      data_type: hash
Table 4. Transformation Impact
Configuration Records Duration Throughput

No transformations

300,000

25s

12,000/s

With transformations (9 renames + JMESPath)

300,000

35s

8,571/s

Adding transformations reduced throughput by ~29%. More complex transformations (filters, aggregations) may have additional impact.

Component Analysis

Analysis of the 1M records test shows RIOTX is not the bottleneck:

Table 5. Component Performance
Component Rate Notes

RIOTX Collector

30-40K/s

Peak rate before backpressure

RDI Processor

18-20K/s

Bottleneck - limited end-to-end throughput

End-to-End

~19,500/s

Limited by processor consumption

RIOTX can achieve 50K+ records/s when writing directly to Redis (see Performance Tuning benchmarks above). In the RDI pipeline, backpressure from the streamLimit setting throttles RIOTX when the processor cannot keep up.

RDI Operator Helm Configuration

The following Helm values enable the RIOTX collector in the RDI operator:

rdi-values.yaml
dataPlane:
  riotxCollector:
    enabled: true
    global:
      image:
        registry: <your-registry>
        repository: redis
        tag: <version>
    riotx:
      resources:
        limits:
          memory: 1600Mi
        requests:
          memory: 512Mi

Monitoring Commands

Monitor benchmark progress with these commands:

# Check stream statistics (entries added, backlog)
redis-cli -p <rdi-port> -a <password> \
  XINFO STREAM "data:riotx.PUBLIC.BENCHMARK_ORDERS_300K"

# Check processor statistics
redis-cli -p <rdi-port> -a <password> \
  HGETALL "statistics:{data:riotx.PUBLIC.BENCHMARK_ORDERS_300K}"

# Check target key count
redis-cli -p <target-port> -a <password> DBSIZE

# Check target memory usage
redis-cli -p <target-port> -a <password> INFO memory

Key Findings

  1. RIOTX scales well: RIOTX achieved 30-40K records/s ingestion rate, limited only by downstream backpressure.

  2. Processor is the bottleneck: RDI processor consumed at 18-20K records/s, which determined end-to-end throughput.

  3. Transformations add overhead: Each transformation operation adds processing time. 9 field renames + JMESPath expression reduced throughput by ~29%.

  4. Backpressure works correctly: The streamLimit setting (200K) prevented unbounded stream growth by throttling RIOTX when backlog grew.

  5. Memory planning: Each record consumes ~700 bytes in Redis hashes. Plan target Redis memory accordingly (e.g., 500MB for ~700K records).

Kubernetes Deployment

RIOT-X can be deployed on Kubernetes for production workloads using the provided deployment configuration.

Prerequisites

Before deploying, ensure you have:

  • A Kubernetes cluster with access to create deployments and secrets

  • Redis instance accessible from the cluster

  • Snowflake credentials and proper role permissions (see above)

  • Docker image riotx/riotx:latest available in your cluster

Configuration

The Kubernetes deployment configuration uses environment variables following the RIOT_REDIS_* and RIOT_* naming conventions.

Download the deployment configuration:

curl -O link:https://redis.github.io/riotx/_/attachments/snowflake-import-deployment.yaml

Or use the local file: snowflake-import-deployment.yaml

Secrets Setup

Before deploying the deployment, create the required secrets with your actual credentials:

# Create Redis credentials secret
kubectl create secret generic redis-credentials \
  --from-literal=username='redis_user' \
  --from-literal=password='redis_password'

# Create Snowflake credentials secret
kubectl create secret generic snowflake-credentials \
  --from-literal=jdbc-url='jdbc:snowflake://company.snowflakecomputing.com' \
  --from-literal=username='snowflake_user' \
  --from-literal=password='snowflake_password'

For key-based authentication, include the private key file path in the JDBC URL:

kubectl create secret generic snowflake-credentials \
  --from-literal=jdbc-url='jdbc:snowflake://company.snowflakecomputing.com?private_key_file=/path/to/key.p8' \
  --from-literal=username='snowflake_user' \
  --from-literal=password=''

Customization

Edit the deployment YAML to customize for your environment:

# Update Redis connection
- name: RIOT_REDIS_URI
  value: "rediss://your-redis-host:12000"  (1)

# Update Snowflake table and settings
- name: RIOT_TABLE
  value: "YOUR_DB.YOUR_SCHEMA.YOUR_TABLE"  (2)
- name: RIOT_ROLE
  value: "your_snowflake_role"  (3)
- name: RIOT_WAREHOUSE
  value: "your_warehouse"  (4)
- name: RIOT_CDC_SCHEMA
  value: "your_cdc_schema"  (5)
- name: RIOT_STREAM_LIMIT
  value: "20000"  (6)
1 Your Redis server URI (TLS enabled)
2 Fully qualified Snowflake table name
3 Snowflake role with CDC permissions
4 Snowflake warehouse to use
5 CDC schema for stream creation
6 Stream processing limit

Deployment

Deploy the application:

kubectl apply -f snowflake-import-deployment.yaml

Monitor the deployment:

# Check deployment status
kubectl get deployments -l app=riotx

# Check pod status
kubectl get pods -l app=riotx

# View logs
kubectl logs -f deployment/riotx-snowflake-import

# Check metrics (if enabled)
kubectl port-forward deployment/riotx-snowflake-import 8080:8080
curl http://localhost:8080/metrics

Production Considerations

For production deployments, consider:

  • Resource Limits: Adjust CPU and memory based on your workload

  • Persistent Storage: Mount volumes for key files if using key-based auth

  • Monitoring: Enable metrics and configure alerting

  • Security: Use proper RBAC and network policies

  • High Availability: The deployment ensures pods are restarted if they fail

  • Auto-scaling: Configure HPA based on metrics if needed

  • Replica Management: Keep replicas=1 for CDC consistency to avoid duplicate processing

Deployment Features

The snowflake-import-deployment.yaml includes:

  • Deployment: Ensures reliable pod management and restart policies

  • Service: Exposes metrics endpoint for monitoring

  • ServiceMonitor: Prometheus integration for metrics collection

  • Secrets: Secure credential management

  • ConfigMap: Non-sensitive configuration management

  • Health Checks: Liveness and readiness probes

  • Init Containers: Dependency checks before startup