Play by Play: NodeJS App Development; in this short tutorial, you'll learn how to interact with Apache Kafka for distributed event processing.

Play by Play: NodeJS App Development; in this short tutorial, you'll learn how to interact with Apache Kafka for distributed event processing.

Ya'll are probably heard the term ETL(extraction, transformation and loading). In plan English, when you have a system that needs to send data to a second system, you'll first have to extract the data. In many cases(usually older systems) you can extract the data directly from the database, but it is possible that in modern systems you'll have access to a set of APIs. In the past few years, I've seen very complex extractions that depend on SSIS or plain scripts such as Bash or PowerShell to get the data out from the origin. The problem is these extractions can get hairy and ugly real quick, especially for enterprise level apps.

Extractions are not the most difficult part, but you'll have to think about massaging the data or transforming it to a form thats consumable for the receiving system. This transformations are focused on how the data should be consumed and whether the receiving system can digest it as is or it has to be changed. And finally, the loading part, how often the data has to be made available to the receiving system. I've seen homemade systems that run thousands of ETLs and the folks who have created them over many years of the company's existence went as far as thinking about a queue manager for managing the load on their tooling and source servers. It's very ugly and if you find yourself debugging an ETL script that is few thousand lines long that was poorly documented and developed years ago you'll see how miserable you'll become.

Knowing everything that I know today, I would argue that as part of human's evolution, we should explore new ways for distributing data among different systems, stepping away from the common ETL approaches. The solution is not to make your ETL leaner, or to rewrite it or document it better; instead, you'll have to take a few steps back and rethink the whole approach. ETLs, in their traditional form, are not real time. You'll always have to depend on a process that has to be run and send the data from one system to another, running into lots of timing and racing conditions. What happens if there is a delay on the source system producing the data for the consumption of multiple other system? I can hear you saying that you have a file watcher, but in complex scenarios, human intervention is required such as when the file is received empty or when the data is not in the desired form.

Going back to the basics, how can we rethink the entire ETL idea because we simply cannot eliminate the needs. That's what the remainder of this article is going to focus on, using Apache Kafka, a distributed data streaming platform used for real-time data pipelines, integration, stream processing, and more. We'll learn how move away from scheduled, batch based ETLs and shift towards a more real-time, streaming based data architecture. Key differences are outlined here:

  1. Real-time data streaming: Kafka operates as a high-throughput, real-time messaging system. Traditional ETL processes typically batch process data at set intervals. In contrast, Kafka processes data in real-time as it arrives, which eliminates the delay inherent in batch processing.

  2. Decoupling data sources and destinations: Kafka acts as a central hub for data streams. It decouples data producers from consumers, allowing them to operate independently. Producers write topics data to Kafka topics, and consumers read tihs data at their own pace. This separation means that the tranform and load steps do not need to be tightly integrated as they are in traditional ETLs.

  3. Built-in Scalability: Kafka is designed to handle large volumes of data efficiently, scaling easy with the addition of more kafka brokers(servers) in the cluster. This scalability is more complex to achieve in traditional ETL setup.

  4. Stream Processing: Kafka integrates seamlessly with stream processing tools like Kafka stream and ksqlDB. These tools allow for transforming data in real-time as it flows though Kafka.

  5. Fault tolerance and reliability: Kafka's distributed nature and replication model provide built in fault tolerance and data durability.

  6. Simplified Architecture: by using Kafka, organization can streamline their data architectures. Data can be continuously ingested into Kafka from various sources and then streamed to different destinations.

I hope I was able to make my case and convince you to give this a try. Now, lets get our hands dirty and see how we can set this up. I am going to use Docker to quickly spin up my environment and shift my focus on creating a proof of concept as opposed to how to set up your environment. You'll need to have Docker Desktop and NodeJS installed on your machine in order to replicate the following steps on your machine.


docker network create kafka-network

docker run -d --name=zookeeper --network=kafka-network -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper

docker run -d --name=kafka --network=kafka-network -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka

docker run --name cassandra --network=kafka-network -p 9042:9042 -d cassandra:latest

Above, we first create a network because we want to make sure that our containers can see each other. We then need Zookeeper for managing and coordinating Kafka broker nodes in a cluster. It handles tasks like maintaining configuration information, electing leaders, and managing the distributed state of the Kafka cluster. We're also setting up cassandra in case you choose to take it a step further and capture the receiving messages into a database.

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['kafka:9092']
})

const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
    await consumer.connect()
    await consumer.subscribe({ topic: 'test2', fromBeginning: true })

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                value: message.value.toString(),
            })
        },
    })
}

run().catch(console.error)

This is how my consumer looks like, now lets take a look at my producer

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka:9092']
})

const producer = kafka.producer()

const run = async () => {
  await producer.connect()
  await producer.send({
    topic: 'test2',
    messages: [
      { value: 'Hello from the second producer' },
    ],
  })

  await producer.disconnect()
}

run().catch(console.error)

There you have it! now lets also consist the data:

const { Kafka } = require('kafkajs')
const cassandra = require('cassandra-driver');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka:9092']
})

const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })

const client = new cassandra.Client({ 
  contactPoints: ['cassandra'], 
  localDataCenter: 'datacenter1', 
  keyspace: 'myspace' 
});

const runProducer = async () => {
  await producer.connect()
  await producer.send({
    topic: 'test2',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })
}

const runConsumer = async () => {
  await consumer.connect()
  await consumer.subscribe({ topic: 'test2', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const query = 'INSERT INTO messages (message) VALUES (?)';
      const params = [message.value.toString()];
      await client.execute(query, params, { prepare: true });
    },
  })
}

const run = async () => {
  await Promise.all([
    runProducer(),
    runConsumer()
  ])
}

run().catch(console.error)

As you can see this is purely a proof of concept and we're scratching the surface here. You're welcome to play with the idea and think about how you can step away from your traditional ETL into real time data streaming between systems.

Did you find this article valuable?

Support Application Support by becoming a sponsor. Any amount is appreciated!