0
votes

I've trying to write data from Google cloud pubsub to neo4j using the Go bolt client, but the write transaction in the pubsub receiver panics. In the following example I've removed the code to unmarshal the pubsub message, and hardcoded test values in the neo4j write transaction.

package main

import (
    "context"
    "fmt"
    "github.com/neo4j/neo4j-go-driver/neo4j"
    "log"
    "cloud.google.com/go/pubsub"
)

var (
    driver   neo4j.Driver
    session  neo4j.Session
    result   neo4j.Result
)

func main() {

    ctx := context.Background()
    cctx, _ := context.WithCancel(ctx)

    client, err := pubsub.NewClient(cctx, "projectid")
    if err != nil {
        log.Fatal(err)
    }

    var driver neo4j.Driver
    driver, err = neo4j.NewDriver("bolt://localhost:7687", neo4j.BasicAuth("neo4j", "neo4j", ""), func(c *neo4j.Config) {
        c.Encrypted = false
    })
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    session, err = driver.Session(neo4j.AccessModeWrite)
    if err != nil {
        log.Fatal(err)
    }
    defer session.Close()


    node, err := session.WriteTransaction(func(transaction neo4j.Transaction) (interface{}, error) {
        result, err = transaction.Run(
            "CREATE (t:Test) SET t.prop = $prop RETURN id(t)",
            map[string]interface{}{"prop": "Test Value 1"})
        if err != nil {
            return nil, err
        }

        if result.Next() {
            return result.Record().GetByIndex(0), nil
        }

        return nil, result.Err()
    })
    if err != nil {
        log.Fatal(err)
    }


    sub := client.Subscription("data")
    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {

        node, err := session.WriteTransaction(func(transaction neo4j.Transaction) (interface{}, error) {
            result, err = transaction.Run(
                "CREATE (t:Test) SET t.prop = $prop RETURN id(t)",
                map[string]interface{}{"prop": "Test Value 2"})
            if err != nil {
                return nil, err
            }

            if result.Next() {
                return result.Record().GetByIndex(0), nil
            }

            return nil, result.Err()
        })
        if err != nil {
            log.Fatal(err)
        }
        fmt.Println(node.(int64))

        msg.Ack()
    }
    if err != nil {
        log.Println(err)
    }
}


Here is the panic in the neo4j write transaction inside the pubsub subscription. The first write transaction runs fine without a panic.


panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x5eed97]

goroutine 125 [running]:
log.(*Logger).Output(0x0, 0x2, 0xc0005e4000, 0x66, 0x0, 0x0)
        /usr/local/go/src/log/log.go:153 +0x47
log.(*Logger).Printf(0x0, 0xa2677e, 0x20, 0xc0005e2070, 0x1, 0x1)
        /usr/local/go/src/log/log.go:179 +0x7e
github.com/neo4j/neo4j-go-driver/neo4j.(*internalLogger).Errorf(0xc0001230b0, 0xa2677e, 0x20, 0xc0005e2070, 0x1, 0x1)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/logging_impl.go:83 +0x61
github.com/neo4j/neo4j-go-driver/neo4j.(*statementRunner).id(0xc00037c380, 0x0, 0x0)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/runner.go:75 +0x106
github.com/neo4j/neo4j-go-driver/neo4j.(*neoSession).id(...)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/session_impl.go:119
github.com/neo4j/neo4j-go-driver/neo4j.runTransaction.func1(0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/session_impl.go:236 +0x118
github.com/neo4j/neo4j-go-driver/neo4j.(*retryLogic).retry(0xc0007aae28, 0xc0007aae58, 0x9, 0x9, 0x7f405cdaf630, 0xc0003da6c0)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/retry.go:66 +0x123
github.com/neo4j/neo4j-go-driver/neo4j.runTransaction(0xc0000b4cd0, 0x0, 0xc0005e2000, 0x0, 0x0, 0x0, 0x10, 0x97f600, 0x430a01, 0xc0005e2000)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/session_impl.go:233 +0x11f
github.com/neo4j/neo4j-go-driver/neo4j.(*neoSession).WriteTransaction(0xc0000b4cd0, 0xc0005e2000, 0x0, 0x0, 0x0, 0xc0004de060, 0xc0002fe580, 0x0, 0xc0003da760)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/session_impl.go:146 +0x63
main.main.func3(0xada300, 0xc00007d5c0, 0xc00038e750)
        /home/username/go/src/project/neo4jimporter/main.go:64 +0x9b
cloud.google.com/go/pubsub.(*Subscription).receive.func3(0xc00002b330, 0xc0001c3070, 0xada300, 0xc00007d5c0, 0xc00038e750)
        /home/username/go/src/cloud.google.com/go/pubsub/subscription.go:729 +0x6a
created by cloud.google.com/go/pubsub.(*Subscription).receive
        /home/username/go/src/cloud.google.com/go/pubsub/subscription.go:727 +0x1fb
exit status 2

What am I doing wrong in the pubsub subscription?

Thanks!

1

1 Answers

0
votes

Just needed to open the session within the pubsub receiver as sessions aren't thread safe.

err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {

    session, err := driver.Session(neo4j.AccessModeWrite)
    if err != nil {
        log.Fatal(err)
    }
    defer session.Close()