6
votes

I'm working on .net core web application. I want to listen my PostgreSQL database. And if there are any changes on table, I have to got it.

So according to my research, I have to use SignalR Core. I did some example application with SignalR like chat app but none of them listen database. I couldn't find any example for this.

-Does It have to be trigger on PostgreSQL database?

-Does It have to be listener on code side?

-How can I use SignalR Core?

Please show me a way.

Thanks a lot.

2
Check out .net core running services, basically running service will be the one that keep checking for changes on DB (make some table with one column changeOccured, and keep checking that table every few sec is easiest way), if changes occurs then you will use SignalR to notify clientsVeljko89

2 Answers

11
votes

This example is work asp.net core 3.0+. Full code is below.

Step 1. Create a trigger on PostgreSql for listening actions

 create trigger any_after_alarm_speed after
 insert
 or
 delete
 or
 update
 on
 public.alarm_speed for each row execute procedure alarm_speedf();

Step 2. Create Procedur on Postgresql

CREATE OR REPLACE FUNCTION public.alarm_speedf()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF TG_OP = 'INSERT' then
PERFORM pg_notify('notifyalarmspeed', format('INSERT %s %s', NEW.alarm_speed_id, 
NEW.alarm_speed_date));
ELSIF TG_OP = 'UPDATE' then
PERFORM pg_notify('notifyalarmspeed', format('UPDATE %s %s', OLD.alarm_speed_id, 
OLD.alarm_speed_date));
ELSIF TG_OP = 'DELETE' then
PERFORM pg_notify('notifyalarmspeed', format('DELETE %s %s', OLD.alarm_speed_id, 
OLD.alarm_speed_date));
END IF;
RETURN NULL;
END;
$function$;

Step 3. Create Hub

  public class speedalarmhub : Hub
    {

        private IMemoryCache _cache;
       `private IHubContext<speedalarmhub> _hubContext;
         public speedalarmhub(IMemoryCache cache, IHubContext<speedalarmhub> hubContext)
        {
            _cache = cache;
            _hubContext = hubContext; 
        }

        public async Task SendMessage()
        {
            if (!_cache.TryGetValue("SpeedAlarm", out string response))
            {
                SpeedListener speedlist = new SpeedListener(_hubContext,_cache);
                speedlist.ListenForAlarmNotifications();
                string jsonspeedalarm = speedlist.GetAlarmList();
                _cache.Set("SpeedAlarm", jsonspeedalarm);
                await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString());
            }
            else
            {
                await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString());
            }
        }

    }

Step 4. Create Listener Controller

 public class SpeedListener :Controller
{
    private IHubContext<speedalarmhub> _hubContext;
    private IMemoryCache _cache;
    public SpeedListener(IHubContext<speedalarmhub> hubContext,IMemoryCache cache)
    {
        _hubContext = hubContext;
        _cache = cache; 
    }
    static string GetConnectionString()
    {
        var csb = new NpgsqlConnectionStringBuilder
        {
            Host = "yourip",
            Database = "yourdatabase",
            Username = "yourusername",
            Password = "yourpassword",
            Port = 5432,
            KeepAlive = 30
        };
        return csb.ConnectionString;
    }
    public void ListenForAlarmNotifications()
    {
        NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString());
        conn.StateChange += conn_StateChange;
        conn.Open();
        var listenCommand = conn.CreateCommand();
        listenCommand.CommandText = $"listen notifyalarmspeed;";
        listenCommand.ExecuteNonQuery();
        conn.Notification += PostgresNotificationReceived;
        _hubContext.Clients.All.SendAsync(this.GetAlarmList());
        while (true)
        {
            conn.Wait();
        }
    }
    private void PostgresNotificationReceived(object sender, NpgsqlNotificationEventArgs e)
    {

        string actionName = e.Payload.ToString();
        string actionType = "";
        if (actionName.Contains("DELETE"))
        {
            actionType = "Delete";
        }
        if (actionName.Contains("UPDATE"))
        {
            actionType = "Update";
        }
        if (actionName.Contains("INSERT"))
        {
            actionType = "Insert";
        }
        _hubContext.Clients.All.SendAsync("ReceiveMessage", this.GetAlarmList());
    }
    public string GetAlarmList()
    {
        var AlarmList = new List<AlarmSpeedViewModel>();
        using (NpgsqlCommand sqlCmd = new NpgsqlCommand())
        {
            sqlCmd.CommandType = CommandType.StoredProcedure;
            sqlCmd.CommandText = "sp_alarm_speed_process_get";
            NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString());
            conn.Open();
            sqlCmd.Connection = conn;
            using (NpgsqlDataReader reader = sqlCmd.ExecuteReader())
            {
                while (reader.Read())
                {
                    AlarmSpeedViewModel model = new AlarmSpeedViewModel();
                    model.alarm_speed_id = reader.GetInt32(0);
                  // you must fill  your model items
                    AlarmList.Add(model);
                }
                reader.Close();
                conn.Close();
            }



        }
        _cache.Set("SpeedAlarm", SerializeObjectToJson(AlarmList));
        return _cache.Get("SpeedAlarm").ToString();
    }
    public String SerializeObjectToJson(Object alarmspeed)
    {
        try
        {
            var jss = new JavaScriptSerializer();
            return  jss.Serialize(alarmspeed);
        }
        catch (Exception) { return null; }
    }
    private void conn_StateChange(object sender, System.Data.StateChangeEventArgs e)
    {

        _hubContext.Clients.All.SendAsync("Current State: " + e.CurrentState.ToString() + " Original State: " + e.OriginalState.ToString(), "connection state changed");
    }
}

Step 5 Calling Hub

<script src="~/lib/signalr.js"></script>

<script type="text/javascript">
// Start the connection.
var connection = new signalR.HubConnectionBuilder()
    .withUrl('/speedalarmhub')
    .build();


connection.on('ReceiveMessage', function (message) {

            var encodedMsg = message;
            // Add the message to the page.

});
// Transport fallback functionality is now built into start.
connection.start()
    .then(function () {

        console.log('connection started');
        connection.invoke('SendMessage');
    })
    .catch(error => {
        console.error(error.message);
    });

Step 6. Add below code Configuration Services at Startup

public void ConfigureServices(IServiceCollection services)
    {
        services.AddControllersWithViews();
        services.AddSignalR();
        services.AddMemoryCache();
    }

Step 7. add below code in Configure method

app.UseEndpoints(endpoints =>
        {
            endpoints.MapControllerRoute(
                name: "default",
                pattern: "{controller=Home}/{action=Index}/{id?}");
              endpoints.MapHub<speedalarmhub>("/speedalarmhub");
        });
3
votes

I want to listen my PostgreSQL database. And if there are any changes on table, I have to got it.

You can create a trigger associated with your specified table, and use the function pg_notify(text, text) to send a notification, like below.

Function

CREATE OR REPLACE FUNCTION mytestfunc() RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' then
PERFORM pg_notify('notifytesttable', 'new record inserted');
ELSIF TG_OP = 'UPDATE' then
PERFORM pg_notify('notifytesttable', 'updated');
ELSIF TG_OP = 'DELETE' then
PERFORM pg_notify('notifytesttable', 'deleted');
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

Trigger

CREATE TRIGGER any_after_testtable
AFTER INSERT OR DELETE OR UPDATE
ON testtable
FOR EACH ROW
EXECUTE PROCEDURE mytestfunc();

In your client application code, you can listen and receive notifications from PostgreSQL.

conn.Open();
conn.Notification += Conn_Notification;

using (var cmd = new NpgsqlCommand("LISTEN notifytesttable", conn))
{
    cmd.ExecuteNonQuery();
}

In Notification event handler, you can call SignalR hub method to push notifications to SignalR clients.

private static void Conn_Notification(object sender, NpgsqlNotificationEventArgs e)
{
    var notification_payload = e.Payload;

    //code logic here

    //call hub method to push PostgreSQL notifications that you received to SignalR client users

}

Test result

enter image description here

For detailed information about PostgreSQL LISTEN and NOTIFY features, you can check following links.

https://www.postgresql.org/docs/current/sql-notify.html

https://www.npgsql.org/doc/wait.html#processing-of-notifications