sql_raw
component_type_dropdown::[]
Executes an arbitrary SQL query for each message.
Introduced in version 3.65.0.
-
Common
-
Advanced
# Common configuration fields, showing default values
output:
label: ""
sql_raw:
driver: "" # No default (required)
dsn: "clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60" # No default (required)
query: INSERT INTO mytable (column1, column2, column3) VALUES (?, ?, ?); # No default (optional)
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
queries: [] # No default (optional)
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""
# All configuration fields, showing default values
output:
label: ""
sql_raw:
driver: "" # No default (required)
dsn: "clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60" # No default (required)
query: INSERT INTO mytable (column1, column2, column3) VALUES (?, ?, ?); # No default (optional)
unsafe_dynamic_query: false
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
queries: [] # No default (optional)
max_in_flight: 64
init_files: [] # No default (optional)
init_statement: | # No default (optional)
CREATE TABLE IF NOT EXISTS some_table (
column1 varchar(50) not null,
column2 integer,
column3 varchar(50),
primary key (column1)
) WITHOUT ROWID;
conn_max_idle_time: "" # No default (optional)
conn_max_life_time: "" # No default (optional)
conn_max_idle: 2
conn_max_open: 0 # No default (optional)
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
For some scenarios where you might use this output, see Examples.
Fields
driver
A database driver to use.
Type: string
Options:
clickhouse
, gocosmos
, mysql
, mssql
, oracle
, postgres
, snowflake
, spanner
, sqlite
, trino
dsn
A Data Source Name to identify the target database.
Drivers
The following is a list of supported drivers, their placeholder style, and their respective DSN formats:
Driver | Data Source Name Format |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Type: string
# Examples
dsn: "clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60"
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
dsn: oracle://foouser:foopass@localhost:1521/service_name
query
The query to execute.
You must include the correct placeholders for the specified database driver. Some drivers use question marks (?
), whereas others expect incrementing dollar signs ($1
, $2
, and so on) or colons (:1
, :2
, and so on).
Driver | Placeholder Style |
---|---|
|
Dollar sign ( |
|
Colon ( |
|
Question mark ( |
|
Question mark ( |
|
Colon ( |
|
Dollar sign ( |
|
Question mark ( |
|
Question mark ( |
|
Question mark ( |
|
Question mark ( |
Type: string
# Examples
query: INSERT INTO mytable (column1, column2, column3) VALUES (?, ?, ?); # No default (optional)
unsafe_dynamic_query
Whether to enable interpolation functions in the query. Great care should be made to ensure your queries are defended against injection attacks.
Type: bool
Default: false
args_mapping
An optional Bloblang mapping that includes the same number of values in an array as the placeholder arguments in the query
field.
Type: bloblang
# Examples
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ meta("user.id") ]
queries
A list of database statements to run in addition to your main query
. If you specify multiple queries, they are executed within a single transaction. For more information, see Examples.
Type: array
queries[].query
The query to execute.
You must include the correct placeholders for the specified database driver. Some drivers use question marks (?
), whereas others expect incrementing dollar signs ($1
, $2
, and so on) or colons (:1
, :2
, and so on).
Driver | Placeholder Style |
---|---|
|
Dollar sign ( |
|
Colon ( |
|
Question mark ( |
|
Question mark ( |
|
Colon ( |
|
Dollar sign ( |
|
Question mark ( |
|
Question mark ( |
|
Question mark ( |
|
Question mark ( |
Type: string
queries[].args_mapping
An optional Bloblang mapping that includes the same number of values in an array as the placeholder arguments in the query
field.
Type: bloblang
# Examples
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ meta("user.id") ]
max_in_flight
The maximum number of database statements to execute in parallel.
Type: int
Default: 64
init_files
An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).
Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both init_statement
and init_files
are specified the init_statement
is executed after the init_files
.
If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.
Type: array
Requires version 4.10.0 or newer
# Examples
init_files:
- ./init/*.sql
init_files:
- ./foo.sql
- ./bar.sql
init_statement
An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.
If both init_statement
and init_files
are specified the init_statement
is executed after the init_files
.
If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.
Type: string
Requires version 4.10.0 or newer
# Examples
init_statement: |2
CREATE TABLE IF NOT EXISTS some_table (
foo varchar(50) not null,
bar integer,
baz varchar(50),
primary key (foo)
) WITHOUT ROWID;
conn_max_idle_time
An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value ⇐ 0
, connections are not closed due to a connections idle time.
Type: string
conn_max_life_time
An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value ⇐ 0
, connections are not closed due to a connections age.
Type: string
conn_max_idle
An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value ⇐ 0
, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.
Type: int
Default: 2
conn_max_open
An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value ⇐ 0
, then there is no limit on the number of open connections. The default is 0 (unlimited).
Type: int
batching
Allows you to configure a batching policy.
Type: object
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.count
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples
period: 1s
period: 1m
period: 500ms
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples
check: this.type == "end_of_transaction"
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
Examples
-
Table insert (MySQL)
-
Create tables dynamically (PostgreSQL)
Insert rows into a database by populating the column ID, name, and topic with values extracted from messages and metadata:
output:
sql_raw:
driver: mysql
dsn: user:password@tcp(localhost:3306)/foodb
query: INSERT INTO mytable (id, name, topic) VALUES (?, ?, ?); # No default (optional)
args_mapping: |
root = [
this.user.id,
this.user.name,
meta("kafka_topic"),
]
Execute multiple queries in a single transaction to create output tables and insert a record into each one.
output:
processors:
- mapping: |
root = this
# Prevent SQL injection when using unsafe_dynamic_query
meta table_name = "\"" + metadata("table_name").replace_all("\"", "\"\"") + "\""
sql_raw:
driver: postgres
dsn: postgres://localhost/postgres
unsafe_dynamic_query: true
queries:
- query: |
CREATE TABLE IF NOT EXISTS ${!metadata("table_name")} (id varchar primary key, document jsonb);
- query: |
INSERT INTO ${!metadata("table_name")} (id, document) VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document;
args_mapping: |
root = [ this.id, this.document.string() ]