0
votes

We have an API as a proxy between clients and google Pub/Sub, so it basically retrieves a JSON body and publishes it to the topic. Then, it is processed by DataFlow, which stores it in BigQuery. Also, we use transform UDF to, for instance, convert a field value to upper case; it parses JSON sent and produces a new one.

The problem is the following. The number of bytes sent to the destination table is much less than to the deadletter, and the error message is 99% percent contains the error saying that the sent JSON is invalid. And that's true, the payloadstring column contains distorted JSONs: they could be truncated, concatenated with other ones, or even both. I've added logs on the API side to see where did the message set corrupted, but neither received or sent by the API JSON bodies are invalid.

How can I debug this problem? Is it any chance of pub/sub or dataflow to corrupt messages? If so, what can I do to fix it?

UPD. By the way, we use a Google-provided template called "pubsub topic to bigquery"

UPD2. API is written in Go, and the way we send the message is simply by calling
res := p.topic.Publish(ctx, &pubsub.Message{Data: msg})

The res variable is then used for error logging. p here is a custom struct.

The message we sent is a JSON with 15 fields, and just to be concise I'll mock it and UDF. Message:

 {"MessageName":"Name","MessageTimestamp":123123123",...}

UDF:

function transform(inJson) {
  var obj;
  try {
    obj = JSON.parse(inJson);
  } catch (error){
    throw 'parse JSON error: '+error;
  }

  if (Object.keys(obj).length !== 15){
    throw "Message is invalid";
  }

  if (!(obj.hasOwnProperty('EventSource') && typeof obj.EventSource === 'string' && obj.MessageName.length>0)) {
    throw "MessageName is absent or invalid";
  }
  /*
     other fields check
  */

  obj.MessageName = obj.MessageName.toUpperCase()
  /*
     other fields transform
  */ 


  return JSON.stringify(obj);
}

UPD3:

Besides being corrupted, I've noticed that every single message is duplicated at least once, and the duplicates are often truncated. The problem occurred several days ago when it was a massive increase in the number of messages, but now it got back to normal, and the error is still there. The problem was seeing before, but it was a much more rare case.

2
It seems that you're using this template, is that correct?. It would be very useful if you could include the code snippet from your API where you're sending the messages to Pub/Sub, a sample message and your UDF.Noe Romero
@NoeRomero thank you for your reply. I've added the snippets of asked forartur dinanmitt

2 Answers

0
votes

The behavior you describe suggests that the data is corrupt before it gets to Pubsub or Dataflow.

0
votes

I have performed a test, sending JSON messages containing 15 fields. Your UDF function as well as the Dataflow template work fine since I was able to insert the data to BigQuery.

Based on that, it seems your messages are already corrupted before getting to Pub/Sub, I suggest you to check your messages once they arrived to Pub/Sub and see if they have the correct format.

enter image description here enter image description here Please notice that it's required for the messages schema match with the BigQuery table schema.