3
votes

A ZMQ subscriber socket keeps only the last message in queue when the CONFLATE option is set to true. (zmq_docs) However, it does not seem to be working for me. Typically in python i would do something like the following and it would work:

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.CONFLATE, 1)
subscriber.connect("tcp://localhost:5555")

The subscriber in pub/sub pattern below simply ignores the CONFLATE option set to 1. You can observe by the minute and second being displayed on the subscriber that the program is tied up as it is processing old messages (fib 42). If fib is set to a trivial value, you can see that the subscriber is indeed receiving messages from the publisher.

Here is the publisher:

open System
open fszmq
open fszmq.Context
open fszmq.Socket


let funcPublish () =
  use context   = new Context()
  use publisher = pub context
  "tcp://*:5563" |> bind publisher

  while true do
    let tm = System.DateTime.Now
    let t = String.Concat([tm.Minute.ToString(); " "; tm.Second.ToString()])
    t |> s_send publisher
    sleep 1

  EXIT_SUCCESS


[<EntryPoint>]
let main argv = 
    funcPublish ()
    0

And here is the subscriber:

open fszmq
open fszmq.Context
open fszmq.Socket

let rec fib n =
    match n with
    | 1 | 2 -> 1
    | n -> fib(n-1) + fib(n-2)

let funcSubscribe () = 
  use context    = new Context()
  use subscriber = sub context
  "tcp://localhost:5563" |> connect subscriber
  Socket.setOption subscriber (ZMQ.CONFLATE, 1)
  [ ""B ] |> subscribe subscriber


  while true do
    let contents = s_recv subscriber
    fib 42
    contents |> printfn "%A"

  EXIT_SUCCESS


[<EntryPoint>]
let main argv = 
    funcSubscribe ()
    0

Thanks.

1
I don't know ZMQ, but I noticed one difference between your Python code and your F# code. In Python, you set the CONFLATE option before you connect to the socket, but in F#, you set the CONFLATE option after you connect. What happens if you move the Socket.setOption line to before your connect subscriber line in your F# code? Does that solve your problem? - rmunn
That is exactly what the problem was. Post your comment as an answer and I'll hit the checkmark. - professor bigglesworth
Okay, answer posted. - rmunn

1 Answers

8
votes

I noticed one difference between your Python code and your F# code. In Python, you set the CONFLATE option before you connect to the socket, but in F#, you set the CONFLATE option after you connect.

If you move the Socket.setOption line to before your connect subscriber line in your F# code, I suspect that should solve your problem.

In other words, instead of this:

"tcp://localhost:5563" |> connect subscriber
Socket.setOption subscriber (ZMQ.CONFLATE, 1)

you should probably have this:

Socket.setOption subscriber (ZMQ.CONFLATE, 1)
"tcp://localhost:5563" |> connect subscriber