1
votes

let's say I've got a channel out (chan). I need to take values that are put into the channel and add them. The number of values is undetermined (thus cannot use traditional loop with an end case with (<! out)) and comes from an external IO. I'm using a fixed timeout with alts! but that doesn't seem like the best way to approach the problem. So far, I've got the following (which I got from https://gist.github.com/schaueho/5726a96641693dce3e47)

(go-loop
      [[v ch] (alts! [out (timeout 1000)])
       acc 0]
      (if-not v
        (do (close! out)
            (deliver p acc))
        (do
          (>! task-ch (as/progress-tick))
          (recur (alts! [out (timeout 1000)]) (+ acc v)))))

The problem I've got is that a timeout of 1000 is sometimes not enough and causes the go-loop to exit prematurely (as it may take more than 1000ms for the IO operation to complete and put the val in the out channel). I do not think increasing the timeout value is such a good idea as it may cause me to wait longer than necessary.

What is the best way to guarantee all reads from the out channel and exit out correctly from the loop?

Update:

Why am I using timeout? Because the number of values being put in the channel is not fixed; which means, I cannot create an exit case. W/o the exit case, the go-loop will park indefinely waiting ((<! out)) for values to be put in the channel out. If you have a solve without the timeout, that'd be really awesome.

How do i know I've read the last value? I dont. That's the problem. That's why I'm using timeout and alts!! to exit the go-loop.

What do you want to do w/ the result? Simple addition for now. However, that's not the important bit.

Update Final:

I figured out a way to get the number of values I'd be dealing with. So I modified my logic to make use of that. I'm still going to use the timeout and alts! to prevent any locking.

(go-loop
     [[v _] (alts! [out (timeout 1000)])
      i 0
      acc 0]
      (if (and v (not= n i))
        (do
          (>! task-ch (as/progress-tick))
          (recur (alts! [out (timeout 1000)]) (inc i) (+ acc v)))
        (do (close! out)
            (deliver p* (if (= n i) acc nil)))))
2
a. Why do you even need the timeout? b. How do you know if you've read the last value? c. What do you want to do with the result? - orestis
alts!! takes in a vector of channels and reads the value from the first successful channel operation. If the out channel is receiving values before the timeout , I can continue w/ the loop. If I get the timeout channel, it means I can exit out of the loop. I haven't figured out another way to exit out of the loop and hence my question. - Lordking
We are missing an important piece of info here. If you don’t know how many values are coming in, how do you know when you are done? - orestis
that's exactly what i'm asking. I don't know how many values are coming in. That is why i cannot use an exit case to determine when i'm done and have resorted to using timeout with alts! to exit the reader loop. If it were n values, I could just use (dotimes [v (<! out) i 1] ...) and use i to determine that i've reached the end. - Lordking
Will throw my hat into the ring and say that you're approaching this fundamentally incorrectly. You cannot sum an infinite stream. Any time one needs to use a timeout, it's 95% of the time indicative of an error in architecture. Time-based exits are notoriously unreliable and usually a band-aid on a poor design choice. - Josh

2 Answers

2
votes

I think your problem is a bit higher-up in your design, not a core-async specific one:

On one hand, you have an undetermined amount of values coming in a channel — there could be 0, there could be 10, there could be 1,000,000.

On the other hand, you want to read all of them, do some calculation and then return. This is impossible to do — unless there is some other signal that you can use to say "I think I'm done now".

If that signal is the timing of the values, then your approach of using alts! is the correct one, albeit I believe the code can be simplified a bit.

Update: Do you have access to the "upstream" IO? Can you put a sentinel value (e.g. something like ::closed) to the channel when the IO operation finishes?

2
votes

The 'best' way is to wait for either a special batch ending message from out or for out to be closed by the sender to mark the end of the inputs.

Either way, the solution rests with the sender communicating something about the inputs.