Getting started with apache-flink

Other topics

Remarks:

This section provides an overview of what apache-flink is, and why a developer might want to use it.

It should also mention any large subjects within apache-flink, and link out to the related topics. Since the Documentation for apache-flink is new, you may need to create initial versions of those related topics.

Overview and requirements

What is Flink

Like Apache Hadoop and Apache Spark, Apache Flink is a community-driven open source framework for distributed Big Data Analytics. Written in Java, Flink has APIs for Scala, Java and Python, allowing for Batch and Real-Time streaming analytics.

Requirements

  • a UNIX-like environment, such as Linux, Mac OS X or Cygwin;
  • Java 6.X or later;
  • [optional] Maven 3.0.4 or later.

Stack

enter image description here

Execution environments

Apache Flink is a data processing system and an alternative to Hadoop’s MapReduce component. It comes with its own runtime rather than building on top of MapReduce. As such, it can work completely independently of the Hadoop ecosystem.

The ExecutionEnvironment is the context in which a program is executed. There are different environments you can use, depending on your needs.

  1. JVM environment: Flink can run on a single Java Virtual Machine, allowing users to test and debug Flink programs directly from their IDE. When using this environment, all you need is the correct maven dependencies.

  2. Local environment: to be able to run a program on a running Flink instance (not from within your IDE), you need to install Flink on your machine. See local setup.

  3. Cluster environment: running Flink in a fully distributed fashion requires a standalone or a yarn cluster. See the cluster setup page or this slideshare for more information. mportant__: the 2.11 in the artifact name is the scala version, be sure to match the one you have on your system.

APIs

Flink can be used for either stream or batch processing. They offer three APIs:

  • DataStream API: stream processing, i.e. transformations (filters, time-windows, aggregations) on unbounded flows of data.
  • DataSet API: batch processing, i.e. transformations on data sets.
  • Table API: a SQL-like expression language (like dataframes in Spark) that can be embedded in both batch and streaming applications.

Building blocks

At the most basic level, Flink is made of source(s), transformations(s) and sink(s).

enter image description here

At the most basic level, a Flink program is made up of:

  • Data source: Incoming data that Flink processes
  • Transformations: The processing step, when Flink modifies incoming data
  • Data sink: Where Flink sends data after processing

Sources and sinks can be local/HDFS files, databases, message queues, etc. There are many third-party connectors already available, or you can easily create your own.

Local runtime setup

  1. ensure you have java 6 or above and that the JAVA_HOME environment variable is set.

  2. download the latest flink binary here:

    wget flink-XXXX.tar.gz
    

    If you don't plan to work with Hadoop, pick the hadoop 1 version. Also, note the scala version you download, so you can add the correct maven dependencies in your programs.

  3. start flink:

    tar xzvf flink-XXXX.tar.gz
    ./flink/bin/start-local.sh
    

    Flink is already configured to run locally. To ensure flink is running, you can inspect the logs in flink/log/ or open the flink jobManager's interface running on http://localhost:8081.

  4. stop flink:

    ./flink/bin/stop-local.sh
    

Flink Environment setup

To run a flink program from your IDE(we can use either Eclipse or Intellij IDEA(preffered)), you need two dependencies:flink-java / flink-scala and flink-clients (as of february 2016). These JARS can be added using Maven and SBT(if you are using scala).

  • Maven
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.1.4</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.1.4</version>
</dependency>
  • SBT name := " "

     version := "1.0"
     
     scalaVersion := "2.11.8"
     
     libraryDependencies ++= Seq(
       "org.apache.flink" %% "flink-scala" % "1.2.0",
       "org.apache.flink" %% "flink-clients" % "1.2.0"
     )
    

important: the 2.11 in the artifact name is the scala version, be sure to match the one you have on your system.

WordCount - Table API

This example is the same as WordCount, but uses the Table API. See WordCount for details about execution and results.

Maven

To use the Table API, add flink-table as a maven dependency:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.1.4</version>
</dependency>

The code

public class WordCountTable{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment( env );

        // get input data
        DataSource<String> source = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );

        // split the sentences into words
        FlatMapOperator<String, String> dataset = source
                .flatMap( ( String value, Collector<String> out ) -> {
                    for( String token : value.toLowerCase().split( "\\W+" ) ){
                        if( token.length() > 0 ){
                            out.collect( token );
                        }
                    }
                } )
                // with lambdas, we need to tell flink what type to expect
                .returns( String.class );

        // create a table named "words" from the dataset
        tableEnv.registerDataSet( "words", dataset, "word" );

        // word count using an sql query
        Table results = tableEnv.sql( "select word, count(*) from words group by word" );
        tableEnv.toDataSet( results, Row.class ).print();
    }
}

Note: For a version using Java < 8, replace the lambda by an anonymous class:

FlatMapOperator<String, String> dataset = source.flatMap( new FlatMapFunction<String, String>(){
        @Override
        public void flatMap( String value, Collector<String> out ) throws Exception{
            for( String token : value.toLowerCase().split( "\\W+" ) ){
                if( token.length() > 0 ){
                    out.collect( token );
                }
            }
        }
    } );

WordCount

Maven

Add the dependencies flink-java and flink-client (as explained in the JVM environment setup example).

The code

public class WordCount{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // input data
        // you can also use env.readTextFile(...) to get words
        DataSet<String> text = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles,"
        );

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap( new LineSplitter() )
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy( 0 )
                        .aggregate( Aggregations.SUM, 1 );

        // emit result
        counts.print();
    }   
}

LineSplitter.java:

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>{

    public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
        // normalize and split the line into words
        String[] tokens = value.toLowerCase().split( "\\W+" );

        // emit the pairs
        for( String token : tokens ){
            if( token.length() > 0 ){
                out.collect( new Tuple2<String, Integer>( token, 1 ) );
            }
        }
    }
}

If you use Java 8, you can replace .flatmap(new LineSplitter()) by a lambda expression:

DataSet<Tuple2<String, Integer>> counts = text
    // split up the lines in pairs (2-tuples) containing: (word,1)
    .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
        // normalize and split the line into words
        String[] tokens = value.toLowerCase().split( "\\W+" );

        // emit the pairs
        for( String token : tokens ){
            if( token.length() > 0 ){
                out.collect( new Tuple2<>( token, 1 ) );
            }
        }
    } )
    // group by the tuple field "0" and sum up tuple field "1"
    .groupBy( 0 )
    .aggregate( Aggregations.SUM, 1 );

Execution

From the IDE: simply hit run in your IDE. Flink will create an environment inside the JVM.

From the flink command line: to run the program using a standalone local environment, do the following:

  1. ensure flink is running (flink/bin/start-local.sh);

  2. create a jar file (maven package);

  3. use the flink command-line tool (in the bin folder of your flink installation) to launch the program:

    flink run -c your.package.WordCount target/your-jar.jar
    

    The -c option allows you to specify the class to run. It is not necessary if the jar is executable/defines a main class.

Result

(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)

WordCount - Streaming API

This example is the same as WordCount, but uses the Table API. See WordCount for details about execution and results.

Maven

To use the Streaming API, add flink-streaming as a maven dependency:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.1.4</version>
</dependency>

The code

public class WordCountStreaming{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // get input data
        DataStreamSource<String> source = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );
        
        source
                // split up the lines in pairs (2-tuples) containing: (word,1)
                .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
                    // emit the pairs
                    for( String token : value.toLowerCase().split( "\\W+" ) ){
                        if( token.length() > 0 ){
                            out.collect( new Tuple2<>( token, 1 ) );
                        }
                    }
                } )
                // due to type erasure, we need to specify the return type
                .returns( TupleTypeInfo.getBasicTupleTypeInfo( String.class, Integer.class ) )
                // group by the tuple field "0"
                .keyBy( 0 )
                // sum up tuple on field "1"
                .sum( 1 )
                // print the result
                .print();

        // start the job
        env.execute();
    }
}

Contributors

Topic Id: 5798

Example Ids: 20427,27895,27896,27897,27898,27899

This site is not affiliated with any of the contributors.