Flink的UDF函数

FLink中的UDF函数

UDF函数:flink中提供了很多自带的函数给用户使用,比如:MapFunction、FLatMapFunction、FilterFunction等,今天以FilterFunction为例子来说明。

定义MapFunction的实现类

背景:一批旅客去北京旅游,现在过安检的时候,我们需要将体温不正常的旅客信息打印出来,正常的体温为:36.3~37.2

pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flnk-deep-study</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.9.0</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 资源文件拷贝插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>2.7</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.7</version>
            </plugin>
        </plugins>
    </build>

</project>

主函数:

package com.liuzhuo.udf;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MyFilterFunction {

    public static void main(String[] args) throws Exception {

        //1) 获取执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        //2) 从socket中获取数据:监听本地的端口号:9000
        DataStreamSource<String> streamSource = environment.socketTextStream("localhost", 9000);

        //3) 执行过滤的功能
        streamSource.filter(new UserFliterFunction())
                .print();

        environment.execute("自定义过滤函数!");

    }
}

运行后:出现类找不到的异常的话:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/environment/StreamExecutionEnvironment
    at com.liuzhuo.udf.MyFilterFunction.main(MyFilterFunction.java:11)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 1 more

打开idea的run,选择Edit Configuration

再次运行即可!!!


打开终端,输入:nc -lk 9000

该命令会往本地的端口号:9000发送数据。

现在,输入格式:姓名,体温,地点

说明:

当输入gakki的体温正常时,是没有打印的,输入张三的体温不属于正常体温时,将该信息打印出来了!

匿名函数来实现

修改主函数:

package com.liuzhuo.udf;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MyFilterFunction {

    public static void main(String[] args) throws Exception {

        //1) 获取执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        //2) 从socket中获取数据:监听本地的端口号:9000
        DataStreamSource<String> streamSource = environment.socketTextStream("localhost", 9000);

        //3) 执行过滤的功能
        streamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                if (value == null || "".equals(value)) {
                    return false;
                }
                //根据逗号分隔
                String[] text = value.split(",");
                String tempStr = text[1];
                float temp = Float.parseFloat(tempStr);
                if (temp >= 36.3 && temp <= 37.2) {
                    return false;
                } else {
                    //过滤出体温异常的旅客
                    return true;
                }
            }
        }).print();

        environment.execute("自定义过滤函数!");
    }
}

Lamda表达式

主函数:

package com.liuzhuo.udf;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MyFilterFunction {

    public static void main(String[] args) throws Exception {

        //1) 获取执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        //2) 从socket中获取数据:监听本地的端口号:9000
        DataStreamSource<String> streamSource = environment.socketTextStream("localhost", 9000);

        //3) 执行过滤的功能
        streamSource.filter(value ->{
                if (value == null || "".equals(value)) {
                    return false;
                }
                //根据逗号分隔
                String[] text = value.split(",");
                String tempStr = text[1];
                float temp = Float.parseFloat(tempStr);
                if (temp >= 36.3 && temp <= 37.2) {
                    return false;
                } else {
                    //过滤出体温异常的旅客
                    return true;
                }
        }).print();

        environment.execute("自定义过滤函数!");
    }
}


  转载请注明: 解忧杂货店 Flink的UDF函数

  目录