Using the Go SDK for Apache Beam, I'm trying to create a view of a PCollection using a side input.
But I'm getting this weird error:
Failed to execute job: on ctx= making side input 0:
singleton side input Singleton for int ill-defined
exit status 1
Here the code I'm using:
// A PCollection of key/value pairs
pairedWithOne := beam.ParDo(s, func(r models.Review) (string, int) {
return r.DoRecommend, 1
}, col)
// A PCollection of ints (demo)
pcollInts := beam.CreateList(s, [3]int{
1, 2, 3,
})
// A PCollection of key/values pairs
summed := stats.SumPerKey(s, pairedWithOne)
// Here is where I'd like to use my side input.
mapped := beam.ParDo(s, func(k string, v int, side int, emit func(ratio
models.RecommendRatio)) {
var ratio = models.RecommendRatio{
DoRecommend: k,
NumVotes: v,
}
emit(ratio)
}, summed, beam.SideInput{Input: pcollInts})
I found this example on git:
// Side Inputs
//
// While a ParDo processes elements from a single "main input" PCollection, it
// can take additional "side input" PCollections. These SideInput along with
// the DoFn parameter form express styles of accessing PCollection computed by
// earlier pipeline operations, passed in to the ParDo transform using SideInput
// options, and their contents accessible to each of the DoFn operations. For
// example:
//
// words := ...
// cufoff := ... // Singleton PCollection<int>
// smallWords := beam.ParDo(s, func (word string, cutoff int, emit func(string)) {
// if len(word) < cutoff {
// emit(word)
// }
// }, words, beam.SideInput{Input: cutoff})
update: It seems like the Impulse(scope)
function has a role here but I cannot figure what. From GoDoc :
Impulse emits a single empty []byte into the global window. The resulting PCollection is a singleton of type []byte.
The purpose of Impulse is to trigger another transform, such as ones that take all information as side inputs.
If this can help, here my structs:
type Review struct {
Date time.Time `csv:"date" json:"date"`
DoRecommend string `csv:"doRecommend" json:"doRecommend"`
NumHelpful int `csv:"numHelpful" json:"numHelpful"`
Rating int `csv:"rating" json:"rating"`
Text string `csv:"text" json:"text"`
Title string `csv:"title" json:"title"`
Username string `csv:"username" json:"username"`
}
type RecommendRatio struct {
DoRecommend string `json:"doRecommend"`
NumVotes int `json:"numVotes"`
}
Any solution for this?
thanks