Guest post originally published on Medium by Hubert Dulay

Gartner States:

“By the end of 2024, 75% of organizations will shift from piloting to operationalizing artificial intelligence (AI), driving a 5 times increase in streaming data and analytics infrastructures.”

https://www.gartner.com/en/newsroom/press-releases/2020-06-22-gartner-identifies-top-10-data-and-analytics-technolo

There are several reasons why data streaming is becoming more popular:

In this post, I’ll be showing you how to build a streaming stack that supports real-time analytics quickly. We’ll be using these technologies for simple setup:

Choosing The Technologies

Event data preparation can significantly impact the performance of analytical queries. By optimizing the data layout, indexing, and partitioning, the efficiency of data retrieval and processing can be improved. This includes techniques such as data de-normalization, columnar storage, and indexing strategies tailored for the analytical workload. Well-prepared data can reduce the processing time and enable faster insights. Transformation tasks tend to be resource-consuming and process intensive. It’s best to complete transformation as part of a pre-processing step in a data pipeline before writing to a data store that serves it to consumers.

RisingWave has all the capabilities to preprocess the streaming data to optimize it for analytical workloads in an OLAP data store.

Apache Pinot provides columnar storage and indexing that makes analytical queries extremely fast atop petabytes of data. It has the ability to source from real-time streaming platforms like Kafka as well as ingest data from an object store for historical context.

If you are familiar with the Lambda Architecture, you’ll know it employs batch and streaming pipelines and merges them together to combine real-time with historical data. Merging historical and real-time data has always been a difficult task. Apache Pinot automatically does this for you. It’s the easiest way to enable a Lambda Architecture.

With RisingWave and Pinot, users can harness the power of both stream processing and OLAP technologies, enabling them to achieve a comprehensive solution. RisingWave excels in efficiently processing pre-computation logic in real-time. Whenever a new tuple is received, RisingWave triggers incremental computation, ensuring that computation results are instantly updated. On the other hand, Pinot excels at efficiently serving analytical workloads. By ingesting the results from RisingWave into Pinot, users can seamlessly issue ad-hoc analytical queries, empowering them to perform interactive analytics with ease.

Getting Started

I’ll be using the macOS-based brew package manager. For Windows package installer, you can use Chocolatey. We will be building the solution diagram below.

Diagram flow showing flow using Rising Wave, Pinot and Apache Superset

Install the Kubernetes tools

These are the tools you’ll need to begin using k8s locally. We will be using these tools often throughout the post.

brew install kubectl
brew install kind
brew install helm
brew install derailed/k9s/k9s

Before starting, you may need to free up some space in docker by removing unused local volumes. You can do so by running the command below.

docker volume prune

Add charts to your local Helm repository

A Helm local repository is a directory on your local machine that stores Helm charts. In k8s, a Helm chart is a package that contains all the necessary resources to deploy an application to a k8s cluster, including YAML configuration files for deployments, services, secrets, and config maps that define the desired state of your application.

We will be sourcing helm charts for most of the services we’ll deploy into k8s. Below we are adding repositories for Apache Kafka, Apache Pinot, and Apache SuperSet.

helm repo add kafka https://charts.bitnami.com/bitnami
helm repo add pinot https://raw.githubusercontent.com/apache/pinot/master/kubernetes/helm
helm repo add superset https://apache.github.io/superset

Starting a Local k8s Cluster

We will first need to start a k8s cluster using Kind. Run the commands below.

kind create cluster
code example

Use k9s to check the status of the k8s cluster to ensure the kind cluster is running.

code example

On the top right of the k9s UI, you’ll see a reference to some commands. We’ll use these commands to gain access to pods and view logs throughout this post.

Installing RisingWave

We will be installing RisingWave using a Kubernetes operator instead of Helm. A Kubernetes operator is a software component that automates the management of Kubernetes resources. It is a custom controller that extends the Kubernetes API to create, configure, and manage instances of complex applications on behalf of a Kubernetes user.

Helm and operators are both tools that can be used to manage Kubernetes applications. However, they have different strengths and weaknesses.

Helm is a package manager for Kubernetes. It allows you to package, deploy, and manage applications using a simple, declarative YAML file called a Helm chart. Helm charts can be used to deploy a wide variety of applications, including databases, stateful applications, and microservices.

Operators are custom controllers that extend the Kubernetes API to create, configure, and manage instances of complex applications on behalf of a Kubernetes user. Operators can also be used to manage a wide variety of applications, including databases, stateful applications, and microservices. They can also be used to manage complex applications requiring a lot of manual intervention, such as Kubernetes itself.

First, open another terminal to invoke Kubernetes commands then install the cert-manager using the command below. cert-manager adds certificates and certificate issuers as resource types in Kubernetes clusters, and simplifies the process of obtaining, renewing, and using those certificates.

kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml

Check k9s for the cert-manager to ensure that it’s running.

code example

Next, install the RisingWave operator using the command below.

kubectl apply --server-side -f https://github.com/risingwavelabs/risingwave-operator/releases/latest/download/risingwave-operator.yaml

Check k9s for the RisingWave operator.

code example

Next, run the following command to deploy a RisingWave instance with MinIO as the object storage. MinIO is an open-source object storage server. It is a distributed object storage system that is compatible with Amazon S3.

kubectl apply -f https://raw.githubusercontent.com/risingwavelabs/risingwave-operator/main/docs/manifests/risingwave/risingwave-etcd-minio.yaml

You should see a set of pods related to RisingWave deployed in the k9s.

code example

Connecting To RisingWave

RisingWave makes it easy to manage streams and data. All you need to interact with RisingWave is Postgres-compatible SQL. No Java or Scala codes are needed. Therefore, you’ll need to have a Postgres client to interface with RisingWave. Run the command below to create a psql client.

kubectl apply -f https://raw.githubusercontent.com/risingwavelabs/risingwave-operator/main/docs/manifests/psql/psql-console.yaml

You should see the psql-console available pod in k9s.

code example

Use the arrows to highlight the psql-console then hit s to log into the psql container in the pod. This will get you access to the command line from which you can run the psql client. Run the command below to get access to RisingWave.

psql -h risingwave-etcd-minio-frontend -p 4567 -d dev -U root

You can now use Postgres commands to interact with RisingWave.

codee example

Installing and Publishing Data To Kafka

Open another terminal and run k9s in that instance so that you can make the RisingWave console available in its current terminal. Since RisingWave is a streaming database, we can source data from streaming platforms like Kafka. Install Kafka using the command below.

helm install kafka bitnami/kafka

Now install a Kafka client to interface with Kafka.

kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.4.0-debian-11-r33 --namespace default --command -- sleep infinity

Going back to k9s, you should see two pods related to Kafka.

code example

Just like the psql pod, you can log into the kafka-client pod to run Kafka commands. Let’s create a topic called http then produce some JSON data from httpbin.org.

kafka-topics.sh --bootstrap-server kafka:9092 --create --topic http
curl http://httpbin.org/stream/10 | kafka-console-producer.sh --bootstrap-server kafka:9092 --topic http

Querying From Kafka Using RisingWave

Now that we have a topic in Kafka filled with data, we can now create a table in RisingWave that consumes from that topic. Copy the SQL below to create a SOURCE object in RisingWave.

CREATE SOURCE http_events (
id integer,
url varchar,
origin varchar,
headers JSONB
)
WITH (
connector='kafka',
topic='http',
properties.bootstrap.server='kafka:9092',
scan.startup.mode='earliest'
)
ROW FORMAT JSON;

Notice the connection information to Kafka and the row format. Run the select statement below. You should get a similar output. Notice also the header JSON column (you may need to pan right).

select * from http_events limit 5;

id | url | origin | headers

----+------------------------------+---------------+----------------------------------------------------------------------------------------------------------------
----------------------
0 | http://httpbin.org/stream/10 | 69.206.123.45 | {"Accept": "*/*", "Host": "httpbin.org", "User-Agent": "curl/7.74.0", "X-Amzn-Trace-Id": "Root=1-64838cc4-69ffd
00912b674f3360e6b05"}
1 | http://httpbin.org/stream/10 | 69.206.123.45 | {"Accept": "*/*", "Host": "httpbin.org", "User-Agent": "curl/7.74.0", "X-Amzn-Trace-Id": "Root=1-64838cc4-69ffd
00912b674f3360e6b05"}
2 | http://httpbin.org/stream/10 | 69.206.123.45 | {"Accept": "*/*", "Host": "httpbin.org", "User-Agent": "curl/7.74.0", "X-Amzn-Trace-Id": "Root=1-64838cc4-69ffd
00912b674f3360e6b05"}
3 | http://httpbin.org/stream/10 | 69.206.123.45 | {"Accept": "*/*", "Host": "httpbin.org", "User-Agent": "curl/7.74.0", "X-Amzn-Trace-Id": "Root=1-64838cc4-69ffd
00912b674f3360e6b05"}
4 | http://httpbin.org/stream/10 | 69.206.123.45 | {"Accept": "*/*", "Host": "httpbin.org", "User-Agent": "curl/7.74.0", "X-Amzn-Trace-Id": "Root=1-64838cc4-69ffd
00912b674f3360e6b05"}
(5 rows)

Create a Join

Let’s now create a table that has user information. We will join this information with http_event data to enrich it.

CREATE TABLE USERS (
id integer primary key,
name varchar,
ip varchar
);

INSERT INTO USERS values (1, 'foo', '69.206.123.45');

--select * from http_events a join USERS b on a.origin=b.ip;

If you execute the select statement that is commented out, you’ll notice that the console prints out records until the topic is exhausted. This is what is unique about streaming databases; if data doesn’t stop, is unbounded and never stops.

Write Enriched Data Back to Kafka

First, let’s create another Kafka topic for the output of the enrichment we just performed in RisingWave.

kafka-topics.sh --bootstrap-server kafka:9092 --create --topic http_enrich

Now we can perform a CSAS (create sink as select …) which will perform the write back to Kafka.

CREATE SINK http_enrich AS
SELECT
b.id as id,
name,
ip,
url,
headers
FROM http_events a JOIN USERS b ON a.origin=b.ip
WITH (
connector='kafka',
topic='http_enrich',
properties.bootstrap.server='kafka:9092',
type='upsert',
primary_key='id'
);

You can test the output by consuming from the http_enrich topic you created. Run the command below in the Kafka client pod.

kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--topic http_enrich \
--from-beginning

Installing Apache Pinot to Serve Real-Time Data

Run the commands below in your macOS terminal to add the Apache Pinot charts and deploy them into k8s.

helm repo add pinot https://raw.githubusercontent.com/apache/pinot/master/kubernetes/helm
helm install pinot pinot/pinot

Next, highlight the pinot-controller-0 pod and type shift-f. This will allow you to port forward the Pinot controller externally so that you can open the Pinot console from your browser.

code example

Arrow down to select OK then open the browser to localhost:9000. You should get the same view below.

Screenshot showing Pinot dashboard

Before we create a table, we’ll need to adjust the DefaultTenant. A tenant is a logical component defined as a group of server/broker nodes with the same Helix tag. Out-of-the-box, the broker in Pinot needs to be associated with a tenant or else you will get errors when creating a real-time table.

Creating a Table in Pinot with JSON Index

First, we need to create a schema. In the Pinot console, click on “Tables” then “Add Schema.” Then copy and paste the JSON schema below.

Remember the “headers” field contains JSON data. In this schema, we’ve defined its data type as STRING. We also set the max length to 10,000 characters. When we define the table, we will apply a JSON index to this field. Pinot will parse this JSON at ingestion and place its values in an index. At query time, Pinot will not parse the JSON string to search for values. Instead, it will search the JSON index.

The JSON index is a powerful feature that allows you to balance strict and loose enforcement of your data structure. Having a JSON field provides a way to freely evolve your schema without breaking changes.

Next, we specify a timestamp field “__ts.” This field was not part of the original in RisingWave. We will automatically add a value to this field by applying an ingestion transform when we define the table.

{
"schemaName": "http_enrich",
"dimensionFieldSpecs": [
{
"name": "headers",
"dataType": "STRING",
"maxLength": 10000
},
{
"name": "id",
"dataType": "INT"
},
{
"name": "ip",
"dataType": "STRING"
},
{
"name": "url",
"dataType": "STRING"
},
{
"name": "name",
"dataType": "STRING"
}
],
"dateTimeFieldSpecs": [
{
"name": "__ts",
"dataType": "LONG",
"format": "EPOCH",
"granularity": "5:MINUTES"
}
]
}

Next, we need to create a real-time table in Pinot. Pinot has the ability to source real-time events from Kafka as well as from object stores like S3. Streaming table types are called “REALTIME” and batch table types are called “OFFLINE” tables.

Click on “Add Realtime Table”. Scroll to the bottom of the form and copy and paste the JSON below. The console will automatically update the values in the form.

{
"tableName": "http_enrich",
"tableType": "REALTIME",
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant",
"tagOverrideConfig": {}
},
"segmentsConfig": {
"schemaName": "http_enrich",
"timeColumnName": "__ts",
"replication": "1",
"replicasPerPartition": "1",
"retentionTimeUnit": null,
"retentionTimeValue": null,
"completionConfig": null,
"crypterClassName": null,
"peerSegmentDownloadScheme": null
},
"tableIndexConfig": {
"loadMode": "MMAP",
"invertedIndexColumns": [],
"createInvertedIndexDuringSegmentGeneration": false,
"rangeIndexColumns": [],
"sortedColumn": [],
"bloomFilterColumns": [],
"bloomFilterConfigs": null,
"noDictionaryColumns": [],
"onHeapDictionaryColumns": [],
"varLengthDictionaryColumns": [],
"enableDefaultStarTree": false,
"starTreeIndexConfigs": null,
"enableDynamicStarTreeCreation": false,
"segmentPartitionConfig": null,
"columnMinMaxValueGeneratorMode": null,
"aggregateMetrics": false,
"nullHandlingEnabled": false,
"jsonIndexConfigs": {
"headers": {
"maxLevels": 2,
"excludeArray": false,
"disableCrossArrayUnnest": true,
"includePaths": null,
"excludePaths": null,
"excludeFields": null
}
},
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "http_enrich",
"stream.kafka.broker.list": "kafka:9092",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "100M"
}
},
"metadata": {},
"ingestionConfig": {
"filterConfig": null,
"transformConfigs": [{
"columnName": "__ts",
"transformFunction": "now()"
}]
},
"quota": {
"storage": null,
"maxQueriesPerSecond": null
},
"task": null,
"routing": {
"segmentPrunerTypes": null,
"instanceSelectorType": null
},
"query": {
"timeoutMs": null
},
"fieldConfigList": null,
"upsertConfig": null,
"tierConfigs": null
}

Let’s go over some important sections in the table configuration. First, the JSON index. Below is the JSON snippet. It shows that the column to index is the “headers” column. It also indicates not to index below 2 levels deep.

"jsonIndexConfigs": {
"headers": {
"maxLevels": 2,
"excludeArray": false,
"disableCrossArrayUnnest": true,
"includePaths": null,
"excludePaths": null,
"excludeFields": null
}
},

The snippet below shows the Kafka connection information.

"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "http_enrich",
"stream.kafka.broker.list": "kafka:9092",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "100M"
}

And last, the snippet below shows how we transform the “__ts” timestamp column with the current time.

Kafka is a publish and subscribe streaming platform. This means data could be in a state where multiple consumers can derive from it to get it to a preferred format. RisingWave preprocessed and placed the data into Kafka. This means we often will need some additional transformations before it is pushed to Pinot.

You can define transformations in the ingestion phase like extracting records from nested objects, applying simple transform functions on certain columns, filtering out unwanted columns, as well as more advanced operations like joining between datasets.

In Pinot, we need to provide a dateTime field so that it can perform temporal analytical operations. We can add this field automatically using the table configuration snippet below. This ingestion transformation sets the __ts field to the current time in milliseconds from epoch.

"ingestionConfig": {
"filterConfig": null,
"transformConfigs": [{
"columnName": "__ts",
"transformFunction": "now()"
}]
},

Querying JSON data in Pinot

Next, click on the “Query Console” and click on “http_encrich” from the table list. This will automatically select from the table.

Screenshot showing Pinot dashboard on Query Console page

Since we used the JSON index, we can now do things like selecting records that match values in the headers JSON field. Copy and paste the SQL below to test.

select * from http_enrich
where JSON_MATCH(headers, '"$.Host"=''httpbin.org''')
limit 10

We can also extract values.

select 
json_extract_scalar(headers, '$.User-Agent', 'STRING', 'null') agent,
name
from http_enrich
where JSON_MATCH(headers, '"$.Host"=''httpbin.org''')

Installing SuperSet and Connecting to Pinot

We’ve already added the helm superset chart to our helm repo. Although we need to modify the chart to force it to add the Pinot driver so that Superset has the libraries to connect to Pinot. Create a file called supserset-values.yaml. Notice it installed the pinotdb module using pip.

bootstrapScript:
#!/bin/bash
pip install pinotdb

Next, run the helm install command below.

helm install superset superset/superset -f superset-values.yaml

Once complete, go back to k9s and port forward the superset-abcdef1234 pod. Your pod name may be different.

code example

Open your browser to localhost:8088 which will take you to the Superset console. Use admin/admin as the username and password.

Go to the top right and create a connection to a database.

Screenshot showing choose connect database > data >

Choose Apache Pinot from the dropdown.

Screenshot showing supported databases option on dropdown menu

Use the Pinot URL below and test the connection. You should get a good connection.

pinot://pinot-broker:8099/query/sql?controller=pinot-controller:9000

Instead of going through all the steps to build a dashboard in SuperSet, we’ll test using its built-in SQL feature. Start by clicking on the SQL editor at the menu bar at the top of the console. Paste any of the previous SQL statements in this post and click “Run.”

code example

That’s it! You can continue to build datasets, charts, and dashboards for a real-time view of your data.