兰州
切换分站
免费发布信息
数据中心运维需要具备哪些知识和能力?
    • 联系人:
    • 电话:**** 点击查看完整号码
      • 公司转让壳提醒您:让你提前汇款,或者价格明显低于市价,均有骗子嫌疑,不要轻易相信。
  • 信息详情

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


本文介绍了表的OrderBy、Offset 和 Fetch、insert操作,以示例形式展示每个操作的结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)

17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:

【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图

【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询

【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作

【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作

【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)

【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)

【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作

【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作

【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作

【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作

【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作

【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)

【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、表的OrderBy, Offset 和 Fetch操作

在批处理模式下,也即有界情况下,order by 可以单独使用,排序也可以是任意字段,与一般数据库的排序结果一样。 在流模式下,也即无界的情况下,order by需要和fetch一起使用,排序字段需要有时间属性,与一般数据库的排序有点差异。

需要说明的是order by 和offset&fetch都可以在批处理模式和流模式下工作。
  • Order By,和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
  • Offset & Fetch,和 SQL 的 OFFSET 和 FETCH 子句类似。Offset 操作根据偏移位置来限定(可能是已排序的)结果集。Fetch 操作将(可能已排序的)结果集限制为前 n 行。通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。

具体结果见下面示例

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.tablesql.TestTableAPIJoinOperationDemo2.Order;
import org.tablesql.TestTableAPIJoinOperationDemo2.User;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestTableAPIJoinOperationDemo3 {
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class User {
        private long id;
        private String name;
        private double balance;
        private Long rowtime;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        private long id;
        private long user_id;
        private double amount;
        private Long rowtime;
    }

    final static List<User> userList = Arrays.asList(
            new User(1L, "alan", 18, 1698742358391L), 
            new User(2L, "alan", 19, 1698742359396L), 
            new User(3L, "alan", 25, 1698742360407L),
            new User(4L, "alanchan", 28, 1698742361409L), 
            new User(5L, "alanchan", 29, 1698742362424L)
            );

    final static List<Order> orderList = Arrays.asList(
            new Order(1L, 1, 18, 1698742358391L), 
            new Order(2L, 2, 19, 1698742359396L), 
            new Order(3L, 1, 25, 1698742360407L),
            new Order(4L, 3, 28, 1698742361409L), 
            new Order(5L, 1, 29, 1698742362424L),
            new Order(6L, 4, 49, 1698742362424L)
            );

     // 创建输出表
    final static String sinkSql = "CREATE TABLE sink_table (\n" +
            "  id BIGINT,\n" +
            "  user_id BIGINT,\n" +
            "  amount DOUBLE,\n" +
            "  rowtime BIGINT\n" +
            ") WITH (\n" +
            "  'connector' = 'print'\n" +
            ")";

    /**
     * Order By
     * 和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。
     * 对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
     * Sort on a non-time-attribute field is not supported.
     * 
     * Offset & Fetch
     * 和 SQL 的 OFFSET 和 FETCH 子句类似。
     * Offset 操作根据偏移位置来限定(可能是已排序的)结果集。
     * Fetch 操作将(可能已排序的)结果集限制为前 n 行。
     * 通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。
     * 
     * @throws Exception
     */
    static void testOrderByWithUnbounded() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        env.setParallelism(1);

        DataStream<User> users = env.fromCollection(userList)
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                        .<User>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner((user, recordTimestamp) -> user.getRowtime())
                        );
        Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("balance"),$("rowtime").rowtime());
        usersTable.printSchema();

        // 从已排序的结果集中返回前3条记录
        Table result = usersTable.orderBy($("rowtime").desc()).fetch(3);

        DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
//      resultDS.print();
//      (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
//      (true,+I[2, alan, 19.0, 2023-10-31T08:52:39.396])
//      (true,+I[3, alan, 25.0, 2023-10-31T08:52:40.407])
//      (false,-D[1, alan, 18.0, 2023-10-31T08:52:38.391])
//      (true,+I[4, alanchan, 28.0, 2023-10-31T08:52:41.409])
//      (false,-D[2, alan, 19.0, 2023-10-31T08:52:39.396])
//      (true,+I[5, alanchan, 29.0, 2023-10-31T08:52:42.424])

        // 从已排序的结果集中返回跳过2条记录之后的所有记录
        Table result2 = usersTable.orderBy($("rowtime").desc()).offset(2).fetch(4);

        DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
        result2DS.print();
//      (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
//      (false,-U[1, alan, 18.0, 2023-10-31T08:52:38.391])
//      (true,+U[2, alan, 19.0, 2023-10-31T08:52:39.396])
//      (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])
//      (false,-U[2, alan, 19.0, 2023-10-31T08:52:39.396])
//      (true,+U[3, alan, 25.0, 2023-10-31T08:52:40.407])
//      (false,-U[1, alan, 18.0, 2023-10-31T08:52:38.391])
//      (true,+U[2, alan, 19.0, 2023-10-31T08:52:39.396])
//      (true,+I[1, alan, 18.0, 2023-10-31T08:52:38.391])

        env.execute();
    }

    /**
     * 和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。
     * 对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。
     * 这个和一般的查询数据库的结果比较类似
     * 
     * @throws Exception
     */
    static void testOrderByWithBounded() throws Exception {
        EnvironmentSettings env = EnvironmentSettings.newInstance().inBatchMode() .build();
        TableEnvironment tenv = TableEnvironment.create(env);

        Table ordersTable = tenv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.BIGINT()),
                        DataTypes.FIELD("user_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("rowtime", DataTypes.BIGINT())
                        ),
                Arrays.asList(
                        row(1L, 1, 18, 1698742358391L), 
                        row(2L, 2, 19, 1698742359396L), 
                        row(3L, 1, 25, 1698742360407L),
                        row(4L, 3, 28, 1698742361409L), 
                        row(5L, 1, 29, 1698742362424L),
                        row(6L, 4, 49, 1698742362424L)
                        ));

        Table left = ordersTable.select($("id"), $("user_id"),$("amount"),$("rowtime"));

        Table orderByResult = left.orderBy($("amount").desc());

        tenv.createTemporaryView("order_union_t", orderByResult);

        Table result = tenv.sqlQuery("select * from order_union_t");

        //输出表
        tenv.executeSql(sinkSql);
//              +I[6, 4, 49.0, 1698742362424]
//              +I[5, 1, 29.0, 1698742362424]
//              +I[4, 3, 28.0, 1698742361409]
//              +I[3, 1, 25.0, 1698742360407]
//              +I[2, 2, 19.0, 1698742359396]
//              +I[1, 1, 18.0, 1698742358391]

        result.executeInsert("sink_table");
    }

    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
//      testOrderByWithUnbounded();
        testOrderByWithBounded();
    }

}

三、表的insert操作

和 SQL 查询中的 INSERT INTO 子句类似,该方法执行对已注册的输出表的插入操作。 insertInto() 方法会将 INSERT INTO 转换为一个 TablePipeline。 该数据流可以用 TablePipeline.explain() 来解释,用 TablePipeline.execute() 来执行。

输出表必须已注册在 TableEnvironment中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。

该示例仅仅展示一个方法,运行环境和其他的示例一致,并且本示例仅仅展示的是insert Into,也可以使用execute Insert方法,在其他示例中有展示其使用。

static void testInsert() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        DataStream<Order> orderA = env.fromCollection(orderList);
        DataStream<Order> orderB = env.fromCollection(
                Arrays.asList(
                        new Order(10L, 1, 18, 1698742358391L), 
                        new Order(16L, 4, 49, 1698742362424L)
                        )
                );

        Table tableA = tenv.fromDataStream(orderA, $("id"), $("user_id"), $("amount"),$("rowtime"));
        Table tableB = tenv.fromDataStream(orderB, $("id"), $("user_id"), $("amount"),$("rowtime"));
        tenv.executeSql(sinkSql);

        tableA.insertInto("sink_table").execute();
        tableB.insertInto("sink_table").execute();
//              +I[1, 1, 18.0, 1698742358391]
//              +I[2, 2, 19.0, 1698742359396]
//              +I[3, 1, 25.0, 1698742360407]
//              +I[4, 3, 28.0, 1698742361409]
//              +I[5, 1, 29.0, 1698742362424]
//              +I[6, 4, 49.0, 1698742362424]
//              +I[10, 1, 18.0, 1698742358391]
//              +I[16, 4, 49.0, 1698742362424]

    }

以上,本文介绍了表的OrderBy、Offset 和 Fetch、insert操作,以示例形式展示每个操作的结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)

17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:

【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图

【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询

【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作

【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作

【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)

【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)

【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作

【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作

【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作

【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作

【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作

【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)

【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版


联系我时,请说是在公司转让壳看到的,谢谢!

  • 您可能感兴趣
查看更多
    小贴士:本页信息由用户及第三方发布,真实性、合法性由发布人负责,请仔细甄别。