You don't need Kafka! Use Postgres for message-queues

January 11, 2023

No, it's not a click bait, you can use PostgreSQL (yes, the database) as a message queue, and without any external libraries and plug-ins. Also, you probably don't need Apache Kafka. In this article, I will explain why you probably don't need Kafka and how to use Postgres as a message-queue.

 

About Apache Kafka

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Apache Kafka is very used for asynchronous communication between microservices. It can handle up to 2 million messages PER SECOND and has a well-structured broker scheme, with topic partitions and data persistence in the disk.

 

To think 🤔

Do your company really need a robust message broker, that is capable to handle 2 million messages per second? Probably not, unless you work for a giant that has a huge number of active users. Digital Ocean had used MySQL as a message-queue for many years, because this was enough and enabled them to save money and resources. Now, they use RabbitMQ, because they grew too much in the last few years and MySQL wasn't being able to handle the request anymore. You can do as Digital Ocean did, start small and refactor the system when needed. Imagine you are the CTO of an early-stage startup, but that is fastly growing. You have just decided to use a message-queue to help your system to handle the increasing number of requests.

Early-stage startup system

 

When to use Apache Kafka?

Apache Kafka is an amazing tool, but you have to know where to use it, to not spend too much money with something you might not need. So, some use cases for Kafka usage, are:

  • Real-time data processing
  • Messaging between big (really big) microservices that have a high throughput and are considered "mission critical"
  • Log Aggregation
  • Tracking user activities across a distributed systems network

If your system works with little data and hasn't requirements like the ones above, using Kafka will be overkill. In this cases you can use RabbitMQ, but RabbitMQ may also be too much, well, you can use Redis then! But Redis may also bee too much. What about using Postgres as your message-queue system?

 

What is a message-queue?

A message queue is a software engineering component used for communication between processes or between threads within the same process. Message queues provide an asynchronous communication protocol in which the sender and receiver of messages don't need to interact at the same time - messages are held in queue until the recipient retrieves them

Postgres as a message-queue

As mentioned earlier, relational databases, including PostgreSQL, are used by many companies as a message-queue or pub-sub. But, why? What are the benefits?

  • Easy to setup
  • Can handle a large number of concurrent connections
  • Cheaper
  • Doesn't introduce more complexity, because you will be using the same technology for data storage and message-queuing

TLDR; Just use postgres for everything

Implementation

Now, let's go right to the point. How to implement a message-queue using Postgres? There are three ways to do this, using LISTEN/NOTIFY, using a table and combining LISTEN/NOTIFY with a table. For this implementation we are going to use the following technologies:

  • Node.js
  • Typescript
  • Postgres
  • pg (Postgres native driver for Node.js)

OBS.: Don't worry, we are going to use the minimum amount of programming language possible to explain the concepts, the focus is on the database and SQL, not on the programming language.

Setup the project

  1. Enter in the project folder and then type in your terminal:
touch docker-compose.yml
  1. Open the file and paste the following code:
docker-compose.yml
version: '3'

services:
  db:
    image: postgres:13
    restart: always
    container_name: postgres-mq
    ports:
      - '5439:5432'
    environment:
      POSTGRES_USER: postgres-mq
      POSTGRES_PASSWORD: postgres-mq
      POSTGRES_DB: postgres-mq
networks:
  user-network:
    driver: bridge
  proxynet:
    name: custom_network
  1. Now, run the following command:
touch .dockerignore && echo "node_modules" >> .dockerignore # set the .dockerignore file
  1. Initialize a JavaScript project:
yarn init -y # you can use npm or pnpm as well = )
  1. Open the package.json file and override it with the following content:
package.json
{
  "name": "postgres-mq",
  "version": "1.0.0",
  "main": "index.js",
  "license": "MIT",
  "scripts": {
    "lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix",
    "start:dev": "tsnd -r tsconfig-paths/register --exit-child --transpile-only --ignore-watch node_modules src/main.ts",
    "example:pubsub": "tsnd -r tsconfig-paths/register --exit-child --transpile-only --ignore-watch node_modules src/pubsub-test.ts",
    "example:mq": "tsnd -r tsconfig-paths/register --exit-child --transpile-only --ignore-watch node_modules src/mq-test.ts"
  },
  "dependencies": {
    "pg": "^8.8.0"
  },
  "devDependencies": {
    "@types/pg": "^8.6.5",
    "@faker-js/faker": "^7.5.0",
    "@types/node": "^18.7.3",
    "@types/validator": "^13.7.8",
    "@typescript-eslint/eslint-plugin": "^5.0.0",
    "@typescript-eslint/parser": "^5.0.0",
    "dotenv": "^16.0.3",
    "eslint": "^8.0.1",
    "eslint-config-airbnb-base": "^15.0.0",
    "eslint-config-prettier": "^8.3.0",
    "eslint-plugin-import": "^2.25.2",
    "eslint-plugin-prettier": "^4.2.1",
    "jest": "28.0.3",
    "prettier": "^2.3.2",
    "ts-jest": "28.0.1",
    "ts-node": "^10.0.0",
    "ts-node-dev": "^2.0.0",
    "tsconfig-paths": "^4.0.0",
    "typescript": "^4.8.4"
  }
}
  1. Install the dependencies:
yarn install
  1. Create a typescript configuration file:
touch tsconfig.json
  1. Open the tsconfig.json file and add some default configurations:
tsconfig.json
{
  "compilerOptions": {
    "outDir": "dist",
    "module": "commonjs",
    "target": "es2019",
    "esModuleInterop": true,
    "emitDecoratorMetadata": true,
    "experimentalDecorators": true,
    "strict": true,
    "strictPropertyInitialization": false,
    "allowJs": true,
    "sourceMap": true,
    "rootDirs": ["./src", "tests"],
    "baseUrl": "./",
    "paths": {
      "@/tests/*": ["./tests/*"],
      "@/*": ["./src/*"]
    },
  },
  "include": ["src", "tests", "@types"],
  "exclude": ["node_modules"]
}
  1. Create the src folder and then create the main.ts file:
mkdir src && touch src/main.ts

Now we are ready to start the project!

Using LISTEN/NOTIFY (pub/sub)

LISTEN and NOTIFY are Postgres' functions that allow us to listen to events and notify them. It is a pub/sub mechanism inside the database. Let's see how it works in the diagram below:

Postgres message-queue architecture diagram

The diagram above is a very simplified representation of a payment analysis system. The system has a payment service that receives payments and then sends them to the analysis service. The analysis service is responsible for analyzing the payments and then sending the result to the payments notification service. The process of analyzing a payment may take a long time to be completed, so the notification service needs to know when the payment analysis was completed. The notification service is responsible for sending the payment analysis result to the user.

To make this process work, we need to stablish a communication between the services, and this is where the LISTEN and NOTIFY functions come in. The payment service will send the payment to the analysis service, and then the analysis service will notify the notification service that the payment was analyzed. So, summing up, the notification service will LISTEN to the analysis service and the analysis service will NOTIFY the notification service.

Code

In our main.ts, let's create a function to connect with the database:

main.ts
import { Client, Notification } from 'pg';
import { SqlQueries } from './sql-queries';

interface IinitPostgresMQ {
  user: string;
  host: string;
  database: string;
  password: string;
  port: number;
  type?: 'message-queue' | 'pub-sub';
}

/**
 * Start the application.
 */
const initPostgresMQ = ({
  user,
  host,
  database,
  password,
  port,
  type,
}: IinitPostgresMQ) => {
  const dbClient = new Client({
    user,
    host,
    database,
    password,
    port,
  });

  dbClient.connect((err) => {
    if (err) {
      console.log('Error connecting to database', err);
    } else {
      console.log('Connected to database');
    }
  });

  if (type === 'message-queue') {
    dbClient.query(
      `
      ${SqlQueries.CREATE_MESSAGE_QUEUE_TABLE()}
      ${SqlQueries.CREATE_MESSAGE_TABLE()}
      `,
    );
  }

  return dbClient;
};

The initPostgresMQ function receives the database connection data and returns a Client instance. The Client instance is the connection with the database. Our queries will be stored in a separated file called sql-queries.ts. Let's create it:

touch src/sql-queries.ts

And fill it with some useful queries

sql-queries.ts
export class SqlQueries {
  public static readonly LISTEN = (queueName: string): string =>
    `LISTEN ${queueName};`;
  public static readonly NOTIFY = (
    queueName: string,
    message: Record<string, unknown>,
  ): string => `NOTIFY ${queueName}, '${JSON.stringify(message)}';`;
  public static readonly CREATE_MESSAGE_QUEUE_TABLE = (): string =>
    `CREATE TABLE IF NOT EXISTS message_queue (
      id SERIAL PRIMARY KEY,
      queue_name VARCHAR(255) NOT NULL UNIQUE,
      created_at TIMESTAMP NOT NULL DEFAULT NOW()
    );`;
  public static readonly CREATE_MESSAGE_TABLE = (): string =>
    `CREATE TABLE IF NOT EXISTS message(
      id SERIAL PRIMARY KEY,
      msg JSONB NOT NULL,
      delay INTEGER NOT NULL DEFAULT 0,
      retry INTEGER NOT NULL DEFAULT 0,
      consumed BOOLEAN NOT NULL DEFAULT FALSE,
      queue_id INTEGER NOT NULL,
      created_at TIMESTAMP NOT NULL DEFAULT NOW(),
      FOREIGN KEY (queue_id) REFERENCES message_queue(id)
    );`;
  public static readonly CREATE_MESSAGE_QUEUE = (): string =>
    `INSERT INTO message_queue (queue_name) VALUES ($1) ON CONFLICT DO NOTHING;`;
  public static readonly CREATE_MESSAGE =
    (): string => `INSERT INTO message (msg, delay, retry, queue_id) VALUES ($1, $2, $3,
    (SELECT id FROM message_queue WHERE queue_name = $4));`;
  public static readonly GET_QUEUE_UNCONSUMED_MESSAGES = (): string =>
    `SELECT m.id, m.msg, m.delay, m.retry, m.consumed, m.created_at, q.queue_name
        FROM message m INNER JOIN message_queue q ON m.queue_id = q.id
        WHERE q.queue_name = $1 AND m.consumed = FALSE
        FOR UPDATE SKIP LOCKED;`;
  public static readonly UPDATE_MESSAGE_CONSUMED_STATUS = (
    consumed: boolean,
  ): string => `UPDATE message SET consumed = ${consumed} WHERE id = $1;`;
}

Now, back to main.ts, we need to create the functions to listen and notify the events in the database, so let's create a class for this:

main.ts
[...] // previous code

export default class PostgresMQ {
  private dbClient: Client;
  /**
   * Start the application via the constructor.
   */
  constructor({
    user,
    database,
    host,
    password,
    port,
    type = 'pub-sub',
  }: IinitPostgresMQ) {
    this.dbClient = initPostgresMQ({
      user,
      database,
      host,
      password,
      port,
      type,
    });
  }

  /**
   * Subscribe to a channel.
   * @param queueName - The name of the queue to listen to.
   */
  public subscribeTo(queueName: string, action: (...args: any) => void): void {
    console.log('Subscribing to queue', ` "${queueName}" `);
    this.dbClient.query(SqlQueries.LISTEN(queueName));

    this.dbClient.on('notification', (msg: Notification) => {
      action(msg);
    });
  }

  /**
   * Add a message to a queue.
   * @param param0 - the queue name and the message to send.
   */
  public publishMessage({
    queueName,
    message,
  }: {
    queueName: string;
    message: Record<string, unknown>;
  }): void {
    console.log('Adding message to queue', ` "${queueName}" `);
    this.dbClient.query(SqlQueries.NOTIFY(queueName, message));
  }
}

Now, let's see this code in action. Create a file called pubsub-test.ts:

touch src/pubsub-test.ts

And add the following code:

pubsub-test.ts
import { Notification } from 'pg';
import PostgresMQ from './main';

const myQueue = new PostgresMQ({
  user: 'postgres-mq',
  database: 'postgres-mq',
  host: 'localhost',
  password: 'postgres-mq',
  port: 5439,
});

/**
 *
 * @param _
 */
function sayHi(_: string) {
  console.log(`Hi, ${_} !`);
}

myQueue.subscribeTo('my_test_queue', (msg: Notification) => {
  console.log('Message received', {
    channel: msg.channel,
    payload: msg.payload ? JSON.parse(msg.payload) : '<empty-value>',
  });
  sayHi(msg.channel);
});

setInterval(() => {
  myQueue.publishMessage({
    queueName: 'my_test_queue',
    message: {
      foo: 'bar',
      it: 'works',
    },
  });
}, 5000);

This code will create a queue called my_test_queue and will send a message to this queue every 5 seconds. Let's run it:

yarn example:pubsub

Our pub/sub implementation is done, but the Postgres' LISTEN and NOTIFY functions aren't persistent, they are more like a pub/sub mechanism. Pub/sub means that if nobody is subscribed listening, the events emmited in this time interval were lost. So, in this case, if you need persistent data, maybe it is not the best option, but hold still! Let's see a better approach for this case.

Making it persistent

Message-queues like RabbitMQ, Kafka, and others, are persistent, so if you send a message to a queue and nobody is listening, the message will be stored in the queue until someone consumes it. So far, our implementation is just a pub/sub model, let's make our PostgresMQ persistent to be a true message-queue in a producer/consumer model.

You probably noticed that at the beginning of this article, we created a table called message and another called message_queue, but we didn't use them yet. Let's use them now. These tables will be used to store a queue's messages and the created queues, respectively.

Message-queues are different from pub/sub because they follow a producer/consumer model, but what is a producer and what is a consumer? A producer is a service that sends messages to a queue, and a consumer is a service that consumes messages from a queue. In a producer/consumer model, we have two possible approaches:

  • PULL-BASED: the consumer pulls messages from the queue.
  • PUSH-BASED: the queue pushes (notifies) messages to the consumer.

In this article, we will use the PULL-BASED approach, but you can use the PUSH-BASED approach if you want.

First, we need to adapt our PostgresMQ class to use the message and message_queue tables. To do it, let's create two new methods in our class:

main.ts
[...] // previous code
class PostgresMQ {
  [...] // existent code

  /**
   * Add a message to a queue.
   * @param param0 - the queue name and the message to send.
   * @param param1 - the number of seconds to wait before consuming the message.
   * @param param2 - the number of times to retry consuming the message.
   * @param param3 - the number of seconds to wait before retrying to consume the message.
   */
  public async produceMessage({
    queueName,
    message,
    delay = 0,
    retry = 0,
  }: {
    queueName: string;
    message: Record<string, unknown>;
    delay?: number;
    retry?: number;
  }): Promise<void> {
    await this.dbClient.query(SqlQueries.CREATE_MESSAGE_QUEUE(), [queueName]);
    await this.dbClient.query(SqlQueries.CREATE_MESSAGE(), [
      message,
      delay,
      retry,
      queueName,
    ]);
    this.publishMessage({ queueName, message });
  }

  /**
   * Consume a message from a queue.
   * @param queueName - the name of the queue to consume from.
   */
  public async consumeMessage(
    queueName: string,
    action: (...args: any) => void,
  ): Promise<void> {
    this.subscribeTo(queueName, async (msg: Notification) => {
      console.log('Message received', {
        channel: msg.channel,
        payload: msg.payload ? JSON.parse(msg.payload) : '<empty-value>',
      });

      const rows = await this.dbClient.query(
        SqlQueries.GET_QUEUE_UNCONSUMED_MESSAGES(),
        [queueName],
      );

      if (rows.rowCount > 0) {
        rows.rows.forEach(async (row) => {
          const { id, msg, delay, created_at } = row;
          if (delay > 0) {
            setTimeout(() => {
              action({
                id,
                created_at,
                ...msg,
              });
            }, delay * 1000);

            await this.dbClient.query(
              SqlQueries.UPDATE_MESSAGE_CONSUMED_STATUS(true),
              [id],
            );

            return;
          }
          action({
            id,
            created_at,
            ...msg,
          });

          /* essa linha deve mesmo ser atualizada? os consumers que perderem a
          mensagem não vão conseguir consumir novamente? */
          await this.dbClient.query(
            SqlQueries.UPDATE_MESSAGE_CONSUMED_STATUS(true),
            [id],
          );
        });
      }
    });
  }
}

[...] // rest of the code

In the code above, we have created two methods: produceMessage and consumeMessage, that will be used to produce and consume messages, respectively. The produceMessage method will add a message to the message table and will publish the message to the queue. The consumeMessage method will consume a message from the message table and will execute the action function passed as a parameter.

Now, let's see our message-queue in action. Create a file called mq-test.ts:

touch src/mq-test.ts

And add the following code:

pubsub-test.ts
import PostgresMQ from './main';

const myQueue = new PostgresMQ({
  user: 'postgres-mq',
  database: 'postgres-mq',
  host: 'localhost',
  password: 'postgres-mq',
  port: 5439,
  type: 'message-queue',
});

/**
 *
 * @param _
 */
function sayHi(_: unknown) {
  console.log(`Hi, ${JSON.stringify(_)} !`);
}

setInterval(() => {
  myQueue.produceMessage({
    queueName: 'my_test_queue',
    message: {
      foo: 'bar',
      it: 'works',
    },
  });
}, 5000);

myQueue.consumeMessage('my_test_queue', sayHi);

In the code above, we are creating a new instance of our PostgresMQ class and we are calling the produceMessage method every 5 seconds. The produceMessage method will add a message to the message table and will publish the message to the my_test_queue queue. The consumeMessage method will consume a message from the message table and will execute the sayHi function passed as a parameter.

Now, let's run our code:

yarn example:mq

This was our Postgres message-queue implementation combining the LISTEN and NOTIFY commands with the message and message_queue tables. We have created a simple producer/consumer model that can be used to send and receive messages.

To improve

Congratulations, you have reached the end of this article and created a simple message-queue using Postgres. There are still some improvements that can be made to this implementation:

  • Add a TTL to the messages.
  • Add a TTL to the queues.
  • Our producer/consumer model is PULL-BASED, but you can use the PUSH-BASED approach if you want.
  • Our producer/consumer model is not broadcasting to multiple consumers, currently only one consumer can consume a message.
  • Add a way to delete a queue.
  • Add a way to delete a message.

Sources