Rate limitation refers to restricting the number of times a specific action can be performed within a certain time frame. For example, an API might have rate limitations restricting user or app requests within a given period. This helps prevent server overload, ensures fair usage, and maintains system stability and security.
Rate limitation is also a challenge for the apps that encounter it, as it requires to “slow down” or pause. Here’s a typical scenario:
To effectively operate within rate limitations, apps often incorporate strategies like:
A queue serves as an excellent “sidekick” or tool for helping services manage rate limitations due to its ability to handle tasks systematically. However, while it offers significant benefits, it’s not a standalone solution for this purpose.
In constructing a robust architecture, the service or app used to interact with an external API subject to rate limitations often handles tasks asynchronously. This service is typically initiated by tasks derived from a queue. When the service encounters a rate limit, it can easily return the job to the main queue, or assign it to a separate queue designated for delayed tasks, and revisit it after a specific waiting period, say X seconds.
This reliance on a queue system is highly advantageous, primarily because of its temporary nature and ordering. However, the queue alone cannot fully address rate limitations; it requires additional features or help from the service itself to effectively handle these constraints.
const amqp = require('amqplib');
const axios = require('axios');
// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {
try {
const response = await axios.get(url);
console.log('API Response:', response.data);
} catch (error) {
console.error('API Error:', error.message);
}
}
// Connect to RabbitMQ server
async function connect() {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'rateLimitedQueue';
channel.assertQueue(queue, { durable: true });
// Consume messages from the queue
channel.consume(queue, async msg => {
const { url, delayInSeconds } = JSON.parse(msg.content.toString());
// Simulating rate limitation
await new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));
await makeAPICall(url); // Make the API call
channel.ack(msg); // Acknowledge message processing completion
});
} catch (error) {
console.error('RabbitMQ Connection Error:', error.message);
}
}
// Function to send a message to the queue
async function addToQueue(url, delayInSeconds) {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'rateLimitedQueue';
channel.assertQueue(queue, { durable: true });
const message = JSON.stringify({ url, delayInSeconds });
channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
console.log('Task added to the queue');
} catch (error) {
console.error('RabbitMQ Error:', error.message);
}
}
// Usage example
addToQueue('https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds
// Start the consumer
connect();
const { Kafka } = require('kafkajs');
const axios = require('axios');
// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {
try {
const response = await axios.get(url);
console.log('API Response:', response.data);
} catch (error) {
console.error('API Error:', error.message);
}
}
// Kafka configuration
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // Replace with your Kafka broker address
});
// Create a Kafka producer
const producer = kafka.producer();
// Connect to Kafka and send messages
async function produceToKafka(topic, message) {
await producer.connect();
await producer.send({
topic,
messages: [{ value: message }],
});
await producer.disconnect();
}
// Create a Kafka consumer
const consumer = kafka.consumer({ groupId: 'my-group' });
// Consume messages from Kafka topic
async function consumeFromKafka(topic) {
await consumer.connect();
await consumer.subscribe({ topic });
await consumer.run({
eachMessage: async ({ message }) => {
const { url, delayInSeconds } = JSON.parse(message.value.toString());
// Simulating rate limitation
await new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));
await makeAPICall(url); // Make the API call
},
});
}
// Usage example - Sending messages to Kafka topic
async function addToKafka(topic, url, delayInSeconds) {
const message = JSON.stringify({ url, delayInSeconds });
await produceToKafka(topic, message);
console.log('Message added to Kafka topic');
}
// Start consuming messages from Kafka topic
const kafkaTopic = 'rateLimitedTopic';
consumeFromKafka(kafkaTopic);
// Usage example - Adding messages to Kafka topic
addToKafka('rateLimitedTopic', 'https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds
Both approaches are legitimate, yet they necessitate your service to incorporate a ‘sleep’ mechanism.
With Memphis, you can offload the delay from the client to the queue using a simple feature made
just for that purpose and called “Delayed Messages”. Delayed messages allow you to send a received message back to the broker when your consumer application requires extra processing time.
What sets apart Memphis’ implementation is the consumer’s capability to control this delay independently and atomically.
Within the station, the count of unconsumed messages doesn’t impact the consumption of delayed messages. For instance, if a 60-second delay is necessary, it precisely configures the invisibility time for that specific message.
maxMsgDeliveries
hasn’t hit its limit, the consumer will activate message.delay(delayInMilliseconds)
, bypassing the message. Instead of immediately reprocessing the same message, the broker will retain it for the specified duration.delayInMilliseconds
has passed, the broker will halt the primary message flow and reintroduce the delayed message into circulation.
const { memphis } = require('memphis-dev');
// Function to make API requests, simulating rate limitations
async function makeAPICall(message)
{
try {
const response = await axios.get(message.getDataAsJson()['url']);
console.log('API Response:', response.data);
message.ack();
} catch (error) {
console.error('API Error:', error.message);
console.log("Delaying message for 1 minute");
message.delay(60000);
}
}
(async function () {
let memphisConnection;
try {
memphisConnection = await memphis.connect({
host: '<broker-hostname>',
username: '<application-type username>',
password: '<password>'
});
const consumer = await memphisConnection.consumer({
stationName: '<station-name>',
consumerName: '<consumer-name>',
consumerGroup: ''
});
consumer.setContext({ key: "value" });
consumer.on('message', (message, context) => {
await makeAPICall(url, message);
});
consumer.on('error', (error) => { });
} catch (ex) {
console.log(ex);
if (memphisConnection) memphisConnection.close();
}
})();
Understanding and adhering to rate limitations is crucial for app developers working with APIs. It involves managing request frequency, handling errors when limits are reached, implementing backoff strategies to prevent overloading the API servers, and utilizing rate limit information provided by the API to optimize app performance, and now you know how to do it with a queue as well!