Kafka

Publish telemetry data to Apache Kafka

This guide shows how to configure gNMIc Operator to send telemetry data to Apache Kafka.

Prerequisites

  • A running Kubernetes cluster with gNMIc Operator installed
  • Kafka deployed in your cluster ( using Strimzi or Confluent Operator)

Deploy Kafka with Strimzi

# install Strimzi operator
kubectl create namespace kafka
kubectl apply -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

# create a Kafka cluster
cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
EOF

This creates a bootstrap Service named my-cluster-kafka-bootstrap with port 9092.

Kafka Output with Service Reference

Create an Output that references the Kafka bootstrap Service:

apiVersion: operator.gnmic.dev/v1alpha1
kind: Output
metadata:
  name: kafka-output
  labels:
    type: kafka
spec:
  type: kafka
  serviceRef:
    name: my-cluster-kafka-bootstrap
    namespace: kafka
    port: "9092"
  config:
    topic: telemetry
    encoding: json

The operator resolves the service to my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092.

Complete Example

1. Create a Target

apiVersion: operator.gnmic.dev/v1alpha1
kind: Target
metadata:
  name: switch1
  labels:
    role: leaf
spec:
  address: switch1.example.com:57400
  profile: default

2. Create a TargetProfile

apiVersion: operator.gnmic.dev/v1alpha1
kind: TargetProfile
metadata:
  name: default
spec:
  credentialsRef: device-credentials

3. Create a Subscription

apiVersion: operator.gnmic.dev/v1alpha1
kind: Subscription
metadata:
  name: interface-counters
  labels:
    type: streaming
spec:
  paths:
    - /interfaces/interface/state/counters
  mode: STREAM/SAMPLE
  sampleInterval: 10s

4. Create the Kafka Output

apiVersion: operator.gnmic.dev/v1alpha1
kind: Output
metadata:
  name: kafka-output
  labels:
    type: kafka
spec:
  type: kafka
  serviceRef:
    name: my-cluster-kafka-bootstrap
    namespace: kafka
    port: "9092"
  config:
    topic: network-telemetry
    encoding: json
    num-workers: 4
    timeout: 10s
    recovery-wait-time: 5s

5. Create the Pipeline

apiVersion: operator.gnmic.dev/v1alpha1
kind: Pipeline
metadata:
  name: kafka-pipeline
spec:
  clusterRef: my-cluster
  enabled: true
  targetSelectors:
    - matchLabels:
        role: leaf
  subscriptionSelectors:
    - matchLabels:
        type: streaming
  outputs:
    outputSelectors:
      - matchLabels:
          type: kafka

Using Service Selector for Multiple Brokers

If you want to discover multiple Kafka broker services:

apiVersion: operator.gnmic.dev/v1alpha1
kind: Output
metadata:
  name: kafka-output
spec:
  type: kafka
  serviceSelector:
    matchLabels:
      strimzi.io/cluster: my-cluster
      strimzi.io/kind: Kafka
    namespace: kafka
    port: "9092"
  config:
    topic: telemetry

Kafka with SASL Authentication

apiVersion: operator.gnmic.dev/v1alpha1
kind: Output
metadata:
  name: kafka-secure
spec:
  type: kafka
  serviceRef:
    name: my-cluster-kafka-bootstrap
    namespace: kafka
    port: "9092"
  config:
    topic: telemetry
    sasl:
      mechanism: SCRAM-SHA-512
      user: kafka-user
      password: kafka-password

Verify Messages

Consume messages from the topic to verify:

# using Strimzi kafka-console-consumer
kubectl -n kafka run kafka-consumer -ti --rm=true --restart=Never \
  --image=quay.io/strimzi/kafka:latest-kafka-3.5.0 \
  -- bin/kafka-console-consumer.sh \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 \
  --topic network-telemetry \
  --from-beginning