asyncapi: '2.6.0'
id: 'urn:com:materialize:streaming'
info:
title: Materialize Streaming Sources and Sinks
version: '1.0.0'
description: |
AsyncAPI description of Materialize's streaming integration surface. Materialize
is an operational data warehouse that ingests events from external message brokers
and HTTP webhooks (sources) and emits change events back to message brokers
(sinks). This document models the documented Kafka source, Kafka sink, and
Webhook source.
Scope notes:
* Kafka source/sink and the HTTP webhook source are modelled here because they
are bindings AsyncAPI 2.6 supports natively (kafka, http).
* Materialize's SUBSCRIBE statement also streams change rows (mz_timestamp,
mz_diff, columns) but does so exclusively over the PostgreSQL wire protocol
(pgwire) on TCP port 6875. AsyncAPI 2.6 has no standard binding for pgwire,
so SUBSCRIBE is intentionally NOT represented as a channel here. It is
described as a server entry for discoverability only; consumers should treat
it as a SQL streaming protocol, not an AsyncAPI-bindable transport.
* Materialize also supports CDC sources (PostgreSQL, MySQL, SQL Server,
MongoDB) and a load generator. These are out of scope for this document,
which focuses on the message-broker and webhook surfaces requested.
Sourced from https://materialize.com/docs/ (CREATE SOURCE / CREATE SINK /
SUBSCRIBE / CREATE CONNECTION pages).
contact:
name: API Evangelist
url: https://apievangelist.com
email: [email protected]
license:
name: Materialize Documentation
url: https://materialize.com/docs/
x-apis-json:
aid: materialize:materialize-streaming-asyncapi
humanURL: https://materialize.com/docs/
defaultContentType: application/json
servers:
kafka-broker:
url: '{broker_host}:{broker_port}'
protocol: kafka-secure
description: |
Kafka cluster referenced by a Materialize CREATE CONNECTION ... TO KAFKA
object. Used by both Kafka sources (CREATE SOURCE ... FROM KAFKA) and
Kafka sinks (CREATE SINK ... INTO KAFKA). Default broker port is 9092.
Materialize supports PLAINTEXT, SSL, SASL_PLAINTEXT, and SASL_SSL with
PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 mechanisms.
variables:
broker_host:
default: kafka.example.com
description: Kafka broker hostname configured in CREATE CONNECTION TO KAFKA.
broker_port:
default: '9092'
description: Kafka broker port. Materialize documents 9092 as the default.
security:
- saslScram: []
- saslPlain: []
webhook-https:
url: '{mz_host}'
protocol: https
description: |
Materialize webhook source endpoint. Each webhook source exposes a public
POST URL of the shape https://<mz_host>/api/webhook/<database>/<schema>/<src_name>.
The exact URL is discoverable via the mz_internal.mz_webhook_sources system
catalog table.
variables:
mz_host:
default: <region>.materialize.cloud
description: Materialize region host.
materialize-pgwire:
url: '{mz_host}:6875'
protocol: 'postgres-pgwire'
description: |
INFORMATIONAL ONLY. Materialize's SUBSCRIBE statement streams change rows
over the PostgreSQL wire protocol on TCP 6875. AsyncAPI 2.6 has no standard
binding for pgwire, so no channel is defined against this server. Clients
must use a PostgreSQL-compatible driver (psql, libpq, pgx, JDBC, etc.) and
issue a SUBSCRIBE statement. Output columns are mz_timestamp (numeric),
mz_diff (bigint), optional mz_progressed (bool when PROGRESS is set), and
the relation columns. See https://materialize.com/docs/sql/subscribe/.
variables:
mz_host:
default: <region>.materialize.cloud
description: Materialize region host.
channels:
# ----------------------------------------------------------------------------
# KAFKA SOURCE: Materialize reads (subscribes) messages from a Kafka topic.
# From the perspective of this AsyncAPI document, Materialize is the consumer,
# so the application "receives" from this channel.
# ----------------------------------------------------------------------------
kafka/source/{topic}:
description: |
Kafka topic consumed by a Materialize Kafka source.
Syntax (abbreviated):
CREATE SOURCE <src_name>
[IN CLUSTER <cluster_name>]
FROM KAFKA CONNECTION <connection_name> (
TOPIC '<topic>'
[, GROUP ID PREFIX '<group_id_prefix>']
[, START OFFSET (<partition_offset>, ...)]
[, START TIMESTAMP <timestamp>]
)
FORMAT <AVRO|JSON|PROTOBUF|TEXT|BYTES|CSV>
[ENVELOPE <NONE|UPSERT|DEBEZIUM>]
[INCLUDE KEY|PARTITION|OFFSET|TIMESTAMP|HEADERS|HEADER '<k>' AS <n> [BYTES]];
servers:
- kafka-broker
parameters:
topic:
description: Kafka topic name supplied via the TOPIC option of CREATE SOURCE.
schema:
type: string
bindings:
kafka:
bindingVersion: '0.4.0'
subscribe:
operationId: receiveFromKafkaSource
summary: Materialize ingests messages from the configured Kafka topic.
description: |
Materialize consumes records from the topic according to the FORMAT and
ENVELOPE clauses. ENVELOPE NONE treats records as insert-only;
ENVELOPE UPSERT applies key/value upserts and tombstones; ENVELOPE
DEBEZIUM decodes Debezium CDC payloads (Avro only) into inserts,
updates, and deletes.
bindings:
kafka:
groupId:
type: string
description: Optional consumer group id prefix (GROUP ID PREFIX).
bindingVersion: '0.4.0'
message:
oneOf:
- $ref: '#/components/messages/KafkaSourceRecordAvro'
- $ref: '#/components/messages/KafkaSourceRecordJson'
- $ref: '#/components/messages/KafkaSourceRecordProtobuf'
- $ref: '#/components/messages/KafkaSourceRecordText'
- $ref: '#/components/messages/KafkaSourceRecordBytes'
# ----------------------------------------------------------------------------
# KAFKA SINK: Materialize publishes change events to a Kafka topic.
# ----------------------------------------------------------------------------
kafka/sink/{topic}:
description: |
Kafka topic produced to by a Materialize Kafka sink.
Syntax (abbreviated):
CREATE SINK <sink_name>
[IN CLUSTER <cluster_name>]
FROM <item_name>
INTO KAFKA CONNECTION <connection_name> (
TOPIC '<topic>'
[, COMPRESSION TYPE = '<none|gzip|snappy|lz4|zstd>']
[, TOPIC PARTITION COUNT = <n>]
[, TOPIC REPLICATION FACTOR = <n>]
[, PARTITION BY = <expr>]
)
[KEY (<col>, ...)]
[HEADERS <column>]
FORMAT <AVRO|JSON|TEXT|BYTES>
ENVELOPE <UPSERT|DEBEZIUM>
[WITH (SNAPSHOT = <true|false>)];
Materialize also maintains an internal progress topic
(default: _materialize-progress-{REGION}-{CONNECTION}) for exactly-once
semantics.
servers:
- kafka-broker
parameters:
topic:
description: Kafka topic name supplied via the TOPIC option of CREATE SINK.
schema:
type: string
bindings:
kafka:
bindingVersion: '0.4.0'
publish:
operationId: publishToKafkaSink
summary: Materialize emits change events to the configured Kafka topic.
description: |
ENVELOPE UPSERT emits inserts/updates as keyed records and deletes as
null-value tombstones; a KEY clause is required. ENVELOPE DEBEZIUM wraps
each change in {before, after} fields.
bindings:
kafka:
bindingVersion: '0.4.0'
message:
oneOf:
- $ref: '#/components/messages/KafkaSinkUpsertRecord'
- $ref: '#/components/messages/KafkaSinkDebeziumRecord'
# ----------------------------------------------------------------------------
# WEBHOOK SOURCE: external producers POST events to a Materialize-hosted URL.
# ----------------------------------------------------------------------------
'api/webhook/{database}/{schema}/{src_name}':
description: |
HTTP endpoint exposed by a Materialize webhook source. External producers
POST events to this URL; Materialize stores the body (and optionally
selected headers) as rows.
Syntax (abbreviated):
CREATE SOURCE <src_name>
[IN CLUSTER <cluster_name>]
FROM WEBHOOK
BODY FORMAT <TEXT|JSON|JSON ARRAY|BYTES>
[INCLUDE HEADER '<name>' AS <alias> [BYTES]]
[INCLUDE HEADERS [( NOT '<name>', ... )]]
[CHECK ( WITH (HEADERS, BODY [AS <alias>], SECRET <name> [AS <alias>]) <bool_expr> )];
Documented limits:
* Max body size: 2 MB (413 on exceed).
* Max concurrent requests across all webhook sources: 500 (429 on exceed).
* Duplicate header names: 401 Unauthorized.
* Invalid TEXT or JSON body: 400 Bad Request.
* Without a CHECK clause all requests are accepted.
servers:
- webhook-https
parameters:
database:
description: Materialize database that owns the source.
schema:
type: string
schema:
description: Materialize schema that owns the source.
schema:
type: string
src_name:
description: Name of the webhook source.
schema:
type: string
bindings:
http:
bindingVersion: '0.3.0'
publish:
operationId: postWebhookEvent
summary: External producer POSTs an event to a Materialize webhook source.
description: |
Materialize accepts POST requests at
https://<mz_host>/api/webhook/{database}/{schema}/{src_name}. The body is
parsed according to BODY FORMAT. Optional CHECK clauses can validate
requests using BODY, HEADERS, and SECRET references (e.g., HMAC SHA-256
signature verification).
bindings:
http:
type: request
method: POST
bindingVersion: '0.3.0'
message:
oneOf:
- $ref: '#/components/messages/WebhookEventJson'
- $ref: '#/components/messages/WebhookEventJsonArray'
- $ref: '#/components/messages/WebhookEventText'
- $ref: '#/components/messages/WebhookEventBytes'
components:
securitySchemes:
saslScram:
type: scramSha256
description: SASL/SCRAM-SHA-256 (or SCRAM-SHA-512) via CREATE CONNECTION secrets.
saslPlain:
type: plain
description: SASL/PLAIN via CREATE CONNECTION secrets.
webhookBasic:
type: httpApiKey
description: |
Optional. Materialize webhook sources do not require any built-in auth.
Producers and Materialize agree on a CHECK clause that may verify a
Basic auth header, an HMAC signature header, an API key header, etc.
name: Authorization
in: header
messages:
# ---- Kafka source messages (one per FORMAT) ----
KafkaSourceRecordAvro:
name: kafkaSourceAvroRecord
title: Kafka source record (FORMAT AVRO)
summary: Avro-encoded record consumed by Materialize via Confluent Schema Registry.
contentType: application/vnd.apache.avro+binary
payload:
type: object
description: |
Avro payload. Materialize derives columns and types from the schema
registered in Confluent Schema Registry under the TopicNameStrategy.
KafkaSourceRecordJson:
name: kafkaSourceJsonRecord
title: Kafka source record (FORMAT JSON)
summary: JSON-encoded record stored as a single jsonb column named data.
contentType: application/json
payload:
type: object
description: Free-form JSON. Materialize ingests it as a single jsonb column.
KafkaSourceRecordProtobuf:
name: kafkaSourceProtobufRecord
title: Kafka source record (FORMAT PROTOBUF)
summary: Protobuf-encoded record decoded via schema registry or inline schema.
contentType: application/x-protobuf
payload:
type: string
format: binary
KafkaSourceRecordText:
name: kafkaSourceTextRecord
title: Kafka source record (FORMAT TEXT)
summary: UTF-8 text with newline delimiters.
contentType: text/plain
payload:
type: string
KafkaSourceRecordBytes:
name: kafkaSourceBytesRecord
title: Kafka source record (FORMAT BYTES)
summary: Raw bytes with no decoding.
contentType: application/octet-stream
payload:
type: string
format: binary
# ---- Kafka sink messages ----
KafkaSinkUpsertRecord:
name: kafkaSinkUpsertRecord
title: Kafka sink record (ENVELOPE UPSERT)
summary: Insert/update keyed record; deletes are emitted as null-value tombstones.
contentType: application/json
payload:
type: object
description: |
Row contents in the configured FORMAT (Avro, JSON, Text, or Bytes).
The Kafka message key is taken from the columns named in the KEY clause.
KafkaSinkDebeziumRecord:
name: kafkaSinkDebeziumRecord
title: Kafka sink record (ENVELOPE DEBEZIUM)
summary: Debezium-style change event with before/after fields.
contentType: application/json
payload:
type: object
properties:
before:
type: object
nullable: true
description: Row state prior to the change, or null for inserts.
after:
type: object
nullable: true
description: Row state after the change, or null for deletes.
# ---- Webhook source messages (one per BODY FORMAT) ----
WebhookEventJson:
name: webhookEventJson
title: Webhook event (BODY FORMAT JSON)
summary: |
Single JSON object or newline-delimited JSON. Invalid JSON yields 400.
contentType: application/json
headers:
type: object
additionalProperties:
type: string
payload:
type: object
description: Free-form JSON payload.
WebhookEventJsonArray:
name: webhookEventJsonArray
title: Webhook event (BODY FORMAT JSON ARRAY)
summary: JSON array; Materialize expands each element to its own row.
contentType: application/json
headers:
type: object
additionalProperties:
type: string
payload:
type: array
items:
type: object
WebhookEventText:
name: webhookEventText
title: Webhook event (BODY FORMAT TEXT)
summary: UTF-8 text body. Invalid UTF-8 yields 400.
contentType: text/plain
headers:
type: object
additionalProperties:
type: string
payload:
type: string
WebhookEventBytes:
name: webhookEventBytes
title: Webhook event (BODY FORMAT BYTES)
summary: Raw request body stored as bytea without parsing.
contentType: application/octet-stream
headers:
type: object
additionalProperties:
type: string
payload:
type: string
format: binary