I am getting the following error log several times while running dataflow pipeline:
Refusing to split <at position ShufflePosition(base64:AAAAAtxW0XoAAQ) of shuffle range [ShufflePosition(base64:AAAAAgD_AP8A_wD_AAE), ShufflePosition(base64:AAAAAtxW0XsAAQ))> at ShufflePosition(base64:AAAAAtxW0XsAAQ): proposed split position out of range
The job ultimately fails. JobId: 2018-04-02_00_19_15-14115706867296503746
I am using the java implementation of Apache Beam 2.2.0. It uses reshuffle with void key as a intermediate step for avoiding fusion. It seems to have failed at that step.
The last error log is:
Workflow failed. Causes: S83:Reshuffle2/GroupByKey/Read+Reshuffle2/GroupByKey/GroupByWindow+Reshuffle2/ExpandIterable+Drop key2/Values/Map+WriteTrackerTableToBQ/PrepareWrite/ParDo(Anonymous)+WriteTrackerTableToBQ/BatchLoads/rewindowIntoGlobal/Window.Assign+WriteTrackerTableToBQ/BatchLoads/WriteBundlesToFiles+WriteTrackerTableToBQ/BatchLoads/ReifyResults/View.AsIterable/View.CreatePCollectionView/ParDo(ToIsmRecordForGlobalWindow)+WriteTrackerTableToBQ/BatchLoads/GroupByDestination/Reify+WriteTrackerTableToBQ/BatchLoads/GroupByDestination/Write failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service.
The dataflow pipeline at a high level is :
// Read from datastore
PCollection<Entity> entities =
pipeline.apply("ReadFromDatastore",
DatastoreIO.v1().read().withProjectId(options.getProject())
.withQuery(query).withNamespace(options.getNamespace()));
// Apply processing to convert it to BigQuery TableRow
PCollection<TableRow> tableRow =
entities.apply("ConvertToTableRow", ParDo.of(new ProcessEntityFn()));
// Apply timestamp to TableRow element, and then apply windowing of one day on that
PCollection<TableRow> tableRowWindowTemp =
tableRow.apply("tableAddTimestamp", ParDo.of(new ApplyTimestampFn())).apply(
"tableApplyWindow",
Window.<TableRow> into(CalendarWindows.days(1).withTimeZone(
DateTimeZone.forID(options.getTimeZone()))));
// Apply reshuffle with void key to avoid fusion
PCollection<TableRow> tableRowWindow =
tableRowWindowTemp.apply("Add void key", WithKeys.<Void, TableRow> of((Void) null))
.apply("Reshuffle", Reshuffle.<Void, TableRow> of())
.apply("Drop key", Values.<TableRow> create());
// Write windowed output to BigQuery partitions
tableRowWindow.apply(
"WriteTableToBQ",
BigQueryIO
.writeTableRows()
.withSchema(BigqueryHelper.getSchema())
.to(TableRefPartition.perDay(options.getProject(),
options.getBigQueryDataset(), options.getTableName()))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));