Toan Le

Creating Kafka Brokers on a multi Node with KRaft mode

2023-03-16

Apache Kafka is a popular distributed streaming platform that allows users to publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. In this tutorial, we will be setting up Kafka brokers on a multi node using Kraft.

Step 1: Use Terraform to deploy server on Azure and download Kafka

Follow my previous tutorial how to deploy VM using Terraform How to Deploy Azure VM with Terraform.

terraform {
  required_providers {
    azurerm = {
      source  = "hashicorp/azurerm"
      version = "=3.0.0"
    }
  }
}

provider "azurerm" {
  features {}
}

resource "azurerm_resource_group" "kafka-rg" {
  name     = "kafka-rg"
  location = "Southeast Asia"
  tags = {
    environment = "dev"
  }
}

resource "azurerm_virtual_network" "kafka-vn" {
  name                = "kafka-network"
  resource_group_name = azurerm_resource_group.kafka-rg.name
  location            = azurerm_resource_group.kafka-rg.location
  address_space       = ["10.0.0.0/16"]
  tags = {
    environment = "dev"
  }
}

resource "azurerm_subnet" "kafka-subnet" {
  name                 = "kafka-subnet"
  resource_group_name  = azurerm_resource_group.kafka-rg.name
  virtual_network_name = azurerm_virtual_network.kafka-vn.name
  address_prefixes     = ["10.0.0.0/24"]
}

resource "azurerm_network_security_group" "kafka-sg" {
  name                = "kafka-sg"
  resource_group_name = azurerm_resource_group.kafka-rg.name
  location            = azurerm_resource_group.kafka-rg.location

  tags = {
    environment = "dev"
  }
}

resource "azurerm_network_security_rule" "kafka-inbound-rule" {
  name                        = "kafka-inbound-rule"
  priority                    = 100
  direction                   = "Inbound"
  access                      = "Allow"
  protocol                    = "*"
  source_port_range           = "1024-65535"
  destination_port_range      = "*"
  source_address_prefix       = "*"
  destination_address_prefix  = "*"
  resource_group_name         = azurerm_resource_group.kafka-rg.name
  network_security_group_name = azurerm_network_security_group.kafka-sg.name
}


resource "azurerm_network_security_rule" "kafka-outbound-rule" {
  name                        = "kafka-outbound-rule"
  priority                    = 100
  direction                   = "Outbound"
  access                      = "Allow"
  protocol                    = "Tcp"
  source_port_range           = "*"
  destination_port_range      = "*"
  source_address_prefix       = "*"
  destination_address_prefix  = "*"
  resource_group_name         = azurerm_resource_group.kafka-rg.name
  network_security_group_name = azurerm_network_security_group.kafka-sg.name
}

resource "azurerm_subnet_network_security_group_association" "kafka-sga" {
  subnet_id                 = azurerm_subnet.kafka-subnet.id
  network_security_group_id = azurerm_network_security_group.kafka-sg.id
}

resource "azurerm_public_ip" "kafka1-ip" {
  name                = "kafka1-ip"
  resource_group_name = azurerm_resource_group.kafka-rg.name
  location            = azurerm_resource_group.kafka-rg.location
  allocation_method   = "Dynamic"

  tags = {
    environment = "dev"
  }
}

resource "azurerm_network_interface" "kafka1-nic" {
  name                = "kafka1-nic"
  location            = azurerm_resource_group.kafka-rg.location
  resource_group_name = azurerm_resource_group.kafka-rg.name

  ip_configuration {
    name                          = "internal"
    subnet_id                     = azurerm_subnet.kafka-subnet.id
    private_ip_address_allocation = "Dynamic"
    public_ip_address_id          = azurerm_public_ip.kafka1-ip.id
  }

  tags = {
    environment = "dev"
  }
}

resource "azurerm_linux_virtual_machine" "kafka1-vm" {
  name                = "kafka1-vm"
  resource_group_name = azurerm_resource_group.kafka-rg.name
  location            = azurerm_resource_group.kafka-rg.location
  size                = "Standard_B1ms"
  admin_username      = "adminuser"
  network_interface_ids = [
    azurerm_network_interface.kafka1-nic.id,
  ]

  admin_ssh_key {
    username   = "adminuser"
    public_key = file("~/.ssh/mtcazurekey.pub")
  }

  os_disk {
    caching              = "ReadWrite"
    storage_account_type = "Standard_LRS"
  }

  source_image_reference {
    publisher = "Canonical"
    offer     = "UbuntuServer"
    sku       = "18.04-LTS"
    version   = "latest"
  }
}


resource "azurerm_public_ip" "kafka2-ip" {
  name                = "kafka2-ip"
  resource_group_name = azurerm_resource_group.kafka-rg.name
  location            = azurerm_resource_group.kafka-rg.location
  allocation_method   = "Dynamic"

  tags = {
    environment = "dev"
  }
}

resource "azurerm_network_interface" "kafka2-nic" {
  name                = "kafka2-nic"
  location            = azurerm_resource_group.kafka-rg.location
  resource_group_name = azurerm_resource_group.kafka-rg.name

  ip_configuration {
    name                          = "internal"
    subnet_id                     = azurerm_subnet.kafka-subnet.id
    private_ip_address_allocation = "Dynamic"
    public_ip_address_id          = azurerm_public_ip.kafka2-ip.id
  }

  tags = {
    environment = "dev"
  }
}

resource "azurerm_linux_virtual_machine" "kafka2-vm" {
  name                = "kafka2-vm"
  resource_group_name = azurerm_resource_group.kafka-rg.name
  location            = azurerm_resource_group.kafka-rg.location
  size                = "Standard_B1ms"
  admin_username      = "adminuser"
  network_interface_ids = [
    azurerm_network_interface.kafka2-nic.id,
  ]

  admin_ssh_key {
    username   = "adminuser"
    public_key = file("~/.ssh/mtcazurekey.pub")
  }

  os_disk {
    caching              = "ReadWrite"
    storage_account_type = "Standard_LRS"
  }

  source_image_reference {
    publisher = "Canonical"
    offer     = "UbuntuServer"
    sku       = "18.04-LTS"
    version   = "latest"
  }
}

resource "azurerm_public_ip" "kafka3-ip" {
  name                = "kafka3-ip"
  resource_group_name = azurerm_resource_group.kafka-rg.name
  location            = azurerm_resource_group.kafka-rg.location
  allocation_method   = "Dynamic"

  tags = {
    environment = "dev"
  }
}

resource "azurerm_network_interface" "kafka3-nic" {
  name                = "kafka3-nic"
  location            = azurerm_resource_group.kafka-rg.location
  resource_group_name = azurerm_resource_group.kafka-rg.name

  ip_configuration {
    name                          = "internal"
    subnet_id                     = azurerm_subnet.kafka-subnet.id
    private_ip_address_allocation = "Dynamic"
    public_ip_address_id          = azurerm_public_ip.kafka3-ip.id
  }

  tags = {
    environment = "dev"
  }
}

resource "azurerm_linux_virtual_machine" "kafka3-vm" {
  name                = "kafka3-vm"
  resource_group_name = azurerm_resource_group.kafka-rg.name
  location            = azurerm_resource_group.kafka-rg.location
  size                = "Standard_B1ms"
  admin_username      = "adminuser"
  network_interface_ids = [
    azurerm_network_interface.kafka3-nic.id,
  ]

  admin_ssh_key {
    username   = "adminuser"
    public_key = file("~/.ssh/mtcazurekey.pub")
  }

  os_disk {
    caching              = "ReadWrite"
    storage_account_type = "Standard_LRS"
  }

  source_image_reference {
    publisher = "Canonical"
    offer     = "UbuntuServer"
    sku       = "18.04-LTS"
    version   = "latest"
  }
}

data "azurerm_public_ip" "kafka1-ip-data" {
  name                = azurerm_public_ip.kafka1-ip.name
  resource_group_name = azurerm_resource_group.kafka-rg.name
  depends_on          = [azurerm_linux_virtual_machine.kafka1-vm]
}

output "public_ip_address_kafka1" {
  value = "${azurerm_linux_virtual_machine.kafka1-vm.name}: ${data.azurerm_public_ip.kafka1-ip-data.ip_address}"
}

data "azurerm_public_ip" "kafka2-ip-data" {
  name                = azurerm_public_ip.kafka2-ip.name
  resource_group_name = azurerm_resource_group.kafka-rg.name
  depends_on          = [azurerm_linux_virtual_machine.kafka2-vm]
}

output "public_ip_address_kafka2" {
  value = "${azurerm_linux_virtual_machine.kafka2-vm.name}: ${data.azurerm_public_ip.kafka2-ip-data.ip_address}"
}

data "azurerm_public_ip" "kafka3-ip-data" {
  name                = azurerm_public_ip.kafka3-ip.name
  resource_group_name = azurerm_resource_group.kafka-rg.name
  depends_on          = [azurerm_linux_virtual_machine.kafka3-vm]
}

output "public_ip_address_kafka3" {
  value = "${azurerm_linux_virtual_machine.kafka3-vm.name}: ${data.azurerm_public_ip.kafka3-ip-data.ip_address}"
}

data "azurerm_subscription" "current" {
}

output "current_subscription_display_name" {
  value = data.azurerm_subscription.current.display_name
}

data "azurerm_client_config" "current" {
}

output "account_id" {
  value = data.azurerm_client_config.current.client_id
}

Download kafka

sudo apt-get update -y &&
sudo apt-get install -y curl
sudo apt install default-jre -y
sudo mkdir ~/Downloads
sudo chmod -R 777 ~/Downloads
curl "https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz" -o ~/Downloads/kafka.tgz
mkdir ~/kafka
sudo chmod -R 777 ~/kafka
tar -xvzf ~/Downloads/kafka.tgz -C ~/kafka --strip 1

Step 2: Configure Kafka

Please change the the local IP to match your VM.

server1
nano ~/kafka/config/kraft/server.properties

node.id=1
controller.quorum.voters=1@10.0.0.5:9093,2@10.0.0.4:9093,3@10.0.0.6:9093

server2
nano ~/kafka/config/kraft/server.properties

node.id=2
controller.quorum.voters=1@10.0.0.5:9093,2@10.0.0.4:9093,3@10.0.0.6:9093

server3
nano ~/kafka/config/kraft/server.properties

node.id=3
controller.quorum.voters=1@10.0.0.5:9093,2@10.0.0.4:9093,3@10.0.0.6:9093

Step 3: Kafka cluster id creation and log directory setup

The cluster id have to be the same on all server.

~/kafka/bin/kafka-storage.sh random-uuid
~/kafka/bin/kafka-storage.sh format -t <uuid> -c ~/kafka/config/kraft/server.properties

Replace uuid with a newly generated

Step 4: Starting the kafka servers

To start the Kafka brokers, run the following commands in separate VM:

~/kafka/bin/kafka-server-start.sh ~/kafka/config/kraft/server.properties

Step 5: Create a kafka topic

~/kafka/bin/kafka-topics.sh --create --topic topic-test --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092

List and describe topic

~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-test

Copyright (c) 2024