Snowflake SQLAlchemy ツールキットでのPythonコネクタの使用

Snowflake SQLAlchemy は、Snowflakeデータベースと SQLAlchemy アプリケーションをつなぐ方言として、Snowflake Connector for Pythonの上で実行されます。

詳細については、 方言 ドキュメントをご参照ください。

このトピックの内容:

前提条件

Snowflake Connector for Python

Snowflake SQLAlchemy の唯一の要件は、Python用Snowflakeコネクタです。ただし、Snowflake SQLAlchemy をインストールするとコネクタが自動的にインストールされるため、コネクタをインストールする必要はありません。

データアナリティクスとウェブアプリケーションフレームワーク(オプション)

Snowflake SQLAlchemy は、Pandas、JupyterおよびPyramidで使用できます。これらは、データアナリティクスおよびウェブアプリケーション用により高次のアプリケーションフレームワークを提供します。ただし、作業環境をゼロから構築することは、特に初心者ユーザーにとっては簡単な作業ではありません。フレームワークをインストールするには、Cコンパイラとツールが必要です。適切なツールとバージョンの選択は、ユーザーがPythonアプリケーションを使用することを妨げる障害になりかねません。

環境を構築する簡単な方法は、Anacondaを使用することです。これにより、データアナリストや学生などのPython以外の専門家を含むすべてのユーザーに完全なプリコンパイルテクノロジースタックが提供されます。Anacondaのインストール手順については、Anacondaのインストールドキュメントをご参照ください。その後、 pip を使用して、Snowflake SQLAlchemy パッケージをAnacondaの上にインストールできます。

詳細については、次のドキュメントをご参照ください。

Snowflake SQLAlchemy のインストール

Snowflake SQLAlchemy パッケージは、 pip を使用してパブリック PyPI リポジトリからインストールできます。

pip install --upgrade snowflake-sqlalchemy 
Copy

pip は、Python用Snowflakeコネクタを含むすべての必要なモジュールを自動的にインストールします。

開発者ノートは、ソースコード GitHub にホストされていることに注意してください。

インストールの確認

  1. 次のPythonサンプルコードを含むファイル(例: validate.py)を作成します。これは、Snowflakeに接続し、Snowflakeバージョンを表示します。

    #!/usr/bin/env python from sqlalchemy import create_engine engine = create_engine( 'snowflake://{user}:{password}@{account_identifier}/'.format( user='<user_login_name>', password='<password>', account_identifier='<account_identifier>', ) ) try: connection = engine.connect() results = connection.execute('select current_version()').fetchone() print(results[0]) finally: connection.close() engine.dispose() 
    Copy
  2. <ユーザーログイン名><パスワード>、および <アカウント識別子> をSnowflakeアカウントおよびユーザーの適切な値に置き換えます。詳細については、 接続パラメーター (このトピック内)をご参照ください。

  3. サンプルコードを実行します。例えば、 validate.py という名前のファイルを作成した場合、

    python validate.py 
    Copy

Snowflakeバージョン(例: 1.6.0)が表示されます。

Snowflake固有のパラメーターと動作

Snowflake SQLAlchemy は、可能な限り、 SQLAlchemy アプリケーションに互換性のある機能を提供します。

SQLAlchemy の使用については、 SQLAlchemy のドキュメントをご参照ください。

ただし、Snowflake SQLAlchemy はSnowflake固有のパラメーターと動作も提供します。これについては、次のセクションで説明します。

接続パラメーター

必須パラメーター

Snowflake SQLAlchemy は、次の接続文字列構文を使用してSnowflakeに接続し、セッションを開始します:

'snowflake://<user_login_name>:<password>@<account_identifier>' 
Copy

条件:

追加の接続パラメーター

オプションで、接続文字列の最後( <アカウント名> の後)に、次の追加情報を含めることができます。

'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>' 
Copy

条件:

  • <データベース名> および <スキーマ名> は、スラッシュ(/)で区切られたSnowflakeセッションの初期データベースとスキーマです。

  • warehouse=<ウェアハウス名> および role=<ロール名>' はセッションの初期ウェアハウスおよびロールであり、パラメーター文字列として指定され、疑問符(?)で区切られています。

注釈

ログイン後、接続文字列で指定された初期データベース、スキーマ、ウェアハウス、およびロールは、セッションに対していつでも変更できます。

プロキシサーバーの構成

プロキシサーバーのパラメーターはサポートされていません。代わりに、サポートされている環境変数を使用してプロキシサーバーを構成します。詳細については、 プロキシサーバーの使用 をご参照ください。

接続文字列の例

次の例では、ユーザー名 testuser1、パスワード 0123456、アカウント識別子 myorganization-myaccount、データベース testdb、スキーマ public、ウェアハウス testwh、およびロール myrolecreate_engine メソッドを呼び出します。

from sqlalchemy import create_engine engine = create_engine( 'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole' ) 
Copy

便宜上、 snowflake.sqlalchemy.URL メソッドを使用して接続文字列を作成し、データベースに接続できます。次の例では、前の例と同じ接続文字列を作成します。

from snowflake.sqlalchemy import URL from sqlalchemy import create_engine engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = '0123456', database = 'testdb', schema = 'public', warehouse = 'testwh', role='myrole', )) 
Copy

接続の開始および終了

engine.connect() を実行して接続を開始します。 engine.execute() の使用は避けてください。

# Avoid this. engine = create_engine(...) engine.execute(<SQL>) engine.dispose() # Do this. engine = create_engine(...) connection = engine.connect() try: connection.execute(<SQL>) finally: connection.close() engine.dispose() 
Copy

注釈

engine.dispose() の前に connection.close() を実行して、接続を終了してください。そうしない場合は、Python GarbageコレクターがSnowflakeとの通信に必要なリソースを削除するため、Pythonコネクタはセッションを適切に終了できなくなります。

明示的なトランザクションを使用する場合は、 SQLAlchemy 内の AUTOCOMMIT 実行オプションを無効にする必要があります。

詳細については、 SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#library-level-e-g-emulated-autocommit>`_ の `AUTOCOMMIT 実行オプションをご参照ください。

デフォルトでは、 SQLAlchemy はこのオプションを有効にします。このオプションを有効にすると、 INSERT、 UPDATE、および DELETE ステートメントは、明示的なトランザクション内で実行された場合でも、実行時に自動的にコミットされます。

AUTOCOMMIT を無効にするには、 autocommit=FalseConnection.execution_options() メソッドに渡します。例:

# Disable AUTOCOMMIT if you need to use an explicit transaction. with engine.connect().execution_options(autocommit=False) as connection: try: connection.execute("BEGIN") connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)") connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)") connection.execute("COMMIT") except Exception as e: connection.execute("ROLLBACK") finally: connection.close() engine.dispose() 
Copy

自動インクリメントの動作

値を自動インクリメントするには、 Sequence オブジェクトが必要です。新しいレコードが挿入されるたびに値を自動的にインクリメントするには、主キー列に Sequence オブジェクトを含めます。例:

t = Table('mytable', metadata, Column('id', Integer, Sequence('id_seq'), primary_key=True), Column(...), ... ) 
Copy

オブジェクト名の大文字小文字の処理

Snowflakeは、大文字と小文字を区別しないすべてのオブジェクト名を大文字で保存します。対照的に、 SQLAlchemy はすべての小文字のオブジェクト名が大文字と小文字を区別しないと見なします。Snowflake SQLAlchemy は、スキーマレベルの通信中(つまり、テーブルとインデックスのリフレクション中)に、オブジェクト名の大文字と小文字を変換します。大文字のオブジェクト名を使用する場合、 SQLAlchemy は大文字と小文字が区別されると想定し、名前を引用符で囲みます。この動作により、Snowflakeから受信したデータディクショナリデータとの不一致が発生するため、引用符(例: "TestDb")を使用して大文字と小文字を区別する識別子名を作成しない限り、SQLAlchemy 側では、すべて小文字の名前を使用する必要があります。

インデックスのサポート

インデックスは、Snowflake SqlAlchemy のハイブリッドテーブルでのみサポートされています。制限事項やユースケースの詳細については、 CREATE INDEX 使用上の注意 をリファレンスしてください。インデックスは以下の方法で作成できます。

  • 単一列インデックス

    列に index=True パラメーターをセットするか、 Index オブジェクトを明示的に定義することで、単一の列インデックスを作成することができます。

    hybrid_test_table_1 = HybridTable( "table_name", metadata, Column("column1", Integer, primary_key=True), Column("column2", String, index=True), Index("index_1","column1", "column2") ) metadata.create_all(engine_testaccount) 
    Copy
  • 複数列インデックス

    複数列インデックスの場合、インデックスされるべき列を指定するインデックスオブジェクトを定義します。

    hybrid_test_table_1 = HybridTable( "table_name", metadata, Column("column1", Integer, primary_key=True), Column("column2", String), Index("index_1","column1", "column2") ) metadata.create_all(engine_testaccount) 
    Copy

Numpyデータ型のサポート

Snowflake SQLAlchemy は、 NumPy データ型のバインドとフェッチをサポートしています。バインディングは常にサポートされています。 NumPy データ型のフェッチを有効にするには、接続パラメーターに numpy=True を追加します。

次の NumPy データ型がサポートされています。

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

次の例は、 numpy.datetime64 データのラウンドトリップを示しています。

import numpy as np import pandas as pd engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', numpy=True, )) specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z') with engine.connect() as connection: connection.exec_sql_query( "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)") connection.exec_sql_query( "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,) ) df = pd.read_sql_query("SELECT * FROM ts_tbl", connection) assert df.c1.values[0] == specific_date 
Copy

列メタデータのキャッシュ

SQLAlchemy はランタイム検査 API を提供して、さまざまなオブジェクトに関するランタイム情報を取得します。一般的な使用例の1つは、スキーマカタログを構築するために、スキーマ内のすべてのテーブルとその列メタデータを取得することです。

詳細については、 ランタイム検査 API をご参照ください。SQLAlchemy、 alembic を使用してデータベース スキーマの移行を管理する例。

擬似コードフローは次のとおりです:

inspector = inspect(engine) schema = inspector.default_schema_name for table_name in inspector.get_table_names(schema): column_metadata = inspector.get_columns(table_name, schema) primary_keys = inspector.get_primary_keys(table_name, schema) foreign_keys = inspector.get_foreign_keys(table_name, schema) ... 
Copy

このフローでは、潜在的な問題として、各テーブルでクエリを実行するのにかなり時間がかかることがあります。結果はキャッシュされますが、列のメタデータの取得には高額な費用がかかります。

この問題を軽減するために、Snowflake SQLAlchemy はフラグ cache_column_metadata=True を取り、 get_table_names が呼び出されたときにすべてのテーブルのすべての列メタデータがキャッシュされ、残りの get_columnsget_primary_keys および get_foreign_keys がキャッシュを利用できるようにします。

engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', cache_column_metadata=True, )) 
Copy

注釈

すべての列メタデータが Inspector オブジェクトに関連付けられてキャッシュされるため、メモリ使用量が増加します。すべての列メタデータを取得する必要がある場合にのみ、フラグを使用します。

VARIANT 、ARRAY、および OBJECT のサポート

Snowflake SQLAlchemy は、 VARIANTARRAY、および OBJECT データ型の取得をサポートしています。すべての型はPythonで str に変換されるため、 json.loads を使用してネイティブデータ型に変換できます。

この例は、 VARIANTARRAY、および OBJECT データ型の列を含むテーブルを作成する方法を示しています。

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT) ... t = Table('my_semi_structured_datatype_table', metadata, Column('va', VARIANT), Column('ob', OBJECT), Column('ar', ARRAY)) metadata.create_all(engine) 
Copy

VARIANTARRAY、および OBJECT データ型の列を取得してネイティブPythonデータ型に変換するには、次のようにデータを取得して json.loads メソッドを呼び出します。

import json connection = engine.connect() results = connection.execute(select([t])) row = results.fetchone() data_variant = json.loads(row[0]) data_object = json.loads(row[1]) data_array = json.loads(row[2]) 
Copy

構造化データ型のサポート

このモジュールは、Snowflake 構造化データ、特に Iceberg テーブル用のカスタム SQLAlchemy タイプを定義します。MAP、 OBJECT、 ARRAY のタイプでは、 SQLAlchemy モデルに複雑なデータ構造を格納することができます。詳細情報については、Snowflake Structured data types ドキュメントを参照してください。

MAP

MAP タイプはキーと値のペアの集まりを表し、キーと値はそれぞれ異なるタイプを持つことができます。

  • キーのタイプ: TEXTNUMBER のようなキーのタイプ。

  • 値のタイプ: TEXTNUMBER のような値のタイプ。

  • Not Null: NULL 値を許可するかどうか (デフォルト値は False)。

使用例:

IcebergTable( table_name, metadata, Column("id", Integer, primary_key=True), Column("map_col", MAP(NUMBER(10, 0), TEXT(134217728))), external_volume="external_volume", base_location="base_location", ) 
Copy

OBJECT

OBJECT タイプは、名前付きフィールドを持つ半構造化オブジェクトを表します。各フィールドは特定のタイプを持つことができ、各フィールドがNULL許容かどうかを指定することもできます。

  • アイテムのタイプ: フィールド名とそのタイプの辞書。このタイプには、オプションでNull許容 フラグ(Null許容でない場合は True、Null許容の場合は False [デフォルト])を含めることができます。

使用例:

IcebergTable( table_name, metadata, Column("id", Integer, primary_key=True), Column( "object_col", OBJECT(key1=(TEXT(134217728), False), key2=(NUMBER(10, 0), False)), OBJECT(key1=TEXT(1134217728), key2=NUMBER(10, 0)), # Without nullable flag ), external_volume="external_volume", base_location="base_location", ) 
Copy

ARRAY

ARRAY タイプは値の順序付きリストを表し、各要素は同じタイプを持ちます。要素のタイプは配列の作成時に定義されます。

  • 値のタイプ: TEXTNUMBER のような配列の要素のタイプ。

  • Not Null: NULL の値を許可するかどうか(デフォルト値は False)。

使用例:

IcebergTable( table_name, metadata, Column("id", Integer, primary_key=True), Column("array_col", ARRAY(TEXT(134217728))), external_volume="external_volume", base_location="base_location", ) 
Copy

CLUSTER BY のサポート

Snowflake SQLAlchemy は、テーブルの CLUSTER BY パラメーターをサポートしています。パラメーターの詳細については、 CREATE TABLE をご参照ください。

この例では、クラスタリングキーとして idname の2つの列を持つテーブルを作成する方法を示します。

t = Table('myuser', metadata, Column('id', Integer, primary_key=True), Column('name', String), snowflake_clusterby=['id', 'name'], ... ) metadata.create_all(engine) 
Copy

Alembicのサポート

Alembicは、 SQLAlchemy の上にあるデータベース移行ツールです。Snowflake SQLAlchemy は、AlembicがSnowflake SQLAlchemy を認識できるように、次のコードを alembic/env.py に追加することで機能します。

from alembic.ddl.impl import DefaultImpl class SnowflakeImpl(DefaultImpl): __dialect__ = 'snowflake' 
Copy

一般的な使用法については、 Alembicドキュメント をご参照ください。

キーペア認証のサポート

Snowflake SQLAlchemy は、Python用Snowflakeコネクタの機能を利用してキーペア認証をサポートします。秘密キーと公開キーを作成するステップについては、 キーペア認証とキーペアローテーションの使用 をご参照ください。

秘密キーパラメーターは、次のように connect_args を介して渡されます。

... from snowflake.sqlalchemy import URL from sqlalchemy import create_engine from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization with open("rsa_key.p8", "rb") as key: p_key= serialization.load_pem_private_key( key.read(), password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(), backend=default_backend() ) pkb = p_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()) engine = create_engine(URL( account='abc123', user='testuser1', ), connect_args={ 'private_key': pkb, }, ) 
Copy

PRIVATE_KEY_PASSPHRASE は、秘密キーファイル rsa_key.p8 を復号化するためのパスフレーズです。

snowflake.sqlalchemy.URL メソッドは秘密キーパラメーターをサポートしていません。

マージコマンドのサポート

Snowflake SQLAlchemy は、その MergeInto カスタム式によるアップサートの実行をサポートしています。包括的なドキュメントについては、 MERGE をご参照ください。

次のように使用します。

from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import MergeInto engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) t1 = meta.tables['t1'] t2 = meta.tables['t2'] merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key) merge.when_matched_then_delete().where(t2.c.marked == 1) merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus) merge.when_matched_then_update().values(val=t2.c.newval) merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus) connection.execute(merge) 
Copy

CopyIntoStorage のサポート

Snowflake SQLAlchemy は、カスタム CopyIntoStorage 式を使用して、テーブルとクエリ結果をさまざまなSnowflakeステージ、Azureコンテナー、および AWS バケットに保存することをサポートしています。包括的なドキュメントについては、 COPY INTO <場所> をご参照ください。

次のように使用します。

from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) users = meta.tables['users'] copy_into = CopyIntoStorage(from_=users, into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'), formatter=CSVFormatter().null_if(['null', 'Null'])) connection.execute(copy_into) 
Copy

IcebergテーブルとSnowflakeカタログサポート

Snowflake SQLAlchemy は、さまざまな関連パラメーターとともに、Snowflake カタログでアイスバーグテーブルをサポートしています。Icebergテーブルの詳細情報については、Snowflake CREATEICEBERG ドキュメントを参照してください。

Snowflake SQLAlchemy を使用して Iceberg テーブルを作成するには、 SQLAlchemy Core 構文を使用して、以下のようにテーブルを定義します。

table = IcebergTable( "myuser", metadata, Column("id", Integer, primary_key=True), Column("name", String), external_volume=external_volume_name, base_location="my_iceberg_table", as_query="SELECT * FROM table" ) 
Copy

あるいは、宣言的アプローチを使ってテーブルを定義することもできます。

class MyUser(Base): __tablename__ = "myuser" @classmethod def __table_cls__(cls, name, metadata, *arg, **kw): return IcebergTable(name, metadata, *arg, **kw) __table_args__ = { "external_volume": "my_external_volume", "base_location": "my_iceberg_table", "as_query": "SELECT * FROM table", } id = Column(Integer, primary_key=True) name = Column(String) 
Copy

ハイブリッドテーブル対応

Snowflake SQLAlchemy はインデックス付きのハイブリッドテーブルをサポートしています。詳細情報については、Snowflake CREATE HYBRID TABLE ドキュメントを参照してください。

ハイブリッド・テーブルを作成し、インデックスを追加するには、 SQLAlchemy Core構文を次のように使用します。

table = HybridTable( "myuser", metadata, Column("id", Integer, primary_key=True), Column("name", String), Index("idx_name", "name") 
Copy

あるいは、宣言的アプローチを使ってテーブルを定義することもできます。

class MyUser(Base): __tablename__ = "myuser" @classmethod def __table_cls__(cls, name, metadata, *arg, **kw): return HybridTable(name, metadata, *arg, **kw) __table_args__ = ( Index("idx_name", "name"), ) id = Column(Integer, primary_key=True) name = Column(String) 
Copy

動的テーブルは、サポートします。

Snowflake SQLAlchemy はダイナミックテーブルをサポートしています。詳細情報については、Snowflake CREATE DYNAMIC TABLE ドキュメントを参照してください。

ダイナミック・テーブルを作成するには、 SQLAlchemy Core の構文を次のように使用します。

 dynamic_test_table_1 = DynamicTable( "dynamic_MyUser", metadata, Column("id", Integer), Column("name", String), target_lag=(1, TimeUnit.HOURS), # Additionally you can use SnowflakeKeyword.DOWNSTREAM warehouse='test_wh', refresh_mode=SnowflakeKeyword.FULL as_query="SELECT id, name from MyUser;" ) 
Copy

さらに、 SqlAlchemy select() 構文を使用して、列のないテーブルを定義することもできます。

 dynamic_test_table_1 = DynamicTable( "dynamic_MyUser", metadata, target_lag=(1, TimeUnit.HOURS), warehouse='test_wh', refresh_mode=SnowflakeKeyword.FULL as_query=select(MyUser.id, MyUser.name) ) 
Copy

注釈

  • ダイナミックテーブルのプライマリキーはサポートされていますう。つまり、宣言型テーブルはダイナミック・テーブルをサポートしていません。

  • as_query パラメーターを文字列で使用する場合、列を明示的に定義する必要があります。しかし、SQLAlchemy select() 構文を使う場合、列を明示的に定義する必要はありません。

  • ダイナミック・テーブルへの直接データ挿入はサポートされていません。