2
votes

I am trying to configure checkpoints for flink jobs in GCS. Everything works fine if I run a test job locally (no docker and any cluster setup) but it fails with an error if I run it using docker-compose or cluster setup and deploy fat jar with jobs in flink dashboard.

Any thoughts of it? Thanks!

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:61)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)
... 33 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)

Env configuration is like this:

StreamExecutionEnvironment env = applicationContext.getBean(StreamExecutionEnvironment.class);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setFailOnCheckpointingErrors(false);
    checkpointConfig.setCheckpointInterval(10000);
    checkpointConfig.setMinPauseBetweenCheckpoints(5000);
    checkpointConfig.setMaxConcurrentCheckpoints(1);
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
            String.format("gs://checkpoints/%s", jobClass.getSimpleName()), true);
    env.setStateBackend((StateBackend) rocksDBStateBackend);

Here is my core-site.xml file:

<configuration>
<property>
    <name>google.cloud.auth.service.account.enable</name>
    <value>true</value>
</property>
<property>
    <name>google.cloud.auth.service.account.json.keyfile</name>
    <value>${user.dir}/key.json</value>
</property>
<property>
    <name>fs.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
    <description>The FileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.AbstractFileSystem.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
    <description>The AbstractFileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.gs.application.name.suffix</name>
    <value>-kube-flink</value>
    <description>
        Appended to the user-agent header for API requests to GCS to help identify
        the traffic as coming from Dataproc.
    </description>
</property>

Dependency to gcs-connector:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>1.9.4-hadoop2</version>
</dependency>

UPDATE:

After some manipulation with dependencies I've been able to write checkpoints. My current setup is:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>hadoop2-1.9.5</version>
</dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
        <version>1.5.1</version>
</dependency>

Also I switched the flink image to version flink:1.5.2-hadoop28

Unfortunately I am still not able to read checkpointing data as my job is always failing on restoring state with an error:

java.lang.NoClassDefFoundError: com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:136)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)

I believe it's going to be the last error...

3
I have checked the documentation and I am not sure your dependency is correct. What happens if you use this dependency for gsc-connector? <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>gcs-connector</artifactId> <version>hadoop2-1.9.5</version> <scope>provided</scope> </dependency>Philipp Sh
Hello Philipp! Actually I've got some progress in the troubleshooting. I changed flink docker image from flink:1.5.0 to flink:1.5.2-hadoop28 so the exception dissapeared. Unfortunately I've got another one and your proposal didn't help. What I've got now is java.lang.ClassNotFoundException: com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.auth.oauth2.TokenResponseException Seems like classloading issue. It cannot see GCS classes from GoogleHadoopFileSystemBase...Oleksandr Serdiukov
Would you be able to provide the full pom.xml file and update the error message?Philipp Sh
Hi Philipp! I've added an update information. If you'll be able help with NoClassDefFound error it would be great. I appreciate your help.Oleksandr Serdiukov

3 Answers

7
votes

Finally I found solution here

You must create your own image and put gcs-connector into the lib directory. Otherwise you'll always get classloading issues (user code and system classloaders).

To create a custom Docker image we create the following Dockerfile:

FROM registry.platform.data-artisans.net/trial/v1.0/flink:1.4.2-dap1-scala_2.11

RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar

RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar && \     
wget http://ftp.fau.de/apache/flink/flink-1.4.2/flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \

tar xf flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \
mv flink-1.4.2/lib/flink-shaded-hadoop2* lib/  && \
rm -r flink-1.4.2*  

RUN mkdir etc-hadoop
COPY <name of key file>.json etc-hadoop/
COPY core-site.xml etc-hadoop/

ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["jobmanager"]

The Docker image will be based on the Flink image we’re providing as part of the dA Platform trial. We are adding the Google Cloud Storage connector, Flink’s Hadoop package and the key with the configuration file.

To build the custom image, the following files should be in your current directory: core-site.xml, Dockerfile and the key-file (.json).

To finally trigger the build of the custom image, we run the following command:

$ docker build -t flink-1.4.2-gs .

Once the image has been built, we will upload the image to Google’s Container Registry. To configure Docker to properly access the registry, run this command once:

$ gcloud auth configure-docker

Next, we’ll tag and upload the container:

$ docker tag flink-1.4.2-gs:latest eu.gcr.io/<your project id>/flink-1.4.2-gs
$ docker push eu.gcr.io/<your project id>/flink-1.4.2-gs

Once the upload is completed, we need to set the custom image for an Application Manager deployment. Sent the following PATCH request:

PATCH /api/v1/deployments/<your AppMgr deployment id>
 spec:
   template:
     spec:
       flinkConfiguration:
         fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
       artifact:
         flinkImageRegistry: eu.gcr.io
         flinkImageRepository: <your project id>/flink-1.4.2-gs
         flinkImageTag: latest

Alternatively, use the following curl command:

$ curl -X PATCH --header 'Content-Type: application/yaml' --header 'Accept: application/yaml' -d '  spec: \ 
    template: \ 
      spec: \ 
        flinkConfiguration:
          fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
        artifact: \ 
          flinkImageRegistry: eu.gcr.io \ 
          flinkImageRepository: <your project id>/flink-1.4.2-gs \ 
          flinkImageTag: latest' 'http://localhost:8080/api/v1/deployments/<your AppMgr deployment id>‘

With this change implemented, you’ll be able to checkpoint to Google’s Cloud Storage. Use the following pattern when specifying the directory gs:///checkpoints. For savepoints, set the state.savepoints.dir Flink configuration option.

0
votes

The problem is the implementation of the scheme gs://. This is the protocol to connec to to GCS. A java program should be able to run if you add the following dependency:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-storage</artifactId>
  <version>1.35.0</version>
</dependency>

In this link you will find how to add this dependency for any other programming lanuage.

0
votes

works for me with this docker file, the point is to take the correct dependencies versions.

I based this solution on flink k8s operator

  1. docker file
ARG FLINK_VERSION=1.13.1
ARG SCALA_VERSION=2.12
FROM flink:${FLINK_VERSION}-scala_${SCALA_VERSION}-java8

ARG FLINK_HADOOP_VERSION=2.8.3-10.0
ARG GCS_CONNECTOR_VERSION=latest-hadoop2

ARG GCS_CONNECTOR_NAME=gcs-connector-${GCS_CONNECTOR_VERSION}.jar
ARG GCS_CONNECTOR_URI=https://storage.googleapis.com/hadoop-lib/gcs/${GCS_CONNECTOR_NAME}
ARG FLINK_HADOOP_JAR_NAME=flink-shaded-hadoop-2-uber-${FLINK_HADOOP_VERSION}.jar
ARG FLINK_HADOOP_JAR_URI=https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/${FLINK_HADOOP_VERSION}/${FLINK_HADOOP_JAR_NAME}

#COPY target/lib /opt/flink/lib

RUN echo "Downloading ${GCS_CONNECTOR_URI}" && \
  wget -q -O /opt/flink/lib/${GCS_CONNECTOR_NAME} ${GCS_CONNECTOR_URI}
RUN echo "Downloading ${FLINK_HADOOP_JAR_URI}" && \
  wget -q -O /opt/flink/lib/${FLINK_HADOOP_JAR_NAME} ${FLINK_HADOOP_JAR_URI}

COPY target/play-flink-1.0-SNAPSHOT.jar /opt/flink/usrlib/play-flink-1.0-SNAPSHOT.jar
  1. create core.xml file:
<?xml version="1.0" ?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>fs.AbstractFileSystem.gs.impl</name>
        <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
        <description>The AbstractFileSystem for gs: uris.</description>
    </property>
    <property>
        <name>fs.gs.project.id</name>
        <value>projectName</value>
        <description>
            Optional. Google Cloud Project ID with access to GCS buckets.
            Required only for list buckets and create bucket operations.
        </description>
    </property>
    <property>
        <name>google.cloud.auth.service.account.enable</name>
        <value>true</value>
        <description>
            Whether to use a service account for GCS authorization.
        </description>
    </property>
</configuration>
  1. create core.xml as configmap kubectl create configmap hadoop-configmap --from-file core-site.xml

  2. create secret for service account kubectl create secret generic gcp-secret --from-file=key.json=${SERVICE_ACCOUNT_FILE}

  3. load configmap and service account secret in job.yaml

 volumeMounts:
            - name: hadoop-configmap-volume
              mountPath: /etc/hadoop/conf
            - name: google-cloud-key
              mountPath: /etc/gcp/keys
  ....
   volumes:
        - name: google-cloud-key
          secret:
            secretName: gcp-secret
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: hadoop-configmap-volume
          configMap:
              name: hadoop-configmap
              items:
                - key: core-site.xml
                  path: core-site.xml
  1. configure the volume mount path in flink conf fs.hdfs.hadoopconf: /etc/hadoop/conf