How to never lose a message

Have you ever got a call in the middle of the night saying “Infrastructure looks ok, but some service is not consuming data / messages get redelivered. Please figure it out”

Redelivered messages are also called “Poison messages”.

Poison messages = Messages that cause a consumer to repeatedly require a delivery (possibly due to a consumer failure) such that the message is never processed completely and acknowledged so that it can be stopped being sent again to the same consumer.

Example: Some message on an arbitrary queue pushed/pulled to or by a consumer. That consumer, for some reason, doesn’t succeed in handling it. It can be due to a bug, an unknown schema, resources issue, etc…

In RabbitMQ, for example, quorum queues keep track of the number of unsuccessful delivery attempts and expose it in the “x-delivery-count” header that is included with any redelivered message. It is possible to set a delivery limit for a queue using a policy argument, delivery-limit. When a message has been returned more times than the limit the message will be dropped or dead-lettered (if a DLX is configured).

It is known to any developer that uses a queue / messaging bus that poison messages should be taken care of, and it’s the developer’s responsibility to do so.

In RabbitMQ, the most common, simple solution would be to enable DLX (dead-lettered queue), but it doesn’t end there.

Recovering a poison message is just the first part, and the developer must also understand what causes this behavior and mitigate the issue.


Solutions

 

While there is the classic solution of committing/acknowledging a message as soon as possible, it’s not the best option for use cases requiring ensuring messages are acknowledged only when finished being handled.

Other approaches – 

How to handle unacknowledged messages in RabbitMQ

  1. Turn on DLX
  2. Configure the DLX
  3. Place a routing key
  4. Build a dedicated consumer pointed to the DLX
  5. Consume the unacknowledged messages
  6. Fix the code/events

dead letter exchange cloudAMQP

How to handle unacknowledged messages in Apache Kafka

There is no out-of-the-box redelivery/recovery of such messages.

  1. Ensure there are logs within the code, tracking exceptions, and export to pagerduty/datadog/new relic/etc…
  2. If the retention of a message is too small, it will be gone before the developer gets the chance to debug it. In most cases, the message will not be unique and can the loss, but in other, like transactions / atomic requests, it is. To mitigate this, a wrapper that provides this functionality should be made. A great example of such that provides that ability, and more is Wix’s GreyHound. Definitely worth taking a look.
  3. There are other use cases that utilize cache DB of some kind to persist the message while being processed before being getting committed.

handling unacknowledged messages in Apache Kafka


Unacknowledged messages in Memphis.dev

 

When we started to refine our approach, understand the needs, validate the experience, and craft the value it will bring to our users – three key objectives led our process – 

  1. The entry point of a user to the issue.
  2. Quickly understand the problematic consumer and the root cause of the issue.
  3. Fix code. The developer must debug the fix with a similar message to ensure it works.

three key objectives to Memphis DLS

1 – Define the trigger

In Memphis broker, at the SDK level, we use a parameter called “maxMsgDeliveries.”
Once this threshold is reached, the broker will not repeatedly send the “failed-to-ack” message to the same consumer group.

Defining the trigger to poision message

2 – Notification

  • Memphis broker senses the event of crossing the “maxMsgDeliveries = 2” per station, per consumer group.
  • Persist the time_sent, and payload of the redelivered message to the Memphis file-based store for 3 hours.
  • Mark the message as poisoned by a specific consumer.
  • Create an alert.

Notification mechanisem

Poison message overview in Memphis station

3 – Identify the consumer group which didn’t acknowledge the message

Instead of going around through logs and multiple consumer groups, we wanted to narrow the finding to the minimum, so a simple click on the “real-time tracing” will lead to a graph screen showing the CGs which passed the redelivery threshold.

Identifing the consumer group which didn’t acknowledge the message

4 – Fix and Debug

After the developer understand what went wrong and creates a fix, before pushing the code which will lead to probably more adjustments when new messages will arrive, we have created the “Resend” mechanism which will push the unacknowledged message as many times as needed (Until ACK) to the consumer group that was not able to acknowledge the message in the first place.

“Resend” mechanism schema

The unacknowledged message will be retrieved from Memphis internal DB, ingested into a dedicated station per station, per CG, and only upon request. Next, it will be pushed to the unacknowledged CG – WITHOUT ANY CODE CHANGE, using the same already-configured emit.

That’s it. No need to create a persistency process, cache DB, DLQ, or massy logic. You can be sure no message is lost.

Here is Why You Need a Message Broker

Among the open-source projects my college buddies (and my future co-founders of memphis.dev) and I built, you can find “Makhela”, a Hebrew word for choir.
For the sake of simplicity – We will use “Choir”.

“Choir” was an open-source OSINT (Open-source intelligent) project focused on gathering context-based connections between social profiles using AI models like LDA and topic modeling, written in Python to explain what the world discusses over a specific domain and by high-ranking influencers in that domain and focus on what’s going on at the margins. For proof-of-concept or MVP we used a single data source, fairly easy for integrations – Twitter.

The graph below was the “brain” behind “Choir”. The brain autonomously grows and analyzes new vertexes and edges based on incremental changes in the corpus and fresh ingested data.

Each vertex symbolizes a profile, a persona, and each edge emphasizes (a) who connects to who. (b) Similar color = Similar topic.

 

Makhela graph

Purple = Topic 1
Blue = Topic 2
Yellow = Marginal topic

After a reasonable amount of research, dev time, and a lot of troubleshooting & debug, things started to look good.

Among the issues we needed to solve were:

  • Understand the connection between profiles
  • Build a ranking algorithm for adding more influencers
  • Transform the schema of incoming data to a shape the analysis side knows how to handle
  • Near real-time is crucial – Enrich each tweet with external data
  • Adaptivity to “Twitter” rate limit
  • Each upstream or schema change crashed the analysis functions
  • Sync between collection and analysis, which were two different components
  • Infrastructure
  • Scale

As with any startup or early-stage project, we built “Choir” as MVP, Working solely with “Twitter”, and it looked like this –

Makhela Architecture 1
Makhela collectorThe “Collector” is a monolith, python-written application that collects and refines the data for analysis and visualization in batches and a static timing every couple of hours.

However, as the collected data and its complexity grew, problems started to arise. Each batch processing cycle analysis took hours for no good reason in terms of the capacity of the collected data (Hundreds of Megabytes at most!!). More on the rest of the challenges in the next sections.

Fast forward a few months later, users started to use “Choir”!!!
Not just using, but engaging, paying, and raising feature requests.
Any creator’s dream!

But then it hit us. 

(a) “Twitter” is not the center of the universe, and we need to expand “Choir” to more sources.

(b) Any minor change in the code breaks the entire pipeline.

(c) Monolith is a death sentence to a data-driven app performance-wise.

As with every eager-to-run project that starting to get good traction, fueling that growth and user base is your number 1, 2, and 3 priority,

and the last thing you want to do at this point is to go back and rebuild your framework. You want to continue the momentum.

With that spirit in our mind, we said “Let’s add more data sources and refactor in the future”. A big Mistake indeed.


Challenges in scaling a data-driven application

 

  1. Each new data source requires a different schema transformation
  2. Each schema change causes a chain of reaction downstream to the rest of the stages in the pipeline
  3. Incremental / climbing collection. While you can wait for an entire batch collection to finalize and then save it to the DB, applications often crash. Imagine you’re doing a very slow collection and in the very last record, the collection process crashes.
  4. In a monolith architecture, it’s hard to scale out the specific functions which require more power
  5. Analysis functions often require modifications, upgrades, and algorithms to get better results, which are made by using or requiring different keys from the collectors.

While there is no quick fix, what we can do is build a framework to support such requirements.


Solutions

 

Option 1 – Duplicate the entire existing process to another source, for example, “Facebook”.

Architecture alternative
In addition to duplicating the collector, we needed to –

  • Maintain two different schemas (Nightmare)
  • Entirely different analysis functions. The connections between profiles on Facebook and “Twitter” are different and require different objective relationships.
  • The analyzer should be able to analyze the data in a joined manner, not individually; therefore, any minor change in source X directly affects the analyzer and often crashes it down.
  • Double maintenance

And the list goes on…
As a result, it cant scale.

Option 2 – Here it comes. Using a message broker!

I want to draw a baseline. A message broker is not the solution but a supporting framework or a tool to enable branched, growing data-driven architectures.

What is a message broker?

A message broker is an architectural pattern for message validation, transformation, and routing. It mediates communication among applications[vague], minimizing the mutual awareness that applications should have of each other in order to be able to exchange messages, effectively implementing decoupling.[4]”. Wikipedia.

Firstly, let’s translate it to something we can grasp better.

A message broker is a temporary data store. Why temporary? Because each piece of data within it will be removed after a certain time, defined by the user. Therefore, the pieces of data within the message broker are called “messages.” Each message usually weighs a few bytes to a few megabytes. 

Around the message broker, we can find producers and consumers.

Producer = The “thing” that pushes the messages into the message broker.
Consumer = The “thing” that consumes the messages from the message broker.

“Thing” means system/service/application/IoT/some objective that connects with the message broker and exchanges data.

*Small note* the same service/system/app can act as a producer and consumer at the same time.

Messaging queues derive from the same family, but there is a crucial difference between a broker and a queue.

  1. MQ uses the term publish and subscribe. The MQ itself pushes the data to the consumers and not the other way (consumer pulls data from the broker)
  2. Ordering is promised. Messages will be pushed in the order they receive. Some systems require it.
  3. The ratio between a publisher (producers) and subscribers is 1:1. Having said it, modern versions can achieve it by some features like exchange and more.

Famous message brokers/queues are Apache Kafka, RabbitMQ, Apache Pulsar, and our own Memphis.dev. Kafka use cases span from event streaming to real-time data processing. One might consider using Memphis.dev instead of Kafka due to the ease of deployment and developer friendliness it provides.

Still with me? Awesome!

Thus, let’s understand how using a message broker helped “Choir” to scale.


Instead of doing things like this –

Makhela Architecture 1
By decoupling the app to smaller microservices, and orchestrating the flow using a message broker, it therefore turned into this – 

Message broker architecture

 

Starting from the top-left corner, each piece of data (tweet/post) inserted into the system automatically triggers the entire process and flows between the different stages.

  1. Collection. The three collectors search each new profile added to the community in parallel. If any more data source/social network is needed – it’s been developed on the side, and once ready, start listening for incoming events. Allows infinite scale of sources, ability to work on the specific source without disrupting the others, micro-scaling for better performance of each source individually, and more.
  2. Transformation. Once the collection is complete, results will be pushed to the next stage, “Schema transformation,” where the schema transformation service will transform the events’ schemas into a shape the analysis function can interpret. It enables a “single source of truth” regarding schema management, so in case of upstream change, all is needed to reach out to this service and debug the issue. In a more robust design, it can also integrate with an external schema registry to make maintenance even more effortless.

Schema change

  1. Analysis. Each piece of event is sent to the analysis function transformed, and in a shape the analysis function can interpret. In “Choir” we used different AI models. Scaling it was impossible, so moving to analysis per event definitely helped.
  2. Save. Creates an abstraction between “Choir” and the type of database and the ability to batch several insertions to a single batch instead of request per event.

The main reason behind my writing is to emphasize the importance of implementing a message broker pattern and technology as early as possible to avoid painful refactoring in the future. Message brokers, by default, enable you to build scalable architectures because they remove the tight coupling constraints.

Yes, your roadmap and added features are important, Yes it will take a learning curve, yes it might look like an overkill solution for your stage, but when it comes to a data-driven use case, the need for scale will reveal quickly in performance, agility, feature additions, modifications, and more. Bad design decisions or a lack of proper framework will burn out your resources. It is better to build agile foundations, not necessarily enterprise-grade, before reaching the phase you are overwhelmed by users and feature requests.

To conclude, the entry barrier for a message broker is definitely worth your time.