0
votes

I am getting the following error log several times while running dataflow pipeline:

Refusing to split <at position ShufflePosition(base64:AAAAAtxW0XoAAQ) of shuffle range [ShufflePosition(base64:AAAAAgD_AP8A_wD_AAE), ShufflePosition(base64:AAAAAtxW0XsAAQ))> at ShufflePosition(base64:AAAAAtxW0XsAAQ): proposed split position out of range

The job ultimately fails. JobId: 2018-04-02_00_19_15-14115706867296503746

I am using the java implementation of Apache Beam 2.2.0. It uses reshuffle with void key as a intermediate step for avoiding fusion. It seems to have failed at that step.

The last error log is:

Workflow failed. Causes: S83:Reshuffle2/GroupByKey/Read+Reshuffle2/GroupByKey/GroupByWindow+Reshuffle2/ExpandIterable+Drop key2/Values/Map+WriteTrackerTableToBQ/PrepareWrite/ParDo(Anonymous)+WriteTrackerTableToBQ/BatchLoads/rewindowIntoGlobal/Window.Assign+WriteTrackerTableToBQ/BatchLoads/WriteBundlesToFiles+WriteTrackerTableToBQ/BatchLoads/ReifyResults/View.AsIterable/View.CreatePCollectionView/ParDo(ToIsmRecordForGlobalWindow)+WriteTrackerTableToBQ/BatchLoads/GroupByDestination/Reify+WriteTrackerTableToBQ/BatchLoads/GroupByDestination/Write failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. 

The dataflow pipeline at a high level is :

// Read from datastore
PCollection<Entity> entities =
        pipeline.apply("ReadFromDatastore",
                DatastoreIO.v1().read().withProjectId(options.getProject())
                        .withQuery(query).withNamespace(options.getNamespace()));

// Apply processing to convert it to BigQuery TableRow
PCollection<TableRow> tableRow =
        entities.apply("ConvertToTableRow", ParDo.of(new ProcessEntityFn()));

// Apply timestamp to TableRow element, and then apply windowing of one day on that
PCollection<TableRow> tableRowWindowTemp =
        tableRow.apply("tableAddTimestamp", ParDo.of(new ApplyTimestampFn())).apply(
                "tableApplyWindow",
                Window.<TableRow> into(CalendarWindows.days(1).withTimeZone(
                        DateTimeZone.forID(options.getTimeZone()))));

// Apply reshuffle with void key to avoid fusion
PCollection<TableRow> tableRowWindow =
        tableRowWindowTemp.apply("Add void key", WithKeys.<Void, TableRow> of((Void) null))
                .apply("Reshuffle", Reshuffle.<Void, TableRow> of())
                .apply("Drop key", Values.<TableRow> create());

// Write windowed output to BigQuery partitions
tableRowWindow.apply(
        "WriteTableToBQ",
        BigQueryIO
                .writeTableRows()
                .withSchema(BigqueryHelper.getSchema())
                .to(TableRefPartition.perDay(options.getProject(),
                        options.getBigQueryDataset(), options.getTableName()))
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
1
Please attach some code snippet.Minato
I have added dataflow code snippetAshley Thomas

1 Answers

0
votes

The "refusing to split" errors should not cause the job to fail. Based on the job error messages you posted, I think we already have the fix for this in production. Can you try launching your job again and see whether the same errors persist?