0
votes

filter (which uses halt inside) terminates other branch even if it has some side-effects:

scala> val p = Process("1","2", "3")
scala> val p1 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> val p2 = p.filter(_ => false).map(_ + "p2").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1

scala> val p2 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1
1p2
2p1
2p2
3p1
3p2

Seems logical as there is no value to be returned to yip after that filter. But what about side-effects, specified with observe?

My current solution is to use flatMap to specify default value:

scala> val p1 = p.map(_ + "p1").flatMap(x => Process.emit(x).observe(io.stdOutLines))

scala> val p2 = p.map(_ + "p2").flatMap(x => Process.emit(""))

scala> (p1 yip p2).run.run
1p1
2p1
3p1

But maybe there is a way to use filter?

P.S. merge combinator executes side-effects for other branch (as it doesn't require value to be returned), but it doesn't wait for other branch if one halts (even if it has side-effects).

2
Not sure if I understand the issue? You would like to run p1 even when p2 terminates ? - Pavel Chlupacek
exactly, as I still need p1 for side-effects. filter in any branch terminates both branches by default. - dk14

2 Answers

0
votes

To run the effects even after p2 terminates there needs to be clear default behaviour. So there are probably these solutions:

  1. define p2 to supply default value after being terminated
  2. use either wye to get left and rights if we don't really need tuples

perhaps the (1) is closer to question and code will looks like:

val p = Process("1","2", "3")
val p1 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
val p2 = p.filter(_ => false).map(_ + "p2")
         .observe(io.stdOutLines).map(Some(_)) ++ emit(None).repeat
// alternativelly
// val p2 = p.map { v =>  if (pred(v)) right(v) else left(v) }
//          .observeO(o.stdOutLines).flatMap { _.toOption }
//          ++ emit(None).repeat             

(p1 yip p2).run.run
0
votes

Actually it should be just something like that:

in.map(emit).flatMap{ p =>
  val p1 = p.map(_ + "p1").filter(_ => true).observe(out)
  val p2 = p.map(_ + "p2").filter(_ => false).observe(out)
  p1 merge p2
}.run.run

It makes all side effects being in order as filter can't get more than one value (produced by emit)