I have a function that reads data from a source
and send them to destination
. Source and destination could be anything, lets say for this example source is database (any MySQL
, PostgreSQL
...) and destination is distributed Q
(any... ActiveMQ
, Kafka
). Messages are stored in bytes.
This is main function. idea is it will spin a new go routine and will wait for messages to be returned for future processing.
type Message []byte
func (p *ProcessorService) Continue(dictId int) {
level.Info(p.logger).Log("process", "message", "dictId", dictId)
retrieved := make(chan Message)
go func() {
err := p.src.Read(retrieved, strconv.Itoa(p.dictId))
if err != nil {
level.Error(p.logger).Log("process", "read", "message", "err", err)
}
}()
for r := range retrieved {
go func(message Message) {
level.Info(p.logger).Log("message", message)
if len(message) > 0 {
if err := p.dst.sendToQ(message); err != nil {
level.Error(p.logger).Log("failed", "during", "persist", "err", err)
}
} else {
level.Error(p.logger).Log("failed")
}
}(r)
}
}
and this is read function itself
func (s *Storage) Read(out chan<- Message, opt ...string) error {
// I just skip some basic database read operations here
// but idea is simple, read data from the table / file row by row and
//
for _, value := range dataFromDB {
message, err := value.row
if err == nil {
out <- message
} else {
errorf("Unable to get data %v", err)
out <- make([]byte, 0)
}
}
})
close(out)
if err != nil {
return err
}
return nil
}
As you can see communication done via out chan<- Message channel. My concern in Continue function, specifically here
for r := range retrieved {
go func(message Message) {
// basically here message and r are pointing to the same underlying array
}
}
When data received var r
is a type of slice byte. Then it passed to go func(message Message)
everything passed by value in go, in this case var r
will be passed as copy to anonymous func, however it will still have a pointer to underlying slice data. I am curious if it could be a problem during p.dst.sendToQ(message);
execution and at the same time read function will send something to out channel
causing slice data structure to be overridden with a new information. Should I copy byte slice r
into the new byte slice before passing to anonymous function, so underlying arrays will be different? I tested it, but couldn't really cause this behavior. Not sure if I am paranoid or have to worry about it.