4
votes

I am relatively new to Apache Flink and I am trying to create a simple project that produces a file to an AWS S3 bucket. Based on the documentation it looks like I am required to have Hadoop installed in order to do this.

How do I set up my local environment to allow me to test this capability? I have installed Apache Flink as well as Hadoop locally. I have added the necessary changes to the core-site.xml configuration for Hadoop and also added my HADOOP_CONF path to my flink.yaml configuration. When I try and submit my job locally through the Flink UI I always get an error

2016-12-29 16:03:49,861 INFO  org.apache.flink.util.NetUtils                                - Unable to allocate on port 6123, due to error: Address already in use
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.lang.RuntimeException: Unable to do further retries starting the actor system
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203)
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143)
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040)
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

I am assuming that I am missing something with how my environment is set up. Is it possible to do this locally? Any help would be appreciated.

4
Check if port 6123 is in use. If its not then disable your firewall/iptables.Ani Menon

4 Answers

4
votes

While you need Hadoop libraries you do not have to have Hadoop installed to run locally and write to S3. I just happened to try this out with writing a Parquet output based on Avro schema and generated SpecificRecord to S3. I am running a version of the following code locally through SBT and Intellij Idea. Needed parts:

1) Have the following file specifying the needed Hadoop properties (Note: defining the AWS access key/secret key is not recommended. Better to run on an EC2 instance that has proper IAM role to read/write to your S3 bucket. But needed for local for testing)

<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2) Imports: import com.uebercomputing.eventrecord.EventOnlyRecord

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job

import org.apache.parquet.avro.AvroParquetOutputFormat

3) Flink code uses HadoopOutputFormat with above configuration:

    val events: DataSet[(Void, EventOnlyRecord)] = ...

    val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)

    val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
    val outputJob = Job.getInstance

    //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
    //so key is Void, value of type T - EventOnlyRecord in this case
    val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
      outputFormat,
      outputJob
    )

    val outputConfig = outputJob.getConfiguration
    outputConfig.addResource(hadoopConfig)
    val outputPath = new Path("s3://<bucket>/<dir-prefix>")
    FileOutputFormat.setOutputPath(outputJob, outputPath)
    AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)

    events.output(hadoopOutputFormat)

    env.execute

    ...

    def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
      val hadoopConfig = new HadoopConfiguration()
      hadoopConfig.addResource(new Path(hadoodConfigPath))
      hadoopConfig
    }

4) Build dependencies and versions used:

    val awsSdkVersion = "1.7.4"
    val hadoopVersion = "2.7.3"
    val flinkVersion = "1.1.4"

    val flinkDependencies = Seq(
      ("org.apache.flink" %% "flink-scala" % flinkVersion),
      ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
    )

    val providedFlinkDependencies = flinkDependencies.map(_ % "provided")

    val serializationDependencies = Seq(
      ("org.apache.avro" % "avro" % "1.7.7"),
      ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
      ("org.apache.parquet" % "parquet-avro" % "1.8.1")
    )

    val s3Dependencies = Seq(
      ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
      ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
    )

Edit for using writeAsText to S3:

1) Create a Hadoop configuration directory (will reference this as hadoop-conf-dir) with a file core-site.xml in it.

For example:

mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml

#content of core-site.xml 
<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2) Create a directory (will reference this as flink-conf-dir) with a file flink-conf.yaml in it.

For example:

mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml

//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config

3) Edit your IntelliJ Run configuration used to run your S3 Flink job - Run - Edit configurations - and add the following environment variable:

FLINK_CONF_DIR and set it to your flink-conf-dir

Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config

4) Run the code with that environment variable set:

events.writeAsText("s3://<bucket>/<prefix-dir>")

env.execute
1
votes

I had to do the following to run my flink job locally which sinks to S3:

1- Added flink-s3-fs-hadoop-1.9.1.jar to my flink/plugins/flink-s3-fs-hadoop directory

2- Modified flink/conf/flink-conf.yaml to include s3.access-key: AWS_ACCESS_KEY s3.secret-key: AWS_SECRET_KEY fs.hdfs.hadoopconf: /etc/hadoop-config

I have the core-site.xml file in hadoop-config folder but it does not include any configurations so fs.hdfs.hadoopconf might not be needed.

0
votes

In sbt I just need to add the S3 library dependencies to use it like a local file system

SBT file:

"org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion.value

Reading example:

    public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> text = env.readTextFile("s3://etl-data-ia/test/fileStreamTest.csv");
    text.print();
    env.execute("test");}
0
votes

Based on that link https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins

To use flink-s3-fs-hadoop plugin you should copy the respective JAR file from the opt directory to the plugins directory of your Flink distribution before starting Flink.

another way I know is to enable it via environment variable ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-[flink-version].jar"

for example: flink-s3-fs-hadoop-1.12.2.jar

For both ways we have to define S3 configurations in flink-conf.yaml file 

Flink will internally translate this back to fs.s3a.connection.maximum. There is no need to pass configuration parameters using Hadoop’s XML configuration files.

s3.endpoint: <end-point>
s3.path.style.access : true

As for AWS credentials , they must be provided either in environment variable or. configured in flink-conf.yaml

s3.endpoint: <end-point>
s3.path.style.access : true
s3.access-key: <key>
s3.secret-key: <value>
s3.region: <region>

Once its all set you can read from S3 as @EyalP mentioned, or write to S3(i.e with dataset)

dataset.map(new MapToJsonString())
                .writeAsText("s3://....",
                        FileSystem.WriteMode.OVERWRITE);

If you would like to test it locally (Without real AWS account) i'd suggest you to check localstack. it fully supports various AWS services (S3 included). if you go with that, then the AWS credentials are not necessary(may provided empty) and endpoint will be the localstack itself.