Snorkeling with Snowflake
I have been using Snowflake at Toplyne for a long time and looking for ways to leverage Ray with Snowflake to unlock next-level compute power.
The Snowflake Python connector offers a basic API that lets us pull data from Snowflake in a batched manner. To date, I have been using Python multi-threading to patch together basic Snowflake workflows. A basic workflow lets us boot up multiple threads wherein individual batches of Snowflake data can be mapped to different threads. This lets us get a seeming increase in throughput.
Multithreading is built into Python and for a lot of general-purpose tooling. It so happens that we can marry Snowflake’s APIs with multithreading and patch a workflow, but this approach has limited horizontal scalability capabilities.
Let's see what this workflow will look like in Ray:
This is what is so exciting about Ray. Simple idioms and maximum compute.
But wait, what is SnowflakeDatasource?
Ray’s ray-data library describes APIs to load data from different sources. The library implements a bunch of general-purpose APIs to read data from well-defined data sources. However, currently there are no APIs specifically for Snowflake.
Fortunately, implementing a data source for Snowflake is pretty straightforward.
How do we go about it though? We should collect some data points first:
- Ray has a guide wherein the implementation of a Mongo Datasource is described.
- Anyscale’s GitHub repo has a fork of Ray data which has an implementation of SnowflakeDataSource as well.
- Additionally, Ray has documented the block API which is fundamental to Ray’s internal data representation.
One of Ray’s standout features is that we can easily map individual Snowflake result batches to Ray’s data blocks. Based on this information, we can start with our implementation.
We need to implement two Ray classes and follow these 3 methods to get the entire thing going:
- ray.data.datasource.datasource.Datasource [source]
a) create_reader [source]
- ray.data.datasource.datasource.Reader [source]
a) estimate_inmemory_data_size [source]
b) get_read_tasks [source]
Now that we know the which of this Ray-data API, we can dive deeper into the what and why of the API:
a) Create a Snowflake connection.
b) Execute the query.
c) Fetch the snowflake ResultBatches.
d) Generate read tasks.
These read tasks fetch the data batch from Snowflake. Since Ray’s APIs are lazy, the memory footprint of this execution step is minimal.
Get the total size of the table as of when it’ll be loaded into memory.
For our use, I’ll infer it to be the Pyarrow table size.
Create an instance of a reader which has implemented the above two methods.
Now that we know what & why to implement, let's get into the how. I’ll be adding more descriptions in the documentation of this code.
Woah 😅, that is smooth.
Now let’s quickly tidy over the data source, which will let us juice the Ray system.
That is all!
You have a Snowflake data source. The next time you want to use some Ray goodness on Snowflake, you won’t be left wanting for a fast-reading data source.
You already got it here.
Now please do the cool stuff and show it to me.
- The GitHub repo with my implementation.
- The Anyscale blog that motivated me: https://www.anyscale.com/blog/introducing-the-anyscale-snowflake-connector
- The corresponding Anyscale fork: https://github.com/anyscale/datasets-database/blob/master/python/ray/data/datasource/snowflake_datasource.py