This is part three of a series of blog posts on building a modern event-driven system using Memphis.dev.
In our last blog post, we introduced a reference implementation for capturing change data capture (CDC) events from a MongoDB database using Debezium Server and Memphis.dev. At the end of the post we noted that MongoDB records are serialized as strings in Debezium CDC messages like so:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
...
}
}
We want to use the Schemaverse functionality of Memphis.dev to check messages against an expected schema. Messages that don’t match the schema are routed to a dead letter station so that they don’t impact downstream consumers. If this all sounds like ancient Greek, don’t worry! We’ll explain the details in our next blog post.
To use functionality like Schemaverse, we need to deserialize the MongoDB records as JSON documents. In this blog post, we describe a modification to our MongoDB CDC pipeline that adds a transformer service to deserialize the MongoDB records to JSON documents.
The previous solution consisted of six components:
In this iteration, we are adding two additional components:
The updated architecture looks like this:
The transformer service uses the Memphis.dev Python SDK. Let’s walk through the transformer implementation. The main() method of our transformer first connects to the Memphis.dev broker. The connection details are grabbed from environmental variables. The host, username, password, input station name, and output station name are passed using environmental variables in accordance with suggestions from the Twelve-Factor App manifesto.
async def main():
try:
print("Waiting on messages...")
memphis = Memphis()
await memphis.connect(host=os.environ[HOST_KEY],
username=os.environ[USERNAME_KEY],
password=os.environ[PASSWORD_KEY])
Once a connection is established, we create consumer and producer objects. In Memphis.dev, consumers, and producers have names. These names appear in the Memphis.dev UI, offering transparency into the system operations.
print("Creating consumer")
consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
consumer_name="transformer",
consumer_group="")
print("Creating producer")
producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
producer_name="transformer")
The consumer API uses the callback function design pattern. When messages are pulled from the broker, the provided function is called with a list of messages as its argument.
print("Creating handler")
msg_handler = create_handler(producer)
print("Setting handler")
consumer.consume(msg_handler)
After setting up the callback, we kick off the asyncio event loop. At this point, the transformer service pauses and waits until messages are available to pull from the broker.
await asyncio.Event().wait()
The create function for the message handler takes a producer object and returns a callback function. Since the callback function only takes a single argument, we use the closure pattern to implicitly pass the producer to the msg_handler function when we create it.
The msg_handler function is passed three arguments when called: a list of messages, an error (if one occurred), and a context consisting of a dictionary. Our handler loops over the messages, calls the transform function on each, sends the messages to the second station using the producer, and acknowledges that the message has been processed. In Memphis.dev, messages are not marked off as delivered until the consumer acknowledges them. This prevents messages from being dropped if an error occurs during processing.
def create_handler(producer):
async def msg_handler(msgs, error, context):
try:
for msg in msgs:
transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
await producer.produce(message=transformed_msg)
await msg.ack()
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
print(e)
return
return msg_handler
Now, we get to the meat of the service: the message transformer function. Message payloads (returned by the get_data() method) are stored as bytearray objects. We use the Python json library to deserialize the messages into a hierarchy of Python collections (list and dict) and primitive types (int, float, str, and None).
def deserialize_mongodb_cdc_event(input_msg):
obj = json.loads(input_msg)
We expect the object to have a payload property with an object as the value. That object then has two properties (“before” and “after”) which are either None or strings containing serialized JSON objects. We use the JSON library again to deserialize and replace the strings with the objects.
if "payload" in obj:
payload = obj["payload"]
if "before" in payload:
before_payload = payload["before"]
if before_payload is not None:
payload["before"] = json.loads(before_payload)
if "after" in payload:
after_payload = payload["after"]
if after_payload is not None:
payload["after"] = json.loads(after_payload)
Lastly, we reserialize the entire JSON record and convert it back into a bytearray for transmission to the broker.
output_s = json.dumps(obj)
output_msg = bytearray(output_s, "utf-8")
return output_msg
Hooray! Our objects now look like so:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},
...
}
}
If you followed the 7 steps in the previous blog post, you only need to run three additional steps. to start the transformer service and verify that its working:
$ docker compose up -d cdc-transformer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cdc-transformer Started 1.3s
$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cleaned-printing-consumer Started 1.3s
When the transformer starts producing messages to Memphis.dev, a second station named “cleaned-todo-cdc-events” will be created. You should see this new station on the Station Overview page in the Memphis.dev UI like so:
The details page for the “cleaned-todo-cdc-events” page should show the transformer attached as a producer, the printing consumer, and the transformed messages:
Congratulations! We’re now ready to tackle validating messages using Schemaverse in our next blog post.
In case you missed parts 1 & 2:
Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev
Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events