Using Postgres Notifications in .NET for Data Change Notifications

PostgreSQL has a feature called NOTIFY and LISTEN. We can send asynchronous notifications to subscribers using the pg_notify function. This is useful, if we want to be informed about changes in our data or anything relevant to our application.

In this example I am going to show how to subscribe to notifications in .NET and listen to insert, update and delete operations on our database tables. It can be done in just a few lines of code using Npgsql, and I think it's worth sharing.

All code can be found in a Git Repository at:

Table of contents

Trying it out

If you want to try it out, before looking at the details, here is an example.

Clone the Git Repository

Start by cloning the npgsql-examples repository by running:

git clone https://github.com/bytefish/npgsql-examples.git

Start the PostgreSQL database

Then switch to notify-listen/docker and start the Postgres instance using:

docker compose up

It starts a local PostgreSQL 16 instance listening localhost:5432 using username postgres and password password.

Start the .NET Application

Start the .NET application.

The connection string, which is given in the appsettings.json, already points to the local Postgres instance.

Using psql to update the data

Switch over to a console and start psql to update some data:

> psql -h localhost -U postgres -W
Password:

psql (15.2, server 16.6 (Debian 16.6-1.pgdg120+1))
WARNING: psql major version 15, server major version 16.
         Some psql features might not work.
WARNING: Console code page (437) differs from Windows code page (1252)
         8-bit characters might not work correctly. See psql reference
         page "Notes for Windows users" for details.
Type "help" for help.

postgres=# SELECT * FROM gitclub.repository;
 repository_id |      name      | organization_id | last_edited_by
---------------+----------------+-----------------+----------------
             1 | Tooling        |               1 |              1
             2 | Specifications |               1 |              1
(2 rows)

postgres=# UPDATE gitclub.repository SET name = 'New Tooling' WHERE repository_id = 1;
UPDATE 1

Enjoying the received Notification

And in the console output we will see the incoming notification generated by the PostgreSQL NOTIFY command:

dbug: Microsoft.Extensions.Hosting.Internal.Host[2]
      Hosting started
dbug: NpgsqlNotifyListen.Infrastructure.LoggingPostgresNotificationHandler[0]
      PostgresNotification (PID = 137, Channel = core_db_event, Payload = {"timestamp" : "2025-01-25T08:38:47.781178+00:00", "row_version" : "752", "operation" : "UPDATE", "schema" : "gitclub", "table" : "repository", "payload" : {"repository_id":1,"name":"New Tooling","organization_id":1,"last_edited_by":1}}

Creating NOTIFY Triggers in Postgres

We start by defining a function notify_trigger(), that invokes pg_notify(...) to raise a notification on a given channel. We are going to send some information, such as a Timestamp, the current row version, and the current values of the inserted, updated or deleted row.

CREATE OR REPLACE FUNCTION notify_trigger() 
RETURNS TRIGGER AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  channel_name TEXT;
  payload_items json;
BEGIN

  -- Get the Operation
  CASE TG_OP
  WHEN 'INSERT','UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
    RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;

  -- Get the Channel Name
  IF TG_ARGV[0] IS NULL THEN
    RAISE EXCEPTION 'A Channel Name is required as first argument';
  END IF;

  channel_name := TG_ARGV[0];

  -- Get the payload
  payload_items := row_to_json(rec);

  -- Build the payload
  payload := json_build_object(
      'timestamp', CURRENT_TIMESTAMP
    , 'row_version', rec.xmin
    , 'operation', TG_OP
    , 'schema', TG_TABLE_SCHEMA
    , 'table', TG_TABLE_NAME
    , 'payload', payload_items);

  -- Notify the channel
  PERFORM pg_notify(channel_name, payload);

  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;

We then create a Trigger on a table, for example on gitclub.organization.

CREATE OR REPLACE TRIGGER organization_notify_trigger
AFTER INSERT OR UPDATE OR DELETE ON gitclub.organization
FOR EACH ROW EXECUTE PROCEDURE notify_trigger('core_db_event');

This Trigger invokes our notify_trigger(...) method, whenever a row has been inserted, updated or deleted.

Oh, and that's it already for the Postgres-side!

Listening for Notifications in .NET

In our .NET application we start by defining the Notification received from Postgres as a PostgresNotification.

// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace GitClub.Infrastructure.Postgres
{
    /// <summary>
    /// A Notification received from Postgres NOTIFY / LISTEN.
    /// </summary>
    public record PostgresNotification
    {
        /// <summary>
        /// Gets or sets the PID.
        /// </summary>
        public required int PID { get; set; }

        /// <summary>
        /// Gets or sets the Channel.
        /// </summary>
        public required string Channel { get; set; }

        /// <summary>
        /// Gets or sets the Payload.
        /// </summary>
        public required string Payload { get; set; }
    }
}

And we want to execute some action, when we receive a notification. It sounds like we should handle a PostgresNotification with something like an IPostgresNotificationHandler.

// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace GitClub.Infrastructure.Postgres
{
    /// <summary>
    /// Handles Notifications received from a Postgres Channel.
    /// </summary>
    public interface IPostgresNotificationHandler
    {
        /// <summary>
        /// Handles a Notification received from a Postgres Channel.
        /// </summary>
        /// <param name="notification">Notification received from Postgres</param>
        /// <param name="cancellationToken">CancellationToken to stop asynchronous processing</param>
        /// <returns>Awaitable ValueTask</returns>
        ValueTask HandleNotificationAsync(PostgresNotification notification, CancellationToken cancellationToken);
    }
}

The default implementation is a LoggingPostgresNotificationHandler, which just logs all notifications received from Postgres.

// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace GitClub.Infrastructure.Postgres
{
    /// <summary>
    /// Logs all Notifications received from a Postgres Channel.
    /// </summary>
    public class LoggingPostgresNotificationHandler : IPostgresNotificationHandler
    {
        private readonly ILogger<LoggingPostgresNotificationHandler> _logger;

        public LoggingPostgresNotificationHandler(ILogger<LoggingPostgresNotificationHandler> logger)
        {
            _logger = logger;
        }

        public ValueTask HandleNotificationAsync(PostgresNotification notification, CancellationToken cancellationToken)
        {
            _logger.LogDebug("PostgresNotification (PID = {PID}, Channel = {Channel}, Payload = {Payload}",
                notification.PID, notification.Channel, notification.Payload);

            return ValueTask.CompletedTask;
        }
    }
}

We can now write the PostgresNotificationService, which is a BackgroundService. The idea is pretty simple: We have two Tasks. One Task is for receiving notifications from Postgres and putting them on a Channel<PostgresNotification>. The other Task is for processing the messages.

We are limiting the number of unprocessed notifications to 10,000 messages.

You can easily configure it to suit your needs.

// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using GitClub.Infrastructure.Postgres;
using Microsoft.Extensions.Options;
using Npgsql;
using System.Threading.Channels;

namespace GitClub.Hosted
{
    /// <summary>
    /// Options to configure the <see cref="PostgresNotificationService"/>.
    /// </summary>
    public class PostgresNotificationServiceOptions
    {
        /// <summary>
        /// Gets or sets the Channel the Service is listening to.
        /// </summary>
        public required string ChannelName { get; set; }

        /// <summary>
        /// Gets or sets the Maximum Capacity of unhandled Notifications. Default is 10,000 notifications. 
        /// </summary>
        public int MaxCapacity { get; set; } = 10_000;
    }

    /// <summary>
    /// This Service waits for Notifications received on a given Postgres Channel name.
    /// </summary>
    public class PostgresNotificationService : BackgroundService
    {
        private readonly ILogger<PostgresNotificationService> _logger;

        private readonly PostgresNotificationServiceOptions _options;
        private readonly IPostgresNotificationHandler _postgresNotificationHandler;
        private readonly NpgsqlDataSource _npgsqlDataSource;

        public PostgresNotificationService(ILogger<PostgresNotificationService> logger, IOptions<PostgresNotificationServiceOptions> options, NpgsqlDataSource npgsqlDataSource, IPostgresNotificationHandler postgresNotificationHandler)
        {
            _logger = logger;
            _options = options.Value;
            _npgsqlDataSource = npgsqlDataSource;
            _postgresNotificationHandler = postgresNotificationHandler;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            // Now this could be a horrible thing to do when we experience backpressure... 
            var channel = Channel.CreateBounded<PostgresNotification>(new BoundedChannelOptions(_options.MaxCapacity)
            {
                SingleReader = true,
                SingleWriter = true,
                FullMode = BoundedChannelFullMode.Wait
            });

            // We are running both loops until either of them is stopped or runs dry ...
            await Task
                .WhenAny(SetupPostgresAsync(stoppingToken), ProcessChannelAsync(stoppingToken))
                .ConfigureAwait(false);

            // Initializes the Postgres Listener by issueing a LISTEN Command.
            async Task SetupPostgresAsync(CancellationToken cancellationToken)
            {
                // Open a new Connection, which can be used to issue the LISTEN Command 
                // to the Postgres Database.
                using var connection = await _npgsqlDataSource
                    .OpenConnectionAsync(cancellationToken)
                    .ConfigureAwait(false);

                // If we receive a message from Postgres, we convert the Event 
                // to a PostgresNotification and put it on the Channel.
                connection.Notification += (sender, x) =>
                {
                    var notification = new PostgresNotification
                    {
                        Channel = x.Channel,
                        PID = x.PID,
                        Payload = x.Payload,
                    };

                    channel.Writer.TryWrite(notification);
                };

                // We register to the Notifications on the Channel.
                using (var command = new NpgsqlCommand($"LISTEN {_options.ChannelName}", connection))
                {
                    await command
                        .ExecuteNonQueryAsync(cancellationToken)
                        .ConfigureAwait(false);
                }

                // And now we are putting the Connection into the Wait State,
                // until the Cancellation is requested.
                while (!cancellationToken.IsCancellationRequested)
                {
                    await connection
                        .WaitAsync(cancellationToken)
                        .ConfigureAwait(false);
                }
            }

            // This Processes the Messages received by the Channel, so we can process 
            // the messages. All we are doing is basically ivoking the handler given 
            // to us.
            async Task ProcessChannelAsync(CancellationToken cancellationToken)
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        await foreach (var message in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
                        {
                            await _postgresNotificationHandler.HandleNotificationAsync(message, cancellationToken).ConfigureAwait(false);
                        }
                    }
                    catch (Exception e)
                    {
                        _logger.LogError(e, "An Error Occured processing the Event");
                    }
                }
            }
        }
    }
}

In the Program.cs we can register the NpgsqlDataSource and the PostgresNotificationService as ...

// Database
builder.Services.AddSingleton<NpgsqlDataSource>((sp) =>
{
    var connectionString = builder.Configuration.GetConnectionString("ApplicationDatabase");

    if (connectionString == null)
    {
        throw new InvalidOperationException("No ConnectionString named 'ApplicationDatabase' was found");
    }

    // Since version 7.0, NpgsqlDataSource is the recommended way to use Npgsql. When using NpsgqlDataSource,
    // NodaTime currently has to be configured twice - once at the EF level, and once at the underlying ADO.NET
    // level (there are plans to improve this):
    var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);

    return dataSourceBuilder.Build();
});

// Notification Handler
builder.Services.AddSingleton<IPostgresNotificationHandler, LoggingPostgresNotificationHandler>();

// Add the Background Service processing the Notifications
builder.Services.Configure<PostgresNotificationServiceOptions>(o => o.ChannelName = "core_db_event");
builder.Services.AddHostedService<PostgresNotificationService>();

And that's it.

Enjoy your incoming notifications!

Conclusion

I think this is a simple way to be notified about events in a Postgres database. The solution we came up with is quite generic and it can be adapted to a lot of scenarios. The code is short and it's going to be easy to extend and maintain.

There are obviously some things missing here and I have no answer for, like ...

What's going to happen, if we cannot process incoming notifications and experience backpressure? Should we wait or drop notifications? Throw an Exception? What happens, if we lose the Connection to the Postgres Server, thus being unsubscribed from Notifications? Should we throw an exception? Log it and reconnect? Does Npgsql automatically reconnect?

I would be interested to hear your thoughts on it!