From bd70fafc29f579ab076be98b996ba6076852cd33 Mon Sep 17 00:00:00 2001 From: Meggielqk <126552073+Meggielqk@users.noreply.github.com> Date: Mon, 11 Aug 2025 17:29:59 +0800 Subject: [PATCH 1/4] update(snowflake): Separate Streaming connector and sink --- en_US/data-integration/snowflake.md | 169 +++++++++++++++++----------- 1 file changed, 103 insertions(+), 66 deletions(-) diff --git a/en_US/data-integration/snowflake.md b/en_US/data-integration/snowflake.md index 0a14c5e16..c5f8f0433 100644 --- a/en_US/data-integration/snowflake.md +++ b/en_US/data-integration/snowflake.md @@ -366,14 +366,9 @@ This includes: ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe; ``` -## Create a Connector +## Create a Snowflake Connector for Aggregated Mode -Before configuring the Snowflake Sink, you must create a connector in EMQX to establish the connection with your Snowflake environment. The connector supports two modes of communication: - -- **Aggregated mode**: Uses ODBC (via DSN) to connect through a stage. -- **Streaming mode**: Uses HTTPS and the Snowpipe Streaming REST API (AWS-only). - -> The required fields in the form depend on which upload mode (`aggregated` or `streaming`) you plan to use in the Sink. +If you plan to use the aggregated upload mode in your Snowflake Sink, you need to create a Snowflake Connector to establish the connection with your Snowflake environment. This connector uses ODBC (via DSN) to connect through a stage. 1. Go to the Dashboard **Integration** -> **Connector** page. @@ -385,14 +380,10 @@ Before configuring the Snowflake Sink, you must create a connector in EMQX to es 5. Enter the connection information. - :::: tabs - - ::: tab Aggregated Mode (ODBC-based) + - **Server Host**: The server host is the Snowflake endpoint URL, typically in the format `-.snowflakecomputing.com`. You need to replace `-` with the subdomain specific to your Snowflake instance. - **Account**: Enter your Snowflake Organization ID and Snowflake account name separated by a dash (`-`), which is part of the URL you use to access the Snowflake platform and can be found in your Snowflake console. - - **Server Host**: The server host is the Snowflake endpoint URL, typically in the format `-.snowflakecomputing.com`. You need to replace `-` with the subdomain specific to your Snowflake instance. - - **Data Source Name (DSN)**: Enter `snowflake`, which corresponds to the DSN configured in the `.odbc.ini` file during ODBC driver setup. - **Username**: Enter `snowpipeuser`, as defined during the previous setup process. @@ -400,45 +391,27 @@ Before configuring the Snowflake Sink, you must create a connector in EMQX to es - **Password**: The password for authenticating with Snowflake via ODBC using username/password authentication. This field is optional: - You may enter the password here, e.g., `Snowpipeuser99`, as defined during the previous setup process; - + - Or configure it in `/etc/odbc.ini`; - If using key-pair authentication instead, leave this field blank. - + ::: tip Use either Password or Private Key for authentication, not both. If neither is configured here, ensure the appropriate credentials are set in `/etc/odbc.ini`. ::: - + - **Private Key Path**: The absolute file path to the private RSA key used for authenticating with Snowflake via ODBC. This path must be consistent across all nodes of the cluster. For example: `/etc/emqx/certs/snowflake_rsa_key.private.pem`. - - - **Private Key Password**: The password used to decrypt the private RSA key file, if the key is encrypted. Leave this field blank if the key was generated without encryption (i.e., with the `-nocrypt` option in OpenSSL). - - - **Proxy**: Configuration settings for connecting to Snowflake through an HTTP proxy server. HTTPS proxies are **not** supported. By default, no proxy is used. To enable proxy support, select the `Enable Proxy` and provide the following: - - - **Proxy Host**: The hostname or IP address of the proxy server. - - **Proxy Port**: The port number used by the proxy server. - - ::: - - ::: tab Streaming Mode (Snowpipe Streaming API) - - **Account**: Enter your Snowflake Organization ID and Snowflake account name separated by a dash (`-`), which is part of the URL you use to access the Snowflake platform and can be found in your Snowflake console. - - **Server Host**: The server host is the Snowflake endpoint URL, typically in the format `-.snowflakecomputing.com`. You need to replace `-` with the subdomain specific to your Snowflake instance. - - **Username**: (Optional) Enter the Snowflake user with a registered RSA public key if you configured it in `odbc.ini` (e.g. `snowpipeuser`). - - **Private Key Path**: The absolute file path to the private RSA key. EMQX uses this key to sign JWT tokens to authenticate itself with the Snowflake API. This path must be consistent across all nodes of the cluster. For example: - `/etc/emqx/certs/snowflake_rsa_key.private.pem`. - **Private Key Password**: The password used to decrypt the private RSA key file, if the key is encrypted. Leave this field blank if the key was generated without encryption (i.e., with the `-nocrypt` option in OpenSSL). + - **Proxy**: Configuration settings for connecting to Snowflake through an HTTP proxy server. HTTPS proxies are **not** supported. By default, no proxy is used. To enable proxy support, select the `Enable Proxy` and provide the following: + - **Proxy Host**: The hostname or IP address of the proxy server. - **Proxy Port**: The port number used by the proxy server. - ::: - - :::: - 6. If you want to establish an encrypted connection, click the **Enable TLS** toggle switch. For more information about TLS connection, see [TLS for External Resource Access](../network/overview.md/#tls-for-external-resource-access). TLS must be enabled for streaming mode, as communication is over HTTPS. 7. Advanced settings (optional): See [Advanced Settings](#advanced-settings). @@ -449,9 +422,9 @@ Before configuring the Snowflake Sink, you must create a connector in EMQX to es You have now completed the connector creation and can proceed to create a rule and Sink to specify how the data will be written into Snowflake. -## Create a Rule with Snowflake Sink +## Create a Rule with Snowflake Sink for Aggregated Mode -This section demonstrates how to create a rule in EMQX to process messages (e.g., from the source MQTT topic `t/#`) and write the processed results to Snowflake through the configured Sink. EMQX supports both streaming and aggregated upload modes. +This section demonstrates how to create a rule in EMQX to process messages (e.g., from the source MQTT topic `t/#`) and write the processed results to Snowflake through the configured Sink using the aggregated upload mode. This method groups the results of multiple rule triggers into a single file (e.g., a CSV file) and uploads it to Snowflake, reducing the number of files and improving write efficiency. 1. Go to the Dashboard **Integration** -> **Rules** page. @@ -475,25 +448,19 @@ This section demonstrates how to create a rule in EMQX to process messages (e.g. ::: ::: tip - + For Snowflake integration, it is important that the selected fields exactly match the number of columns and their names of the table defined in Snowflake, so avoid adding extra fields or selecting from `*`. - + ::: -4. Add an action, select `Snowflake` from the **Action Type** dropdown list, keep the action dropdown as the default `create action` option, or choose a previously created Snowflake action from the action dropdown. Here, create a new Sink and add it to the rule. +4. Add an action, select `Snowflake` from the **Action Type** dropdown list, keep the **Action** dropdown as the default `Create Action` option, or choose a previously created Snowflake action from the action dropdown. Here, create a new Sink and add it to the rule. 5. Enter the Sink's name (for example, `snowflake_sink`) and a brief description. -6. Select the `my-snowflake` connector created earlier from the connector dropdown. You can also click the create button next to the dropdown to quickly create a new connector in the pop-up box. The required configuration parameters can be found in [Create a Connector](#create-a-connector). - -7. Select the **Upload Mode** and configure upload settings: The `Aggregated Upload` is by default selected. Configure the settings based on the selected upload mode. - - :::: tabs - - ::: tab Aggregated Upload +6. Select the `my-snowflake` connector created earlier from the **Connectors** dropdown. You can also click the create button next to the dropdown to quickly create a new connector in the pop-up box. The required configuration parameters can be found in [Create a Snowflake Connector for Aggregated Mode](#create-a-snowflake-connector-for-aggregated-mode). - This method groups the results of multiple rule triggers into a single file (e.g., a CSV file) and uploads it to Snowflake, reducing the number of files and improving write efficiency. +7. Configure the settings for the aggregated upload mode. - **Database Name**: Enter `testdatabase`. This is the Snowflake database that was created for storing EMQX data. - **Schema**: Enter `public`, the schema within the `testdatabase` where the data table is located. @@ -503,45 +470,115 @@ This section demonstrates how to create a rule in EMQX to process messages (e.g. - **Private Key**: The RSA private key used by the pipe user to securely access the Snowflake pipe. You can provide the key in one of two formats: - **Plain Text**: Paste the full PEM-formatted private key content directly as a string. - **File Path**: Specify the path to the private key file, starting with `file://`. The file path must be consistent across all nodes in the cluster and accessible by the EMQX application user. For example, `file:///etc/emqx/certs/snowflake_rsa_key.private.pem`. + - **Private Key Password**: The password used to decrypt the private RSA key file, if the key is encrypted. Leave this field blank if the key was generated without encryption (i.e., with the `-nocrypt` option in OpenSSL). + - **Aggregation Upload Format**: Currently, only `csv` is supported. Data will be staged to Snowflake in comma-separated CSV format. + - **Column Order**: Select the order of the columns from the dropdown list based on your desired arrangement. The generated CSV file will be sorted first by the selected columns, with unselected columns sorted alphabetically afterward. + - **Max Records**: Set the maximum number of records before aggregation is triggered. For example, you can set it to `1000` to upload after collecting 1000 records. When the maximum number of records is reached, the aggregation of a single file will be completed and uploaded, resetting the time interval. + - **Time Interval**: Set the time interval (in seconds) at which aggregation occurs. For example, if set to `60`, data will be uploaded every 60 seconds even if the maximum number of records hasn’t been reached, resetting the maximum number of records. - **Proxy**: Configuration settings for connecting to Snowflake through an HTTP proxy server. HTTPS proxies are **not** supported. By default, no proxy is used. To enable proxy support, select the `Enable Proxy` and provide the following: - **Proxy Host**: The hostname or IP address of the proxy server. - **Proxy Port**: The port number used by the proxy server. - Additional aggregation options: +8. **Fallback Actions (optional)**: If you want to improve reliability in case of message delivery failure, you can define one or more fallback actions. These actions will be triggered if the primary Sink fails to process a message. See [Fallback Actions](./data-bridges.md#fallback-actions) for more details. - - **Aggregation Type**: Currently, only `csv` is supported. Data will be staged to Snowflake in comma-separated CSV format. - - **Column Order**: Select the order of the columns from the dropdown list based on your desired arrangement. The generated CSV file will be sorted first by the selected columns, with unselected columns sorted alphabetically afterward. - - **Max Records**: Set the maximum number of records before aggregation is triggered. For example, you can set it to `1000` to upload after collecting 1000 records. When the maximum number of records is reached, the aggregation of a single file will be completed and uploaded, resetting the time interval. - - **Time Interval**: Set the time interval (in seconds) at which aggregation occurs. For example, if set to `60`, data will be uploaded every 60 seconds even if the maximum number of records hasn’t been reached, resetting the maximum number of records. +9. Expand **Advanced Settings** and configure the advanced setting options as needed (optional). For more details, refer to [Advanced Settings](#advanced-settings). - ::: +10. Use the default values for the remaining settings. Click the **Create** button to complete the Sink creation. After successful creation, the page will return to the rule creation, and the new Sink will be added to the rule actions. - ::: tab Streaming +11. Back on the rule creation page, click the **Create** button to complete the entire rule creation process. - This mode enables real-time ingestion using the Snowpipe Streaming API. +You have now successfully created the rule. You can see the newly created rule on the **Rules** page and the new Snowflake Sink on the **Actions (Sink)** tab. - - **Database Name**: Enter `testdatabase`. This is the Snowflake database that was created for storing EMQX data. +You can also click **Integration** -> **Flow Designer** to view the topology. The topology visually shows how messages under the topic `t/#` are written into the Snowflake after being parsed by the rule `my_rule`. - - **Schema**: Enter `public`, the schema within the `testdatabase` where the data table is located. +## Create a Snowflake Streaming Connector - - **Pipe**: Enter `emqxstreaming`, the name of the Snowflake Streaming pipe created using a SQL statement. The name must match exactly as defined in Snowflake. +If you plan to use the streaming upload mode in your Snowflake Sink, you need to create a Snowflake Streaming Connector to establish the connection with your Snowflake environment. This connector uses HTTPS and the Snowpipe Streaming REST API (AWS-only). - - **Pipe User**: Enter `snowpipeuser`, the Snowflake user with privileges to operate the streaming pipe. +1. Go to the Dashboard **Integration** -> **Connector** page. - - **Private Key**: The RSA private key used by the pipe user to sign JWTs for Snowflake Streaming API authentication. You can provide the key in one of two formats: - - **Plain Text**: Paste the full PEM-formatted private key content directly as a string. - - **File Path**: Specify the path to the private key file, starting with `file://`. The file path must be consistent across all nodes in the cluster and accessible by the EMQX application user. For example, `file:///etc/emqx/certs/snowflake_rsa_key.private.pem`. - - - **Private Key Password**: Optional, only if your key is encrypted. +2. Click the **Create** button in the top right corner. +3. Select **Snowflake Streaming** as the connector type and click next. + +4. Enter the connector name, a combination of upper and lowercase letters and numbers. Here, enter `my-snowflake-streaming`. + +5. Enter the connection information. + + - **Server Host**: The server host is the Snowflake endpoint URL, typically in the format `-.snowflakecomputing.com`. You need to replace `-` with the subdomain specific to your Snowflake instance. + - **Account**: Enter your Snowflake Organization ID and Snowflake account name separated by a dash (`-`), which is part of the URL you use to access the Snowflake platform and can be found in your Snowflake console. + - **Pipe User**: The name of a Snowflake user account that has a role with permissions to operate the target Pipe, for example, `snowpipeuser`. The role must have at least the `OPERATE` and `MONITOR` privileges. + - **Private Key Path**: The absolute file path to the private RSA key. EMQX uses this key to sign JWT tokens to authenticate itself with the Snowflake API. This path must be consistent across all nodes of the cluster. For example: + `/etc/emqx/certs/snowflake_rsa_key.private.pem`. + - **Private Key Password**: The password used to decrypt the private RSA key file, if the key is encrypted. Leave this field blank if the key was generated without encryption (i.e., with the `-nocrypt` option in OpenSSL). - **Proxy**: Configuration settings for connecting to Snowflake through an HTTP proxy server. HTTPS proxies are **not** supported. By default, no proxy is used. To enable proxy support, select the `Enable Proxy` and provide the following: - **Proxy Host**: The hostname or IP address of the proxy server. - **Proxy Port**: The port number used by the proxy server. + +6. If you want to establish an encrypted connection, click the **Enable TLS** toggle switch. For more information about TLS connection, see [TLS for External Resource Access](../network/overview.md/#tls-for-external-resource-access). TLS must be enabled for streaming mode, as communication is over HTTPS. + +7. Advanced settings (optional): See [Advanced Settings](#advanced-settings). + +8. Before clicking **Create**, you can click **Test Connectivity** to test if the connector can connect to the Snowflake. + +9. Click the **Create** button at the bottom to complete the connector creation. + +You have now completed the connector creation and can proceed to create a rule and Sink to specify how the data will be written into Snowflake. + +## Create a Rule with Snowflake Sink for Streaming Mode + +This section demonstrates how to create a rule in EMQX to process messages (e.g., from the source MQTT topic `t/#`) and write the processed results to Snowflake through the configured Sink using the streaming mode. This mode enables real-time ingestion using the Snowpipe Streaming API. + +1. Go to the Dashboard **Integration** -> **Rules** page. + +2. Click the **Create** button in the top right corner. + +3. Enter the rule ID `my_rule`, and input the following rule SQL in the SQL editor: + + ```sql + SELECT + clientid, + unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at, + topic, + payload + FROM + "t/#" + ``` + + ::: tip + + If you are new to SQL, you can click **SQL Examples** and **Enable Debug** to learn and test the rule SQL results. + + ::: + ::: tip + + For Snowflake integration, it is important that the selected fields exactly match the number of columns and their names of the table defined in Snowflake, so avoid adding extra fields or selecting from `*`. ::: + + +4. Add an action, select `Snowflake Streaming` from the **Action Type** dropdown list, keep the **Action** dropdown as the default `Create Action` option, or choose a previously created Snowflake action from the action dropdown. Here, create a new Sink and add it to the rule. + +5. Enter the Sink's name (for example, `snowflake_sink_streaming`) and a brief description. + +6. Select the `my-snowflake-streaming` connector created earlier from the connector dropdown. You can also click the create button next to the dropdown to quickly create a new connector in the pop-up box. The required configuration parameters can be found in [Create a Snowflake Streaming Connector](#create-a-snowflake-streaming-connector). + +7. Configure the settings for the streaming upload mode. + + - **Database Name**: Enter `testdatabase`. This is the Snowflake database that was created for storing EMQX data. + + - **Schema**: Enter `public`, the schema within the `testdatabase` where the data table is located. + + - **Pipe**: Enter `emqxstreaming`, the name of the Snowflake Streaming pipe created using a SQL statement. The name must match exactly as defined in Snowflake. + + - **HTTP Pipelining**: The maximum number of HTTP requests that can be sent without waiting for responses. Default: `100`. + + - **Connect Timeout**: The time limit for establishing a connection to Snowflake before the attempt is aborted. Default: `15` seconds. + + - **Connection Pool Size**: The maximum number of concurrent connections EMQX can maintain to Snowflake for this Sink. Default: `8`. - :::: + - **Max Inactive**: The maximum amount of time an idle connection can remain open before being closed. Default: `10` seconds. 8. **Fallback Actions (optional)**: If you want to improve reliability in case of message delivery failure, you can define one or more fallback actions. These actions will be triggered if the primary Sink fails to process a message. See [Fallback Actions](./data-bridges.md#fallback-actions) for more details. From 02336f35696af7ab25b69985ecb0f364b7cc3c56 Mon Sep 17 00:00:00 2001 From: Meggielqk <126552073+Meggielqk@users.noreply.github.com> Date: Tue, 12 Aug 2025 11:15:08 +0800 Subject: [PATCH 2/4] Update zh file and reorganize sections --- en_US/data-integration/snowflake.md | 148 +++++++++++----------------- zh_CN/data-integration/snowflake.md | 135 +++++++++++++------------ 2 files changed, 128 insertions(+), 155 deletions(-) diff --git a/en_US/data-integration/snowflake.md b/en_US/data-integration/snowflake.md index c5f8f0433..6724c5a16 100644 --- a/en_US/data-integration/snowflake.md +++ b/en_US/data-integration/snowflake.md @@ -422,76 +422,6 @@ If you plan to use the aggregated upload mode in your Snowflake Sink, you need t You have now completed the connector creation and can proceed to create a rule and Sink to specify how the data will be written into Snowflake. -## Create a Rule with Snowflake Sink for Aggregated Mode - -This section demonstrates how to create a rule in EMQX to process messages (e.g., from the source MQTT topic `t/#`) and write the processed results to Snowflake through the configured Sink using the aggregated upload mode. This method groups the results of multiple rule triggers into a single file (e.g., a CSV file) and uploads it to Snowflake, reducing the number of files and improving write efficiency. - -1. Go to the Dashboard **Integration** -> **Rules** page. - -2. Click the **Create** button in the top right corner. - -3. Enter the rule ID `my_rule`, and input the following rule SQL in the SQL editor: - - ```sql - SELECT - clientid, - unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at, - topic, - payload - FROM - "t/#" - ``` - - ::: tip - - If you are new to SQL, you can click **SQL Examples** and **Enable Debug** to learn and test the rule SQL results. - - ::: - ::: tip - - For Snowflake integration, it is important that the selected fields exactly match the number of columns and their names of the table defined in Snowflake, so avoid adding extra fields or selecting from `*`. - - ::: - - -4. Add an action, select `Snowflake` from the **Action Type** dropdown list, keep the **Action** dropdown as the default `Create Action` option, or choose a previously created Snowflake action from the action dropdown. Here, create a new Sink and add it to the rule. - -5. Enter the Sink's name (for example, `snowflake_sink`) and a brief description. - -6. Select the `my-snowflake` connector created earlier from the **Connectors** dropdown. You can also click the create button next to the dropdown to quickly create a new connector in the pop-up box. The required configuration parameters can be found in [Create a Snowflake Connector for Aggregated Mode](#create-a-snowflake-connector-for-aggregated-mode). - -7. Configure the settings for the aggregated upload mode. - - - **Database Name**: Enter `testdatabase`. This is the Snowflake database that was created for storing EMQX data. - - **Schema**: Enter `public`, the schema within the `testdatabase` where the data table is located. - - **Stage**: Enter `emqx`, the stage created in Snowflake for holding the data before loading it into the table. - - **Pipe**: Enter `emqx`, the pipe automating the loading process from the stage to the table. - - **Pipe User**: Enter `snowpipeuser`, the Snowflake user with the appropriate permissions to manage the pipe. - - **Private Key**: The RSA private key used by the pipe user to securely access the Snowflake pipe. You can provide the key in one of two formats: - - **Plain Text**: Paste the full PEM-formatted private key content directly as a string. - - **File Path**: Specify the path to the private key file, starting with `file://`. The file path must be consistent across all nodes in the cluster and accessible by the EMQX application user. For example, `file:///etc/emqx/certs/snowflake_rsa_key.private.pem`. - - **Private Key Password**: The password used to decrypt the private RSA key file, if the key is encrypted. Leave this field blank if the key was generated without encryption (i.e., with the `-nocrypt` option in OpenSSL). - - - **Aggregation Upload Format**: Currently, only `csv` is supported. Data will be staged to Snowflake in comma-separated CSV format. - - **Column Order**: Select the order of the columns from the dropdown list based on your desired arrangement. The generated CSV file will be sorted first by the selected columns, with unselected columns sorted alphabetically afterward. - - **Max Records**: Set the maximum number of records before aggregation is triggered. For example, you can set it to `1000` to upload after collecting 1000 records. When the maximum number of records is reached, the aggregation of a single file will be completed and uploaded, resetting the time interval. - - **Time Interval**: Set the time interval (in seconds) at which aggregation occurs. For example, if set to `60`, data will be uploaded every 60 seconds even if the maximum number of records hasn’t been reached, resetting the maximum number of records. - - **Proxy**: Configuration settings for connecting to Snowflake through an HTTP proxy server. HTTPS proxies are **not** supported. By default, no proxy is used. To enable proxy support, select the `Enable Proxy` and provide the following: - - **Proxy Host**: The hostname or IP address of the proxy server. - - **Proxy Port**: The port number used by the proxy server. - -8. **Fallback Actions (optional)**: If you want to improve reliability in case of message delivery failure, you can define one or more fallback actions. These actions will be triggered if the primary Sink fails to process a message. See [Fallback Actions](./data-bridges.md#fallback-actions) for more details. - -9. Expand **Advanced Settings** and configure the advanced setting options as needed (optional). For more details, refer to [Advanced Settings](#advanced-settings). - -10. Use the default values for the remaining settings. Click the **Create** button to complete the Sink creation. After successful creation, the page will return to the rule creation, and the new Sink will be added to the rule actions. - -11. Back on the rule creation page, click the **Create** button to complete the entire rule creation process. - -You have now successfully created the rule. You can see the newly created rule on the **Rules** page and the new Snowflake Sink on the **Actions (Sink)** tab. - -You can also click **Integration** -> **Flow Designer** to view the topology. The topology visually shows how messages under the topic `t/#` are written into the Snowflake after being parsed by the rule `my_rule`. - ## Create a Snowflake Streaming Connector If you plan to use the streaming upload mode in your Snowflake Sink, you need to create a Snowflake Streaming Connector to establish the connection with your Snowflake environment. This connector uses HTTPS and the Snowpipe Streaming REST API (AWS-only). @@ -526,9 +456,9 @@ If you plan to use the streaming upload mode in your Snowflake Sink, you need to You have now completed the connector creation and can proceed to create a rule and Sink to specify how the data will be written into Snowflake. -## Create a Rule with Snowflake Sink for Streaming Mode +## Create a Rule with Snowflake Sink -This section demonstrates how to create a rule in EMQX to process messages (e.g., from the source MQTT topic `t/#`) and write the processed results to Snowflake through the configured Sink using the streaming mode. This mode enables real-time ingestion using the Snowpipe Streaming API. +This section demonstrates how to create a rule in EMQX to process messages (e.g., from the source MQTT topic `t/#`) and write the processed results to Snowflake through a configured Sink. 1. Go to the Dashboard **Integration** -> **Rules** page. @@ -558,39 +488,73 @@ This section demonstrates how to create a rule in EMQX to process messages (e.g. ::: -4. Add an action, select `Snowflake Streaming` from the **Action Type** dropdown list, keep the **Action** dropdown as the default `Create Action` option, or choose a previously created Snowflake action from the action dropdown. Here, create a new Sink and add it to the rule. +4. Add action to the rule by configuring a Sink. + - If you want to write the rule processing results to Snowflake using the aggregated upload mode, refer to [Add Snowflake Sink with Aggregated Upload Mode](#add-snowflake-sink-with-aggregated-upload-mode). + - If you want to write the rule processing results to Snowflake using the streaming upload mode, refer to [Add Snowflake Sink with Streaming Upload Mode](#add-snowflake-sink-with-aggregated-upload-mode). +5. After the action is added, you will see the newly added Sink appear under the **Action Outputs** section. Click the **Save** button on the **Create Rule** page to complete the entire rule creation process. + +You have now successfully created the rule. You can see the newly created rule on the **Rules** page and the new Snowflake Sink on the **Actions (Sink)** tab. -5. Enter the Sink's name (for example, `snowflake_sink_streaming`) and a brief description. +You can also click **Integration** -> **Flow Designer** to view the topology. The topology visually shows how messages under the topic `t/#` are written into the Snowflake after being parsed by the rule `my_rule`. -6. Select the `my-snowflake-streaming` connector created earlier from the connector dropdown. You can also click the create button next to the dropdown to quickly create a new connector in the pop-up box. The required configuration parameters can be found in [Create a Snowflake Streaming Connector](#create-a-snowflake-streaming-connector). +### Add Snowflake Sink with Aggregated Upload Mode -7. Configure the settings for the streaming upload mode. +This section demonstrates how to add a Sink to the rule to write the processed results to Snowflake using the aggregated upload mode. This mode consolidates the results of multiple rule triggers into a single file (e.g., a CSV file) and uploads it to Snowflake, thereby reducing the number of files and enhancing write efficiency. - - **Database Name**: Enter `testdatabase`. This is the Snowflake database that was created for storing EMQX data. +1. On the **Create Rule** page, click **Add Action** under the **Action Outputs** section to add an action to the rule. + +2. Select `Snowflake` from the **Action Type** dropdown list, keep the **Action** dropdown as the default `Create Action` option, or choose a previously created Snowflake action from the action dropdown. Here, create a new Sink and add it to the rule. + +3. Enter the Sink's name (for example, `snowflake_sink`) and a brief description. + +4. Select the `my-snowflake` connector created earlier from the **Connectors** dropdown. You can also click the create button next to the dropdown to quickly create a new connector in the pop-up box. The required configuration parameters can be found in [Create a Snowflake Connector for Aggregated Mode](#create-a-snowflake-connector-for-aggregated-mode). + +5. Configure the settings for the aggregated upload mode. + + - **Database Name**: Enter `testdatabase`. This is the Snowflake database that was created for storing EMQX data. - **Schema**: Enter `public`, the schema within the `testdatabase` where the data table is located. + - **Stage**: Enter `emqx`, the stage created in Snowflake for holding the data before loading it into the table. + - **Pipe**: Enter `emqx`, the pipe automating the loading process from the stage to the table. + - **Pipe User**: Enter `snowpipeuser`, the Snowflake user with the appropriate permissions to manage the pipe. + - **Private Key**: The RSA private key used by the pipe user to securely access the Snowflake pipe. You can provide the key in one of two formats: + - **Plain Text**: Paste the full PEM-formatted private key content directly as a string. + - **File Path**: Specify the path to the private key file, starting with `file://`. The file path must be consistent across all nodes in the cluster and accessible by the EMQX application user. For example, `file:///etc/emqx/certs/snowflake_rsa_key.private.pem`. + - **Private Key Password**: The password used to decrypt the private RSA key file, if the key is encrypted. Leave this field blank if the key was generated without encryption (i.e., with the `-nocrypt` option in OpenSSL). - - **Pipe**: Enter `emqxstreaming`, the name of the Snowflake Streaming pipe created using a SQL statement. The name must match exactly as defined in Snowflake. + - **Aggregation Upload Format**: Currently, only `csv` is supported. Data will be staged to Snowflake in comma-separated CSV format. + - **Column Order**: Select the order of the columns from the dropdown list based on your desired arrangement. The generated CSV file will be sorted first by the selected columns, with unselected columns sorted alphabetically afterward. + - **Max Records**: Set the maximum number of records before aggregation is triggered. For example, you can set it to `1000` to upload after collecting 1000 records. When the maximum number of records is reached, the aggregation of a single file will be completed and uploaded, resetting the time interval. + - **Time Interval**: Set the time interval (in seconds) at which aggregation occurs. For example, if set to `60`, data will be uploaded every 60 seconds even if the maximum number of records hasn’t been reached, resetting the maximum number of records. + - **Proxy**: Configuration settings for connecting to Snowflake through an HTTP proxy server. HTTPS proxies are **not** supported. By default, no proxy is used. To enable proxy support, select the `Enable Proxy` and provide the following: + - **Proxy Host**: The hostname or IP address of the proxy server. + - **Proxy Port**: The port number used by the proxy server. - - **HTTP Pipelining**: The maximum number of HTTP requests that can be sent without waiting for responses. Default: `100`. - - - **Connect Timeout**: The time limit for establishing a connection to Snowflake before the attempt is aborted. Default: `15` seconds. - - - **Connection Pool Size**: The maximum number of concurrent connections EMQX can maintain to Snowflake for this Sink. Default: `8`. - - - **Max Inactive**: The maximum amount of time an idle connection can remain open before being closed. Default: `10` seconds. - -8. **Fallback Actions (optional)**: If you want to improve reliability in case of message delivery failure, you can define one or more fallback actions. These actions will be triggered if the primary Sink fails to process a message. See [Fallback Actions](./data-bridges.md#fallback-actions) for more details. +6. **Fallback Actions (optional)**: If you want to improve reliability in case of message delivery failure, you can define one or more fallback actions. These actions will be triggered if the primary Sink fails to process a message. See [Fallback Actions](./data-bridges.md#fallback-actions) for more details. -9. Expand **Advanced Settings** and configure the advanced setting options as needed (optional). For more details, refer to [Advanced Settings](#advanced-settings). +7. Expand **Advanced Settings** and configure the advanced setting options as needed (optional). For more details, refer to [Advanced Settings](#advanced-settings). -10. Use the default values for the remaining settings. Click the **Create** button to complete the Sink creation. After successful creation, the page will return to the rule creation, and the new Sink will be added to the rule actions. +8. Click the **Create** button to complete the Sink creation. After successful creation, the page will return to the rule creation, and the new Sink will be added to the rule actions. -11. Back on the rule creation page, click the **Create** button to complete the entire rule creation process. +### Add Snowflake Sink with Streaming Upload Mode -You have now successfully created the rule. You can see the newly created rule on the **Rules** page and the new Snowflake Sink on the **Actions (Sink)** tab. +This section demonstrates how to add a Sink to the rule to write the processed results to Snowflake using the Streaming upload mode. This mode enables real-time ingestion using the Snowpipe Streaming API. -You can also click **Integration** -> **Flow Designer** to view the topology. The topology visually shows how messages under the topic `t/#` are written into the Snowflake after being parsed by the rule `my_rule`. +1. On the **Create Rule** page, click **Add Action** under the **Action Outputs** section to add an action to the rule. +2. Select `Snowflake Streaming` from the **Action Type** dropdown list, keep the **Action** dropdown as the default `Create Action` option, or choose a previously created Snowflake action from the action dropdown. Here, create a new Sink and add it to the rule. +3. Enter the Sink's name (for example, `snowflake_sink_streaming`) and a brief description. +4. Select the `my-snowflake-streaming` connector created earlier from the connector dropdown. You can also click the create button next to the dropdown to quickly create a new connector in the pop-up box. The required configuration parameters can be found in [Create a Snowflake Streaming Connector](#create-a-snowflake-streaming-connector). +5. Configure the settings for the streaming upload mode. + - **Database Name**: Enter `testdatabase`. This is the Snowflake database that was created for storing EMQX data. + - **Schema**: Enter `public`, the schema within the `testdatabase` where the data table is located. + - **Pipe**: Enter `emqxstreaming`, the name of the Snowflake Streaming pipe created using a SQL statement. The name must match exactly as defined in Snowflake. + - **HTTP Pipelining**: The maximum number of HTTP requests that can be sent without waiting for responses. Default: `100`. + - **Connect Timeout**: The time limit for establishing a connection to Snowflake before the attempt is aborted. Default: `15` seconds. + - **Connection Pool Size**: The maximum number of concurrent connections EMQX can maintain to Snowflake for this Sink. Default: `8`. + - **Max Inactive**: The maximum amount of time an idle connection can remain open before being closed. Default: `10` seconds. +6. **Fallback Actions (optional)**: If you want to improve reliability in case of message delivery failure, you can define one or more fallback actions. These actions will be triggered if the primary Sink fails to process a message. See [Fallback Actions](./data-bridges.md#fallback-actions) for more details. +7. Expand **Advanced Settings** and configure the advanced setting options as needed (optional). For more details, refer to [Advanced Settings](#advanced-settings). +8. Click the **Create** button to complete the Sink creation. After successful creation, the page will return to the rule creation, and the new Sink will be added to the rule actions. ## Test the Rule diff --git a/zh_CN/data-integration/snowflake.md b/zh_CN/data-integration/snowflake.md index 620aa1843..905105ab0 100644 --- a/zh_CN/data-integration/snowflake.md +++ b/zh_CN/data-integration/snowflake.md @@ -312,14 +312,9 @@ openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.pub ALTER USER snowpipeuser SET DEFAULT_ROLE = snowpipe; ``` -## 创建连接器 +## 创建 Snowflake 连接器(聚合模式) -在配置 Snowflake Sink 之前,你需要先在 EMQX 中创建一个连接器,以建立与 Snowflake 环境的连接。连接器支持两种通信模式: - -- **聚合模式**:通过 ODBC(使用 DSN)连接到 Snowflake 存储区。 -- **流式模式**:通过 HTTPS 和 Snowpipe Streaming REST API(仅支持 AWS)进行连接。 - -> 表单中所需的字段取决于你在 Sink 中选择的上传模式(`聚合上传` 或 `流式`)。 +如果您计划在 Snowflake Sink 中使用聚合上传模式,则需要创建一个 Snowflake 连接器,以建立与 Snowflake 环境的连接。该连接器通过 ODBC(使用 DSN)连接到 Snowflake 的存储区(Stage)。 1. 进入 Dashboard **集成** -> **连接器**页面。 @@ -331,14 +326,10 @@ openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.pub 5. 输入连接信息: - :::: tabs - - ::: tab 聚合模式(基于 ODBC) + - **服务器地址**:服务器地址为 Snowflake 的端点 URL,通常格式为 `<你的 Snowflake 组织 ID>-<你的 Snowflake 账户名>.snowflakecomputing.com`。您需要用自己 Snowflake 实例的子域替换 `<你的 Snowflake 组织 ID>-<你的 Snowflake 账户名称>`。 - **账户**:输入您的 Snowflake 组织 ID 和账户名,用连字符(`-`)分隔,可以在 Snowflake 控制台中找到该信息,通常也是您访问 Snowflake 平台的 URL 中的一部分。 - - **服务器地址**:服务器地址为 Snowflake 的端点 URL,通常格式为 `<你的 Snowflake 组织 ID>-<你的 Snowflake 账户名>.snowflakecomputing.com`。您需要用自己 Snowflake 实例的子域替换 `<你的 Snowflake 组织 ID>-<你的 Snowflake 账户名称>`。 - - **数据源名称**:输入 `snowflake`,与您在 ODBC 驱动设置中配置的 `.odbc.ini` 文件中的 DSN 名称相对应。 - **用户名**:输入 `snowpipeuser`,这是之前设置过程中定义的用户名。 @@ -346,11 +337,11 @@ openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.pub - **密码**:输入用于通过用户名和密码进行 ODBC 连接认证。此字段为可选项,用户可以选择: - 在此处填写密码,例如: `Snowpipeuser99`,这是之前设置过程中定义的密码。 - + - 或在系统的 `/etc/odbc.ini` 文件中配置; - 如果使用密钥对认证(Key-pair authentication),则无需提供密码。 - + ::: tip 使用密码或私钥进行身份验证,而不是两者兼用。如果此处未配置这两种方式,请确保在 `/etc/odbc.ini` 中设置了适当的凭证。 @@ -362,27 +353,43 @@ openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.pub - **私钥密码**:用于解密 RSA 私钥文件的密码(如果该私钥已加密)。如果私钥是在未加密的情况下生成的(例如使用 OpenSSL 的 `-nocrypt` 选项),则此字段应留空。 - **代理**:用于通过 HTTP 代理服务器连接到 Snowflake 的配置。**不支持** HTTPS 代理。默认情况下不使用代理。若需启用代理支持,请选择`开启代理`并填写以下信息: - + - **代理主机**:代理服务器的主机名或 IP 地址。 - **代理端口**:代理服务器使用的端口号。 + +6. 如果您想建立一个加密连接,单击**启用 TLS** 切换按钮。有关 TLS 连接的更多信息,请参见[启用 TLS 加密访问外部资源](../network/overview.md/#tls-for-external-resource-access)。流式模式必须启用 TLS,因为通信是通过 HTTPS 进行的。 - ::: +7. 高级配置(可选),请参考[高级设置](#高级设置)。 + +8. 在点击**创建**之前,可以点击**测试连接**来测试连接器是否能够连接到 Snowflake。 + +9. 点击页面底部的**创建**按钮,完成连接器创建。 + +现在,您已经成功创建了连接器,可以继续创建规则,以指定如何将数据写入 Snowflake。 + +## 创建 Snowflake Streaming 连接器 + +如果您计划在 Snowflake Sink 中使用流式上传模式,则需要创建一个 Snowflake Streaming 连接器,以建立与 Snowflake 环境的连接。该连接器通过 HTTPS 和 Snowpipe Streaming REST API(仅支持 AWS)进行连接。 + +1. 进入 Dashboard **集成** -> **连接器**页面。 + +2. 点击右上角的**创建**按钮。 + +3. 选择 **Snowflake** 作为连接器类型,然后点击下一步。 + +4. 输入连接器名称,由大小写字母和数字组成。这里输入 `my-snowflake-streaming`。 + +5. 输入连接信息: - ::: tab 流式模式 (Snowpipe Streaming API) - - - **账户**:输入您的 Snowflake 组织 ID 和账户名,用连字符(`-`)分隔,可以在 Snowflake 控制台中找到该信息,通常也是您访问 Snowflake 平台的 URL 中的一部分。 - **服务器地址**:服务器地址为 Snowflake 的端点 URL,通常格式为 `<你的 Snowflake 组织 ID>-<你的 Snowflake 账户名>.snowflakecomputing.com`。您需要用自己 Snowflake 实例的子域替换 `<你的 Snowflake 组织 ID>-<你的 Snowflake 账户名称>`。 + - **账户**:输入您的 Snowflake 组织 ID 和账户名,用连字符(`-`)分隔,可以在 Snowflake 控制台中找到该信息,通常也是您访问 Snowflake 平台的 URL 中的一部分。 - **用户名**:(可选)如果您在 `odbc.ini` 中已配置,在此输入绑定了 RSA 公钥的 Snowflake 用户名(如 `snowpipeuser`)。 - **私钥路径**: RSA 私钥的绝对文件路径。EMQX 使用此密钥签发 JWT 令牌,用于向 Snowflake API 进行身份认证。此路径必须在集群的所有节点上保持一致。例如:`/etc/emqx/certs/snowflake_rsa_key.private.pem`。 - **私钥密码**:用于解密 RSA 私钥文件的密码(如果该私钥已加密)。如果私钥是在未加密的情况下生成的(例如使用 OpenSSL 的 `-nocrypt` 选项),则此字段应留空。 - **代理**:用于通过 HTTP 代理服务器连接到 Snowflake 的配置。**不支持** HTTPS 代理。默认情况下不使用代理。若需启用代理支持,请选择`开启代理`并填写以下信息: - **代理主机**:代理服务器的主机名或 IP 地址。 - **代理端口**:代理服务器使用的端口号。 - - ::: - - :::: - + 6. 如果您想建立一个加密连接,单击**启用 TLS** 切换按钮。有关 TLS 连接的更多信息,请参见[启用 TLS 加密访问外部资源](../network/overview.md/#tls-for-external-resource-access)。流式模式必须启用 TLS,因为通信是通过 HTTPS 进行的。 7. 高级配置(可选),请参考[高级设置](#高级设置)。 @@ -391,11 +398,11 @@ openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.pub 9. 点击页面底部的**创建**按钮,完成连接器创建。 -现在,您已经成功创建了连接器,可以继续创建规则和 Sink,以指定如何将数据写入 Snowflake。 +现在,您已经成功创建了连接器,可以继续创建规则,以指定如何将数据写入 Snowflake。 -## 使用 Snowflake Sink 创建规则 +## 创建 Snowflake Sink 规则 -本节将演示如何在 EMQX 中创建规则以处理消息(例如,来自源 MQTT 主题 `t/#`),并通过配置好的 Sink 将处理结果写入 Snowflake。EMQX 支持流式和聚合两种上传模式。 +本节演示如何在 EMQX 中创建规则,以处理消息(例如,来自源 MQTT 主题 `t/#`),并通过已配置的 Sink 将规则处理结果写入 Snowflake。 1. 进入 Dashboard **集成** -> **规则**页面。 @@ -425,19 +432,28 @@ openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.pub ::: -4. 点击**添加动作**按钮,从**动作类型**下拉列表中选择 `Snowflake`,将动作下拉选项保留为默认的 `创建动作`,或从下拉列表中选择之前创建的 Snowflake 动作。此示例将创建一个新的 Sink 并将其添加到规则中。 +4. 为规则添加动作输出。 + + - 如果您想要使用聚合上传模式将规则处理结果写入 Snowflake,参考[添加使用聚合上传模式的 Snowflake Sink](#添加使用聚合上传模式的-snowflake-sink)。 + - 如果您想要使用流式上传模式将规则处理结果写入 Snowflake,参考[添加使用流式上传模式的 Snowflake Sink](#添加使用流式上传模式的-snowflake-sink)。 + +5. 动作添加完成后,您可以看到新添加的 Sink 出现在**动作输出**栏下。点击**创建规则**页面上的**保存**按钮完成整个规则创建过程。 -5. 输入 Sink 的名称(例如 `snowflake_sink`)和简短描述。 +现在,您已成功创建了规则。您可以在**规则**页面看到新创建的规则,并在**动作 (Sink)** 标签页中查看新创建的 Snowflake Sink。 + +您还可以点击**集成** -> **Flow 设计器**来查看拓扑图,拓扑图可视化显示了主题 `t/#` 下的消息在经过规则 `my_rule` 解析后如何写入 Snowflake。 + +### 添加使用聚合上传模式的 Snowflake Sink -6. 从连接器下拉列表中选择之前创建的 `my-snowflake` 连接器。您也可以点击下拉列表旁的创建按钮,在弹出的对话框中快速创建新的连接器。所需的配置参数请参考[创建连接器](#创建连接器)。 +本节演示了为规则添加一个使用聚合上传模式的 Sink,将规则处理结果写入 Snowflake。该模式会将多次规则触发的结果合并到一个文件(例如 CSV 文件)中,再上传至 Snowflake,从而减少文件数量并提升写入效率。 -7. 选择**上传方式**并配置相关参数。默认选择为 `聚合上传`。根据所选上传模式配置以下选项。 +1. 在**创建规则**页面右侧点击**添加动作**按钮,从**动作类型**下拉列表中选择 `Snowflake`,将**动作**下拉选项保留为默认的`创建动作`,或从下拉列表中选择之前创建的 Snowflake 动作。此示例将创建一个新的 Sink 并将其添加到规则中。 - :::: tabs +2. 输入 Sink 的名称(例如 `snowflake_sink`)和简短描述。 - ::: tab 聚合上传 +3. 从连接器下拉列表中选择之前创建的 `my-snowflake` 连接器。您也可以点击下拉列表旁的创建按钮,在弹出的对话框中快速创建新的连接器。所需的配置参数请参考[创建 Snowflake 连接器(聚合模式)](#创建-snowflake-连接器聚合模式)。 - 此方式将多个规则触发的结果分组到单个文件(例如 CSV 文件)中,并上传到 Snowflake,从而减少文件数量并提高写入效率。 +4. 配置以下 Sink 选项: - **数据库名字**:输入 `testdatabase`,这是为存储 EMQX 数据而创建的 Snowflake 数据库。 - **模式**:输入 `public`,这是 `testdatabase` 中的数据表所在的模式 (Schema) 名称。 @@ -447,49 +463,42 @@ openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.pub - **私钥**:管道用户用于安全访问 Snowflake 管道的 RSA 私钥。您可以通过以下两种方式之一提供该密钥: - **明文内容**:直接粘贴完整的 PEM 格式私钥内容,作为字符串填写。 - **文件路径**:指定私钥文件的路径,路径需以 `file://` 开头。例如:`file:///etc/emqx/certs/snowflake_rsa_key.private.pem`。该路径在集群所有节点上必须保持一致,并确保 EMQX 应用用户具备读取权限。 - - **代理**:用于通过 HTTP 代理服务器连接到 Snowflake 的配置。**不支持** HTTPS 代理。默认情况下不使用代理。若需启用代理支持,请选择`开启代理`并填写以下信息: - - **代理主机**:代理服务器的主机名或 IP 地址。 - - **代理端口**:代理服务器使用的端口号。 - - 配置其他聚合参数: + - **私钥密码**:用于解密 RSA 私钥文件的密码(如果该私钥已加密)。如果私钥是在未加密的情况下生成的(例如使用 OpenSSL 的 `-nocrypt` 选项),则此字段应留空。 - - **聚合类型**:目前仅支持 `csv`。数据将以逗号分隔的 CSV 格式存储到 Snowflake。 + - **聚合上传文件格式**:目前仅支持 `csv`。数据将以逗号分隔的 CSV 格式存储到 Snowflake。 - **列排序**:从下拉列表中选择列的顺序,生成的 CSV 文件将首先按选定的列排序,未选定的列将按字母顺序排序。 - **最大记录数**:设置触发聚合前的最大记录数。例如,您可以设置为 `1000`,在收集 1000 条记录后触发上传。当达到最大记录数时,单个文件的聚合将完成并上传,重置时间间隔。 - **时间间隔**:设置触发聚合的时间间隔(秒)。例如,如果设置为 `60`,即使未达到最大记录数,也将在 60 秒后上传数据,并重置记录数。 - - ::: - - ::: tab 流式 - - 此模式使用 Snowpipe Streaming API 实现实时写入。 - - - **数据库名字**:输入 `testdatabase`,这是为存储 EMQX 数据而创建的 Snowflake 数据库。 - - **模式**:输入 `public`,这是 `testdatabase` 中的数据表所在的模式 (Schema) 名称。 - - **管道**:输入 `emqxstreaming`,该名称需与在 Snowflake 中创建的流式管道名称完全一致。 - - **管道用户**:输入 `snowpipeuser`,该用户需具备操作该流式 Pipe 的权限。 - - **私钥**:管道用户用于签署 JWT,以进行 Snowflake Streaming API 认证的 RSA 私钥。您可以通过以下两种格式之一提供该密钥: - - **明文内容**:直接粘贴完整的 PEM 格式私钥内容作为字符串填写。 - - **文件路径**:指定私钥文件的路径,路径需以 `file://` 开头。例如:`file:///etc/emqx/certs/snowflake_rsa_key.private.pem`。该路径必须在集群的所有节点上一致,并确保 EMQX 应用用户具备读取权限。 - **代理**:用于通过 HTTP 代理服务器连接到 Snowflake 的配置。**不支持** HTTPS 代理。默认情况下不使用代理。若需启用代理支持,请选择`开启代理`并填写以下信息: - **代理主机**:代理服务器的主机名或 IP 地址。 - **代理端口**:代理服务器使用的端口号。 - ::: - - :::: +5. **备选动作(可选)**:如果您希望在消息投递失败时提升系统的可靠性,可以为 Sink 配置一个或多个备选动作。当 Sink 无法成功处理消息时,这些备选动作将被触发。更多信息请参见:[备选动作](./data-bridges.md#备选动作)。 -8. **备选动作(可选)**:如果您希望在消息投递失败时提升系统的可靠性,可以为 Sink 配置一个或多个备选动作。当 Sink 无法成功处理消息时,这些备选动作将被触发。更多信息请参见:[备选动作](./data-bridges.md#备选动作)。 +6. 展开**高级设置**,根据需要配置高级设置选项(可选)。更多详细信息请参考[高级设置](#高级设置)。 -9. 展开**高级设置**,根据需要配置高级设置选项(可选)。更多详细信息请参考[高级设置](#高级设置)。 +7. 点击**添加**按钮完成 Sink 创建。成功创建后,页面将返回到规则创建页面,并将新创建的 Sink 添加到规则动作中。 -10. 其余设置保持默认值,点击**创建**按钮完成 Sink 创建。成功创建后,页面将返回到规则创建页面,并将新创建的 Sink 添加到规则动作中。 +### 添加使用流式上传模式的 Snowflake Sink -11. 返回规则创建页面,点击**创建**按钮完成整个规则创建过程。 +本节演示了为规则添加一个使用流式上传模式的 Sink,将规则处理结果写入 Snowflake。此模式使用 Snowpipe Streaming API 实现实时写入。 -现在,您已成功创建了规则。您可以在**规则**页面看到新创建的规则,并在**动作 (Sink)** 标签页中查看新创建的 Snowflake Sink。 +1. 在**创建规则**页面右侧点击**添加动作**按钮,从**动作类型**下拉列表中选择 `Snowflake-Streaming`,将**动作**下拉选项保留为默认的`创建动作`,或从下拉列表中选择之前创建的 Snowflake 动作。此示例将创建一个新的 Sink 并将其添加到规则中。 +2. 输入 Sink 的名称(例如 `snowflake_sink_streaming`)和简短描述。 +3. 从连接器下拉列表中选择之前创建的 `my-snowflake-streaming` 连接器。您也可以点击下拉列表旁的创建按钮,在弹出的对话框中快速创建新的连接器。所需的配置参数请参考[创建 Snowflake Streaming 连接器](#创建-snowflake-streaming-连接器)。 +4. 配置以下 Sink 选项: -您还可以点击**集成** -> **Flow 设计器**来查看拓扑图,拓扑图可视化显示了主题 `t/#` 下的消息在经过规则 `my_rule` 解析后如何写入 Snowflake。 + - **数据库名字**:输入 `testdatabase`,这是为存储 EMQX 数据而创建的 Snowflake 数据库。 + - **模式**:输入 `public`,这是 `testdatabase` 中的数据表所在的模式 (Schema) 名称。 + - **管道**:输入 `emqxstreaming`,该名称需与在 Snowflake 中创建的流式管道名称完全一致。 + - **HTTP 流水线**:在等待响应之前可以发送的最大 HTTP 请求数。默认值:`100`。 + - **连接超时**:建立与 Snowflake 的连接的最长等待时间,超过此时间将中止连接尝试。默认值:`15` 秒。 + - **连接池大小**:EMQX 为此 Sink 与 Snowflake 保持的最大并发连接数。默认值:`8`。 + - **最大空闲时间**:空闲连接在被关闭前允许保持打开状态的最长时间。默认值:`10` 秒。 + +5. **备选动作(可选)**:如果您希望在消息投递失败时提升系统的可靠性,可以为 Sink 配置一个或多个备选动作。当 Sink 无法成功处理消息时,这些备选动作将被触发。更多信息请参见:[备选动作](./data-bridges.md#备选动作)。 +6. 展开**高级设置**,根据需要配置高级设置选项(可选)。更多详细信息请参考[高级设置](#高级设置)。 +7. 其余设置保持默认值,点击**创建**按钮完成 Sink 创建。成功创建后,页面将返回到规则创建页面,并将新创建的 Sink 添加到规则动作中。 ## 测试规则 From c6c73fa995a57fbbefe640d68ca4c54e2a8641bc Mon Sep 17 00:00:00 2001 From: Meggielqk <126552073+Meggielqk@users.noreply.github.com> Date: Tue, 12 Aug 2025 11:19:58 +0800 Subject: [PATCH 3/4] Update en_US/data-integration/snowflake.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- en_US/data-integration/snowflake.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/en_US/data-integration/snowflake.md b/en_US/data-integration/snowflake.md index 6724c5a16..a3e6f2e8d 100644 --- a/en_US/data-integration/snowflake.md +++ b/en_US/data-integration/snowflake.md @@ -490,7 +490,7 @@ This section demonstrates how to create a rule in EMQX to process messages (e.g. 4. Add action to the rule by configuring a Sink. - If you want to write the rule processing results to Snowflake using the aggregated upload mode, refer to [Add Snowflake Sink with Aggregated Upload Mode](#add-snowflake-sink-with-aggregated-upload-mode). - - If you want to write the rule processing results to Snowflake using the streaming upload mode, refer to [Add Snowflake Sink with Streaming Upload Mode](#add-snowflake-sink-with-aggregated-upload-mode). + - If you want to write the rule processing results to Snowflake using the streaming upload mode, refer to [Add Snowflake Sink with Streaming Upload Mode](#add-snowflake-sink-with-streaming-upload-mode). 5. After the action is added, you will see the newly added Sink appear under the **Action Outputs** section. Click the **Save** button on the **Create Rule** page to complete the entire rule creation process. You have now successfully created the rule. You can see the newly created rule on the **Rules** page and the new Snowflake Sink on the **Actions (Sink)** tab. From 1bbea94495ce8ca3fa8af9e3dfcf92542e5d4c2a Mon Sep 17 00:00:00 2001 From: Meggielqk <126552073+Meggielqk@users.noreply.github.com> Date: Thu, 21 Aug 2025 11:51:26 +0800 Subject: [PATCH 4/4] Update `Private Key Path` for Streaming connector --- en_US/data-integration/snowflake.md | 11 +---------- zh_CN/data-integration/snowflake.md | 2 +- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/en_US/data-integration/snowflake.md b/en_US/data-integration/snowflake.md index a3e6f2e8d..96265741d 100644 --- a/en_US/data-integration/snowflake.md +++ b/en_US/data-integration/snowflake.md @@ -427,31 +427,22 @@ You have now completed the connector creation and can proceed to create a rule a If you plan to use the streaming upload mode in your Snowflake Sink, you need to create a Snowflake Streaming Connector to establish the connection with your Snowflake environment. This connector uses HTTPS and the Snowpipe Streaming REST API (AWS-only). 1. Go to the Dashboard **Integration** -> **Connector** page. - 2. Click the **Create** button in the top right corner. - 3. Select **Snowflake Streaming** as the connector type and click next. - 4. Enter the connector name, a combination of upper and lowercase letters and numbers. Here, enter `my-snowflake-streaming`. - 5. Enter the connection information. - **Server Host**: The server host is the Snowflake endpoint URL, typically in the format `-.snowflakecomputing.com`. You need to replace `-` with the subdomain specific to your Snowflake instance. - **Account**: Enter your Snowflake Organization ID and Snowflake account name separated by a dash (`-`), which is part of the URL you use to access the Snowflake platform and can be found in your Snowflake console. - **Pipe User**: The name of a Snowflake user account that has a role with permissions to operate the target Pipe, for example, `snowpipeuser`. The role must have at least the `OPERATE` and `MONITOR` privileges. - - **Private Key Path**: The absolute file path to the private RSA key. EMQX uses this key to sign JWT tokens to authenticate itself with the Snowflake API. This path must be consistent across all nodes of the cluster. For example: - `/etc/emqx/certs/snowflake_rsa_key.private.pem`. + - **Private Key Path**: The RSA private key used by EMQX to sign JWT tokens to authenticate itself with the Snowflake API. You can paste the full PEM-formatted private key content directly as a string or specify the path to the private key file, starting with `file://`. For example: `file:///etc/emqx/certs/snowflake_rsa_key.private.pem`. - **Private Key Password**: The password used to decrypt the private RSA key file, if the key is encrypted. Leave this field blank if the key was generated without encryption (i.e., with the `-nocrypt` option in OpenSSL). - **Proxy**: Configuration settings for connecting to Snowflake through an HTTP proxy server. HTTPS proxies are **not** supported. By default, no proxy is used. To enable proxy support, select the `Enable Proxy` and provide the following: - **Proxy Host**: The hostname or IP address of the proxy server. - **Proxy Port**: The port number used by the proxy server. - 6. If you want to establish an encrypted connection, click the **Enable TLS** toggle switch. For more information about TLS connection, see [TLS for External Resource Access](../network/overview.md/#tls-for-external-resource-access). TLS must be enabled for streaming mode, as communication is over HTTPS. - 7. Advanced settings (optional): See [Advanced Settings](#advanced-settings). - 8. Before clicking **Create**, you can click **Test Connectivity** to test if the connector can connect to the Snowflake. - 9. Click the **Create** button at the bottom to complete the connector creation. You have now completed the connector creation and can proceed to create a rule and Sink to specify how the data will be written into Snowflake. diff --git a/zh_CN/data-integration/snowflake.md b/zh_CN/data-integration/snowflake.md index 905105ab0..df50500c1 100644 --- a/zh_CN/data-integration/snowflake.md +++ b/zh_CN/data-integration/snowflake.md @@ -384,7 +384,7 @@ openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.pub - **服务器地址**:服务器地址为 Snowflake 的端点 URL,通常格式为 `<你的 Snowflake 组织 ID>-<你的 Snowflake 账户名>.snowflakecomputing.com`。您需要用自己 Snowflake 实例的子域替换 `<你的 Snowflake 组织 ID>-<你的 Snowflake 账户名称>`。 - **账户**:输入您的 Snowflake 组织 ID 和账户名,用连字符(`-`)分隔,可以在 Snowflake 控制台中找到该信息,通常也是您访问 Snowflake 平台的 URL 中的一部分。 - **用户名**:(可选)如果您在 `odbc.ini` 中已配置,在此输入绑定了 RSA 公钥的 Snowflake 用户名(如 `snowpipeuser`)。 - - **私钥路径**: RSA 私钥的绝对文件路径。EMQX 使用此密钥签发 JWT 令牌,用于向 Snowflake API 进行身份认证。此路径必须在集群的所有节点上保持一致。例如:`/etc/emqx/certs/snowflake_rsa_key.private.pem`。 + - **私钥路径**: EMQX 使用此 RSA 私钥签发 JWT 令牌,用于向 Snowflake API 进行身份认证。您可以直接将完整的 PEM 格式私钥内容粘贴为字符串,或指定私钥文件的路径,路径需以 `file://` 开头,例如:`/etc/emqx/certs/snowflake_rsa_key.private.pem`。 - **私钥密码**:用于解密 RSA 私钥文件的密码(如果该私钥已加密)。如果私钥是在未加密的情况下生成的(例如使用 OpenSSL 的 `-nocrypt` 选项),则此字段应留空。 - **代理**:用于通过 HTTP 代理服务器连接到 Snowflake 的配置。**不支持** HTTPS 代理。默认情况下不使用代理。若需启用代理支持,请选择`开启代理`并填写以下信息: - **代理主机**:代理服务器的主机名或 IP 地址。