Wednesday, October 11, 2017

Real-time activity tracking with Kafka

Source: https://unsplash.com/photos/D4g64YUacLA

Baby Steps

When LinkedIn started growing its member base, the site’s functionalities got complex by the day. In 2010, they decided to invest in redesigning the infrastructure to facilitate the blooming need of scaling their multiple data pipelines without much hassle. As a result, Kafka, a single, distributed pub-sub platform, was born to handle real-time data streams in each pipeline. In the very next year, Kafka went open-source under Apache and has been used in huge production scales ever since.

What is Apache Kafka?

Kafka is a fast, scalable, durable, fault-tolerant pub-sub messaging system. It’s written in Scala and Java, and uses Apache Zookeeper for reliable distributed coordination. Kafka provides four core APIs, namely Producer, Consumer, Streams, and Connector.

Source: http://kafka.apache.org/documentation.html

Kafka is composed of a few building blocks. Topic is the feed which represents a stream of records in Kafka. Each topic has one or more partitions which are physical separations of ordered and immutable sequence of records within a topic. Kafka uses the concept of a commit log to append each record, assigned with a sequential integer, the offset in a partition. Producers publish records to topics, while Consumers can subscribe to topics and consume the records. These messaging systems which maintain the published data are called Brokers. One or more such brokers compose a Cluster.

Real world use cases of Kafka

Over big data, fast data is becoming more of a household name lately, as companies are struggling to process real-time data streams. Since Kafka is capable of handling real-time data feeds with high throughput, low latency, and guaranteed reliability, more than a third of the Fortune 500 companies now use Kafka in production.

In its earlier stages, Kafka was used by LinkedIn only for online and offline real-time event consumption, traditional messaging use cases, gathering system health metrics, activity tracking, and feeding the data streams into their Hadoop grid. However, today, Kafka is a critical part of LinkedIn’s central data pipeline, handling over 1.4 trillion messages a day, as a whole ecosystem has been built around it.

Last year in Kafka Summit, impressive facts were revealed about the “Four Comma Club”. Netflix was one of the companies processing over 1 trillion messages a day using Kafka, apart from LinkedIn and Microsoft. This boasts the high scalability this distributed messaging system facilitates. In their Keystone pipeline, Netflix uses Kafka for complex event processing, and real-time monitoring.

Uber uses Apache Kafka in its core infrastructure for online and near real-time event processing. The list goes on and on, from traditional messaging, application monitoring and activity tracking, to usages in recommendation and decision engines, custom preferences and personalization, fraud detection, complex event processing, and ingesting data into Spark and Hadoop.

Why Kafka?

Handling enormous volumes of real-time data streams generated by systems like IoT has erupted pivotal challenges for enterprise giants. Precursory technologies and tools were not equipped to tackle the problems caused due to the scale and speed of these systems. This gave rise to a growing need for real-time analytics rather than traditional big data analytics. Apache Kafka is a fast, scalable, durable, fault-tolerant pub-sub data streaming platform, and hence is endowed to address many of these business problems.

Messaging with Kafka in UltraESB-X

In this article, we are going to discuss integrating Apache Kafka with UltraESB-X using UltraStudio, for real-time messaging. If you want to know more about UltraESB-X, this post would provide a good starting point. UltraStudio provides a graphical IDE to build, test and deploy integration projects without any fuss.

Use Case

Hogwarts is in deep waters lately. After Dolores Umbridge took over the headmastership, not even a pixie could flutter without her knowing. She made sure that school’s webmaster, Prof. Quirinus Quirrell is also dancing under her imperius. All he had to do is to monitor the web activities for the news articles and find which are gaining momentum. Original website looks as follows, with 0 read counts.

Original website view with 0 read counts

UltraESB-X takes care of the magic

Quirrell deligates this task to one of the Slytherin students, Gregory Goyle. As the solution for this, Goyle has decided to use UltraESB-X to integrate this web server with a Kafka server to provide real-time activity tracking. When the audience queries for the full news article as below, the request is used to populate an internal statistics engine to process read counts.

Original website view with updated read counts

Let’s see step by step — how UltraESB-X takes care of the magic. Here is a final view of the website with desired functionalities.

Let’s break down the requirements into 3 key segments.

  1. Retrieve the full news article and record the impression
  2. Persist the records, passing data into Hadoop or data warehousing systems for real-time processing and reporting
  3. Update the read count back on the server

Prerequisites

You can access the complete source for the website with functionalities from here, and the integration part will be covered throughout this article. Before going into details, make sure that you have the latest version — 17.07.1 — of UltraStudio with you. This tutorial assumes that you have Kafka and Zookeeper installed and started — and that there’s a Kafka topic that we can work with; if you are starting fresh, simply follow the first 3 steps of the Kafka quick-start guide.

Before delving into this simple integration, create a new project and have it ready for action. Your integration flow configuration files should reside in /src/main/conf.

Step-by-step Walkthrough

Segment 1:

First, we’ll see about recording the news article reads. For brevity, only the required configuration parameters are shown.

Step 1: Create a new integration flow.

Step 2: Add an NIO HTTP Ingress Connector from the connectors list and configure it as follows. This connector will be triggered when a news article’s “Read full article” button is clicked.

NIO HTTP Ingress Connector properties

Step 3: Add an Extract HTTP Query Parameter processing element and configure it as follows. When the “Read full article” button of a news item is clicked, the unique ID of each news item is sent as a query parameter (i.e. http://localhost:8280/hogwarts/news?id=news1) from the website to the back-end to retrieve the full article from a back-end database, queue, file server or some other storage. This processor is used to extract out the unique ID from the request URL and assign it to a variable named key.

Extract HTTP Query Parameter properties

Step 4: Add a String Payload Setter processing element and configure it as follows. Now, this processor serves for 2 important requirements.

  • including the unique ID of the news article into the message as a metadata for identification in further processing. This is done by composing an XML payload with the value of the key variable (which we injected into the flow in the previous step) added under an id element of the root message element.
  • mocking the full article retrieval from a news storage, setting the value of the article element to be a static text — for the sake of simplicity.

String Payload Setter properties

The response is now ready for sending back to the website with the full article content, but for activity tracking, we are going to persist the message in a Kafka topic.

Step 5: Add a Kafka Egress Connector from the connectors list and configure it as follows.

NOTE: Bootstrap Servers and Topic Name should be changed according to your configuration. I configured mine with the default server details for a single node Kafka cluster and the topic I had created.

Kafka Egress Connector properties

NOTE: Final processing element is used for enabling CORS — thus not a part of the integration logic.
Step 6: Add an Add New Transport Header processing element and configure it as follows.

Add New Transport Header properties

Now we are all good with the first segment and the final integration flow for this segment 1 should look like this.

Complete integration flow for Segment 1

Segment 2:

Next, the messages stored in Kafka topics are ready to be ingested to Spark, Storm or any other streaming data processing engine. For comprehensibility, we’ll settle for processing the messages and injecting the read updates of the news articles into an internal statistics engine. In addition to that, we need a processing element to manipulate this statistics engine and update with new impressions.

Writing a custom processing element is easy in UltraStudio; if you need to know more, go through this. Here, we extract out the newsId value from the input and update the statistics engine in the processing element "Impression Injector" as follows.

Impression Injector custom processing element logic

For this custom processing element to be used effectively, the project should be built. After that, we are ready to create the integration flow for the second segment.

Step 1: Create a new integration flow.

Step 2: Add a Kafka Ingress Connector from the connectors list and configure it as follows. This connector is waiting to be triggered when a new record is published into the Kafka topic, which we published to in the first segment. Whenever it senses a new record, the message is consumed and injected into the integration flow for further processing.

Kafka Ingress Connector properties

Step 3: Add an XPath String Extractor processing element and configure it as follows. Now, if you do remember, we inserted a title element with the unique id for processing. This processor extracts the value of that element and assigns it to a variable named id.

XPath String Extractor properties

Step 4: Add the Impression Injector custom processing element and configure it as follows. This processor takes care of reporting new impressions on the news article with the unique ID, which we extracted and assigned to variable id into the Statistics Engine for analytics.

Impression Injector properties

We have the second segment ready and here’s a preview of the final integration flow for this segment.

Complete integration flow for Segment 2

Segment 3:

Now, we have injected the impression records into the Statistics Engine, but still, the read actions don’t get reflected on the website and it should be fixed. For that, we are going to create another back-end service which the website can be used to call to and retrieve the read counts. We need another custom processing element, “Read Count to Payload Setter” to retrieve the read counts from the Statistics Engine as follows.

Read Count to Payload Setter custom processing element logic

With the project built to use our second custom processing element in effect for retrieval of read counts — this is the last straw — we are all geared up.

Step 1: Create a new integration flow.

Step 2: Add an NIO HTTP Ingress Connector from the connectors list and configure it as follows. This connector is used to retrieve the read counts for each news article from the back-end Statistics Engine.

NIO HTTP Ingress Connector properties

Step 3: Add an Extract HTTP Query Parameter processing element and configure it as follows. We need to update the read count value next to the “Read Full Article” button when the user reads the full article and closes the popup. So, on this action, we bind a backend call with the same unique ID which we used to retrieve the article in the first place, setting it as a query parameter (i.e. http://localhost:8280/hogwarts/stats?id=news1). This processor is used to extract out the unique news ID from the request URL and assign it to a variable named key.

Extract HTTP Query Parameter properties

Step 4: Add a Read Count to Payload Setter processing element and configure it as follows. Using the custom processing element, the read count for the queried news article is retrieved from the Statistics Engine and set to the response payload.

Read Count to Payload Setter properties

NOTE: Just as we did in the Segment 1, we use one final processing element to enable CORS.
Step 5: Add an Add New Transport Header and configure it as follows.

Add New Transport Header properties

Aaaand… that was the third segment — the home stretch — all the integration flows are good to go. Here’s how the final flow for the segment 3 should look like in UltraStudio.

Complete integration flow for Segment 3

You can test the flows you created within UltraStudio. Create a new “UltraESB-X Server” run configuration and start the server. Run the website in a simple HTTP server — Python provides a built-in web server; I use serve — and visit http://localhost:5000 in your favourite browser and see if it’s working.

You can also try out this exercise on UltraStudio as a preconfigured sample project.

No comments:

Post a Comment