Distributed Kafka with Aiven

Published: Nov 22, 2022 by Isaac Johnson

Today we will explore using Aiven.io and their managed Kafka and Kafka MirrorMaker service. Aiven integrates with five clouds; AWS, GCP, Azure, DigitalOcean and UpCloud and has VPC Private endpoints in AWS, GCP and Azure.

I was interested to fire up a Trial account and check out some of their SaaS offerings such as Kafka and Kafka Connect. But to be fair, this is just a subset. To date, they offer Kafka, Postgres, MySQL, Elasticsearch (now OpenSearch), Apache Cassandra, Redis, InfluxDB, Grafana, M3DB, M3 Aggregator, ClickHouse and Apache Fink.

Today we’ll setup Kafka, show how to use it and integrate with MirrorMaker. We’ll also setup a local Kafka Cluster in Kubernetes and (attempt) to connect it with MirrorMaker. Lastly, we’ll checkout logs and metrics and how we can integrate with Datadog.

Company Profile

Aiven.io is a Finnish company based out of Helsinki whose founders started out managing infrastructure for companies such as F-Secure and Nokia. To date they support 11 Open Source products in over 100 regions and integration in five clouds (Azure, AWS, GCP, DigitalOcean and UpCloud).

Their pricing model is satisfyingly simplistic. Just a “Plan” with machine classes. You can add Support as well, but at its base, you pay like a PaaS.

/content/images/2022/11/aivenkafka-60.png

While small compared to perhaps Amazon and Microsoft, as of 2021 it was valued over a US$1B and one of Finland’s most valuable growth companies. A fun fact noted in the Finish Wikipedia page is that they were founded via Twitter in 2016 where they got their first customer, a Mexican Truck Company. According to Forbes, they are growning with 100-147% year over year headcount and revenue increases and over 700 customers including Comcast and Toyota.

Aiven setup

FYI: This link will get you $600 in credit compared to the standard $300 (which is what I got). Enjoy.

We can signup on their website using federated creds (in my case Google) or a username/password. You don’t need a CC to sign up.

/content/images/2022/11/aivenkafka-01.png

Once signed up, you’ll create your first project then be able to create services

/content/images/2022/11/aivenkafka-14.png

In my case, the first thing I wanted to create was a Kafka Cluster

/content/images/2022/11/aivenkafka-02.png

In setting up Kafka, for some of the syncing we plan to do later, make sure to enable the REST API

/content/images/2022/11/aivenkafka-03.png

Also, a few of my issues I encountered later in this blog were merely because I neglected to allow Topic Auto create. This is a setting (by default false) you can change in the “advanced configuration” at bottom of the Kafka details page in Overview

/content/images/2022/11/aivenkafka-61.png

Setting up Kafka in Kubernetes

In order to have something with which to sync, we’ll also want to setup and (attempt to) expose it externally for MirrorMaker

I used a Kafka I already installed with Helm

$ helm status new-kafka-release
NAME: new-kafka-release
LAST DEPLOYED: Tue Jul 26 15:46:57 2022
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 18.0.3
APP VERSION: 3.2.0

** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

    new-kafka-release.default.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

    new-kafka-release-0.new-kafka-release-headless.default.svc.cluster.local:9092

To create a pod that you can use as a Kafka client run the following commands:

    kubectl run new-kafka-release-client --restart='Never' --image docker.io/bitnami/kafka:3.2.0-debian-11-r12 --namespace default --command -- sleep infinity
    kubectl exec --tty -i new-kafka-release-client --namespace default -- bash

    PRODUCER:
        kafka-console-producer.sh \

            --broker-list new-kafka-release-0.new-kafka-release-headless.default.svc.cluster.local:9092 \
            --topic test

    CONSUMER:
        kafka-console-consumer.sh \

            --bootstrap-server new-kafka-release.default.svc.cluster.local:9092 \
            --topic test \
            --from-beginning

We can test this using a producer and consumer in a container as detailed above in the helm output ( note: you can always get that again with helm status (release name) )

builder@DESKTOP-QADGF36:~$ kubectl exec --tty -i new-kafka-release-client --namespace default -- bash
I have no name!@new-kafka-release-client:/$ kafka-console-consumer.sh --bootstrap-server harbor.freshbrewed.science:9092 --topic test --from-beginning
1
2
3
;
4
5
6
This
Is
A
Test
Hi
Fresh
Brewed!
Here
are
some

Testing the Producer and Consumer together

Externalizing our Kubernetes Chart

Below are all the ways I tried to externalized Apache Kafka. Perhaps you’ll see some key flaw in my patterns. In the end I could hit the ‘external’ endpoint, but only from within my cluster which leads me to believe there is still yet more redirects to local IPs

Try 1: Creating a quick external LB Service

To create the external LoadBalancer, I based it on the existing service

$ kubectl get svc new-kafka-release -o yaml
apiVersion: v1
kind: Service
metadata:
  annotations:
    meta.helm.sh/release-name: new-kafka-release
    meta.helm.sh/release-namespace: default
  creationTimestamp: "2022-07-26T20:46:59Z"
  labels:
    app.kubernetes.io/component: kafka
    app.kubernetes.io/instance: new-kafka-release
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: kafka
    helm.sh/chart: kafka-18.0.3
  name: new-kafka-release
  namespace: default
  resourceVersion: "167487"
  uid: dbd17254-c2a0-4d0d-8f17-eb9559499ca9
spec:
  clusterIP: 10.43.127.27
  clusterIPs:
  - 10.43.127.27
  internalTrafficPolicy: Cluster
  ipFamilies:
  - IPv4
  ipFamilyPolicy: SingleStack
  ports:
  - name: tcp-client
    port: 9092
    protocol: TCP
    targetPort: kafka-client
  selector:
    app.kubernetes.io/component: kafka
    app.kubernetes.io/instance: new-kafka-release
    app.kubernetes.io/name: kafka
  sessionAffinity: None
  type: ClusterIP
status:
  loadBalancer: {}

To create the external Ingress

$ cat kafka.ingress.yaml
apiVersion: v1
kind: Service
metadata:
  name: new-kafka-release-ext
  namespace: default
spec:
  ports:
  - name: tcp-client
    port: 9092
    protocol: TCP
    targetPort: kafka-client
  selector:
    app.kubernetes.io/component: kafka
    app.kubernetes.io/instance: new-kafka-release
    app.kubernetes.io/name: kafka
  sessionAffinity: None
  type: LoadBalancer
$ kubectl apply -f kafka.ingress.yaml
service/new-kafka-release-ext created

I could see it was now covering 9092 on all the nodes

$ kubectl get svc
NAME                                                       TYPE           CLUSTER-IP      EXTERNAL-IP                              PORT(S)                                                                                                     AGE
...snip...
new-kafka-release-ext                                      LoadBalancer   10.43.33.168    192.168.1.38,192.168.1.57,192.168.1.77   9092:31280/TCP                                                                                              3s

I exposed on 9092 on my router so harbor.freshbrewed.science could route traffic.

I found I could not consume from the outside URL

I have no name!@new-kafka-release-client:/$ kafka-console-consumer.sh --bootstrap-server harbor.freshbrewed.science:9092 --topic test --from-beginning
[2022-11-17 22:07:35,973] WARN [Consumer clientId=console-consumer, groupId=console-consumer-47115] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-17 22:07:56,333] WARN [Consumer clientId=console-consumer, groupId=console-consumer-47115] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
^CProcessed a total of 0 messages

When I dug, I realized it was because the bootstrap server wasn’t aware of it’s eternalized URL thus it redirected to an internal ‘svc.local’ address (which works inside k8s, but not so hot outside the cluster)

Try 2: Update the Helm Chart for Kubernetes Kafka

On my next pass, I tried to tease out the right values for the right chart

$ cat kafka-helm-values2.yaml
autoCreateTopicsEnable: true
global:
  storageClass: managed-nfs-storage
logsDirs: /bitnami/kafka/data/logs

You’ll see I set a few things with “–set” below and that was mostly because I was trying different permutations of externalAccess settings.

Install:

$ helm install -f kafka-helm-values2.yaml next-kafka-release bitnami/kafka --version 18.4.2 --set rbac.create=true,externalAccess.enabled=true,externalAccess.service.type=LoadBalancer,externalAccess.autoDiscovery.enabled=true
NAME: next-kafka-release
LAST DEPLOYED: Sat Nov 19 09:45:42 2022
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 18.4.2
APP VERSION: 3.2.1
---------------------------------------------------------------------------------------------
 WARNING

    By specifying "serviceType=LoadBalancer" and not configuring the authentication
    you have most likely exposed the Kafka service externally without any
    authentication mechanism.

    For security reasons, we strongly suggest that you switch to "ClusterIP" or
    "NodePort". As alternative, you can also configure the Kafka authentication.

---------------------------------------------------------------------------------------------

** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

    next-kafka-release.default.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

    next-kafka-release-0.next-kafka-release-headless.default.svc.cluster.local:9092

To create a pod that you can use as a Kafka client run the following commands:

    kubectl run next-kafka-release-client --restart='Never' --image docker.io/bitnami/kafka:3.2.1-debian-11-r16 --namespace default --command -- sleep infinity
    kubectl exec --tty -i next-kafka-release-client --namespace default -- bash

    PRODUCER:
        kafka-console-producer.sh \

            --broker-list next-kafka-release-0.next-kafka-release-headless.default.svc.cluster.local:9092 \
            --topic test

    CONSUMER:
        kafka-console-consumer.sh \

            --bootstrap-server next-kafka-release.default.svc.cluster.local:9092 \
            --topic test \
            --from-beginning

To connect to your Kafka server from outside the cluster, follow the instructions below:

  NOTE: It may take a few minutes for the LoadBalancer IPs to be available.
        Watch the status with: 'kubectl get svc --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=next-kafka-release,app.kubernetes.io/component=kafka,pod" -w'

    Kafka Brokers domain: You will have a different external IP for each Kafka broker. You can get the list of external IPs using the command below:

        echo "$(kubectl get svc --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=next-kafka-release,app.kubernetes.io/component=kafka,pod" -o jsonpath='{.items[*].status.loadBalancer.ingress[0].ip}' | tr ' ' '\n')"

    Kafka Brokers port: 9094

I can run the command to see the primary external IP

$ echo "$(kubectl get svc --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=next-kafka-release,app.kubernetes.io/component=kafka,pod" -o jsonpath='{.items[*].status.loadBalancer.ingress[0].ip}' | tr ' ' '\n')"
192.168.1.38

Which I used on the router.

That said, I can see from the services that we are exposing 9094 on all the nodes

$ kubectl get svc | grep kafka
new-kafka-release-zookeeper-headless                       ClusterIP      None            <none>                                   2181/TCP,2888/TCP,3888/TCP
                                                                 115d
new-kafka-release                                          ClusterIP      10.43.127.27    <none>                                   9092/TCP
                                                                 115d
new-kafka-release-headless                                 ClusterIP      None            <none>                                   9092/TCP,9093/TCP
                                                                 115d
new-kafka-release-zookeeper                                ClusterIP      10.43.101.205   <none>                                   2181/TCP,2888/TCP,3888/TCP
                                                                 115d
next-kafka-release-zookeeper-headless                      ClusterIP      None            <none>                                   2181/TCP,2888/TCP,3888/TCP
                                                                 153m
next-kafka-release-headless                                ClusterIP      None            <none>                                   9092/TCP,9093/TCP
                                                                 153m
next-kafka-release                                         ClusterIP      10.43.1.8       <none>                                   9092/TCP
                                                                 153m
next-kafka-release-zookeeper                               ClusterIP      10.43.73.64     <none>                                   2181/TCP,2888/TCP,3888/TCP
                                                                 153m
next-kafka-release-0-external                              LoadBalancer   10.43.100.162   192.168.1.38,192.168.1.57,192.168.1.77   9094:30054/TCP
                                                                 153m

Local Testing:

$ kubectl run next-kafka-release-client --restart='Never' --image docker.io/bitnami/kafka:3.2.1-debian-11-r16 --namespace default --command -- sleep infinity
pod/next-kafka-release-client created
$ kubectl exec --tty -i next-kafka-release-client --namespace default -- bash
I have no name!@next-kafka-release-client:/$  kafka-console-producer.sh --broker-list next-kafka-release-0.next-kafka-release-headless.default.svc.cluster.local:9092  --topic test
>a
[2022-11-19 18:22:08,241] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2022-11-19 18:22:08,344] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2022-11-19 18:22:08,449] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 6 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>fresh
>topic
>is
>updated
>

And I can test from the consumer side

$ kubectl exec --tty -i next-kafka-release-client --namespace default -- bash     

I have no name!@next-kafka-release-client:/$ kafka-console-consumer.sh --bootstrap-server next-kafka-release.default.svc.cluster.local:9092 --topic test --from-beginning
a
fresh
topic
is
updated

The external IP did not work, however.

builder@DESKTOP-72D2D9T:~/Workspaces/jekyll-blog$ ~/confluent-7.0.1/bin/kafka-console-consumer --bootstrap-server harbor.freshbrewed.science:9094 --topic test --from-beginning


But if I’m in my network

$ ~/confluent-7.0.1/bin/kafka-console-consumer --bootstrap-server 192.168.1.77:9094 --topic test --from-beginning
a
fresh
topic
is
updated

$ ~/confluent-7.0.1/bin/kafka-console-consumer --bootstrap-server harbor.freshbrewed.science:9094 --topic test --from-beginning
a
fresh
topic
is
updated
^CProcessed a total of 5 messages

Local Mirror Maker

I decided to Pivot. I knew that my Aiven cluster worked as I could send messages via the UI:

/content/images/2022/11/aivenkafka-15.png

In the testing container, I knew the MirrorMaker tool suite would exist so I saved some files locally. The lack of an editor in the bin tools nor sudoers meant I could not add vim, thus the cat file redirection to tmp

I have no name!@new-kafka-release-client:/$ cat <<'EOF' >> /tmp/source-kafka.config
> bootstrap.servers=harbor.freshbrewed.science:9092
> group.id=example-mirrormaker-group
> exclude.internal.topics=true
> client.id=mirror_maker_consumer
> EOF
I have no name!@new-kafka-release-client:/$ cat /tmp/source-kafka.config
bootstrap.servers=harbor.freshbrewed.science:9092
group.id=example-mirrormaker-group
exclude.internal.topics=true
client.id=mirror_maker_consumer

I believe I need to add the ca.pem to use it later. My quick trick was to cat the file to base64 locally (e.g. cat /mnt/c/Users/isaac/Downloads/ca.pem | base64 -w 0 then use it below)

client.id=mirror_maker_consumer
I have no name!@new-kafka-release-client:/$ which base64
/usr/bin/base64
I have no name!@new-kafka-release-client:/$ echo LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUVRVENDQXFtZ0F3SUJBZ0lVUGUzeGk0eUhUTVlIM2JpV1lGMFNNOHQ4bXFFd0RRWUpLb1pJaHZjTkFRRU0KQlFBd09qRTRNRFlHQTFVRUF3d3ZNRE0yTjJFd1pEVXRPV1V4TXkwME1XUmlMVGxoTmpVdE0yVmxNakUwTXpGbApNbVF3SUZCeWIycGxZM1FnUTBFd0hoY05Nakl4TVRFMk1qQTFNRFV3V2hjTk16SXhNVEV6TWpBMU1EVXdXakE2Ck1UZ3dOZ1lEVlFRRERDOHdNelkzWVRCa05TMDVaVEV6TFRReFpHSXRPV0UyTlMwelpXVXlNVFF6TVdVeVpEQWcKVUhKdmFtVmpkQ0JEUVRDQ0FhSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnR1BBRENDQVlvQ2dnR0JBTUVpVW9LbApZcFRRVDhNb3FhRUdVNFBLNmpPSjltTnJ1RERuZzg0bXN1Y0dCeGplUkVBVUI3OThvRm84d2FYaVArd3p0TmpBCnVPSnRENWpzdEhlNEF1UE1aelAyVTFSZW1TZTdKdTVhWVJubEVJVnV1VVFpeExEVkxSNGJsV28vazVOZXpmR2QKNTN2LzJCSGg4ZEdJWFZlZkRSb1JOZVlKZ2VtaStQWkt6VCs0WnJNcUhDc0VQZyswY0doVklSUE5mZEdjQk1CdwpKS2IySlFBazhPNG1GaXJ5d3JXenNRalk4NFNVamErd3QvSEcyc1FlZ3hFa0VrOEhYSXpKYVRucmF1RTBlZFY1CmRpMjJiWkp3d2hwdjBnbStDMzQ1cUU3MjhyNXVzVktlR28zK2V0YTFrczB6VEQvVnFSSWEyQnR2TlJ4bVB1NUEKZUNLaGNuWlQxYWF4WDdFMUhxbndYYVdGSTE4VzBrcGRLQzRIVDRXYVVxK2lEZDcxN2lWTmVMa20rNm5DdVR5Sgo5bkVBaGJ5MVR5cTY1dzM4cFdTZXVKRVVzY3cvV0hmSCt4OU1TRWloMVdnV2R6ZnNrNzBuZ3ROQ3ZSbGN0U1M0CkxlaEdiazNDNk1aQTdJbUpzem5XbEhSV0sxdzBuQyt6TjZOc1BIbnBNQVM5QUJzM1NqVTNhdXYvUVFJREFRQUIKb3o4d1BUQWRCZ05WSFE0RUZnUVVLMUU3WGk1alZJYWRBem8yU3JOOU84OU9EN3d3RHdZRFZSMFRCQWd3QmdFQgovd0lCQURBTEJnTlZIUThFQkFNQ0FRWXdEUVlKS29aSWh2Y05BUUVNQlFBRGdnR0JBSzNlR1ZqQVlzcUlIcFRSClFER09pbnkybWhJUENxR3pPY0ZoWlVtbml3MnBPMThZQmJsQ0Fwelg4ZlJvd3NRemV1UU8vWERQVGloWHVPbFUKVDNhdFppZ2JVaW1tNmorMElHY1R0NWhHUTFnT0ZXbHM0ZzhYR0xyRzFPYXp6aDFYUG5DTVFremNWQUg2M0Y4OApvUFNHays3dnc0eS9tbVluK09LdW1YcURucW1JWjN3QngrMTBncjRycm4rTlllWElZOTkwd3pQS2lnMXM5QW9aCkVtZFlndldUMUFMSFNhWkQwMlE1RHBUM0oreDYyd1o5S3lUYTY2SmZNc3BiaHROeTk0UHNXbmdDd0lkRUtLMXkKVlV3ZjFYamszZnBMMkFyYThqUkJRVzVIb214aFd4VGcyVmE0MXcwUjc3QnhjVFNQWmw5dVRoY2NPRFJRb2N4RgpkVEtlTTU2K1RFMFJxTFpJNnlSZC9xSi9DeHN4MGdtelVFRTZnT1JMTWxaWk43MXZsaURtUmRzakFVcDZXdCtaCkV5OHdFQmd6L0FCWXBOQkNESXcrN08zQ2kwY2hPTUZCam1YMHNsQ1dEamdZRm5LRmlhN3NFNDFUbG9PMlRoQXcKc3ZEQ0tPMUdzK2JjZGxPZEpmY1g5SGkzS0tkUTJrSHpqNnJkVXZkOWpZMWt1bUlob0E9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== | base64 --decode > /tmp/ca.pem
I have no name!@new-kafka-release-client:/$ cat /tmp/ca.pem
-----BEGIN CERTIFICATE-----
MIIEQTCCAqmgAwIBAgIUPe3xi4yHTMYH3biWYF0SM8t8mqEwDQYJKoZIhvcNAQEM
BQAwOjE4MDYGA1UEAwwvMDM2N2EwZDUtOWUxMy00MWRiLTlhNjUtM2VlMjE0MzFl
MmQwIFByb2plY3QgQ0EwHhcNMjIxMTE2MjA1MDUwWhcNMzIxMTEzMjA1MDUwWjA6
MTgwNgYDVQQDDC8wMzY3YTBkNS05ZTEzLTQxZGItOWE2NS0zZWUyMTQzMWUyZDAg
UHJvamVjdCBDQTCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAMEiUoKl
YpTQT8MoqaEGU4PK6jOJ9mNruDDng84msucGBxjeREAUB798oFo8waXiP+wztNjA
uOJtD5jstHe4AuPMZzP2U1RemSe7Ju5aYRnlEIVuuUQixLDVLR4blWo/k5NezfGd
53v/2BHh8dGIXVefDRoRNeYJgemi+PZKzT+4ZrMqHCsEPg+0cGhVIRPNfdGcBMBw
JKb2JQAk8O4mFirywrWzsQjY84SUja+wt/HG2sQegxEkEk8HXIzJaTnrauE0edV5
di22bZJwwhpv0gm+C345qE728r5usVKeGo3+eta1ks0zTD/VqRIa2BtvNRxmPu5A
eCKhcnZT1aaxX7E1HqnwXaWFI18W0kpdKC4HT4WaUq+iDd717iVNeLkm+6nCuTyJ
9nEAhby1Tyq65w38pWSeuJEUscw/WHfH+x9MSEih1WgWdzfsk70ngtNCvRlctSS4
LehGbk3C6MZA7ImJsznWlHRWK1w0nC+zN6NsPHnpMAS9ABs3SjU3auv/QQIDAQAB
oz8wPTAdBgNVHQ4EFgQUK1E7Xi5jVIadAzo2SrN9O89OD7wwDwYDVR0TBAgwBgEB
/wIBADALBgNVHQ8EBAMCAQYwDQYJKoZIhvcNAQEMBQADggGBAK3eGVjAYsqIHpTR
QDGOiny2mhIPCqGzOcFhZUmniw2pO18YBblCApzX8fRowsQzeuQO/XDPTihXuOlU
T3atZigbUimm6j+0IGcTt5hGQ1gOFWls4g8XGLrG1Oazzh1XPnCMQkzcVAH63F88
oPSGk+7vw4y/mmYn+OKumXqDnqmIZ3wBx+10gr4rrn+NYeXIY990wzPKig1s9AoZ
EmdYgvWT1ALHSaZD02Q5DpT3J+x62wZ9KyTa66JfMspbhtNy94PsWngCwIdEKK1y
VUwf1Xjk3fpL2Ara8jRBQW5HomxhWxTg2Va41w0R77BxcTSPZl9uThccODRQocxF
dTKeM56+TE0RqLZI6yRd/qJ/Cxsx0gmzUEE6gORLMlZZN71vliDmRdsjAUp6Wt+Z
Ey8wEBgz/ABYpNBCDIw+7O3Ci0chOMFBjmX0slCWDjgYFnKFia7sE41TloO2ThAw
svDCKO1Gs+bcdlOdJfcX9Hi3KKdQ2kHzj6rdUvd9jY1kumIhoA==
-----END CERTIFICATE-----
I have no name!@new-kafka-release-client:/$  cat <<'EOF' >> /tmp/mirror-eventhub.config
> bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:12009
> client.id=mirror_maker_producer
> sasl.mechanism=SASL_MECHANISM
> sasl.plain.username=avnadmin
> sasl.plain.password=AVNS_******************
> security.protocol=SASL_SSL
> ssl.cafile="/tmp/ca.pem"
> EOF
I have no name!@new-kafka-release-client:/$ cat /tmp/mirror-eventhub.config
bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:12009
client.id=mirror_maker_producer
sasl.mechanism=SASL_MECHANISM
sasl.plain.username=avnadmin
sasl.plain.password=AVNS_******************
security.protocol=SASL_SSL
ssl.cafile="/tmp/ca.pem"

Unfortunately I got SSL errors trying to use mirror maker

$ kafka-mirror-maker.sh --consumer.config /tmp/source-kafka.config --num.streams 1 --producer.config /tmp/mirror-eventhub.config --whitelist=".*"

I then tried using just a JAAS config:

$ cat <<'EOF' >> /tmp/mirror-eventhub2.config
bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:12009
sasl.mechanism=SCRAM-SHA-256
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="avnadmin" password="AVNS_***********";
EOF

$ kafka-mirror-maker.sh --consumer.config /tmp/source-kafka.config --num.streams 1 --producer.config /tmp/mirror-eventhub2.config --whitelist=".*"

This also errored

[2022-11-20 19:19:49,043] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
[2022-11-20 19:19:49,845] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,852] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,939] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,939] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,090] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,091] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,091] ERROR [Producer clientId=producer-1] Connection to node -1 (kafka-226f19f8-isaac-1040.aivencloud.com/34.71.243.224:12009) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,093] WARN [Producer clientId=producer-1] Bootstrap broker kafka-226f19f8-isaac-1040.aivencloud.com:12009 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
...

I tried yet a third idea - perhaps I could bypass the SSL checks setting client.auth to none.

I have no name!@new-kafka-release-client:/$ cat <<'EOF' >> /tmp/mirror-eventhub3.config
> bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:12009
> sasl.mechanism=SCRAM-SHA-256
> security.protocol=SASL_SSL
> ssl.client.auth=none
> ssl.endpoint.identification.algorithm=
> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="avnadmin" password="AVNS_********************";
> EOF
I have no name!@new-kafka-release-client:/$ kafka-mirror-maker.sh --consumer.config /tmp/source-kafka.config --num.streams 1 --producer.config /tmp/mirror-eventhub3.config --whitelist=".*"
[2022-11-20 19:19:49,043] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
[2022-11-20 19:19:49,845] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,852] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,939] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,939] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,090] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,091] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,091] ERROR [Producer clientId=producer-1] Connection to node -1 (kafka-226f19f8-isaac-1040.aivencloud.com/34.71.243.224:12009) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,093] WARN [Producer clientId=producer-1] Bootstrap broker kafka-226f19f8-isaac-1040.aivencloud.com:12009 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
...

Pivot: switch to Ubuntu

I decided that I would follow the guide in quickconnect

/content/images/2022/11/aivenkafka-16.png

First, I made a long running Ubuntu pod

$ cat ubuntu.pod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: ubuntu
  labels:
    app: ubuntu
spec:
  containers:
  - name: ubuntu
    image: ubuntu:latest
    command: ["/bin/sleep", "3650d"]
    imagePullPolicy: IfNotPresent
  restartPolicy: Always


$ kubectl apply -f ubuntu.pod.yaml
pod/ubuntu created

Then loaded a default JDK and common tools

root@ubuntu:/# apt install default-jdk software-properties-common
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be...

I added the Python3 PPA

root@ubuntu:/# add-apt-repository ppa:deadsnakes/ppa

Repository: 'deb https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu/ jammy main'
Description:
This PPA contains more recent Python versions packaged for Ubuntu.

Disclaimer: there's no guarantee of timely updates in case of security problems or other issues. If you want to use them in a security-or-otherwise-critical environment (say, on a production server), you do so at your own risk.

Update Note
===========
Please use this repository instead of ppa:fkrull/deadsnakes.

Reporting Issues
================

Issues can be reported in the master issue tracker at:
https://github.com/deadsnakes/issues/issues
...

Then installed Python3.8

root@ubuntu:/# apt update
Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:5 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
11 packages can be upgraded. Run 'apt list --upgradable' to see them.


root@ubuntu:/# apt install python3.8
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  bzip2 file libgdbm-compat4 libgdbm6 libmagic-mgc libmagic1 libperl5.34 libpython3.8-minimal libpython3.8-stdlib mailcap mime-support netbase perl perl-base perl-modules-5.34 python3.8-minimal tzdata
Suggested packages:
  bzip2-doc gdbm-l10n perl-doc libterm-readline-gnu-perl | libterm-readline-perl-perl make libtap-harness-archive-perl python3.8-venv binutils binfmt-support
The following NEW packages will be installed:
  bzip2 file libgdbm-compat4 libgdbm6 libmagic-mgc libmagic1 libperl5.34 libpython3.8-minimal libpython3.8-stdlib mailcap mime-support netbase perl perl-modules-5.34 python3.8 python3.8-minimal tzdata
The following packages will be upgraded:
  perl-base
1 upgraded, 17 newly installed, 0 to remove and 10 not upgraded.
Need to get 15.7 MB of archives.
After this operation, 78.6 MB of additional disk space will be used.
Do you want to continue? [Y/n]

I needed to then added PIP

$ apt install python3-pip

To handle the Auth, the guide then had me add the Aiven client via PIP

root@ubuntu:/# python3 -m pip install aiven-client
Collecting aiven-client
  Downloading aiven_client-2.16.0-py3-none-any.whl (73 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 74.0/74.0 KB 1.8 MB/s eta 0:00:00
Collecting requests>=2.2.1
  Downloading requests-2.28.1-py3-none-any.whl (62 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.8/62.8 KB 6.3 MB/s eta 0:00:00
Collecting certifi>=2017.4.17
  Downloading certifi-2022.9.24-py3-none-any.whl (161 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 161.1/161.1 KB 12.4 MB/s eta 0:00:00
Collecting idna<4,>=2.5
  Downloading idna-3.4-py3-none-any.whl (61 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 61.5/61.5 KB 13.5 MB/s eta 0:00:00
Collecting charset-normalizer<3,>=2
  Downloading charset_normalizer-2.1.1-py3-none-any.whl (39 kB)
Collecting urllib3<1.27,>=1.21.1
  Downloading urllib3-1.26.12-py2.py3-none-any.whl (140 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 140.4/140.4 KB 22.9 MB/s eta 0:00:00
Installing collected packages: urllib3, idna, charset-normalizer, certifi, requests, aiven-client
Successfully installed aiven-client-2.16.0 certifi-2022.9.24 charset-normalizer-2.1.1 idna-3.4 requests-2.28.1 urllib3-1.26.12
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
root@ubuntu:/# python3 -m aiven.client user login isaac.johnson@gmail.com
isaac.johnson@gmail.com's Aiven password:
ERROR   command failed: Error: {"errors":[{"message":"Authentication failed","status":403}],"message":"Authentication failed"}

I got auth errors until I moved off of federated to a direct username/password

/content/images/2022/11/aivenkafka-20.png

root@ubuntu:/# python3 -m aiven.client user login isaac.johnson@gmail.com
isaac.johnson@gmail.com's Aiven password:
INFO    Aiven credentials written to: /root/.config/aiven/aiven-credentials.json
INFO    Default project set as 'isaac-1040' (change with 'avn project switch <project>')
root@ubuntu:/#

Now that we are logged in, we need to create a keystore into which to save our auth creds:

root@ubuntu:/# python3 -m aiven.client service user-kafka-java-creds kafka-226f19f8 --username avnadmin -d ~/ --password safePassword123
Downloaded to directory '/root/': CA certificate, certificate, key

To get the user passwords type:
avn service user-list --format '{username} {password}' --project isaac-1040 kafka-226f19f8
Certificate was added to keystore

Lastly, we can test

root@ubuntu:/# kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list kafka-226f19f8-isaac-1040.aivencloud.com:11998 --topic aiventest --producer.config ~/configuration.properties
[2022-11-20 14:07:57,380] ERROR Modification time of key store could not be obtained: client.keystore.p12 (org.apache.kafka.common.security.ssl.DefaultSslEngineFactory)
java.nio.file.NoSuchFileException: client.keystore.p12
        at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
        at java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
        at java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149)
        at java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
        at java.base/java.nio.file.Files.readAttributes(Files.java:1764)
        at java.base/java.nio.file.Files.getLastModifiedTime(Files.java:2315)
        at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.lastModifiedMs(DefaultSslEngineFactory.java:381)
        at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:346)
        at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:297)
        at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:161)
        at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:140)
        at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:97)
        at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:73)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
        at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:514)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
        at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:44)
        at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:468)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
        at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:44)
        at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore client.keystore.p12 of type PKCS12
        at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:375)
        at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:347)
        at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:297)
        at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:161)
        at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:140)
        at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:97)
        at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:73)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
        at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:514)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
        ... 5 more
Caused by: java.nio.file.NoSuchFileException: client.keystore.p12
        at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
        at java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
        at java.base/java.nio.file.Files.newByteChannel(Files.java:371)
        at java.base/java.nio.file.Files.newByteChannel(Files.java:422)
        at java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:420)
        at java.base/java.nio.file.Files.newInputStream(Files.java:156)
        at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:368)
        ... 16 more
root@ubuntu:/#

To get past the errors, I needed to just ensure password login was setup

/content/images/2022/11/aivenkafka-20.png

I created a client secret in a fresh java keystore:

$ python3 -m aiven.client service user-kafka-java-creds kafka-226f19f8 --username avnadmin -d ~/ --password safePassword123

Then made a configuration that would use it:

root@ubuntu:~# cat configuration.properties
security.protocol=SSL
ssl.protocol=TLS
ssl.keystore.type=PKCS12
ssl.keystore.location=client.keystore.p12
ssl.keystore.password=safePassword123
ssl.key.password=safePassword123
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=safePassword123
ssl.truststore.type=JKS

I could then use the kafka binaries to test

$ wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
$ tar -xzvf kafka_2.13-3.3.1.tgz

note: since the file lists ssl.truststore.location=client.truststore.jks, you’ll want to be in ~ when you run mirror maker or producer

Here we can see it producing just fine now

I now have a way to hit both my internal and external aiven.io Kafka clusters

root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list kafka-226f19f8-isaac-1040.aivencloud.com:11998 --topic aiventest --producer.config ~/configuration.properties
>This
S>till
>Works
root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list harbor.freshbrewed.science:9094 --topic test
>So
>Does
>This
>^Croot@ubuntu:~#

Mirror Maker setup and use

Next, I’ll create two local configs for MirrorMaker

root@ubuntu:~# cat localk8s.properties
bootstrap.servers=harbor.freshbrewed.science:9094
acks=1
batch.size=5
client.id=mirror_maker_producer


root@ubuntu:~# cat aiven.properties
bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:11998
client.id=mirror_maker_consumer
group.id=mirror_maker_consumer
exclude.internal.topics=true
security.protocol=SSL
ssl.protocol=TLS
ssl.keystore.type=PKCS12
ssl.keystore.location=client.keystore.p12
ssl.keystore.password=safePassword123
ssl.key.password=safePassword123
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=safePassword123
ssl.truststore.type=JKS

In the end, I tried both directions

root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-mirror-maker.sh --consumer.config aiven.properties --num.streams 1 --producer.config localk8s.properties --whitelist=".*"
[2022-11-21 07:03:25,499] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'

root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-mirror-maker.sh --producer.config aiven.properties --num.streams 1 --consumer.config localk8s.properties --whitelist=".*"
[2022-11-21 07:13:48,101] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
[2022-11-21 07:13:48,511] WARN These configurations '[group.id, exclude.internal.topics]' were supplied but are not used yet. (org.apache.kafka.clients.producer.ProducerConfig)
[2022-11-21 07:13:48,689] WARN These configurations '[acks, batch.size]' were supplied but are not used yet. (org.apache.kafka.clients.consumer.ConsumerConfig)

Neither seemed to replicate topics

/content/images/2022/11/aivenkafka-17.png

To eliminate my local cluster, I tried with an upstash kafka instance. That too failed to sync (either direction)

/content/images/2022/11/aivenkafka-18.png

… a short while later …

Once I had solved the Auto Topic create setting in the Kafka cluster (which we covered in Setup earlier), I was able to use the local MirorMaker

root@ubuntu:~# cat aiven.properties
bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:11998
client.id=mirror_maker_producer
group.id=mirror_maker_producer
acks=1
batch.size=5
exclude.internal.topics=true
security.protocol=SSL
ssl.protocol=TLS
ssl.keystore.type=PKCS12
ssl.keystore.location=client.keystore.p12
ssl.keystore.password=safePassword123
ssl.key.password=safePassword123
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=safePassword123
ssl.truststore.type=JKS

root@ubuntu:~# cat localk8s.properties
bootstrap.servers=192.168.1.38:9094
acks=1
batch.size=5
client.id=mirror_maker_consumer
group.id=mirror_maker_consumer

I fired up the MM instance and, in another shell using the existing Confluent container, I then started to push messages into my local Kafka

/content/images/2022/11/aivenkafka-62.png

root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-mirror-maker.sh --producer.config aiven.properties --num.streams 1 --consumer.config localk8s.properties --whitelist=".*"
[2022-11-22 06:31:52,052] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release 
...


I have no name!@new-kafka-release-client:/$ kafka-console-producer.sh --broker-list harbor.freshbrewed.science:9094 --topic test>asdf
>asdf
>asdf
...

I can now see them in Aiven.io Kafka

/content/images/2022/11/aivenkafka-63.png

Here is a quick demo

Using MirrorMaker in Aiven

First, I setup the config for Upstash in Integration Endpoints

/content/images/2022/11/aivenkafka-19.png

then I add an integration. You can see from the top how many attempts I’ve made to get MirrorMaker to work

/content/images/2022/11/aivenkafka-24.png

I selected the cluster I just added

/content/images/2022/11/aivenkafka-25.png

and give it a name

/content/images/2022/11/aivenkafka-26.png

we can now go to replication flows to create a replication flow

/content/images/2022/11/aivenkafka-27.png

Let’s first attempt to pull topics from the Upstash into Aiven

/content/images/2022/11/aivenkafka-28.png

I can see MirrorMaker kick in

/content/images/2022/11/aivenkafka-29.png

But yet again, nothing synced

/content/images/2022/11/aivenkafka-30.png

However, I cannot get either MirrorMaker to actually work.

Testing bi-directionally with Upstash.io

^Croot@ubuntu:~# cat upstash.properties
bootstrap.servers=well-octopus-6904-us1-kafka.upstash.io:9092
client.id=mirror_maker_consumer
group.id=mirror_maker_consumer
sasl.mechanism=SCRAM-SHA-256
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="**********************************************************" \
  password="******************************************==";

[2022-11-21 07:46:41,134] ERROR include list must be specified (kafka.tools.MirrorMaker$)
root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-mirror-maker.sh --producer.config localk8s.properties --num.streams 1 --consumer.config upstash.properties --whitelist="aiventest"
[2022-11-21 07:46:55,204] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
[2022-11-21 07:46:55,456] WARN These configurations '[group.id]' were supplied but are not used yet. (org.apache.kafka.clients.producer.ProducerConfig)



root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-mirror-maker.sh --producer.config upstash.properties --num.streams 1 --consumer.config localk8s.properties --whitelist="aiventest"
[2022-11-21 07:47:35,348] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
[2022-11-21 07:47:35,720] WARN These configurations '[group.id]' were supplied but are not used yet. (org.apache.kafka.clients.producer.ProducerConfig)
[2022-11-21 07:47:35,934] WARN These configurations '[acks, batch.size]' were supplied but are not used yet. (org.apache.kafka.clients.consumer.ConsumerConfig)

Neither worked

/content/images/2022/11/aivenkafka-31.png

Even reloaded and resetting the password didnt seem to work. I kept getting errors in MirrorMaker

/content/images/2022/11/aivenkafka-32.png

Azure Event Hub

I’ll try setting up a Kafka Endpoint in on of my favourite clouds, Azure. In Azure terminology, the “Event Hubs (Namespace)” is akin to the Cluster and the “Event Hub” (singular) is the Topic.

Create an Azure Event Hub

/content/images/2022/11/aivenkafka-36.png

Create a Topic

/content/images/2022/11/aivenkafka-35.png

Then add as an endpoint to Aiven.io

/content/images/2022/11/aivenkafka-34.png

Add as an integration (Cluster for Replication)

/content/images/2022/11/aivenkafka-37.png

We can pick the Az Event Hub

/content/images/2022/11/aivenkafka-38.png

and give it a name

/content/images/2022/11/aivenkafka-39.png

At first I was worried about the errors I saw about it not resolving Aiven.io’s Kafka cluster (a .local address)

/content/images/2022/11/aivenkafka-40.png

When I set it to replicate from Aiven.io to Azure Event Hub

/content/images/2022/11/aivenkafka-41.png

We finally saw some positive results!

/content/images/2022/11/aivenkafka-42.png

And we can see that synched it both directions (as we setup in our flows)

/content/images/2022/11/aivenkafka-43.png

Create a topic in AzEH

/content/images/2022/11/aivenkafka-44.png

We can test the syncing by using a confluent CLI to Produce and Consume from the topic then see it replicated to Aiven from Azure via MirrorMaker

/content/images/2022/11/aivenkafka-45.png

As we can see, the only limit now, with regards to EventHub is that we top out at 10 Topics per Event Hub. This is a fixed limit in the Basic and Standard Tiers of Azure Event Hubs but if we moved to the Premium SKU it would be 100 per pu, or 1000 in the Dedicated Tier.

Logging and Metrics

We can see project level events in “Event Logs” in Aiven

/content/images/2022/11/aivenkafka-46.png

and our individual services also capture logs as well

/content/images/2022/11/aivenkafka-47.png

We can view basic metrics, from MirrorMaker, for example:

/content/images/2022/11/aivenkafka-48.png

and Kafka

/content/images/2022/11/aivenkafka-49.png

However, if we want more, it’s pretty easy to plug in Datadog here as well

In Datadog, go to Organization Settings and Add a new API key under Access

/content/images/2022/11/aivenkafka-51.png

When you click create, you’ll get your Key and Key ID - we’ll need that KEY

/content/images/2022/11/aivenkafka-52.png

In Aiven.io, from Integration Endpoints, select Datadog

/content/images/2022/11/aivenkafka-50.png

For this we’ll use that Key we created

/content/images/2022/11/aivenkafka-53.png

and I can now see it as an option

/content/images/2022/11/aivenkafka-54.png

In our service, we need to first add this to the Integrations

/content/images/2022/11/aivenkafka-55.png

Picking Datadog for Metrics

/content/images/2022/11/aivenkafka-56.png

We should not be sending service metrics to DD

/content/images/2022/11/aivenkafka-57.png

which we can now query

/content/images/2022/11/aivenkafka-58.png

and of course, metrics we query we can make into dashboards and alerts.

Perhaps we want to Alert when our Kafka project hits high loads. And we want warnings to go to Teams but alerts to hit up Pagerduty.

/content/images/2022/11/aivenkafka-59.png

Summary

We’ve explored a lot today with Aiven.io Kafka. We Setup Kafka Clusters in Aiven.io, Azure Event Hub, locally in Kubernetes, and while I didn’t detail it out, in Upstash as well.

We worked on configuring MirrorMaker locally and in Aiven and managed to get it to connect bi-directionally with Azure Event Hub. We also setup and testing our local MirrorMaker to sync from a Helm installed Kafka cluster through to our Aiven.io one.

To be honest, we just scratched the surface on this. I will be adding a follow-up blog to explore Kafka Connect, more Sink options and more service integrations, such as Google Logging Workspaces.

My only issue, and it’s really quite minor, is the cost. I put in the referral link that gets both us more spend, but I like very cheap things (as I’m just a blogger). Thus, the lack of a free tier might be the only thing that holds me up using Aiven.io more.

Blog Art

Another fun AI Generated image from Midjourney was used in todays post; here it is full size - Franz Kafka holding a Red Crab (which itself is Kafkaesque):

/content/images/2022/11/isaacjohnson_Franz_Kafka_holding_a_bright_glowing_red_crab_dyna_0ec8d629-a4f1-4db6-afb3-66195982248d.png

kafka aiven kubernetes

Have something to add? Feedback? Try our new forums

Isaac Johnson

Isaac Johnson

Cloud Solutions Architect

Isaac is a CSA and DevOps engineer who focuses on cloud migrations and devops processes. He also is a dad to three wonderful daughters (hence the references to Princess King sprinkled throughout the blog).

Theme built by C.S. Rhymes