Real-time Big Data Analytics Solution – 1

To create a real-time data analytics dashboard is definitely a change, especially in big data environment. It includes several tricky steps: real-time data ingestion, real-time data cleaning/merging and real-time data visualization. We did a satisfactory solutions evaluation on a near real-time data streaming/analyzing environment by using some cutting-edge techniques. This series of articles will explain the steps on this demo. The demo site that was created on our dedicate web-servers and free for public access.

Before moving to detail, let’s introduce our data flow architecture first:

  1. The source data we are using is Spotify – the pioneer of online music streaming services. It provides API interface for developers to create mobile app or web application. Those API services can be accessed after obtaining application credential as well as access tokens. After reading the API data, our solution is sending the data into Kafka cluster as a streaming. Kafka is being used as a ‘streaming hub’: store the real-time streaming, communicate with Spark Streaming to do data process: look up historical data, merge/join with other data etc.
  2. Spotify API (Refer Spotify Developers: https://developer.spotify.com/ ) is not a real-time API source, we simulated the API access by Airflow scheduling (Another Airplane status API is near-real time API source, please see our another demo)
  3. We are using Clickhouse (CH) as OLAP layer for the storage (n-times faster than other DW on same hardware/network system). The Kafka topics creation have some tips in this solution, it is based on Kafka message format as well as the Clickhouse Kafka Engine for table. Clickhouse Kafka Engine can synchronize the Kafka streaming without using any programming, but it needs some conditions to use this feature:
    • Use Dictionary data type {key:value} in Kafka message
    • Use three Clickhouse db objects to integrate the Kafka: table with Kafka engine (normally stands for: queue table), Materialized view (acts as trigger to sync the records from queue table to target table), and common storage table. Refer the Clickhouse document regarding the Kafka table engine: https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka
  4. For example, there is a ‘Artists’ API in Spotify can get Artists information, except the Artists name, images, there are also information about artist’s Followers and Popularity, assuming we want to track the artist’s Follower and Popularity changes, we created the following Kafka and Clickhouse objects to handle the pipe
    • Create a Kafka topic, i.e. ‘topic-artists’ (Optional, as the python-kafka method can create the non-existed topic automatically)
    • Create two ‘target’ tables in Clickhouse: dim_artists and fact_artist_popularity. The fact table likes this:
--target table
create table fact_artist_popularity
(
	artist_id String,
	followers UInt64,
	popularity UInt8,
	updated_time Datetime('America/New_York')
) Engine = MergeTree()
Order by (artist_id,updated_time);

-- topic queue table
CREATE TABLE queue_artist_popularity
(
	artist_id String,
	followers UInt64,
	popularity UInt8,
	updated_time Datetime('America/New_York')
)
ENGINE = Kafka()
SETTINGS 
kafka_broker_list = 'xxx.xxx.x.xxx:9092', 
kafka_topic_list = 'topic-artist', 
kafka_group_name = 'group2', 
kafka_format = 'JSONEachRow';

-- Create Materialized view
CREATE MATERIALIZED VIEW vw_fact_artist_popularity TO fact_artist_popularity
AS
SELECT
    artist_id,
    followers,
    popularity,
    updated_time
FROM queue_artist_popularity; 

5. Once the CH objects above are ready, the following python scripts sample can send the message to Kafka topic:

from kafka import KafkaProducer
from datetime import datetime

def update_artist_popularity(ids):
....    
    
    artists = json.loads(response.text)["artists"]
    for _a in artists:
        _value = dict()
        _value.update({"artist_id":_a["id"]})
        _value.update({"followers":_a["followers"]["total"]})
        _value.update({"popularity":_a["popularity"]})
        _value.update({"updated_time":datetime.strftime(datetime.now(),"%Y-%m-%d %H:%M:%S")})

        send_message("topic-artist",id.encode('utf-8'),json.dumps(_value).encode("utf-8"))

    return

Once after running above scripts, the target CH table (fact_artist_popularity) will be populated automatically. This is a super cool feature that the Clickhouse has to integrate with Kafka.

6. In case of connect to CH by python, there is a CH python driver can be installed by: pip intall clickhouse-connect (refer: https://clickhouse.com/docs/en/integrations/python)

import clickhouse_connect as cc
.....
client = cc.get_client(host="xxx.xxx.x.xxx", username="default",password="password here")
result = client.query('select <columns> from <table>')

for r in result.result_rows:
   print (r[0])
....

7. Kafka topic sharing. We are using multiple dimension and fact tables in the CH to handle Artists, Albums, Tracks and Artists Popularities etc, Even though we used Kafka table engine to integrate Kafka topic and CH tables, we have to consider how many topics we have to created in Kafka hub. There are two mechanisms we utilized to handle: Upsert in dimension table and Shared Topic in Fact tables. One of the CH solutions to handle the UPSERT is ReplacingMergeTree engin (Refer: https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree). The Shard Topic means multiple tables (either dimension table or fact table) can share single Kafka topic as long as the Kafka Group is being setup correctly. For example, the following two tables:

Sine all of the two tables’ attributes can be obtained by Spotify Artist API, we create a single Kafka topic: topic-artists, but used two different Kafka consumer groups to split the two attributes set for two tables. Sample code:

CREATE TABLE spotify.queue_artists
(
    `artist_id` String,
....
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'xxx.xxx.x.xxx:9092', kafka_topic_list = 'topic-artist', kafka_group_name = 'group1', kafka_format = 'JSONEachRow'