flink的入门

Flink的简单介绍

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

官网:https://flink.apache.org/zh/flink-architecture.html

Java版本入门案例

统计单词的个数,入门级别案例:

本地运行

使用idea创建maven项目:

pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flnk-deep-study</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.9.0</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        <!-- 资源文件拷贝插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-resources-plugin</artifactId>
            <version>2.7</version>
            <configuration>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <!-- java编译插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.7</version>
        </plugin>
    </plugins>
</build>

</project>

java主函数:

package com.liuzhuo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.ReduceOperator;

public class Bounded {

    public static void main(String[] args) throws Exception {


        //1) 获取有界流的执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2) 读取文件中的数据
        DataSource<String> input = env.readTextFile("/Users/liuzhuo/projects/flink/flnk-deep-study/src/main/resources/input");

        //3) 测试是否读取到了文件
        //input.print();

        //4)执行一系列算子操作
        ReduceOperator<WordCount> reduce = input.flatMap((FlatMapFunction<String, String>) (key, collector) -> {
            String[] words = key.split("\\s+");
            for (String word : words) {
                collector.collect(word);
            }
        }).returns(String.class).filter((word) -> {
            if (!word.isEmpty()) {
                return true;
            } else {
                return false;
            }
        }).map((word) -> {
            return new WordCount(word, 1);
        }).groupBy("word")
                .reduce(((value1, value2) -> {
                    return new WordCount(value1.word, value1.count + value2.count);
                }));

        reduce.print();
        //reduce.setParallelism(1).writeAsText("/Users/liuzhuo/projects/flink/flnk-deep-study/src/main/resources/output/result.txt");

        //执行任务
        //env.execute("word count task");

    }

    public static class WordCount {
        public String word;
        public int count;

        public WordCount() {
        }

        public WordCount(String word, int count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

运行函数:


standalone模式

官网下载:flink的安装包,我这里下载的是1.7.2的版本

PS:flink的版本主要切割点是:1.9.0的版本。1.9.0之前的版本是无界流和有界流的API的分开的,有两套api,DataSet处理有界流,DataStream处理无界流;1.9.0和之后的版本,是将阿里的bflink的一些功能合并进去了,有界流和无界流的api合成一套。


解压flink的tgz包:

tar -zxvf flink-1.7.2-bin-hadoop24-scala_2.11.tgz

bin: 一些列可执行的文件

conf: 配置文件

log:日志文件


启动flink官方的伪分布式集群:

在bin目录下执行:./start-cluster.sh

验证是否启动成功:jps


浏览器输入:

http://localhost:8081/

修改程序:

        ....
        //reduce.print();
        reduce.setParallelism(1).writeAsText("/Users/liuzhuo/projects/flink/flnk-deep-study/src/main/resources/output/result.txt");

        //执行任务
        env.execute("word count task");

编译程序,打包:

clean、package:

上传jar到flink集群中:



细节:

目前,我们还是执行的有界流,从文件从获取所有的单词数据,下篇,我们来讲解无界流的程序,奥利给~~~


  转载请注明: 解忧杂货店 flink的入门

  目录