联 系 我 们
售前咨询
售后咨询
微信关注:星环科技服务号
更多联系方式 >
5.4 WINDOW 窗口
更新时间:6/20/2023, 8:07:30 AM

Slipstream里的窗口(STREAMWINDOW)跟SQL标准的窗口不同,在Slipstream中 STREAMWINDOW 主要作为时间分割的单位。用户的流应用一般会对一定的时间区间做多表关联、聚合或者统计。

Slipstream中窗口切分的方式分为两种:

  • 系统时间(System Time)切分:以流处理引擎处理的时间为基准切分窗口。

  • 事件时间(Event Time)切分:将数据中的某指定个字段作为时间字段切分窗口,详情请参考事件时间

SLIDE WINDOW 滑动窗口

滑动窗口需要由两个量来定义:窗口长度(LENGTH)和滑动间隔(SLIDE)。滑动窗口是指按照一定的 SLIDE 向未来滑动的长度为 LENGTH 的窗口。相邻两个窗口之间可能会有重叠的部分。例如:如果窗口长度为2s,滑动间隔为1s,那么第一个窗口为[0s, 2s),第二个窗口为[1s, 3s),第三个窗口为[2s, 4s),以此类推。

例 33. 系统时间切分滑动窗口
CREATE STREAM s1(id INT, name STRING, ts TIMESTAMP) TBLPROPERTIES("kafka.broker.list"="tw-node127:9092","kafka.zookeeper"="tw-node127:2181");
CREATE STREAM window_stream AS SELECT * FROM s1
  STREAMWINDOW w1 AS (LENGTH '2' SECOND SLIDE '1' SECOND);
复制

以系统时间切分,窗口大小2s (LENGTH '2' SECOND),滑动间隔为1s(SLIDE '1' SECOND)。

TUMBLING WINDOW 跳动窗口

当窗口间隔和滑动间隔相同,滑动窗口就退化为跳动窗口。换句话说,跳动窗口就是滑动窗口 LENGTH = SLIDE 的特例。所以跳动窗口只需要一个时间长度(INTERVAL)即可定义,它既是窗口长度也是滑动间隔。例如:INTERVAL 为2s跳动窗口第一个区间为[0s, 2s),第二个区间为[2s, 4s),第三个区间为[4s, 6s),以此类推。

例 34. 系统时间切分跳动窗口
CREATE STREAM window_stream AS SELECT * FROM s1 STREAMWINDOW w1 AS (INTERVAL '2' SECOND);
复制

创建一个流之后,以2s为间隔(INTERVAL '2' SECOND)的跳动时间窗口对它进行查询,窗口切分的依据是系统时间。

对Window Stream的聚合操作

Slipstream支持对window stream的聚合操作,我们建议先建window stream再做聚合

聚合操作
CREATE STREAM s1(id INT, value INT, ts TIMESTAMP) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");

CREATE STREAM ws1 AS SELECT * FROM s1 STREAMWINDOW w1 AS (LENGTH '2' SECOND SLIDE '1' SECOND); --先建window stream

INSERT INTO t1 SELECT id, SUM(value) FROM ws1 GROUP BY id; -- 然后进行聚合操作
复制

事件驱动模式下, 只在 window stream 下支持聚合操作

事件时间窗口切分

您可以通过在 Input stream 的 tblproperties 中指定时间字段(timefield)来实现事件时间切分,使用示例如下:

--输入流:指定 Kafka 源的一些信息
drop stream if exists basic_stream;
create STREAM basic_stream(ftime string,city string, url string,amt int)
row format delimited fields terminated by ','
tblproperties(
"topic"="test",
"kafka.zookeeper"="localhost:2181",
"kafka.broker.list"="localhost:9092",
"timefield"="ftime", --指定事件时间字段
"timeformat"="yyyy-MM-dd HH:mm:ss");

--derive stream:定义计算逻辑,但没有真正开始计算
drop stream if exists window_stream;
create stream window_stream as select city,url,window_time() as window_time,sum(amt) from basic_stream streamwindow w1 as (interval '1' minute)  group by city,url;


--输出表
drop table output_table;
create table output_table(city string, url string,window_time TIMESTAMP,uv_sum int);

--流任务
DROP STREAMJOB streamjob_grouby_job;
create streamjob streamjob_grouby_job as ("insert into output_table  select  * from window_stream")
jobproperties(
"morphling.result.auto.flush"="true", --测试用,生产不建议打开,影响性能
"morphling.job.enable.checkpoint"="true", --打开Checkpoint,保证任务高可用,打开后会保存状态;如果要清除状态,请重建任务,
"streamsql.use.eventtime"="true" --使用事件时间
);

--启动流任务
start streamjob streamjob_grouby_job;

--停止流任务:stop streamjob streamjob_grouby_job;

--往kafka test 发送数据
2016-02-18 15:18:38,shanghai,u14,8
2016-02-18 15:18:39,shanghai,u8,10
2016-02-18 15:18:39,hangzhou,u16,10
2016-02-18 15:18:40,shanghai,u6,10
2016-02-18 15:18:41,shanghai,u8,100
2016-02-18 15:18:42,hangzhou,u17,10

--此时查询输出表是没有结果的,因为还不满1分钟
select * from output_table order by city,url;

--再发送一条19分的数据
2016-02-18 15:19:44,shanghai,u28,10

--此时查询输出表有前一分钟的数据了
select * from output_table order by city,url;
+-----------+------+--------------------------+---------+
|   city    | url  |       window_time        | uv_sum  |
+-----------+------+--------------------------+---------+
| hangzhou  | u16  | 2016-02-18 15:18:59.999  | 10      |
| hangzhou  | u17  | 2016-02-18 15:18:59.999  | 10      |
| shanghai  | u14  | 2016-02-18 15:18:59.999  | 8       |
| shanghai  | u6   | 2016-02-18 15:18:59.999  | 10      |
| shanghai  | u8   | 2016-02-18 15:18:59.999  | 110     |
+-----------+------+--------------------------+---------+

--发送一条过期数据
2016-02-18 15:17:45,shanghai,u1,10

--再发送一条下一个窗口的数据
2016-02-18 15:20:37,shanghai,u2,10

--再查询已经有u28的数据,但是没有 u1 的结果,因为这是一条过期数据
select * from output_table order by city,url;
+-----------+------+--------------------------+---------+
|   city    | url  |       window_time        | uv_sum  |
+-----------+------+--------------------------+---------+
| hangzhou  | u16  | 2016-02-18 15:18:59.999  | 10      |
| hangzhou  | u17  | 2016-02-18 15:18:59.999  | 10      |
| shanghai  | u14  | 2016-02-18 15:18:59.999  | 8       |
| shanghai  | u28  | 2016-02-18 15:19:59.999  | 10      |
| shanghai  | u6   | 2016-02-18 15:18:59.999  | 10      |
| shanghai  | u8   | 2016-02-18 15:18:59.999  | 110     |
+-----------+------+--------------------------+---------+
6 rows selected (0.346 seconds)
复制