Spark コネクタを使用して StarRocks からデータを読み取る
StarRocks は、Apache Spark™ 用に自社開発したコネクタである StarRocks Connector for Apache Spark™(以下、Spark コネクタ)を提供しており、Spark を使用して StarRocks テーブルからデータを読み取ることができます。Spark を使用して、StarRocks から読み取ったデータに対して複雑な処理や機械学習を行うことができます。
Spark コネクタは、Spark SQL、Spark DataFrame、Spark RDD の3つの読み取り方法をサポートしています。
Spark SQL を使用して StarRocks テーブルに一時ビューを作成し、その一時ビューを使用して StarRocks テーブルから直接データを読み取ることができます。
また、StarRocks テーブルを Spark DataFrame または Spark RDD にマッピングし、そこからデータを読み取ることもできます。Spark DataFrame の使用を推奨します。
注意
StarRocks テーブルの SELECT 権限を持つユーザーのみが、このテーブルからデータを読み取ることができます。ユーザーに権限を付与するには、GRANT の指示に従ってください。
使用上の注意
- データを読み取る前に StarRocks でデータをフィルタリングすることで、転 送されるデータ量を削減できます。
- データ読み取りのオーバーヘッドが大きい場合は、適切なテーブル設計とフィルタ条件を使用して、Spark が一度に過剰なデータを読み取らないようにすることができます。これにより、ディスクとネットワーク接続への I/O 負荷を軽減し、通常のクエリが適切に実行できるようにします。
バージョン要件
| Spark コネクタ | Spark | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 以降 | 8 | 2.12 |
| 1.1.1 | 3.2, 3.3, 3.4 | 2.5 以降 | 8 | 2.12 |
| 1.1.0 | 3.2, 3.3, 3.4 | 2.5 以降 | 8 | 2.12 |
| 1.0.0 | 3.x | 1.18 以降 | 8 | 2.12 |
| 1.0.0 | 2.x | 1.18 以降 | 8 | 2.11 |
注意
- 異なるコネクタバージョン間の動作変更については、Upgrade Spark connector を参照してください。
- バージョン 1.1.1 以降、コネクタは MySQL JDBC ドライバを提供していないため、Spark のクラスパスに手動でドライバをインポートする必要があります。ドライバは Maven Central で見つけることができます。
- バージョン 1.0.0 では、Spark コネクタは StarRocks からのデータ読み取りのみをサポートしています。バージョン 1.1.0 以降、Spark コネクタは StarRocks からのデータ読み取りと書き込みの両方をサポートしています。
- バージョン 1.0.0 とバージョン 1.1.0 では、パラメータとデータ型マッピングが異なります。Upgrade Spark connector を参照してください。
- 一般的な場合、バージョン 1.0.0 に新しい機能は追加されません。できるだけ早く Spark コネクタをアップグレードすることをお勧めします。
Spark コネクタの入手
ビジネスニーズに合った Spark コネクタ .jar パッケージを入手するには、次の方法のいずれかを使用します。
- コンパイル済みパッケージをダウンロードする。
- Maven を使用して Spark コネクタに必要な依存関係を追加する。(この方法は Spark コネクタ 1.1.0 以降でのみサポートされています。)
- 手動でパッケージをコンパイルする。
Spark コネクタ 1.1.0 以降
Spark コネクタ .jar パッケージは次の形式で命名されています。
starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
たとえば、Spark 3.2 と Scala 2.12 で Spark コネクタ 1.1.0 を使用したい場合、starrocks-spark-connector-3.2_2.12-1.1.0.jar を選択できます。
注意
通常、最新の Spark コネクタバージョンは、最新の 3 つの Spark バージョンで使用できます。
コンパイル済みパッケージをダウンロードする
さまざまなバージョンの Spark コネクタ .jar パッケージは Maven Central Repository で入手できます。
Maven 依存関係を追加する
Spark コネクタに必要な依存関係を次のように設定します。
注意
spark_version、scala_version、およびconnector_versionを使用する Spark バージョン、Scala バージョン、および Spark コネクタバージョンに置き換える必要があります。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>
たとえば、Spark 3.2 と Scala 2.12 で Spark コネクタ 1.1.0 を使用したい場合、依存関係を次のように設定します。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
手動でパッケージをコンパイルする
-
Spark コネクタのコード をダウンロードします。
-
次のコマンドを使用して Spark コネクタをコンパイルします。
注意
spark_versionを使用する Spark バージョンに置き換える必要があります。sh build.sh <spark_version>たとえば、Spark 3.2 で Spark コネクタを使用したい場合、次のように Spark コネクタをコンパイルします。
sh build.sh 3.2 -
target/パスに移動し、コンパイル後に生成されたstarrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jarのような Spark コネクタ .jar パッケージを確認します。注意
公式にリリースされていない Spark コネクタバージョンを使用している場合、生成された Spark コネクタ .jar パッケージの名前には
SNAPSHOTがサフィックスとして含まれます。
Spark コネクタ 1.0.0
コンパイル済みパッケージをダウンロードする
手動でパッケージをコンパイルする
-
Spark コネクタのコード をダウンロードします。
注意
spark-1.0に切り替える必要があります 。 -
Spark コネクタをコンパイルするために、次のいずれかの操作を行います。
-
Spark 2.x を使用している場合、次のコマンドを実行します。デフォルトで Spark 2.3.4 に適合するように Spark コネクタをコンパイルします。
sh build.sh 2 -
Spark 3.x を使用している場合、次のコマンドを実行します。デフォルトで Spark 3.1.2 に適合するように Spark コネクタをコンパイルします。
sh build.sh 3
-
-
output/パスに移動し、コンパイル後に生成されたstarrocks-spark2_2.11-1.0.0.jarファイルを確認します。その後、ファイルを Spark のクラスパスにコピーします。- Spark クラスターが
Localモードで実行されている場合、ファイルをjars/パスに配置します。 - Spark クラスターが
Yarnモードで実行され ている場合、ファイルを事前展開パッケージに配置します。
- Spark クラスターが
指定された場所にファイルを配置した後にのみ、Spark コネクタを使用して StarRocks からデータを読み取ることができます。
パラメータ
このセクションでは、Spark コネクタを使用して StarRocks からデータを読み取る際に設定する必要があるパラメータについて説明します。
共通パラメータ
次のパラメータは、Spark SQL、Spark DataFrame、Spark RDD の3つの読み取り方法すべてに適用されます。
| パラメータ | デフォルト値 | 説明 |
|---|---|---|
| starrocks.fenodes | None | StarRocks クラスター内の FE の HTTP URL。形式 <fe_host>:<fe_http_port>。複数の URL を指定する場合は、カンマ (,) で区切る必要があります。 |
| starrocks.table.identifier | None | StarRocks テーブルの名前。形式: <database_name>.<table_name>。 |
| starrocks.request.retries | 3 | Spark が StarRocks に読み取りリクエストを送信する際の最大リトライ回数。 |
| starrocks.request.connect.timeout.ms | 30000 | StarRocks に送信された読み取りリクエストがタイムアウトするまでの最大時間。 |
| starrocks.request.read.timeout.ms | 30000 | StarRocks に送信されたリクエストの読み取りがタイムアウトするまでの最大時間。 |
| starrocks.request.query.timeout.s | 3600 | StarRocks からデータをクエリする際の最大タイムアウト時間。デフォルトのタイムアウト期間は 1 時間です。-1 はタイムアウト期間が指定されていないことを意味します。 |
| starrocks.request.tablet.size | Integer.MAX_VALUE | 各 Spark RDD パーティションにグループ化される StarRocks タブレットの数。このパラメータの値が小さいほど、生成される Spark RDD パーティションの数が多くなります。Spark の並行性が高くなる一方で、StarRocks への負荷も大きくなります。 |
| starrocks.batch.size | 4096 | 一度に BEs から読み取ることができる最大行数。このパラメータの値を増やすことで、Spark と StarRocks 間で確立される接続数を減らし、ネットワーク遅延による余分な時間オーバーヘッドを軽減できます。 |
| starrocks.exec.mem.limit | 2147483648 | クエリごとに許可される最大メモリ量。単位: バイト。デフォルトのメモリ制限は 2 GB です。 |
| starrocks.deserialize.arrow.async | false | Arrow メモリ形式を Spark コネクタの反復に必要な RowBatches に非同期で変換することをサポートするかどうかを指定します。 |
| starrocks.deserialize.queue.size | 64 | Arrow メモリ形式を RowBatches に非同期で変換するタスクを保持する内部キューのサイズ。このパラメータは starrocks.deserialize.arrow.async が true に設定されている場合に有効です。 |
| starrocks.filter.query | None | StarRocks 上でデータをフィルタ リングするための条件。複数のフィルタ条件を指定する場合は、and で結合する必要があります。StarRocks は指定されたフィルタ条件に基づいて StarRocks テーブルからデータをフィルタリングし、その後 Spark によってデータが読み取られます。 |
| starrocks.timezone | JVM のデフォルトタイムゾーン | 1.1.1 以降でサポートされています。StarRocks の DATETIME を Spark の TimestampType に変換するために使用されるタイムゾーン。デフォルトは ZoneId#systemDefault() によって返される JVM のタイムゾーンです。形式は Asia/Shanghai のようなタイムゾーン名、または +08:00 のようなゾーンオフセットです。 |
Spark SQL および Spark DataFrame 用のパラメータ
次のパラメータは、Spark SQL および Spark DataFrame の読み取り方法にのみ適用されます。
| パラメータ | デフォルト値 | 説明 |
|---|---|---|
| starrocks.fe.http.url | None | FE の HTTP IP アドレス。このパラメータは Spark コネクタ 1.1.0 以降でサポートされています。このパラメータは starrocks.fenodes と同等です。どちらか一方を設定するだけで済みます。Spark コネクタ 1.1.0 以降では、starrocks.fe.http.url を使用することをお勧めします。starrocks.fenodes は廃止される可能性があります。 |
| starrocks.fe.jdbc.url | None | FE の MySQL サーバーに接続するために使用されるアドレス。形式: jdbc:mysql://<fe_host>:<fe_query_port>。注意 Spark コネクタ 1.1.0 以降では、このパラメータは必須です。 |
| user | None | StarRocks クラスターアカウントのユーザー名。ユーザーは StarRocks テーブルに対する SELECT 権限 を持っている必要があります。 |
| starrocks.user | None | StarRocks クラスターアカウントのユーザー名。このパラメータは Spark コネクタ 1.1.0 以降でサポートされています。このパラメータは user と同等です。どちらか一方を設定するだけで済みます。Spark コネクタ 1.1.0 以降では、starrocks.user を使用することをお勧めします。user は廃止される可能性があります。 |
| password | None | StarRocks クラスターアカウントのパスワード。 |
| starrocks.password | None | StarRocks クラスターアカウントのパスワード。このパラメータは Spark コネクタ 1.1.0 以降でサポートされています。このパラメータは password と同等です。どちらか一方を設定するだけで済みます。Spark コネクタ 1.1.0 以降では、starrocks.password を使用することをお勧めします。password は廃止される可能性があります。 |
| starrocks.filter.query.in.max.count | 100 | プレディケートプッシュダウン中に IN 式でサポートされる最大値数。IN 式で指定された値の数がこの制限を超える場合、IN 式で指定されたフィルタ条件は Spark で処理されます。 |
Spark RDD 用のパラメータ
次のパラメータは、Spark RDD の読み取り方法にのみ適用されます。
| パラメータ | デフォルト値 | 説明 |
|---|---|---|
| starrocks.request.auth.user | None | StarRocks クラスターアカウントのユーザー名。 |
| starrocks.request.auth.password | None | StarRocks クラスターアカウントのパスワード。 |
| starrocks.read.field | None | データを読み取りたい StarRocks テーブルのカラム。複数のカラムを指定する場合は、カンマ (,) で区切る必要があります。 |
StarRocks と Spark 間のデータ型マッピング
Spark コネクタ 1.1.0 以降
| StarRocks データ型 | Spark データ型 |
|---|---|
| BOOLEAN | DataTypes.BooleanType |
| TINYINT | DataTypes.ByteType |
| SMALLINT | DataTypes.ShortType |
| INT | DataTypes.IntegerType |
| BIGINT | DataTypes.LongType |
| LARGEINT | DataTypes.StringType |
| FLOAT | DataTypes.FloatType |
| DOUBLE | DataTypes.DoubleType |
| DECIMAL | DecimalType |
| CHAR | DataTypes.StringType |
| VARCHAR | DataTypes.StringType |
| STRING | DataTypes.StringType |
| DATE | DataTypes.DateType |
| DATETIME | DataTypes.TimestampType |
| JSON | DataTypes.StringType 注意: このデータ型マッピングは Spark コネクタ v1.1.2 以降でサポートされ、StarRocks バージョン 2.5.13、3.0.3、3.1.0 以降が必要です。 |
| ARRAY | サポートされていないデータ型 |
| HLL | サポートされていないデータ型 |
| BITMAP | サポートされていないデータ型 |
Spark コネクタ 1.0.0
| StarRocks データ型 | Spark データ型 |
|---|---|
| BOOLEAN | DataTypes.BooleanType |
| TINYINT | DataTypes.ByteType |
| SMALLINT | DataTypes.ShortType |
| INT | DataTypes.IntegerType |
| BIGINT | DataTypes.LongType |
| LARGEINT | DataTypes.StringType |
| FLOAT | DataTypes.FloatType |
| DOUBLE | DataTypes.DoubleType |
| DECIMAL | DecimalType |
| CHAR | DataTypes.StringType |
| VARCHAR | DataTypes.StringType |
| DATE | DataTypes.StringType |
| DATETIME | DataTypes.StringType |
| ARRAY | サポートされていないデータ型 |
| HLL | サポートされていないデータ型 |
| BITMAP | サポートされていないデータ型 |
StarRocks が使用する基盤ストレージエンジンの処理ロジックは、DATE および DATETIME データ型を直接使用する場合に期待される時間範囲をカバーできません。そのため、Spark コネクタは StarRocks の DATE および DATETIME データ型を Spark の STRING データ型にマッピングし、StarRocks から読み取った日付と時刻データに一致する読みやすい文字列テキストを生成します。
Spark コネクタのアップグレード
バージョン 1.0.0 からバージョン 1.1.0 へのアップグレード
-
バージョン 1.1.1 以降、Spark コネクタは MySQL の公式 JDBC ドライバである
mysql-connector-javaを提供してい ません。これはmysql-connector-javaが使用する GPL ライセンスの制限によるものです。しかし、Spark コネクタはテーブルメタデータに接続するためにmysql-connector-javaを必要とするため、ドライバを Spark のクラスパスに手動で追加する必要があります。ドライバは MySQL サイト または Maven Central で見つけることができます。 -
バージョン 1.1.0 では、Spark コネクタは StarRocks にアクセスしてより詳細なテーブル情報を取得するために JDBC を使用します。そのため、
starrocks.fe.jdbc.urlを設定する必要があります。 -
バージョン 1.1.0 では、一部のパラメータがリネームされています。現在、古いパラメータと新しいパラメータの両方が保持されています。等価なパラメータのペアのうち、どちらか一方を設定するだけで済みますが、古いパラメータは廃止される可能性があるため、新しいパラメータを使用することをお勧めします。
starrocks.fenodesはstarrocks.fe.http.urlにリネームされました。userはstarrocks.userにリネームされました。passwordはstarrocks.passwordにリネームされました。
-
バージョン 1.1.0 では、Spark 3.x に基づいて一部のデータ型のマッピングが調整されています。
- StarRocks の
DATEは Spark のDataTypes.DateType(元はDataTypes.StringType)にマッピングされます。 - StarRocks の
DATETIMEは Spark のDataTypes.TimestampType(元はDataTypes.StringType)にマッピングされます。
- StarRocks の
例
次の例では、StarRocks クラスターに test という名前のデータベースを作成し、ユーザー root の権限を持っていると仮定します。例のパラメータ設定は Spark コネクタ 1.1.0 に基づいています。
ネットワーク設定
Spark が配置されているマシンが、StarRocks クラスターの FE ノードに http_port(デフォルト: 8030)および query_port(デフォルト: 9030)を介してアクセスでき、BE ノードに be_port(デフォルト: 9060)を介してアクセスできることを確認してください。
データ例
サンプルテーブルを準備するには、次の手順を実行します。
-
testデータベースに移動し、score_boardという名前のテーブルを作成します。MySQL [test]> CREATE TABLE `score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "3"
); -
score_boardテーブルにデータを挿入します。MySQL [test]> INSERT INTO score_board
VALUES
(1, 'Bob', 21),
(2, 'Stan', 21),
(3, 'Sam', 22),
(4, 'Tony', 22),
(5, 'Alice', 22),
(6, 'Lucy', 23),
(7, 'Polly', 23),
(8, 'Tom', 23),
(9, 'Rose', 24),
(10, 'Jerry', 24),
(11, 'Jason', 24),
(12, 'Lily', 25),
(13, 'Stephen', 25),
(14, 'David', 25),
(15, 'Eddie', 26),
(16, 'Kate', 27),
(17, 'Cathy', 27),
(18, 'Judy', 27),
(19, 'Julia', 28),
(20, 'Robert', 28),
(21, 'Jack', 29); -
score_boardテーブルをクエリします。MySQL [test]> SELECT * FROM score_board;
+------+---------+-------+
| id | name | score |
+------+---------+-------+
| 1 | Bob | 21 |
| 2 | Stan | 21 |
| 3 | Sam | 22 |
| 4 | Tony | 22 |
| 5 | Alice | 22 |
| 6 | Lucy | 23 |
| 7 | Polly | 23 |
| 8 | Tom | 23 |
| 9 | Rose | 24 |
| 10 | Jerry | 24 |
| 11 | Jason | 24 |
| 12 | Lily | 25 |
| 13 | Stephen | 25 |
| 14 | David | 25 |
| 15 | Eddie | 26 |
| 16 | Kate | 27 |
| 17 | Cathy | 27 |
| 18 | Judy | 27 |
| 19 | Julia | 28 |
| 20 | Robert | 28 |
| 21 | Jack | 29 |
+------+---------+-------+
21 rows in set (0.01 sec)