3
votes

Can we write data directly into snowflake table without using Snowflake internal stage using Python????

It seems auxiliary task to write in stage first and then transform it and then load it into table. can it done in one step only just like JDBC connection in RDBMS.

3

3 Answers

8
votes

The absolute fastest way to load data into Snowflake is from a file on either internal or external stage. Period. All connectors have the ability to insert the data with standard insert commands, but this will not perform as well. That said, many of the Snowflake drivers are now transparently using PUT/COPY commands to load large data to Snowflake via internal stage. If this is what you are after, then you can leverage the pandas write_pandas command to load data from a pandas dataframe to Snowflake in a single command. Behind the scenes, it will execute the PUT and COPY INTO for you.

https://docs.snowflake.com/en/user-guide/python-connector-api.html#label-python-connector-api-write-pandas

I highly recommend this pattern over INSERT commands in any driver. And I would also recommend transforms be done AFTER loading to Snowflake, not before.

1
votes

If someone if having issues with large datasets. Try Using dask instead and generate your dataframe into CSV chunks after partitioning. Then you can use dask.delayed with your PUT operation with multiple threads and then finally use COPY command. In case of large CSV files, you connection can timeout. It will be better to do it in chunks.

import dask.dataframe as dd
df = dd.read_csv('YOUR Large CSV')
df = df.partion(npartions=100)
df.to_csv('folder_name/export_*.csv')#files will be based on number of partions
import glob
lst_files = glob.glob('folder_name/*csv')
#your sqlalchemy engine or snowflake conn object
@dask.delayed
def run_put_query(conxn):
    conxn.execute(
        f"PUT file://{csv_file} @{staging_area_path}/{folder_name} overwrite=true")
    return
delayed_objects = []

for csv_file in lst_files:
    res = run_put_query(
        conxn=conxn
    )
    delayed_objects.append(res)
output = dask.compute(*delayed_objects,scheduler='threads',num_workers=10)
0
votes

Java:

Load Driver Class:

Class.forName("net.snowflake.client.jdbc.SnowflakeDriver")

Maven:

Add following code block as a dependency

<dependency>
  <groupId>net.snowflake</groupId>
  <artifactId>snowflake-jdbc</artifactId>
  <version>{version}</version>

Spring :

application.yml:

    spring:
      datasource
        hikari:
          maximumPoolSize: 4 # Specify maximum pool size
          minimumIdle: 1 # Specify minimum pool size
          driver-class-name: com.snowflake.client.jdbc.SnowflakeDriver

Python :

    import pyodbc

    # pyodbc connection string
    conn = pyodbc.connect("Driver={SnowflakeDSIIDriver}; Server=XXX.us-east-2.snowflakecomputing.com; Database=VAQUARKHAN_DB; schema=public; UID=username; PWD=password")

    #  Cursor
    cus=conn.cursor()

    # Execute SQL statement to get current datetime and store result in cursor
    cus.execute("select current_date;")

    # Display the content of cursor
    row = cus.fetchone()

    print(row)

Apache Spark:

    <dependency>
         <groupId>net.snowflake</groupId>
         <artifactId>spark-snowflake_2.11</artifactId>
         <version>2.5.9-spark_2.4</version>
    </dependency>

Code

   import org.apache.spark.sql.DataFrame
   import org.apache.spark.sql.DataFrame


        / Use secrets DBUtil to get Snowflake credentials.
        val user = dbutils.secrets.get("data-warehouse", "<snowflake-user>")
        val password = dbutils.secrets.get("data-warehouse", "<snowflake-password>")

        val options = Map(
          "sfUrl" -> "<snowflake-url>",
          "sfUser" -> user,
          "sfPassword" -> password,
          "sfDatabase" -> "<snowflake-database>",
          "sfSchema" -> "<snowflake-schema>",
          "sfWarehouse" -> "<snowflake-cluster>"
        )


        // Generate a simple dataset containing five values and write the dataset to Snowflake.
        spark.range(5).write
          .format("snowflake")
          .options(options)
          .option("dbtable", "<snowflake-database>")
          .save()


        // Read the data written by the previous cell back.
        val df: DataFrame = spark.read
          .format("snowflake")
          .options(options)
          .option("dbtable", "<snowflake-database>")
          .load()

        display(df)

Fastest way to load data into Snowflake is from a file