forked from confluentinc/examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstatements.sql
37 lines (29 loc) · 2.98 KB
/
statements.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
--The STREAM and TABLE names are prefixed with `ksql_` to enable you to run this demo
--concurrently with the Kafka Streams Music Demo java application, to avoid conflicting names
--The play-events Kafka topic is a feed of song plays, generated by KafkaMusicExampleDriver
CREATE STREAM ksql_playevents WITH (KAFKA_TOPIC='play-events', VALUE_FORMAT='AVRO');
--Filter the play events to only accept events where the duration is >= 30 seconds
CREATE STREAM ksql_playevents_min_duration AS SELECT * FROM ksql_playevents WHERE DURATION > 30000;
--The song-feed Kafka topic contains all of the songs available in the streaming service, generated by KafkaMusicExampleDriver
CREATE TABLE ksql_song (ROWKEY BIGINT KEY) WITH (KAFKA_TOPIC='song-feed', VALUE_FORMAT='AVRO', KEY='ID');
--Join the plays with song as we will use it later for charting
--Also create a fixed key `KEYCOL` for global view across multiple partitions (https://github.com/confluentinc/ksql/issues/1053)
CREATE STREAM ksql_songplays AS SELECT plays.SONG_ID AS ID, ALBUM, ARTIST, NAME, GENRE, DURATION, 1 AS KEYCOL FROM ksql_playevents_min_duration plays LEFT JOIN ksql_song songs ON plays.SONG_ID = songs.ID;
--Track song play counts in 30 second intervals
CREATE TABLE ksql_songplaycounts30 AS SELECT ID, NAME, GENRE, KEYCOL, COUNT(*) AS COUNT FROM ksql_songplays WINDOW TUMBLING (size 30 second) GROUP BY ID, NAME, GENRE, KEYCOL;
--Convert TABLE to STREAM
CREATE STREAM ksql_songplaycounts30stream (ID BIGINT, NAME VARCHAR, GENRE VARCHAR, KEYCOL INT, COUNT BIGINT) WITH (kafka_topic='KSQL_SONGPLAYCOUNTS30', value_format='AVRO');
--Get all data into a STREAM with a single partition, using the `KEYCOL` field described earlier
CREATE STREAM ksql_songplaycounts30streampart AS SELECT * FROM ksql_songplaycounts30stream WHERE ROWTIME is not null PARTITION BY KEYCOL;
--Track song play counts for all time
CREATE TABLE ksql_songplaycounts AS SELECT ID, NAME, GENRE, KEYCOL, COUNT(*) AS COUNT FROM ksql_songplays GROUP BY ID, NAME, GENRE, KEYCOL;
--Convert TABLE to STREAM
CREATE STREAM ksql_songplaycountsstream (ID BIGINT, NAME VARCHAR, GENRE VARCHAR, KEYCOL INT, COUNT BIGINT) WITH (kafka_topic='KSQL_SONGPLAYCOUNTS', value_format='AVRO');
--Get all data into a STREAM with a single partition, using the `KEYCOL` field described earlier
CREATE STREAM ksql_songplaycountsstreampart AS SELECT * FROM ksql_songplaycountsstream WHERE ROWTIME is not null PARTITION BY KEYCOL;
--Top Five song counts for all time based on ksql_songplaycountsstreampart
--At this time, `TOPK` does not support sorting by one column and selecting the value of another column (https://github.com/confluentinc/ksql/issues/403)
--So the results are just counts but not names of the songs associated with the counts
CREATE TABLE ksql_top5 AS SELECT KEYCOL, TOPK(COUNT,5) FROM ksql_songplaycountsstreampart GROUP BY KEYCOL;
--Top Five songs for each genre based on each WINDOW of ksql_songplaycounts
CREATE TABLE ksql_top5bygenre AS SELECT GENRE, TOPK(COUNT,5) FROM ksql_songplaycountsstreampart GROUP BY GENRE;