Using Postgres Notifications in .NET for Data Change Notifications

PostgreSQL has a feature called NOTIFY and LISTEN. We can send asynchronous notifications to subscribers using the pg_notify function. This is useful, if we want to be informed about changes in our data or anything relevant to our application.

In this example I am going to show how to subscribe to notifications in .NET and listen to insert, update and delete operations on our database tables. It can be done in just a few lines of code using Npgsql, and I think it's worth sharing.

All code can be found in a Git Repository at:

Table of contents

Creating NOTIFY Triggers in Postgres

We start by defining a function notify_trigger(), that invokes pg_notify(...) to raise a notification on a given channel. We are going to send some information, such as a Timestamp, the current row version, and the current values of the inserted, updated or deleted row.

CREATE OR REPLACE FUNCTION notify_trigger() 
RETURNS TRIGGER AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  channel_name TEXT;
  payload_items json;
BEGIN

  -- Get the Operation
  CASE TG_OP
  WHEN 'INSERT','UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
    RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;

  -- Get the Channel Name
  IF TG_ARGV[0] IS NULL THEN
    RAISE EXCEPTION 'A Channel Name is required as first argument';
  END IF;

  channel_name := TG_ARGV[0];

  -- Get the payload
  payload_items := row_to_json(rec);

  -- Build the payload
  payload := json_build_object(
      'timestamp', CURRENT_TIMESTAMP
    , 'row_version', rec.xmin
    , 'operation', TG_OP
    , 'schema', TG_TABLE_SCHEMA
    , 'table', TG_TABLE_NAME
    , 'payload', payload_items);

  -- Notify the channel
  PERFORM pg_notify(channel_name, payload);

  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;

We then create a Trigger on a table, for example on gitclub.organization.

CREATE OR REPLACE TRIGGER organization_notify_trigger
AFTER INSERT OR UPDATE OR DELETE ON gitclub.organization
FOR EACH ROW EXECUTE PROCEDURE notify_trigger('core_db_event');

This Trigger invokes our notify_trigger(...) method, whenever a row has been inserted, updated or deleted.

Oh, and that's it already for the Postgres-side!

Listening for Notifications in .NET

In our .NET application we start by defining the Notification received from Postgres as a PostgresNotification.

// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace GitClub.Infrastructure.Postgres
{
    /// <summary>
    /// A Notification received from Postgres NOTIFY / LISTEN.
    /// </summary>
    public record PostgresNotification
    {
        /// <summary>
        /// Gets or sets the PID.
        /// </summary>
        public required int PID { get; set; }

        /// <summary>
        /// Gets or sets the Channel.
        /// </summary>
        public required string Channel { get; set; }

        /// <summary>
        /// Gets or sets the Payload.
        /// </summary>
        public required string Payload { get; set; }
    }
}

And we want to execute some action, when we receive a notification. It sounds like we should handle a PostgresNotification with something like an IPostgresNotificationHandler.

// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace GitClub.Infrastructure.Postgres
{
    /// <summary>
    /// Handles Notifications received from a Postgres Channel.
    /// </summary>
    public interface IPostgresNotificationHandler
    {
        /// <summary>
        /// Handles a Notification received from a Postgres Channel.
        /// </summary>
        /// <param name="notification">Notification received from Postgres</param>
        /// <param name="cancellationToken">CancellationToken to stop asynchronous processing</param>
        /// <returns>Awaitable ValueTask</returns>
        ValueTask HandleNotificationAsync(PostgresNotification notification, CancellationToken cancellationToken);
    }
}

The default implementation is a LoggingPostgresNotificationHandler, which just logs all notifications received from Postgres.

// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace GitClub.Infrastructure.Postgres
{
    /// <summary>
    /// Logs all Notifications received from a Postgres Channel.
    /// </summary>
    public class LoggingPostgresNotificationHandler : IPostgresNotificationHandler
    {
        private readonly ILogger<LoggingPostgresNotificationHandler> _logger;

        public LoggingPostgresNotificationHandler(ILogger<LoggingPostgresNotificationHandler> logger)
        {
            _logger = logger;
        }

        public ValueTask HandleNotificationAsync(PostgresNotification notification, CancellationToken cancellationToken)
        {
            _logger.LogDebug("PostgresNotification (PID = {PID}, Channel = {Channel}, Payload = {Payload}",
                notification.PID, notification.Channel, notification.Payload);

            return ValueTask.CompletedTask;
        }
    }
}

We can now write the PostgresNotificationService, which is a BackgroundService. The idea is pretty simple: We have two Tasks. One Task is for receiving notifications from Postgres and putting them on a Channel<PostgresNotification>. The other Task is for processing the messages.

We are limiting the number of unprocessed notifications to 10,000 messages.

You can easily configure it to suit your needs.

// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using GitClub.Infrastructure.Postgres;
using Microsoft.Extensions.Options;
using Npgsql;
using System.Threading.Channels;

namespace GitClub.Hosted
{
    /// <summary>
    /// Options to configure the <see cref="PostgresNotificationService"/>.
    /// </summary>
    public class PostgresNotificationServiceOptions
    {
        /// <summary>
        /// Gets or sets the Channel the Service is listening to.
        /// </summary>
        public required string ChannelName { get; set; }

        /// <summary>
        /// Gets or sets the Maximum Capacity of unhandled Notifications. Default is 10,000 notifications. 
        /// </summary>
        public int MaxCapacity { get; set; } = 10_000;
    }

    /// <summary>
    /// This Service waits for Notifications received on a given Postgres Channel name.
    /// </summary>
    public class PostgresNotificationService : BackgroundService
    {
        private readonly ILogger<PostgresNotificationService> _logger;

        private readonly PostgresNotificationServiceOptions _options;
        private readonly IPostgresNotificationHandler _postgresNotificationHandler;
        private readonly NpgsqlDataSource _npgsqlDataSource;

        public PostgresNotificationService(ILogger<PostgresNotificationService> logger, IOptions<PostgresNotificationServiceOptions> options, NpgsqlDataSource npgsqlDataSource, IPostgresNotificationHandler postgresNotificationHandler)
        {
            _logger = logger;
            _options = options.Value;
            _npgsqlDataSource = npgsqlDataSource;
            _postgresNotificationHandler = postgresNotificationHandler;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            // Now this could be a horrible thing to do when we experience backpressure... 
            var channel = Channel.CreateBounded<PostgresNotification>(new BoundedChannelOptions(_options.MaxCapacity)
            {
                SingleReader = true,
                SingleWriter = true,
                FullMode = BoundedChannelFullMode.Wait
            });

            // We are running both loops until either of them is stopped or runs dry ...
            await Task
                .WhenAny(SetupPostgresAsync(stoppingToken), ProcessChannelAsync(stoppingToken))
                .ConfigureAwait(false);

            // Initializes the Postgres Listener by issueing a LISTEN Command.
            async Task SetupPostgresAsync(CancellationToken cancellationToken)
            {
                // Open a new Connection, which can be used to issue the LISTEN Command 
                // to the Postgres Database.
                using var connection = await _npgsqlDataSource
                    .OpenConnectionAsync(cancellationToken)
                    .ConfigureAwait(false);

                // If we receive a message from Postgres, we convert the Event 
                // to a PostgresNotification and put it on the Channel.
                connection.Notification += (sender, x) =>
                {
                    var notification = new PostgresNotification
                    {
                        Channel = x.Channel,
                        PID = x.PID,
                        Payload = x.Payload,
                    };

                    channel.Writer.TryWrite(notification);
                };

                // We register to the Notifications on the Channel.
                using (var command = new NpgsqlCommand($"LISTEN {_options.ChannelName}", connection))
                {
                    await command
                        .ExecuteNonQueryAsync(cancellationToken)
                        .ConfigureAwait(false);
                }

                // And now we are putting the Connection into the Wait State,
                // until the Cancellation is requested.
                while (!cancellationToken.IsCancellationRequested)
                {
                    await connection
                        .WaitAsync(cancellationToken)
                        .ConfigureAwait(false);
                }
            }

            // This Processes the Messages received by the Channel, so we can process 
            // the messages. All we are doing is basically ivoking the handler given 
            // to us.
            async Task ProcessChannelAsync(CancellationToken cancellationToken)
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        await foreach (var message in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
                        {
                            await _postgresNotificationHandler.HandleNotificationAsync(message, cancellationToken).ConfigureAwait(false);
                        }
                    }
                    catch (Exception e)
                    {
                        _logger.LogError(e, "An Error Occured processing the Event");
                    }
                }
            }
        }
    }
}

In the Program.cs we can register the PostgresNotificationService as ...

// Hosted Services
builder.Services.AddSingleton<IPostgresNotificationHandler, LoggingPostgresNotificationHandler>();

builder.Services.Configure<PostgresNotificationServiceOptions>(o => o.ChannelName = "core_db_event");
builder.Services.AddHostedService<PostgresNotificationService>();

And that's it.

Enjoy your incoming notifications!

Conclusion

I think this is a simple way to be notified about events in a Postgres database. The solution we came up with is quite generic and it can be adapted to a lot of scenarios. The code is short and it's going to be easy to extend and maintain.

There are obviously some things missing here and I have no answer for, like ...

What's going to happen, if we cannot process incoming notifications and experience backpressure? Should we wait or drop notifications? Throw an Exception? What happens, if we lose the Connection to the Postgres Server, thus being unsubscribed from Notifications? Should we throw an exception? Log it and reconnect? Does Npgsql automatically reconnect?

I would be interested to hear your thoughts on it!