Materialize Streaming Sources and Sinks (AsyncAPI)

AsyncAPI 2.6 description of Materialize's event-driven integration surface: Kafka sources (CREATE SOURCE ... FROM KAFKA), Kafka sinks (CREATE SINK ... INTO KAFKA), and HTTP webhook sources (CREATE SOURCE ... FROM WEBHOOK). SUBSCRIBE is documented in the spec as an informational pgwire server only; AsyncAPI 2.6 has no standard binding for the PostgreSQL wire protocol, so it is intentionally not modelled as a channel.

AsyncAPI Specification

materialize-asyncapi.yml Raw ↑
asyncapi: '2.6.0'
id: 'urn:com:materialize:streaming'
info:
  title: Materialize Streaming Sources and Sinks
  version: '1.0.0'
  description: |
    AsyncAPI description of Materialize's streaming integration surface. Materialize
    is an operational data warehouse that ingests events from external message brokers
    and HTTP webhooks (sources) and emits change events back to message brokers
    (sinks). This document models the documented Kafka source, Kafka sink, and
    Webhook source.

    Scope notes:
      * Kafka source/sink and the HTTP webhook source are modelled here because they
        are bindings AsyncAPI 2.6 supports natively (kafka, http).
      * Materialize's SUBSCRIBE statement also streams change rows (mz_timestamp,
        mz_diff, columns) but does so exclusively over the PostgreSQL wire protocol
        (pgwire) on TCP port 6875. AsyncAPI 2.6 has no standard binding for pgwire,
        so SUBSCRIBE is intentionally NOT represented as a channel here. It is
        described as a server entry for discoverability only; consumers should treat
        it as a SQL streaming protocol, not an AsyncAPI-bindable transport.
      * Materialize also supports CDC sources (PostgreSQL, MySQL, SQL Server,
        MongoDB) and a load generator. These are out of scope for this document,
        which focuses on the message-broker and webhook surfaces requested.
    Sourced from https://materialize.com/docs/ (CREATE SOURCE / CREATE SINK /
    SUBSCRIBE / CREATE CONNECTION pages).
  contact:
    name: API Evangelist
    url: https://apievangelist.com
    email: [email protected]
  license:
    name: Materialize Documentation
    url: https://materialize.com/docs/
  x-apis-json:
    aid: materialize:materialize-streaming-asyncapi
    humanURL: https://materialize.com/docs/

defaultContentType: application/json

servers:
  kafka-broker:
    url: '{broker_host}:{broker_port}'
    protocol: kafka-secure
    description: |
      Kafka cluster referenced by a Materialize CREATE CONNECTION ... TO KAFKA
      object. Used by both Kafka sources (CREATE SOURCE ... FROM KAFKA) and
      Kafka sinks (CREATE SINK ... INTO KAFKA). Default broker port is 9092.
      Materialize supports PLAINTEXT, SSL, SASL_PLAINTEXT, and SASL_SSL with
      PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 mechanisms.
    variables:
      broker_host:
        default: kafka.example.com
        description: Kafka broker hostname configured in CREATE CONNECTION TO KAFKA.
      broker_port:
        default: '9092'
        description: Kafka broker port. Materialize documents 9092 as the default.
    security:
      - saslScram: []
      - saslPlain: []

  webhook-https:
    url: '{mz_host}'
    protocol: https
    description: |
      Materialize webhook source endpoint. Each webhook source exposes a public
      POST URL of the shape https://<mz_host>/api/webhook/<database>/<schema>/<src_name>.
      The exact URL is discoverable via the mz_internal.mz_webhook_sources system
      catalog table.
    variables:
      mz_host:
        default: <region>.materialize.cloud
        description: Materialize region host.

  materialize-pgwire:
    url: '{mz_host}:6875'
    protocol: 'postgres-pgwire'
    description: |
      INFORMATIONAL ONLY. Materialize's SUBSCRIBE statement streams change rows
      over the PostgreSQL wire protocol on TCP 6875. AsyncAPI 2.6 has no standard
      binding for pgwire, so no channel is defined against this server. Clients
      must use a PostgreSQL-compatible driver (psql, libpq, pgx, JDBC, etc.) and
      issue a SUBSCRIBE statement. Output columns are mz_timestamp (numeric),
      mz_diff (bigint), optional mz_progressed (bool when PROGRESS is set), and
      the relation columns. See https://materialize.com/docs/sql/subscribe/.
    variables:
      mz_host:
        default: <region>.materialize.cloud
        description: Materialize region host.

channels:

  # ----------------------------------------------------------------------------
  # KAFKA SOURCE: Materialize reads (subscribes) messages from a Kafka topic.
  # From the perspective of this AsyncAPI document, Materialize is the consumer,
  # so the application "receives" from this channel.
  # ----------------------------------------------------------------------------
  kafka/source/{topic}:
    description: |
      Kafka topic consumed by a Materialize Kafka source.

      Syntax (abbreviated):
        CREATE SOURCE <src_name>
          [IN CLUSTER <cluster_name>]
          FROM KAFKA CONNECTION <connection_name> (
            TOPIC '<topic>'
            [, GROUP ID PREFIX '<group_id_prefix>']
            [, START OFFSET (<partition_offset>, ...)]
            [, START TIMESTAMP <timestamp>]
          )
          FORMAT <AVRO|JSON|PROTOBUF|TEXT|BYTES|CSV>
          [ENVELOPE <NONE|UPSERT|DEBEZIUM>]
          [INCLUDE KEY|PARTITION|OFFSET|TIMESTAMP|HEADERS|HEADER '<k>' AS <n> [BYTES]];
    servers:
      - kafka-broker
    parameters:
      topic:
        description: Kafka topic name supplied via the TOPIC option of CREATE SOURCE.
        schema:
          type: string
    bindings:
      kafka:
        bindingVersion: '0.4.0'
    subscribe:
      operationId: receiveFromKafkaSource
      summary: Materialize ingests messages from the configured Kafka topic.
      description: |
        Materialize consumes records from the topic according to the FORMAT and
        ENVELOPE clauses. ENVELOPE NONE treats records as insert-only;
        ENVELOPE UPSERT applies key/value upserts and tombstones; ENVELOPE
        DEBEZIUM decodes Debezium CDC payloads (Avro only) into inserts,
        updates, and deletes.
      bindings:
        kafka:
          groupId:
            type: string
            description: Optional consumer group id prefix (GROUP ID PREFIX).
          bindingVersion: '0.4.0'
      message:
        oneOf:
          - $ref: '#/components/messages/KafkaSourceRecordAvro'
          - $ref: '#/components/messages/KafkaSourceRecordJson'
          - $ref: '#/components/messages/KafkaSourceRecordProtobuf'
          - $ref: '#/components/messages/KafkaSourceRecordText'
          - $ref: '#/components/messages/KafkaSourceRecordBytes'

  # ----------------------------------------------------------------------------
  # KAFKA SINK: Materialize publishes change events to a Kafka topic.
  # ----------------------------------------------------------------------------
  kafka/sink/{topic}:
    description: |
      Kafka topic produced to by a Materialize Kafka sink.

      Syntax (abbreviated):
        CREATE SINK <sink_name>
          [IN CLUSTER <cluster_name>]
          FROM <item_name>
          INTO KAFKA CONNECTION <connection_name> (
            TOPIC '<topic>'
            [, COMPRESSION TYPE = '<none|gzip|snappy|lz4|zstd>']
            [, TOPIC PARTITION COUNT = <n>]
            [, TOPIC REPLICATION FACTOR = <n>]
            [, PARTITION BY = <expr>]
          )
          [KEY (<col>, ...)]
          [HEADERS <column>]
          FORMAT <AVRO|JSON|TEXT|BYTES>
          ENVELOPE <UPSERT|DEBEZIUM>
          [WITH (SNAPSHOT = <true|false>)];

      Materialize also maintains an internal progress topic
      (default: _materialize-progress-{REGION}-{CONNECTION}) for exactly-once
      semantics.
    servers:
      - kafka-broker
    parameters:
      topic:
        description: Kafka topic name supplied via the TOPIC option of CREATE SINK.
        schema:
          type: string
    bindings:
      kafka:
        bindingVersion: '0.4.0'
    publish:
      operationId: publishToKafkaSink
      summary: Materialize emits change events to the configured Kafka topic.
      description: |
        ENVELOPE UPSERT emits inserts/updates as keyed records and deletes as
        null-value tombstones; a KEY clause is required. ENVELOPE DEBEZIUM wraps
        each change in {before, after} fields.
      bindings:
        kafka:
          bindingVersion: '0.4.0'
      message:
        oneOf:
          - $ref: '#/components/messages/KafkaSinkUpsertRecord'
          - $ref: '#/components/messages/KafkaSinkDebeziumRecord'

  # ----------------------------------------------------------------------------
  # WEBHOOK SOURCE: external producers POST events to a Materialize-hosted URL.
  # ----------------------------------------------------------------------------
  'api/webhook/{database}/{schema}/{src_name}':
    description: |
      HTTP endpoint exposed by a Materialize webhook source. External producers
      POST events to this URL; Materialize stores the body (and optionally
      selected headers) as rows.

      Syntax (abbreviated):
        CREATE SOURCE <src_name>
          [IN CLUSTER <cluster_name>]
          FROM WEBHOOK
          BODY FORMAT <TEXT|JSON|JSON ARRAY|BYTES>
          [INCLUDE HEADER '<name>' AS <alias> [BYTES]]
          [INCLUDE HEADERS [( NOT '<name>', ... )]]
          [CHECK ( WITH (HEADERS, BODY [AS <alias>], SECRET <name> [AS <alias>]) <bool_expr> )];

      Documented limits:
        * Max body size: 2 MB (413 on exceed).
        * Max concurrent requests across all webhook sources: 500 (429 on exceed).
        * Duplicate header names: 401 Unauthorized.
        * Invalid TEXT or JSON body: 400 Bad Request.
        * Without a CHECK clause all requests are accepted.
    servers:
      - webhook-https
    parameters:
      database:
        description: Materialize database that owns the source.
        schema:
          type: string
      schema:
        description: Materialize schema that owns the source.
        schema:
          type: string
      src_name:
        description: Name of the webhook source.
        schema:
          type: string
    bindings:
      http:
        bindingVersion: '0.3.0'
    publish:
      operationId: postWebhookEvent
      summary: External producer POSTs an event to a Materialize webhook source.
      description: |
        Materialize accepts POST requests at
        https://<mz_host>/api/webhook/{database}/{schema}/{src_name}. The body is
        parsed according to BODY FORMAT. Optional CHECK clauses can validate
        requests using BODY, HEADERS, and SECRET references (e.g., HMAC SHA-256
        signature verification).
      bindings:
        http:
          type: request
          method: POST
          bindingVersion: '0.3.0'
      message:
        oneOf:
          - $ref: '#/components/messages/WebhookEventJson'
          - $ref: '#/components/messages/WebhookEventJsonArray'
          - $ref: '#/components/messages/WebhookEventText'
          - $ref: '#/components/messages/WebhookEventBytes'

components:

  securitySchemes:
    saslScram:
      type: scramSha256
      description: SASL/SCRAM-SHA-256 (or SCRAM-SHA-512) via CREATE CONNECTION secrets.
    saslPlain:
      type: plain
      description: SASL/PLAIN via CREATE CONNECTION secrets.
    webhookBasic:
      type: httpApiKey
      description: |
        Optional. Materialize webhook sources do not require any built-in auth.
        Producers and Materialize agree on a CHECK clause that may verify a
        Basic auth header, an HMAC signature header, an API key header, etc.
      name: Authorization
      in: header

  messages:

    # ---- Kafka source messages (one per FORMAT) ----
    KafkaSourceRecordAvro:
      name: kafkaSourceAvroRecord
      title: Kafka source record (FORMAT AVRO)
      summary: Avro-encoded record consumed by Materialize via Confluent Schema Registry.
      contentType: application/vnd.apache.avro+binary
      payload:
        type: object
        description: |
          Avro payload. Materialize derives columns and types from the schema
          registered in Confluent Schema Registry under the TopicNameStrategy.
    KafkaSourceRecordJson:
      name: kafkaSourceJsonRecord
      title: Kafka source record (FORMAT JSON)
      summary: JSON-encoded record stored as a single jsonb column named data.
      contentType: application/json
      payload:
        type: object
        description: Free-form JSON. Materialize ingests it as a single jsonb column.
    KafkaSourceRecordProtobuf:
      name: kafkaSourceProtobufRecord
      title: Kafka source record (FORMAT PROTOBUF)
      summary: Protobuf-encoded record decoded via schema registry or inline schema.
      contentType: application/x-protobuf
      payload:
        type: string
        format: binary
    KafkaSourceRecordText:
      name: kafkaSourceTextRecord
      title: Kafka source record (FORMAT TEXT)
      summary: UTF-8 text with newline delimiters.
      contentType: text/plain
      payload:
        type: string
    KafkaSourceRecordBytes:
      name: kafkaSourceBytesRecord
      title: Kafka source record (FORMAT BYTES)
      summary: Raw bytes with no decoding.
      contentType: application/octet-stream
      payload:
        type: string
        format: binary

    # ---- Kafka sink messages ----
    KafkaSinkUpsertRecord:
      name: kafkaSinkUpsertRecord
      title: Kafka sink record (ENVELOPE UPSERT)
      summary: Insert/update keyed record; deletes are emitted as null-value tombstones.
      contentType: application/json
      payload:
        type: object
        description: |
          Row contents in the configured FORMAT (Avro, JSON, Text, or Bytes).
          The Kafka message key is taken from the columns named in the KEY clause.

    KafkaSinkDebeziumRecord:
      name: kafkaSinkDebeziumRecord
      title: Kafka sink record (ENVELOPE DEBEZIUM)
      summary: Debezium-style change event with before/after fields.
      contentType: application/json
      payload:
        type: object
        properties:
          before:
            type: object
            nullable: true
            description: Row state prior to the change, or null for inserts.
          after:
            type: object
            nullable: true
            description: Row state after the change, or null for deletes.

    # ---- Webhook source messages (one per BODY FORMAT) ----
    WebhookEventJson:
      name: webhookEventJson
      title: Webhook event (BODY FORMAT JSON)
      summary: |
        Single JSON object or newline-delimited JSON. Invalid JSON yields 400.
      contentType: application/json
      headers:
        type: object
        additionalProperties:
          type: string
      payload:
        type: object
        description: Free-form JSON payload.

    WebhookEventJsonArray:
      name: webhookEventJsonArray
      title: Webhook event (BODY FORMAT JSON ARRAY)
      summary: JSON array; Materialize expands each element to its own row.
      contentType: application/json
      headers:
        type: object
        additionalProperties:
          type: string
      payload:
        type: array
        items:
          type: object

    WebhookEventText:
      name: webhookEventText
      title: Webhook event (BODY FORMAT TEXT)
      summary: UTF-8 text body. Invalid UTF-8 yields 400.
      contentType: text/plain
      headers:
        type: object
        additionalProperties:
          type: string
      payload:
        type: string

    WebhookEventBytes:
      name: webhookEventBytes
      title: Webhook event (BODY FORMAT BYTES)
      summary: Raw request body stored as bytea without parsing.
      contentType: application/octet-stream
      headers:
        type: object
        additionalProperties:
          type: string
      payload:
        type: string
        format: binary