Shared Pydantic schemas as the basis for Kafka/Avro messages in SQuaRE Roundtable

Abstract

Many SQuaRE applications in Roundtable, notably the Squarebot Slack bot, use Kafka for sharing event-driven messages. Those Kafka messages are encoded in Avro, and those Avro schemas are shared between applications at runtime with the Confluent Schema Registry. This existing system lacks a story for sharing schemas between applications during development. In SQR-075 we described a monorepo architecture for publishing an application’s Pydantic schemas in a Python library that an app’s clients could use. This technote describes how shared Pydantic schemas can also support the development of Kafka consumers.

Background

SQuaRE’s Roundtable Kubernetes cluster for internal observatory services includes a Kafka cluster. One of the applications for this Kafka cluster is passing messages between applications. The advantage of this approach over direct API calls between applications is that message buffering is built into Kafka. In a real-time event driven system, like the Squarebot Slack bot, individual applications don’t need to maintain their own internal queue systems. As well, Kafka topics can be partitioned so that multiple instances of a consumer application can tackle an event queue in parallel.

Another benefit of using Kafka for message passing between services is that schema management is built in. Messages are encoded in Avro, and the corresponding schemas are stored centrally in the Confluent Schema Registry. We created a Python library, Kafkit, that encodes and decodes Avro messages in conjunction with the Schema Registry. With this schema management system, the format of any message is well-known, even if the schema of a messages in a given topic evolves over time. Further, the manner in which a schema can evolve is regulated through the Schema Registry. Specifically, Roundtable applications use “forward compatibility” so that a message sent by a newer producer can still be read by older consumers that rely on the older schema.

This schema information could be used much more powerfully during application development, though. Modern SQuaRE Python applications use type annotations and static type checking to prevent bugs and improve development speed through in-editor documentation. Pydantic is a Python library for creating data models with type annotations that also validate and normalize data during runtime. We make extensive use of Pydantic in our web APIs to declare the schemas of HTTP request and response bodies. It makes sense, then, to also seek to use Pydantic to represent data within our Kafka-based applications. This technote explores this issue, including the specific questions of:

  • How can Pydantic models be transformed into Avro schemas?

  • How can Pydantic models sync to the Confluent Schema Registry with Kafkit?

  • How can Pydantic models be shared between Python-based Kafka producers and consumers?

Converting between Pydantic models and Avro schemas

At present, two Python packages provide the tools to convert between Pydantic models and Avro schemas: Dataclasses Avro Schema and pydantic-avro. Overall Dataclasses Avro Schema appears to be more actively maintained.

As a brief demonstration and comparison of the two packages, the following code is a Pydantic model based on a Slack message, typical of the messages that Squarebot produces. This code sample uses Dataclasses Avro Schema. Note how the base model is dataclasses_avroschema.avrodantic.AvroBaseModel rather than pydantic.BaseModel. AvroBaseModel provides an avro_schema method that generates an Avro schema from the Pydantic model. pydantic-avro works similarly, but with a pydantic_avro.base.AvroBase base class.

demo_dataclasses_avroschema.py
"""Demo Pydantic models based on Squarebot schemas."""

from __future__ import annotations

import json
from enum import Enum
from pathlib import Path
from typing import Optional

from dataclasses_avroschema.avrodantic import AvroBaseModel
from pydantic import Field


class SlackMessageType(str, Enum):
    app_mention = "app_mention"
    message = "message"


class SlackChannelType(str, Enum):
    channel = "channel"  # public channel
    group = "group"  # private channel
    im = "im"  # direct message
    mpim = "mpim"  # multi-persion direct message


class SquarebotMessage(AvroBaseModel):
    """Model for a Slack message produced by Squarebot."""

    type: SlackMessageType = Field(description="The Slack message type.")

    channel: str = Field(
        description=(
            "ID of the channel where the message was sent "
            "(e.g., C0LAN2Q65)."
        )
    )

    channel_type: SlackChannelType = Field(
        description="The type of channel (public, direct im, etc..)"
    )

    user: Optional[str] = Field(
        description="The ID of the user that sent the message (eg U061F7AUR)."
    )

    text: str = Field(description="Content of the message.")

    ts: str = Field(description="Timestamp of the message.")

    event_ts: str = Field(description="When the event was dispatched.")

    class Meta:
        """Metadata for the model."""

        namespace = "squarebot"
        schema_name = "message"


if __name__ == "__main__":
    avro_schema = json.loads(SquarebotMessage.avro_schema())
    p = Path("squarebot_message.dataclasses.avsc")
    p.write_text(json.dumps(avro_schema, indent=2, sort_keys=True))

This is the corresponding Avro schema generated from Dataclasses Avro Schema:

Avro schema generated by dataclasses-avroschema.
{
  "doc": "Model for a Slack message produced by Squarebot.",
  "fields": [
    {
      "name": "type",
      "type": {
        "name": "SlackMessageType",
        "symbols": [
          "app_mention",
          "message"
        ],
        "type": "enum"
      }
    },
    {
      "name": "channel",
      "type": "string"
    },
    {
      "name": "channel_type",
      "type": {
        "name": "SlackChannelType",
        "symbols": [
          "channel",
          "group",
          "im",
          "mpim"
        ],
        "type": "enum"
      }
    },
    {
      "default": null,
      "name": "user",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "text",
      "type": "string"
    },
    {
      "name": "ts",
      "type": "string"
    },
    {
      "name": "event_ts",
      "type": "string"
    }
  ],
  "name": "message",
  "namespace": "squarebot",
  "type": "record"
}

In comparison the equivalent Avro schema generated by pydantic-avro:

Avro schema generated by pydantic-avro.
{
  "fields": [
    {
      "doc": "The Slack message type.",
      "name": "type",
      "type": {
        "name": "SlackMessageType",
        "symbols": [
          "app_mention",
          "message"
        ],
        "type": "enum"
      }
    },
    {
      "doc": "ID of the channel where the message was sent (e.g., C0LAN2Q65).",
      "name": "channel",
      "type": "string"
    },
    {
      "doc": "The type of channel (public, direct im, etc..)",
      "name": "channel_type",
      "type": {
        "name": "SlackChannelType",
        "symbols": [
          "channel",
          "group",
          "im",
          "mpim"
        ],
        "type": "enum"
      }
    },
    {
      "default": null,
      "doc": "The ID of the user that sent the message (eg U061F7AUR).",
      "name": "user",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "doc": "Content of the message.",
      "name": "text",
      "type": "string"
    },
    {
      "doc": "Timestamp of the message.",
      "name": "ts",
      "type": "string"
    },
    {
      "doc": "When the event was dispatched.",
      "name": "event_ts",
      "type": "string"
    }
  ],
  "name": "SquarebotMessage",
  "namespace": "SquarebotMessage",
  "type": "record"
}

Both Avro schemas are equivalent and correct. In particular, both schemas handle the Optional type annotation on the user field (added here for testing), and the Enum types on the channel_type and message_type fields.

The packages differ in their handling of documentation. Dataclasses Avro Schema includes the class’s documentation on the top-level record in the schema, but omit documentation on individual fields. pydantic-avro does the inverse.

Another area where the packages differ is that pydantic-avro makes the schema’s namespace the same as the Python class’s name, whereas Dataclasses Avro Schema omits setting a namespace by default. Both use the class’s name for the schema’s name by default. However, notice how both the namespace and schema_name fields in the Pydantic model Meta class provide control over the Avro schema’s fully-qualified name.

We will adopt Dataclasses Avro Schema since the package is more actively maintained and has a larger feature set, but switching to pydantic-avro would be a simple change if desired.

Kafkit handling of Pydantic models

In this operational model, Kafka message schemas are treated entirely as Pydantic models within an application’s codebase. Only within Kafkit are these Pydantic models converted into Avro schemas, both for storage in the Schema Registry and for encoding/decoding messages. This design ensures that the Avro conversion is a detail that applications do not need to be concerned with.

Analogy to the RecordNameSchemaManager

To integrate Pydantic models into Kafkit, we can follow the pattern of Kafkit’s RecordNameSchemaManager. The RecordNameSchemaManager is designed around the operational concept that a Kafka producer stores all of its schemas as Avro schema (JSON) files within the application package. RecordNameSchemaManager synchronizes those schemas with the Schema Registry following the RecordNameStrategy naming convention where the fully-qualified name of the schema is the subject name in the Schema Registry.[1] Then the producer application can serialize messages with the RecordNameSchemaManager.serialize method, which takes the dataset and fully-qualified name of the schema that the dataset is encoded with.

The PydanticSchemaManager

The model for the RecordNameSchemaManager can be translated into the Pydantic model case with a new class in Kafkit, PydanticSchemaManager. Like its predecessor, PydanticSchemaManager should have a method to register Pydantic schemas with the Schema Registry. It should also have a serialize method, which takes a Pydantic model instance and returns a serialized Avro message (in this case, the dataset and schema arguments are one and the same thanks to how Pydantic model instances identify themselves). Unlike RecordNameSchemaManager, the PydanticSchemaManager can also have a deserialize method, which takes a serialized Avro message and returns a Pydantic model instance. This enables both producing and consuming applications to use Pydantic models for type checking and validation (how Pydantic models are shared between codebases is discussed in Sharing Pydantic models between applications).

Registering Pydantic models

Like the RecordNameSchemaManager, the PydanticSchemaManager should register all of the schemas on application start-up. This ensures that schemas as pre-validated by the Schema Registry (particularly for issues of schema evolution compatibility) and that the schema IDs are cached for later use. As a convenience, the PydanticSchemaManager‘s serialize and deserialize methods could also automatically register or look-up schema IDs if they are not registered or cached.

The simplest design for a registration API is a method that takes a collection of Pydantic model classes:

class PydanticSchemaManager:
    async def register(self, models: Iterable[Type[AvroBaseModel]]) -> None:
        ...

An alternative system where individual AvroBaseModel instances are registered when they are declared with a class decorator would not work because these classes are shared between applications through a model library, where instances of PydanticSchemaManager are not available.

The PydanticSchemaManager follows the RecordNameStrategy naming convention, where the fully-qualified name of the schema is the subject name in the Schema Registry. With Dataclasses Avro Schema, the fully-qualified name of the schema is the namespace and schema_name fields in the Pydantic model Meta class:

class SquarebotMessage(AvroBaseModel):
    class Meta:
        namespace = "squarebot"
        schema_name = "message"

    user: Optional[str]
    channel_type: ChannelType
    channel_id: str
    message_type: MessageType
    message_id: str
    message: str

The RecordNameSchemaManager also has an optional suffix argument on its constructor, which can be used to add a suffix to the subject name. This is useful when deploying an application in testing so that the test schemas don’t interfere with the compatibility of the production schemas. The PydanticSchemaManager can also support this pattern.

Serializing Pydantic datasets

The PydanticSchemaManager‘s serialize method is straightforward to implement. It can use the serialize method of the AvroBaseModel class to serialize the dataset into an Avro message in conjunction’s with Kafkit’s existing function to add the schema ID to the encoded message’s magic byte header:

class PydanticSchemaManager:
    async def serialize(self, model: AvroBaseModel) -> bytes:
        ...

Deserializing into a Pydantic model

The PydanticSchemaManager‘s deserialize method will use the schema ID in the magic byte header to look up the schema in the Schema Registry. Based on the fully-qualified name of the Avro schema, the PydanticSchemaManager will deserialize message into the appropriate Pydantic model class:

class PydanticSchemaManager:
    async def deserialize(self, message: bytes) -> AvroBaseModel:
        ...

Notice that this deserialization method can only guarantee that the returned object is a subclass of AvroBaseModel. The application would need to assert the type of the returned object to properly operate on the message and have a type-safe interface. To simplify type checking in a consumer application, the PydanticSchemaManager should also support a deserialization method that, instead of returning a generic AvroBaseModel, instead triggers a callback function that is associated with the Pydantic model class. In practice, such a routing system would not work on the PydanticSchemaManager itself since a Kafka message consists of information beyond the individual message that would weigh into the routing decision: namely the Kafka topic name, the parsed contents of the message value, and the parsed contents of the message key (both of which would be Pydantic types). A higher-level Kafka consumer class would be a better place to implement routing, and this is discussed in A Kafka consumer class with Pydantic model-based routing.

Sharing Pydantic models between applications

In the model described by this technote, both the producer and consumer applications would use the same Pydantic models. This differs from our previous approach where the producer would retain “orignal” copies of the Avro schemas, and consumers would retrieve those schemas at runtime from the Schema Registry. In SQR-075 we described a vertical monorepo development approach for sharing a library of Pydantic models between applications. The vertical monorepo is a perfect fit for Pydantic-driven Kafka applications, as it is for our Pydantic/FastAPI-based REST web services.

Any Kafka producer application would adopt the vertical monorepo architecture where all Pydantic models are defined in a Python library package hosted in the same repostory as the producer application. This library package is published to PyPI and becomes a dependency of any consumer application.

If the schemas are developed with “forward” compatibility, then the producer application can introduce changes to the changes to the schemas without breaking the consumer applications. Once the consumers update their dependency on the shared library package, they can start making use of new fields in the schema.

Although it is not currently common in SQuaRE application, it is possible that multiple applications might produce messages with a common schema. This this case, the Pydantic model could either be defined in a separate library package that is shared between the producer applications, or a specific producer application could be designated as the owner of that Pydantic model so that the model would published from its repository.

A Kafka consumer class with Pydantic model-based routing

As discussed in Deserializing into a Pydantic model, consumer applications would be easier to develop if the consumer class automatically routed messages to the appropriate callback function based on the Pydantic model types of the message’s key and value. Such a router would prevent consumer applications from having to write a series of isinstance checks to determine how to interact with message data.

This consumer would essentially wrap the Kafka consumer class from existing libraries (like aiokafka.AIOKafkaConsumer) and include a PydanticSchemaManager for deserializing message keys and values. Ideally the consumer should be implemented in a sans-I/O style so that it can be used with any asyncio-based Kafka library, including a mock consumer for testing.

class PydanticAIOKafkaConsumer:
    def __init__(
        self,
        *,
        schema_manager: PydanticSchemaManager,
        consumer: AIOKafkaConsumer,
    ) -> None:
        ...

    async def start(self) -> None:
        """Start the consumer."""
        ...

    async def register_models(self, models: Iterable[Type[AvroBaseModel]]) -> None:
        """Pre-register Pydantic models with the schema manager."""
        ...

    async def add_route(
        self,
        callback: Callable[[str, AvroBaseModel, AvroBaseModel], None],
        topics: Sequence[str],
        key_models: Sequence[Type[AvroBaseModel]],
        value_models: Sequence[Type[AvroBaseModel]],
    ) -> None:
        ...

Routing design

The add_route method takes a callback function, a list of Kafka topics, and a list of Pydantic model classes for the message key and value. This provides a simple selector for router messages to that callback function: if the topic name is in the topics list, the key’s model is in the key_models list, and the value’s model is in the value_models list, then the callback function is called. Multiple callback functions could meet the same criteria; in this case, all matching callback functions would be called in the order they were registered.

The PydanticAIOKafkaConsumer should also have a set of default callbacks that are called when a message doesn’t match any of the registered routes, or when a message can’t be deserialized into any of the registered models to provide configurable error logging.