You don't need Kafka! Use Postgres for message-queues
January 11, 2023
No, it's not a click bait, you can use PostgreSQL (yes, the database) as a message queue, and without any external libraries and plug-ins. Also, you probably don't need Apache Kafka. In this article, I will explain why you probably don't need Kafka and how to use Postgres as a message-queue.
About Apache Kafka
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Apache Kafka is very used for asynchronous communication between microservices. It can handle up to 2 million messages PER SECOND and has a well-structured broker scheme, with topic partitions and data persistence in the disk.
To think 🤔
Do your company really need a robust message broker, that is capable to handle 2 million messages per second? Probably not, unless you work for a giant that has a huge number of active users. Digital Ocean had used MySQL as a message-queue for many years, because this was enough and enabled them to save money and resources. Now, they use RabbitMQ, because they grew too much in the last few years and MySQL wasn't being able to handle the request anymore. You can do as Digital Ocean did, start small and refactor the system when needed. Imagine you are the CTO of an early-stage startup, but that is fastly growing. You have just decided to use a message-queue to help your system to handle the increasing number of requests.
When to use Apache Kafka?
Apache Kafka is an amazing tool, but you have to know where to use it, to not spend too much money with something you might not need. So, some use cases for Kafka usage, are:
- Real-time data processing
- Messaging between big (really big) microservices that have a high throughput and are considered "mission critical"
- Log Aggregation
- Tracking user activities across a distributed systems network
If your system works with little data and hasn't requirements like the ones above, using Kafka will be overkill. In this cases you can use RabbitMQ, but RabbitMQ may also be too much, well, you can use Redis then! But Redis may also bee too much. What about using Postgres as your message-queue system?
What is a message-queue?
A message queue is a software engineering component used for communication between processes or between threads within the same process. Message queues provide an asynchronous communication protocol in which the sender and receiver of messages don't need to interact at the same time - messages are held in queue until the recipient retrieves them
Postgres as a message-queue
As mentioned earlier, relational databases, including PostgreSQL, are used by many companies as a message-queue or pub-sub. But, why? What are the benefits?
- Easy to setup
- Can handle a large number of concurrent connections
- Cheaper
- Doesn't introduce more complexity, because you will be using the same technology for data storage and message-queuing
TLDR; Just use postgres for everything
Implementation
Now, let's go right to the point. How to implement a message-queue using Postgres? There are three ways to do this, using LISTEN/NOTIFY, using a table and combining LISTEN/NOTIFY with a table. For this implementation we are going to use the following technologies:
- Node.js
- Typescript
- Postgres
- pg (Postgres native driver for Node.js)
OBS.: Don't worry, we are going to use the minimum amount of programming language possible to explain the concepts, the focus is on the database and SQL, not on the programming language.
Setup the project
- Enter in the project folder and then type in your terminal:
touch docker-compose.yml
- Open the file and paste the following code:
version: '3'
services:
db:
image: postgres:13
restart: always
container_name: postgres-mq
ports:
- '5439:5432'
environment:
POSTGRES_USER: postgres-mq
POSTGRES_PASSWORD: postgres-mq
POSTGRES_DB: postgres-mq
networks:
user-network:
driver: bridge
proxynet:
name: custom_network
- Now, run the following command:
touch .dockerignore && echo "node_modules" >> .dockerignore # set the .dockerignore file
- Initialize a JavaScript project:
yarn init -y # you can use npm or pnpm as well = )
- Open the
package.json
file and override it with the following content:
{
"name": "postgres-mq",
"version": "1.0.0",
"main": "index.js",
"license": "MIT",
"scripts": {
"lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix",
"start:dev": "tsnd -r tsconfig-paths/register --exit-child --transpile-only --ignore-watch node_modules src/main.ts",
"example:pubsub": "tsnd -r tsconfig-paths/register --exit-child --transpile-only --ignore-watch node_modules src/pubsub-test.ts",
"example:mq": "tsnd -r tsconfig-paths/register --exit-child --transpile-only --ignore-watch node_modules src/mq-test.ts"
},
"dependencies": {
"pg": "^8.8.0"
},
"devDependencies": {
"@types/pg": "^8.6.5",
"@faker-js/faker": "^7.5.0",
"@types/node": "^18.7.3",
"@types/validator": "^13.7.8",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"dotenv": "^16.0.3",
"eslint": "^8.0.1",
"eslint-config-airbnb-base": "^15.0.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-import": "^2.25.2",
"eslint-plugin-prettier": "^4.2.1",
"jest": "28.0.3",
"prettier": "^2.3.2",
"ts-jest": "28.0.1",
"ts-node": "^10.0.0",
"ts-node-dev": "^2.0.0",
"tsconfig-paths": "^4.0.0",
"typescript": "^4.8.4"
}
}
- Install the dependencies:
yarn install
- Create a typescript configuration file:
touch tsconfig.json
- Open the
tsconfig.json
file and add some default configurations:
{
"compilerOptions": {
"outDir": "dist",
"module": "commonjs",
"target": "es2019",
"esModuleInterop": true,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"strict": true,
"strictPropertyInitialization": false,
"allowJs": true,
"sourceMap": true,
"rootDirs": ["./src", "tests"],
"baseUrl": "./",
"paths": {
"@/tests/*": ["./tests/*"],
"@/*": ["./src/*"]
},
},
"include": ["src", "tests", "@types"],
"exclude": ["node_modules"]
}
- Create the
src
folder and then create themain.ts
file:
mkdir src && touch src/main.ts
Now we are ready to start the project!
Using LISTEN/NOTIFY (pub/sub)
LISTEN and NOTIFY are Postgres' functions that allow us to listen to events and notify them. It is a pub/sub mechanism inside the database. Let's see how it works in the diagram below:
The diagram above is a very simplified representation of a payment analysis system. The system has a payment service that receives payments and then sends them to the analysis service. The analysis service is responsible for analyzing the payments and then sending the result to the payments notification service. The process of analyzing a payment may take a long time to be completed, so the notification service needs to know when the payment analysis was completed. The notification service is responsible for sending the payment analysis result to the user.
To make this process work, we need to stablish a communication between the services, and this is where the LISTEN and NOTIFY functions come in. The payment service will send the payment to the analysis service, and then the analysis service will notify the notification service that the payment was analyzed. So, summing up, the notification service will LISTEN to the analysis service and the analysis service will NOTIFY the notification service.
Code
In our main.ts
, let's create a function to connect with the database:
import { Client, Notification } from 'pg';
import { SqlQueries } from './sql-queries';
interface IinitPostgresMQ {
user: string;
host: string;
database: string;
password: string;
port: number;
type?: 'message-queue' | 'pub-sub';
}
/**
* Start the application.
*/
const initPostgresMQ = ({
user,
host,
database,
password,
port,
type,
}: IinitPostgresMQ) => {
const dbClient = new Client({
user,
host,
database,
password,
port,
});
dbClient.connect((err) => {
if (err) {
console.log('Error connecting to database', err);
} else {
console.log('Connected to database');
}
});
if (type === 'message-queue') {
dbClient.query(
`
${SqlQueries.CREATE_MESSAGE_QUEUE_TABLE()}
${SqlQueries.CREATE_MESSAGE_TABLE()}
`,
);
}
return dbClient;
};
The initPostgresMQ
function receives the database connection data and returns a Client
instance. The Client
instance is the connection with the database. Our queries
will be stored in a separated file called sql-queries.ts
. Let's create it:
touch src/sql-queries.ts
And fill it with some useful queries
export class SqlQueries {
public static readonly LISTEN = (queueName: string): string =>
`LISTEN ${queueName};`;
public static readonly NOTIFY = (
queueName: string,
message: Record<string, unknown>,
): string => `NOTIFY ${queueName}, '${JSON.stringify(message)}';`;
public static readonly CREATE_MESSAGE_QUEUE_TABLE = (): string =>
`CREATE TABLE IF NOT EXISTS message_queue (
id SERIAL PRIMARY KEY,
queue_name VARCHAR(255) NOT NULL UNIQUE,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);`;
public static readonly CREATE_MESSAGE_TABLE = (): string =>
`CREATE TABLE IF NOT EXISTS message(
id SERIAL PRIMARY KEY,
msg JSONB NOT NULL,
delay INTEGER NOT NULL DEFAULT 0,
retry INTEGER NOT NULL DEFAULT 0,
consumed BOOLEAN NOT NULL DEFAULT FALSE,
queue_id INTEGER NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
FOREIGN KEY (queue_id) REFERENCES message_queue(id)
);`;
public static readonly CREATE_MESSAGE_QUEUE = (): string =>
`INSERT INTO message_queue (queue_name) VALUES ($1) ON CONFLICT DO NOTHING;`;
public static readonly CREATE_MESSAGE =
(): string => `INSERT INTO message (msg, delay, retry, queue_id) VALUES ($1, $2, $3,
(SELECT id FROM message_queue WHERE queue_name = $4));`;
public static readonly GET_QUEUE_UNCONSUMED_MESSAGES = (): string =>
`SELECT m.id, m.msg, m.delay, m.retry, m.consumed, m.created_at, q.queue_name
FROM message m INNER JOIN message_queue q ON m.queue_id = q.id
WHERE q.queue_name = $1 AND m.consumed = FALSE
FOR UPDATE SKIP LOCKED;`;
public static readonly UPDATE_MESSAGE_CONSUMED_STATUS = (
consumed: boolean,
): string => `UPDATE message SET consumed = ${consumed} WHERE id = $1;`;
}
Now, back to main.ts
, we need to create the functions to listen and notify the events in the database, so let's create a class for this:
[...] // previous code
export default class PostgresMQ {
private dbClient: Client;
/**
* Start the application via the constructor.
*/
constructor({
user,
database,
host,
password,
port,
type = 'pub-sub',
}: IinitPostgresMQ) {
this.dbClient = initPostgresMQ({
user,
database,
host,
password,
port,
type,
});
}
/**
* Subscribe to a channel.
* @param queueName - The name of the queue to listen to.
*/
public subscribeTo(queueName: string, action: (...args: any) => void): void {
console.log('Subscribing to queue', ` "${queueName}" `);
this.dbClient.query(SqlQueries.LISTEN(queueName));
this.dbClient.on('notification', (msg: Notification) => {
action(msg);
});
}
/**
* Add a message to a queue.
* @param param0 - the queue name and the message to send.
*/
public publishMessage({
queueName,
message,
}: {
queueName: string;
message: Record<string, unknown>;
}): void {
console.log('Adding message to queue', ` "${queueName}" `);
this.dbClient.query(SqlQueries.NOTIFY(queueName, message));
}
}
Now, let's see this code in action. Create a file called pubsub-test.ts
:
touch src/pubsub-test.ts
And add the following code:
import { Notification } from 'pg';
import PostgresMQ from './main';
const myQueue = new PostgresMQ({
user: 'postgres-mq',
database: 'postgres-mq',
host: 'localhost',
password: 'postgres-mq',
port: 5439,
});
/**
*
* @param _
*/
function sayHi(_: string) {
console.log(`Hi, ${_} !`);
}
myQueue.subscribeTo('my_test_queue', (msg: Notification) => {
console.log('Message received', {
channel: msg.channel,
payload: msg.payload ? JSON.parse(msg.payload) : '<empty-value>',
});
sayHi(msg.channel);
});
setInterval(() => {
myQueue.publishMessage({
queueName: 'my_test_queue',
message: {
foo: 'bar',
it: 'works',
},
});
}, 5000);
This code will create a queue called my_test_queue
and will send a message to this queue every 5 seconds. Let's run it:
yarn example:pubsub
Our pub/sub implementation is done, but the Postgres' LISTEN and NOTIFY functions aren't persistent, they are more like a pub/sub mechanism. Pub/sub means that if nobody is subscribed listening, the events emmited in this time interval were lost. So, in this case, if you need persistent data, maybe it is not the best option, but hold still! Let's see a better approach for this case.
Making it persistent
Message-queues like RabbitMQ, Kafka, and others, are persistent, so if you send a message to a queue and nobody is listening, the message will be stored in the queue until someone consumes it. So far, our implementation is just a pub/sub model, let's make our PostgresMQ persistent to be a true message-queue in a producer/consumer model.
You probably noticed that at the beginning of this article, we created a table called message
and another called message_queue
, but we didn't use them yet. Let's use them now. These tables will be used to store a queue's messages and the created queues, respectively.
Message-queues are different from pub/sub because they follow a producer/consumer model, but what is a producer and what is a consumer? A producer is a service that sends messages to a queue, and a consumer is a service that consumes messages from a queue. In a producer/consumer model, we have two possible approaches:
- PULL-BASED: the consumer pulls messages from the queue.
- PUSH-BASED: the queue pushes (notifies) messages to the consumer.
In this article, we will use the PULL-BASED approach, but you can use the PUSH-BASED approach if you want.
First, we need to adapt our PostgresMQ class to use the message
and message_queue
tables. To do it, let's create two new methods in our class:
[...] // previous code
class PostgresMQ {
[...] // existent code
/**
* Add a message to a queue.
* @param param0 - the queue name and the message to send.
* @param param1 - the number of seconds to wait before consuming the message.
* @param param2 - the number of times to retry consuming the message.
* @param param3 - the number of seconds to wait before retrying to consume the message.
*/
public async produceMessage({
queueName,
message,
delay = 0,
retry = 0,
}: {
queueName: string;
message: Record<string, unknown>;
delay?: number;
retry?: number;
}): Promise<void> {
await this.dbClient.query(SqlQueries.CREATE_MESSAGE_QUEUE(), [queueName]);
await this.dbClient.query(SqlQueries.CREATE_MESSAGE(), [
message,
delay,
retry,
queueName,
]);
this.publishMessage({ queueName, message });
}
/**
* Consume a message from a queue.
* @param queueName - the name of the queue to consume from.
*/
public async consumeMessage(
queueName: string,
action: (...args: any) => void,
): Promise<void> {
this.subscribeTo(queueName, async (msg: Notification) => {
console.log('Message received', {
channel: msg.channel,
payload: msg.payload ? JSON.parse(msg.payload) : '<empty-value>',
});
const rows = await this.dbClient.query(
SqlQueries.GET_QUEUE_UNCONSUMED_MESSAGES(),
[queueName],
);
if (rows.rowCount > 0) {
rows.rows.forEach(async (row) => {
const { id, msg, delay, created_at } = row;
if (delay > 0) {
setTimeout(() => {
action({
id,
created_at,
...msg,
});
}, delay * 1000);
await this.dbClient.query(
SqlQueries.UPDATE_MESSAGE_CONSUMED_STATUS(true),
[id],
);
return;
}
action({
id,
created_at,
...msg,
});
/* essa linha deve mesmo ser atualizada? os consumers que perderem a
mensagem não vão conseguir consumir novamente? */
await this.dbClient.query(
SqlQueries.UPDATE_MESSAGE_CONSUMED_STATUS(true),
[id],
);
});
}
});
}
}
[...] // rest of the code
In the code above, we have created two methods: produceMessage
and consumeMessage
, that will be used to produce and consume messages, respectively. The produceMessage
method will add a message to the message
table and will publish the message to the queue. The consumeMessage
method will consume a message from the message
table and will execute the action
function passed as a parameter.
Now, let's see our message-queue in action. Create a file called mq-test.ts
:
touch src/mq-test.ts
And add the following code:
import PostgresMQ from './main';
const myQueue = new PostgresMQ({
user: 'postgres-mq',
database: 'postgres-mq',
host: 'localhost',
password: 'postgres-mq',
port: 5439,
type: 'message-queue',
});
/**
*
* @param _
*/
function sayHi(_: unknown) {
console.log(`Hi, ${JSON.stringify(_)} !`);
}
setInterval(() => {
myQueue.produceMessage({
queueName: 'my_test_queue',
message: {
foo: 'bar',
it: 'works',
},
});
}, 5000);
myQueue.consumeMessage('my_test_queue', sayHi);
In the code above, we are creating a new instance of our PostgresMQ
class and we are calling the produceMessage
method every 5 seconds. The produceMessage
method will add a message to the message
table and will publish the message to the my_test_queue
queue. The consumeMessage
method will consume a message from the message
table and will execute the sayHi
function passed as a parameter.
Now, let's run our code:
yarn example:mq
This was our Postgres message-queue implementation combining the LISTEN
and NOTIFY
commands with the message
and message_queue
tables. We have created a simple producer/consumer model that can be used to send and receive messages.
To improve
Congratulations, you have reached the end of this article and created a simple message-queue using Postgres. There are still some improvements that can be made to this implementation:
- Add a TTL to the messages.
- Add a TTL to the queues.
- Our producer/consumer model is PULL-BASED, but you can use the PUSH-BASED approach if you want.
- Our producer/consumer model is not broadcasting to multiple consumers, currently only one consumer can consume a message.
- Add a way to delete a queue.
- Add a way to delete a message.