KSQL is a SQL streaming engine for Apache Kafka

Shubham Dangare
2 min readNov 27, 2019

It provides an easy-to-use, yet powerful interactive SQL interface for stream processing on Kafka, without the need to write code in a programming language such as Java or Python. KSQL is scalable, elastic, fault-tolerant, and it supports a wide range of streaming operations, including data filtering, transformations, aggregations, joins, windowing, and sessionization.

But what is streaming ??

In-stream processing, data are continuously processed as new data become available for analyzing. These data are processed sequentially as an unbounded stream and may be pulled in by a “listening” analytics system as record in key-value pair .

Few key features of ksql processing

  1. Per record stream processing with millisecond latency
  2. Data filtering
  3. Data transformation and conversions
  4. Data enrichment with join
  5. Data manipulation with scalar functions
  6. Data analysis with stateful processing, aggregation and windowing operation

Client application can use the Kafka Streams API for Stream processing on Kafka topic data and underneath Kafka Stream API are Kafka producer and consumer

KSQL queries do stream processing which is an abstraction of Kafka stream API which can consume stream data that are structured eg Avro, JSON, DELIMITED

Now let’s take a look at how we can query in ksql

  1. Start you confluent
  2. Open KSQL CLI with help of <confluent-home>/bin/ksql
  3. Create a STREAM pageviews_original from the Kafka topic pageviews, specifying the value_format of DELIMITED. Describe the new STREAM. Notice that KSQL created additional columns called ROWTIME, which corresponds to the Kafka message timestamp, and ROWKEY, which corresponds to the Kafka message key

ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic=’pageviews’,value_format=’DELIMITED’);

  1. Create a TABLE users_original from the Kafka topic users, specifying the value_format of JSON. Describe the new TABLE.

ksql> CREATE TABLE users_original (registertime bigint, gender varchar,regionid varchar, userid varchar) WITH (kafka_topic=’users’,value_format=’JSON’);

  1. Show stream and table using

SHOW STREAMS;

SHOW TABLES;

KStream vs KTable

A Stream is a sequence of structured data, once an event was introduced into a

stream it is immutable, meaning that it can’t be updated or deleted

A Table, on the other hand, represents the current situation based on the events

coming from a stream and they are mutable.

  1. Create a persistent query by using the CREATE STREAM keywords to precede the SELECT statement

ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = ‘FEMALE’;

  1. Write KSQL to output topic

CREATE STREAM pageviews_female_like_89 WITH (kafka_topic=’pageviews_enriched_r8_r9', value_format=’DELIMITED’) AS SELECT * FROM pageviews_female WHERE regionid LIKE ‘%_8’ OR regionid LIKE ‘%_9’

--

--