drop stream if exists basic_stream;
create STREAM basic_stream(ftime string,city string, url string,amt int)
row format delimited fields terminated by ','
tblproperties(
"topic"="test",
"kafka.zookeeper"="localhost:2181",
"kafka.broker.list"="localhost:9092",
"timefield"="ftime",
"timeformat"="yyyy-MM-dd HH:mm:ss");
drop stream if exists window_stream;
create stream window_stream as select city,url,window_time() as window_time,sum(amt) from basic_stream streamwindow w1 as (interval '1' minute) group by city,url;
drop table output_table;
create table output_table(city string, url string,window_time TIMESTAMP,uv_sum int);
DROP STREAMJOB streamjob_grouby_job;
create streamjob streamjob_grouby_job as ("insert into output_table select * from window_stream")
jobproperties(
"morphling.result.auto.flush"="true",
"morphling.job.enable.checkpoint"="true",
"streamsql.use.eventtime"="true"
);
start streamjob streamjob_grouby_job;
2016-02-18 15:18:38,shanghai,u14,8
2016-02-18 15:18:39,shanghai,u8,10
2016-02-18 15:18:39,hangzhou,u16,10
2016-02-18 15:18:40,shanghai,u6,10
2016-02-18 15:18:41,shanghai,u8,100
2016-02-18 15:18:42,hangzhou,u17,10
select * from output_table order by city,url;
2016-02-18 15:19:44,shanghai,u28,10
select * from output_table order by city,url;
+
| city | url | window_time | uv_sum |
+
| hangzhou | u16 | 2016-02-18 15:18:59.999 | 10 |
| hangzhou | u17 | 2016-02-18 15:18:59.999 | 10 |
| shanghai | u14 | 2016-02-18 15:18:59.999 | 8 |
| shanghai | u6 | 2016-02-18 15:18:59.999 | 10 |
| shanghai | u8 | 2016-02-18 15:18:59.999 | 110 |
+
2016-02-18 15:17:45,shanghai,u1,10
2016-02-18 15:20:37,shanghai,u2,10
select * from output_table order by city,url;
+
| city | url | window_time | uv_sum |
+
| hangzhou | u16 | 2016-02-18 15:18:59.999 | 10 |
| hangzhou | u17 | 2016-02-18 15:18:59.999 | 10 |
| shanghai | u14 | 2016-02-18 15:18:59.999 | 8 |
| shanghai | u28 | 2016-02-18 15:19:59.999 | 10 |
| shanghai | u6 | 2016-02-18 15:18:59.999 | 10 |
| shanghai | u8 | 2016-02-18 15:18:59.999 | 110 |
+
6 rows selected (0.346 seconds)
复制