重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本篇内容主要讲解“flink streaming sql怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink streaming sql怎么使用”吧!
我们提供的服务有:网站设计、成都网站设计、微信公众号开发、网站优化、网站认证、阿尔山ssl等。为成百上千家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的阿尔山网站制作公司
SQL,Structured Query Language:结构化查询语言,作为一个通用、流行的查询语言,不仅仅是在传统的数据库,在大数据领域也变得越来越流行,hive、spark、kafka、flink等大数据组件都支持sql的查询,使用sql可以让一些不懂这些组件原理的人,轻松的来操作,大大的降低了使用的门槛,今天我们先来简单的讲讲在flink的流处理中如何使用sql.
在flink的流处理中,要使用sql,需要首先构造一个StreamTableEnvironment对象,方法比较简单。
sql中用到的catalog、table、function等都需要注册到StreamTableEnvironment才能使用。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
接下来要将相应的表的信息注册到StreamTableEnvironment对象中,有以下几种方式可以选择.
以下的代码是基于flink 1.10.0版本进行讲解的,各个版本略有不同。
//使用flink的二元组,这个时候需要自定义字段名称
Tuple2 tuple2 = Tuple2.of("jack", 10);
//构造一个Tuple的DataStream
DataStream> tupleStream = env.fromElements(tuple2);
// 注册到StreamTableEnvironment,并且指定对应的字段名
tableEnv.createTemporaryView("usersTuple", tupleStream, "name,age");
//执行一个sql 查询. 然后返回一个table对象
Table table = tableEnv.sqlQuery("select name,age from usersTuple");
// 将table对象转成flink的DataStream,以便后续操作,我们这里将其输出
tableEnv.toAppendStream(table, Row.class).print();
flink中提供的元组Tuple是有限制的,最多到Tuple25,所以如果我们有更多的字段,可以选择使用flink中的Row对象.
//使用Row
Row row = new Row(2);
row.setField(0, "zhangsan");
row.setField(1, 20);
DataStream rowDataStream = env.fromElements(row);
tableEnv.createTemporaryView("usersRow", rowDataStream, "name,age");
Table tableRow = tableEnv.sqlQuery("select name,age from usersRow");
tableEnv.toAppendStream(tableRow, Row.class).print();
首先定一个pojo类
public static class User{
private String name;
private int age;
public String getName(){
return name;
}
public void setName(String name){
this.name = name;
}
public int getAge(){
return age;
}
public void setAge(int age){
this.age = age;
}
}
定义这个pojo类是要符合flink的序列化规则,是有一定要求的,具体的可以参考【1】:
User user = new User();
user.setName("Tom");
user.setAge(20);
DataStream userDataStream = env.fromElements(user);
tableEnv.createTemporaryView("usersPojo", userDataStream);
Table tablePojo = tableEnv.sqlQuery("select name,age from usersPojo");
tableEnv.toAppendStream(tablePojo, Row.class).print();
如果使用的是java pojo类型的DataStream,就不用声明字段名称了,flink会自动解析pojo类中的字段名称和类型来作为table的字段和类型。
//连接外部系统,比如文件,kafka等
Schema schema = new Schema()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT());
tableEnv.connect(new FileSystem().path("...."))
.withFormat(new Csv())
.withSchema(schema)
.createTemporaryTable("usersFile");
Table tableFile = tableEnv.sqlQuery("select name,age from usersFile");
tableEnv.toAppendStream(tableFile, Row.class).print();
使用外部存储的时候需要指定以下对象:
到此,相信大家对“flink streaming sql怎么使用”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!