PostgreSQL has a feature called NOTIFY and LISTEN. We can send asynchronous notifications to subscribers using
the pg_notify
function. This is useful, if we want to be informed about changes in our data or anything relevant
to our application.
In this example I am going to show how to subscribe to notifications in .NET and listen to insert, update and delete operations on our database tables. It can be done in just a few lines of code using Npgsql, and I think it's worth sharing.
All code can be found in a Git Repository at:
Table of contents
Trying it out
If you want to try it out, before looking at the details, here is an example.
Clone the Git Repository
Start by cloning the npgsql-examples repository by running:
git clone https://github.com/bytefish/npgsql-examples.git
Start the PostgreSQL database
Then switch to notify-listen/docker
and start the Postgres instance using:
docker compose up
It starts a local PostgreSQL 16 instance listening localhost:5432
using username postgres
and password password
.
Start the .NET Application
Start the .NET application.
The connection string, which is given in the appsettings.json
, already points to the local Postgres instance.
Using psql to update the data
Switch over to a console and start psql
to update some data:
> psql -h localhost -U postgres -W
Password:
psql (15.2, server 16.6 (Debian 16.6-1.pgdg120+1))
WARNING: psql major version 15, server major version 16.
Some psql features might not work.
WARNING: Console code page (437) differs from Windows code page (1252)
8-bit characters might not work correctly. See psql reference
page "Notes for Windows users" for details.
Type "help" for help.
postgres=# SELECT * FROM gitclub.repository;
repository_id | name | organization_id | last_edited_by
---------------+----------------+-----------------+----------------
1 | Tooling | 1 | 1
2 | Specifications | 1 | 1
(2 rows)
postgres=# UPDATE gitclub.repository SET name = 'New Tooling' WHERE repository_id = 1;
UPDATE 1
Enjoying the received Notification
And in the console output we will see the incoming notification generated by the PostgreSQL NOTIFY
command:
dbug: Microsoft.Extensions.Hosting.Internal.Host[2]
Hosting started
dbug: NpgsqlNotifyListen.Infrastructure.LoggingPostgresNotificationHandler[0]
PostgresNotification (PID = 137, Channel = core_db_event, Payload = {"timestamp" : "2025-01-25T08:38:47.781178+00:00", "row_version" : "752", "operation" : "UPDATE", "schema" : "gitclub", "table" : "repository", "payload" : {"repository_id":1,"name":"New Tooling","organization_id":1,"last_edited_by":1}}
Creating NOTIFY Triggers in Postgres
We start by defining a function notify_trigger()
, that invokes pg_notify(...)
to raise a notification on a
given channel. We are going to send some information, such as a Timestamp, the current row version, and the
current values of the inserted, updated or deleted row.
CREATE OR REPLACE FUNCTION notify_trigger()
RETURNS TRIGGER AS $trigger$
DECLARE
rec RECORD;
payload TEXT;
channel_name TEXT;
payload_items json;
BEGIN
-- Get the Operation
CASE TG_OP
WHEN 'INSERT','UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
-- Get the Channel Name
IF TG_ARGV[0] IS NULL THEN
RAISE EXCEPTION 'A Channel Name is required as first argument';
END IF;
channel_name := TG_ARGV[0];
-- Get the payload
payload_items := row_to_json(rec);
-- Build the payload
payload := json_build_object(
'timestamp', CURRENT_TIMESTAMP
, 'row_version', rec.xmin
, 'operation', TG_OP
, 'schema', TG_TABLE_SCHEMA
, 'table', TG_TABLE_NAME
, 'payload', payload_items);
-- Notify the channel
PERFORM pg_notify(channel_name, payload);
RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;
We then create a Trigger on a table, for example on gitclub.organization
.
CREATE OR REPLACE TRIGGER organization_notify_trigger
AFTER INSERT OR UPDATE OR DELETE ON gitclub.organization
FOR EACH ROW EXECUTE PROCEDURE notify_trigger('core_db_event');
This Trigger invokes our notify_trigger(...)
method, whenever a row has been inserted, updated or deleted.
Oh, and that's it already for the Postgres-side!
Listening for Notifications in .NET
In our .NET application we start by defining the Notification received from Postgres as a PostgresNotification
.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace GitClub.Infrastructure.Postgres
{
/// <summary>
/// A Notification received from Postgres NOTIFY / LISTEN.
/// </summary>
public record PostgresNotification
{
/// <summary>
/// Gets or sets the PID.
/// </summary>
public required int PID { get; set; }
/// <summary>
/// Gets or sets the Channel.
/// </summary>
public required string Channel { get; set; }
/// <summary>
/// Gets or sets the Payload.
/// </summary>
public required string Payload { get; set; }
}
}
And we want to execute some action, when we receive a notification. It sounds like we should handle a PostgresNotification
with something like an IPostgresNotificationHandler
.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace GitClub.Infrastructure.Postgres
{
/// <summary>
/// Handles Notifications received from a Postgres Channel.
/// </summary>
public interface IPostgresNotificationHandler
{
/// <summary>
/// Handles a Notification received from a Postgres Channel.
/// </summary>
/// <param name="notification">Notification received from Postgres</param>
/// <param name="cancellationToken">CancellationToken to stop asynchronous processing</param>
/// <returns>Awaitable ValueTask</returns>
ValueTask HandleNotificationAsync(PostgresNotification notification, CancellationToken cancellationToken);
}
}
The default implementation is a LoggingPostgresNotificationHandler
, which just logs all notifications received from Postgres.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace GitClub.Infrastructure.Postgres
{
/// <summary>
/// Logs all Notifications received from a Postgres Channel.
/// </summary>
public class LoggingPostgresNotificationHandler : IPostgresNotificationHandler
{
private readonly ILogger<LoggingPostgresNotificationHandler> _logger;
public LoggingPostgresNotificationHandler(ILogger<LoggingPostgresNotificationHandler> logger)
{
_logger = logger;
}
public ValueTask HandleNotificationAsync(PostgresNotification notification, CancellationToken cancellationToken)
{
_logger.LogDebug("PostgresNotification (PID = {PID}, Channel = {Channel}, Payload = {Payload}",
notification.PID, notification.Channel, notification.Payload);
return ValueTask.CompletedTask;
}
}
}
We can now write the PostgresNotificationService
, which is a BackgroundService
. The idea is pretty simple: We have two Tasks
. One
Task
is for receiving notifications from Postgres and putting them on a Channel<PostgresNotification>
. The other Task
is for processing
the messages.
We are limiting the number of unprocessed notifications to 10,000
messages.
You can easily configure it to suit your needs.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using GitClub.Infrastructure.Postgres;
using Microsoft.Extensions.Options;
using Npgsql;
using System.Threading.Channels;
namespace GitClub.Hosted
{
/// <summary>
/// Options to configure the <see cref="PostgresNotificationService"/>.
/// </summary>
public class PostgresNotificationServiceOptions
{
/// <summary>
/// Gets or sets the Channel the Service is listening to.
/// </summary>
public required string ChannelName { get; set; }
/// <summary>
/// Gets or sets the Maximum Capacity of unhandled Notifications. Default is 10,000 notifications.
/// </summary>
public int MaxCapacity { get; set; } = 10_000;
}
/// <summary>
/// This Service waits for Notifications received on a given Postgres Channel name.
/// </summary>
public class PostgresNotificationService : BackgroundService
{
private readonly ILogger<PostgresNotificationService> _logger;
private readonly PostgresNotificationServiceOptions _options;
private readonly IPostgresNotificationHandler _postgresNotificationHandler;
private readonly NpgsqlDataSource _npgsqlDataSource;
public PostgresNotificationService(ILogger<PostgresNotificationService> logger, IOptions<PostgresNotificationServiceOptions> options, NpgsqlDataSource npgsqlDataSource, IPostgresNotificationHandler postgresNotificationHandler)
{
_logger = logger;
_options = options.Value;
_npgsqlDataSource = npgsqlDataSource;
_postgresNotificationHandler = postgresNotificationHandler;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Now this could be a horrible thing to do when we experience backpressure...
var channel = Channel.CreateBounded<PostgresNotification>(new BoundedChannelOptions(_options.MaxCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});
// We are running both loops until either of them is stopped or runs dry ...
await Task
.WhenAny(SetupPostgresAsync(stoppingToken), ProcessChannelAsync(stoppingToken))
.ConfigureAwait(false);
// Initializes the Postgres Listener by issueing a LISTEN Command.
async Task SetupPostgresAsync(CancellationToken cancellationToken)
{
// Open a new Connection, which can be used to issue the LISTEN Command
// to the Postgres Database.
using var connection = await _npgsqlDataSource
.OpenConnectionAsync(cancellationToken)
.ConfigureAwait(false);
// If we receive a message from Postgres, we convert the Event
// to a PostgresNotification and put it on the Channel.
connection.Notification += (sender, x) =>
{
var notification = new PostgresNotification
{
Channel = x.Channel,
PID = x.PID,
Payload = x.Payload,
};
channel.Writer.TryWrite(notification);
};
// We register to the Notifications on the Channel.
using (var command = new NpgsqlCommand($"LISTEN {_options.ChannelName}", connection))
{
await command
.ExecuteNonQueryAsync(cancellationToken)
.ConfigureAwait(false);
}
// And now we are putting the Connection into the Wait State,
// until the Cancellation is requested.
while (!cancellationToken.IsCancellationRequested)
{
await connection
.WaitAsync(cancellationToken)
.ConfigureAwait(false);
}
}
// This Processes the Messages received by the Channel, so we can process
// the messages. All we are doing is basically ivoking the handler given
// to us.
async Task ProcessChannelAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await foreach (var message in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
await _postgresNotificationHandler.HandleNotificationAsync(message, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception e)
{
_logger.LogError(e, "An Error Occured processing the Event");
}
}
}
}
}
}
In the Program.cs
we can register the NpgsqlDataSource
and the PostgresNotificationService
as ...
// Database
builder.Services.AddSingleton<NpgsqlDataSource>((sp) =>
{
var connectionString = builder.Configuration.GetConnectionString("ApplicationDatabase");
if (connectionString == null)
{
throw new InvalidOperationException("No ConnectionString named 'ApplicationDatabase' was found");
}
// Since version 7.0, NpgsqlDataSource is the recommended way to use Npgsql. When using NpsgqlDataSource,
// NodaTime currently has to be configured twice - once at the EF level, and once at the underlying ADO.NET
// level (there are plans to improve this):
var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);
return dataSourceBuilder.Build();
});
// Notification Handler
builder.Services.AddSingleton<IPostgresNotificationHandler, LoggingPostgresNotificationHandler>();
// Add the Background Service processing the Notifications
builder.Services.Configure<PostgresNotificationServiceOptions>(o => o.ChannelName = "core_db_event");
builder.Services.AddHostedService<PostgresNotificationService>();
And that's it.
Enjoy your incoming notifications!
Conclusion
I think this is a simple way to be notified about events in a Postgres database. The solution we came up with is quite generic and it can be adapted to a lot of scenarios. The code is short and it's going to be easy to extend and maintain.
There are obviously some things missing here and I have no answer for, like ...
What's going to happen, if we cannot process incoming notifications and experience backpressure? Should we wait or drop notifications? Throw an Exception? What happens, if we lose the Connection to the Postgres Server, thus being unsubscribed from Notifications? Should we throw an exception? Log it and reconnect? Does Npgsql automatically reconnect?
I would be interested to hear your thoughts on it!