Published: Nov 15, 2022 by Isaac Johnson
Today we will explore using Aiven.io and their managed Kafka and Kafka MirrorMaker service. Aiven integrates with five clouds; AWS, GCP, Azure, DigitalOcean and UpCloud and has VPC Private endpoints in AWS, GCP and Azure.
I was interested to fire up a Trial account and check out some of their SaaS offerings such as Kafka and Kafka Connect. But to be fair, this is just a subset. To date, they offer Kafka, Postgres, MySQL, Elasticsearch (now OpenSearch), Apache Cassandra, Redis, InfluxDB, Grafana, M3DB, M3 Aggregator, ClickHouse and Apache Fink.
Today we’ll setup Kafka, show how to use it and integrate with MirrorMaker. We’ll also setup a local Kafka Cluster in Kubernetes and (attempt) to connect it with MirrorMaker. Lastly, we’ll checkout logs and metrics and how we can integrate with Datadog.
Company Profile
Aiven.io is a Finnish company based out of Helsinki whose founders started out managing infrastructure for companies such as F-Secure and Nokia. To date they support 11 Open Source products in over 100 regions and integration in five clouds (Azure, AWS, GCP, DigitalOcean and UpCloud).
Their pricing model is satisfyingly simplistic. Just a “Plan” with machine classes. You can add Support as well, but at its base, you pay like a PaaS.
While small compared to perhaps Amazon and Microsoft, as of 2021 it was valued over a US$1B and one of Finland’s most valuable growth companies. A fun fact noted in the Finish Wikipedia page is that they were founded via Twitter in 2016 where they got their first customer, a Mexican Truck Company. According to Forbes, they are growning with 100-147% year over year headcount and revenue increases and over 700 customers including Comcast and Toyota.
Aiven setup
FYI: This link will get you $600 in credit compared to the standard $300 (which is what I got). Enjoy.
We can signup on their website using federated creds (in my case Google) or a username/password. You don’t need a CC to sign up.
Once signed up, you’ll create your first project then be able to create services
In my case, the first thing I wanted to create was a Kafka Cluster
In setting up Kafka, for some of the syncing we plan to do later, make sure to enable the REST API
Also, a few of my issues I encountered later in this blog were merely because I neglected to allow Topic Auto create. This is a setting (by default false) you can change in the “advanced configuration” at bottom of the Kafka details page in Overview
Setting up Kafka in Kubernetes
In order to have something with which to sync, we’ll also want to setup and (attempt to) expose it externally for MirrorMaker
I used a Kafka I already installed with Helm
$ helm status new-kafka-release
NAME: new-kafka-release
LAST DEPLOYED: Tue Jul 26 15:46:57 2022
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 18.0.3
APP VERSION: 3.2.0
** Please be patient while the chart is being deployed **
Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
new-kafka-release.default.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
new-kafka-release-0.new-kafka-release-headless.default.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following commands:
kubectl run new-kafka-release-client --restart='Never' --image docker.io/bitnami/kafka:3.2.0-debian-11-r12 --namespace default --command -- sleep infinity
kubectl exec --tty -i new-kafka-release-client --namespace default -- bash
PRODUCER:
kafka-console-producer.sh \
--broker-list new-kafka-release-0.new-kafka-release-headless.default.svc.cluster.local:9092 \
--topic test
CONSUMER:
kafka-console-consumer.sh \
--bootstrap-server new-kafka-release.default.svc.cluster.local:9092 \
--topic test \
--from-beginning
We can test this using a producer and consumer in a container as detailed above in the helm output ( note: you can always get that again with helm status (release name)
)
builder@DESKTOP-QADGF36:~$ kubectl exec --tty -i new-kafka-release-client --namespace default -- bash
I have no name!@new-kafka-release-client:/$ kafka-console-consumer.sh --bootstrap-server harbor.freshbrewed.science:9092 --topic test --from-beginning
1
2
3
;
4
5
6
This
Is
A
Test
Hi
Fresh
Brewed!
Here
are
some
Testing the Producer and Consumer together
Externalizing our Kubernetes Chart
Below are all the ways I tried to externalized Apache Kafka. Perhaps you’ll see some key flaw in my patterns. In the end I could hit the ‘external’ endpoint, but only from within my cluster which leads me to believe there is still yet more redirects to local IPs
Try 1: Creating a quick external LB Service
To create the external LoadBalancer, I based it on the existing service
$ kubectl get svc new-kafka-release -o yaml
apiVersion: v1
kind: Service
metadata:
annotations:
meta.helm.sh/release-name: new-kafka-release
meta.helm.sh/release-namespace: default
creationTimestamp: "2022-07-26T20:46:59Z"
labels:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: new-kafka-release
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: kafka
helm.sh/chart: kafka-18.0.3
name: new-kafka-release
namespace: default
resourceVersion: "167487"
uid: dbd17254-c2a0-4d0d-8f17-eb9559499ca9
spec:
clusterIP: 10.43.127.27
clusterIPs:
- 10.43.127.27
internalTrafficPolicy: Cluster
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
ports:
- name: tcp-client
port: 9092
protocol: TCP
targetPort: kafka-client
selector:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: new-kafka-release
app.kubernetes.io/name: kafka
sessionAffinity: None
type: ClusterIP
status:
loadBalancer: {}
To create the external Ingress
$ cat kafka.ingress.yaml
apiVersion: v1
kind: Service
metadata:
name: new-kafka-release-ext
namespace: default
spec:
ports:
- name: tcp-client
port: 9092
protocol: TCP
targetPort: kafka-client
selector:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: new-kafka-release
app.kubernetes.io/name: kafka
sessionAffinity: None
type: LoadBalancer
$ kubectl apply -f kafka.ingress.yaml
service/new-kafka-release-ext created
I could see it was now covering 9092 on all the nodes
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
...snip...
new-kafka-release-ext LoadBalancer 10.43.33.168 192.168.1.38,192.168.1.57,192.168.1.77 9092:31280/TCP 3s
I exposed on 9092 on my router so harbor.freshbrewed.science
could route traffic.
I found I could not consume from the outside URL
I have no name!@new-kafka-release-client:/$ kafka-console-consumer.sh --bootstrap-server harbor.freshbrewed.science:9092 --topic test --from-beginning
[2022-11-17 22:07:35,973] WARN [Consumer clientId=console-consumer, groupId=console-consumer-47115] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-17 22:07:56,333] WARN [Consumer clientId=console-consumer, groupId=console-consumer-47115] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
^CProcessed a total of 0 messages
When I dug, I realized it was because the bootstrap server wasn’t aware of it’s eternalized URL thus it redirected to an internal ‘svc.local’ address (which works inside k8s, but not so hot outside the cluster)
Try 2: Update the Helm Chart for Kubernetes Kafka
On my next pass, I tried to tease out the right values for the right chart
$ cat kafka-helm-values2.yaml
autoCreateTopicsEnable: true
global:
storageClass: managed-nfs-storage
logsDirs: /bitnami/kafka/data/logs
You’ll see I set a few things with “–set” below and that was mostly because I was trying different permutations of externalAccess settings.
Install:
$ helm install -f kafka-helm-values2.yaml next-kafka-release bitnami/kafka --version 18.4.2 --set rbac.create=true,externalAccess.enabled=true,externalAccess.service.type=LoadBalancer,externalAccess.autoDiscovery.enabled=true
NAME: next-kafka-release
LAST DEPLOYED: Sat Nov 19 09:45:42 2022
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 18.4.2
APP VERSION: 3.2.1
---------------------------------------------------------------------------------------------
WARNING
By specifying "serviceType=LoadBalancer" and not configuring the authentication
you have most likely exposed the Kafka service externally without any
authentication mechanism.
For security reasons, we strongly suggest that you switch to "ClusterIP" or
"NodePort". As alternative, you can also configure the Kafka authentication.
---------------------------------------------------------------------------------------------
** Please be patient while the chart is being deployed **
Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
next-kafka-release.default.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
next-kafka-release-0.next-kafka-release-headless.default.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following commands:
kubectl run next-kafka-release-client --restart='Never' --image docker.io/bitnami/kafka:3.2.1-debian-11-r16 --namespace default --command -- sleep infinity
kubectl exec --tty -i next-kafka-release-client --namespace default -- bash
PRODUCER:
kafka-console-producer.sh \
--broker-list next-kafka-release-0.next-kafka-release-headless.default.svc.cluster.local:9092 \
--topic test
CONSUMER:
kafka-console-consumer.sh \
--bootstrap-server next-kafka-release.default.svc.cluster.local:9092 \
--topic test \
--from-beginning
To connect to your Kafka server from outside the cluster, follow the instructions below:
NOTE: It may take a few minutes for the LoadBalancer IPs to be available.
Watch the status with: 'kubectl get svc --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=next-kafka-release,app.kubernetes.io/component=kafka,pod" -w'
Kafka Brokers domain: You will have a different external IP for each Kafka broker. You can get the list of external IPs using the command below:
echo "$(kubectl get svc --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=next-kafka-release,app.kubernetes.io/component=kafka,pod" -o jsonpath='{.items[*].status.loadBalancer.ingress[0].ip}' | tr ' ' '\n')"
Kafka Brokers port: 9094
I can run the command to see the primary external IP
$ echo "$(kubectl get svc --namespace default -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=next-kafka-release,app.kubernetes.io/component=kafka,pod" -o jsonpath='{.items[*].status.loadBalancer.ingress[0].ip}' | tr ' ' '\n')"
192.168.1.38
Which I used on the router.
That said, I can see from the services that we are exposing 9094
on all the nodes
$ kubectl get svc | grep kafka
new-kafka-release-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP
115d
new-kafka-release ClusterIP 10.43.127.27 <none> 9092/TCP
115d
new-kafka-release-headless ClusterIP None <none> 9092/TCP,9093/TCP
115d
new-kafka-release-zookeeper ClusterIP 10.43.101.205 <none> 2181/TCP,2888/TCP,3888/TCP
115d
next-kafka-release-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP
153m
next-kafka-release-headless ClusterIP None <none> 9092/TCP,9093/TCP
153m
next-kafka-release ClusterIP 10.43.1.8 <none> 9092/TCP
153m
next-kafka-release-zookeeper ClusterIP 10.43.73.64 <none> 2181/TCP,2888/TCP,3888/TCP
153m
next-kafka-release-0-external LoadBalancer 10.43.100.162 192.168.1.38,192.168.1.57,192.168.1.77 9094:30054/TCP
153m
Local Testing:
$ kubectl run next-kafka-release-client --restart='Never' --image docker.io/bitnami/kafka:3.2.1-debian-11-r16 --namespace default --command -- sleep infinity
pod/next-kafka-release-client created
$ kubectl exec --tty -i next-kafka-release-client --namespace default -- bash
I have no name!@next-kafka-release-client:/$ kafka-console-producer.sh --broker-list next-kafka-release-0.next-kafka-release-headless.default.svc.cluster.local:9092 --topic test
>a
[2022-11-19 18:22:08,241] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2022-11-19 18:22:08,344] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2022-11-19 18:22:08,449] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 6 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>fresh
>topic
>is
>updated
>
And I can test from the consumer side
$ kubectl exec --tty -i next-kafka-release-client --namespace default -- bash
I have no name!@next-kafka-release-client:/$ kafka-console-consumer.sh --bootstrap-server next-kafka-release.default.svc.cluster.local:9092 --topic test --from-beginning
a
fresh
topic
is
updated
The external IP did not work, however.
builder@DESKTOP-72D2D9T:~/Workspaces/jekyll-blog$ ~/confluent-7.0.1/bin/kafka-console-consumer --bootstrap-server harbor.freshbrewed.science:9094 --topic test --from-beginning
But if I’m in my network
$ ~/confluent-7.0.1/bin/kafka-console-consumer --bootstrap-server 192.168.1.77:9094 --topic test --from-beginning
a
fresh
topic
is
updated
$ ~/confluent-7.0.1/bin/kafka-console-consumer --bootstrap-server harbor.freshbrewed.science:9094 --topic test --from-beginning
a
fresh
topic
is
updated
^CProcessed a total of 5 messages
Local Mirror Maker
I decided to Pivot. I knew that my Aiven cluster worked as I could send messages via the UI:
In the testing container, I knew the MirrorMaker tool suite would exist so I saved some files locally. The lack of an editor in the bin tools nor sudoers meant I could not add vim, thus the cat file redirection to tmp
I have no name!@new-kafka-release-client:/$ cat <<'EOF' >> /tmp/source-kafka.config
> bootstrap.servers=harbor.freshbrewed.science:9092
> group.id=example-mirrormaker-group
> exclude.internal.topics=true
> client.id=mirror_maker_consumer
> EOF
I have no name!@new-kafka-release-client:/$ cat /tmp/source-kafka.config
bootstrap.servers=harbor.freshbrewed.science:9092
group.id=example-mirrormaker-group
exclude.internal.topics=true
client.id=mirror_maker_consumer
I believe I need to add the ca.pem to use it later. My quick trick was to cat the file to base64 locally (e.g. cat /mnt/c/Users/isaac/Downloads/ca.pem | base64 -w 0
then use it below)
client.id=mirror_maker_consumer
I have no name!@new-kafka-release-client:/$ which base64
/usr/bin/base64
I have no name!@new-kafka-release-client:/$ echo LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUVRVENDQXFtZ0F3SUJBZ0lVUGUzeGk0eUhUTVlIM2JpV1lGMFNNOHQ4bXFFd0RRWUpLb1pJaHZjTkFRRU0KQlFBd09qRTRNRFlHQTFVRUF3d3ZNRE0yTjJFd1pEVXRPV1V4TXkwME1XUmlMVGxoTmpVdE0yVmxNakUwTXpGbApNbVF3SUZCeWIycGxZM1FnUTBFd0hoY05Nakl4TVRFMk1qQTFNRFV3V2hjTk16SXhNVEV6TWpBMU1EVXdXakE2Ck1UZ3dOZ1lEVlFRRERDOHdNelkzWVRCa05TMDVaVEV6TFRReFpHSXRPV0UyTlMwelpXVXlNVFF6TVdVeVpEQWcKVUhKdmFtVmpkQ0JEUVRDQ0FhSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnR1BBRENDQVlvQ2dnR0JBTUVpVW9LbApZcFRRVDhNb3FhRUdVNFBLNmpPSjltTnJ1RERuZzg0bXN1Y0dCeGplUkVBVUI3OThvRm84d2FYaVArd3p0TmpBCnVPSnRENWpzdEhlNEF1UE1aelAyVTFSZW1TZTdKdTVhWVJubEVJVnV1VVFpeExEVkxSNGJsV28vazVOZXpmR2QKNTN2LzJCSGg4ZEdJWFZlZkRSb1JOZVlKZ2VtaStQWkt6VCs0WnJNcUhDc0VQZyswY0doVklSUE5mZEdjQk1CdwpKS2IySlFBazhPNG1GaXJ5d3JXenNRalk4NFNVamErd3QvSEcyc1FlZ3hFa0VrOEhYSXpKYVRucmF1RTBlZFY1CmRpMjJiWkp3d2hwdjBnbStDMzQ1cUU3MjhyNXVzVktlR28zK2V0YTFrczB6VEQvVnFSSWEyQnR2TlJ4bVB1NUEKZUNLaGNuWlQxYWF4WDdFMUhxbndYYVdGSTE4VzBrcGRLQzRIVDRXYVVxK2lEZDcxN2lWTmVMa20rNm5DdVR5Sgo5bkVBaGJ5MVR5cTY1dzM4cFdTZXVKRVVzY3cvV0hmSCt4OU1TRWloMVdnV2R6ZnNrNzBuZ3ROQ3ZSbGN0U1M0CkxlaEdiazNDNk1aQTdJbUpzem5XbEhSV0sxdzBuQyt6TjZOc1BIbnBNQVM5QUJzM1NqVTNhdXYvUVFJREFRQUIKb3o4d1BUQWRCZ05WSFE0RUZnUVVLMUU3WGk1alZJYWRBem8yU3JOOU84OU9EN3d3RHdZRFZSMFRCQWd3QmdFQgovd0lCQURBTEJnTlZIUThFQkFNQ0FRWXdEUVlKS29aSWh2Y05BUUVNQlFBRGdnR0JBSzNlR1ZqQVlzcUlIcFRSClFER09pbnkybWhJUENxR3pPY0ZoWlVtbml3MnBPMThZQmJsQ0Fwelg4ZlJvd3NRemV1UU8vWERQVGloWHVPbFUKVDNhdFppZ2JVaW1tNmorMElHY1R0NWhHUTFnT0ZXbHM0ZzhYR0xyRzFPYXp6aDFYUG5DTVFremNWQUg2M0Y4OApvUFNHays3dnc0eS9tbVluK09LdW1YcURucW1JWjN3QngrMTBncjRycm4rTlllWElZOTkwd3pQS2lnMXM5QW9aCkVtZFlndldUMUFMSFNhWkQwMlE1RHBUM0oreDYyd1o5S3lUYTY2SmZNc3BiaHROeTk0UHNXbmdDd0lkRUtLMXkKVlV3ZjFYamszZnBMMkFyYThqUkJRVzVIb214aFd4VGcyVmE0MXcwUjc3QnhjVFNQWmw5dVRoY2NPRFJRb2N4RgpkVEtlTTU2K1RFMFJxTFpJNnlSZC9xSi9DeHN4MGdtelVFRTZnT1JMTWxaWk43MXZsaURtUmRzakFVcDZXdCtaCkV5OHdFQmd6L0FCWXBOQkNESXcrN08zQ2kwY2hPTUZCam1YMHNsQ1dEamdZRm5LRmlhN3NFNDFUbG9PMlRoQXcKc3ZEQ0tPMUdzK2JjZGxPZEpmY1g5SGkzS0tkUTJrSHpqNnJkVXZkOWpZMWt1bUlob0E9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== | base64 --decode > /tmp/ca.pem
I have no name!@new-kafka-release-client:/$ cat /tmp/ca.pem
-----BEGIN CERTIFICATE-----
MIIEQTCCAqmgAwIBAgIUPe3xi4yHTMYH3biWYF0SM8t8mqEwDQYJKoZIhvcNAQEM
BQAwOjE4MDYGA1UEAwwvMDM2N2EwZDUtOWUxMy00MWRiLTlhNjUtM2VlMjE0MzFl
MmQwIFByb2plY3QgQ0EwHhcNMjIxMTE2MjA1MDUwWhcNMzIxMTEzMjA1MDUwWjA6
MTgwNgYDVQQDDC8wMzY3YTBkNS05ZTEzLTQxZGItOWE2NS0zZWUyMTQzMWUyZDAg
UHJvamVjdCBDQTCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAMEiUoKl
YpTQT8MoqaEGU4PK6jOJ9mNruDDng84msucGBxjeREAUB798oFo8waXiP+wztNjA
uOJtD5jstHe4AuPMZzP2U1RemSe7Ju5aYRnlEIVuuUQixLDVLR4blWo/k5NezfGd
53v/2BHh8dGIXVefDRoRNeYJgemi+PZKzT+4ZrMqHCsEPg+0cGhVIRPNfdGcBMBw
JKb2JQAk8O4mFirywrWzsQjY84SUja+wt/HG2sQegxEkEk8HXIzJaTnrauE0edV5
di22bZJwwhpv0gm+C345qE728r5usVKeGo3+eta1ks0zTD/VqRIa2BtvNRxmPu5A
eCKhcnZT1aaxX7E1HqnwXaWFI18W0kpdKC4HT4WaUq+iDd717iVNeLkm+6nCuTyJ
9nEAhby1Tyq65w38pWSeuJEUscw/WHfH+x9MSEih1WgWdzfsk70ngtNCvRlctSS4
LehGbk3C6MZA7ImJsznWlHRWK1w0nC+zN6NsPHnpMAS9ABs3SjU3auv/QQIDAQAB
oz8wPTAdBgNVHQ4EFgQUK1E7Xi5jVIadAzo2SrN9O89OD7wwDwYDVR0TBAgwBgEB
/wIBADALBgNVHQ8EBAMCAQYwDQYJKoZIhvcNAQEMBQADggGBAK3eGVjAYsqIHpTR
QDGOiny2mhIPCqGzOcFhZUmniw2pO18YBblCApzX8fRowsQzeuQO/XDPTihXuOlU
T3atZigbUimm6j+0IGcTt5hGQ1gOFWls4g8XGLrG1Oazzh1XPnCMQkzcVAH63F88
oPSGk+7vw4y/mmYn+OKumXqDnqmIZ3wBx+10gr4rrn+NYeXIY990wzPKig1s9AoZ
EmdYgvWT1ALHSaZD02Q5DpT3J+x62wZ9KyTa66JfMspbhtNy94PsWngCwIdEKK1y
VUwf1Xjk3fpL2Ara8jRBQW5HomxhWxTg2Va41w0R77BxcTSPZl9uThccODRQocxF
dTKeM56+TE0RqLZI6yRd/qJ/Cxsx0gmzUEE6gORLMlZZN71vliDmRdsjAUp6Wt+Z
Ey8wEBgz/ABYpNBCDIw+7O3Ci0chOMFBjmX0slCWDjgYFnKFia7sE41TloO2ThAw
svDCKO1Gs+bcdlOdJfcX9Hi3KKdQ2kHzj6rdUvd9jY1kumIhoA==
-----END CERTIFICATE-----
I have no name!@new-kafka-release-client:/$ cat <<'EOF' >> /tmp/mirror-eventhub.config
> bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:12009
> client.id=mirror_maker_producer
> sasl.mechanism=SASL_MECHANISM
> sasl.plain.username=avnadmin
> sasl.plain.password=AVNS_******************
> security.protocol=SASL_SSL
> ssl.cafile="/tmp/ca.pem"
> EOF
I have no name!@new-kafka-release-client:/$ cat /tmp/mirror-eventhub.config
bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:12009
client.id=mirror_maker_producer
sasl.mechanism=SASL_MECHANISM
sasl.plain.username=avnadmin
sasl.plain.password=AVNS_******************
security.protocol=SASL_SSL
ssl.cafile="/tmp/ca.pem"
Unfortunately I got SSL errors trying to use mirror maker
$ kafka-mirror-maker.sh --consumer.config /tmp/source-kafka.config --num.streams 1 --producer.config /tmp/mirror-eventhub.config --whitelist=".*"
I then tried using just a JAAS config:
$ cat <<'EOF' >> /tmp/mirror-eventhub2.config
bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:12009
sasl.mechanism=SCRAM-SHA-256
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="avnadmin" password="AVNS_***********";
EOF
$ kafka-mirror-maker.sh --consumer.config /tmp/source-kafka.config --num.streams 1 --producer.config /tmp/mirror-eventhub2.config --whitelist=".*"
This also errored
[2022-11-20 19:19:49,043] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
[2022-11-20 19:19:49,845] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,852] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,939] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,939] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,090] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,091] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,091] ERROR [Producer clientId=producer-1] Connection to node -1 (kafka-226f19f8-isaac-1040.aivencloud.com/34.71.243.224:12009) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,093] WARN [Producer clientId=producer-1] Bootstrap broker kafka-226f19f8-isaac-1040.aivencloud.com:12009 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
...
I tried yet a third idea - perhaps I could bypass the SSL checks setting client.auth to none.
I have no name!@new-kafka-release-client:/$ cat <<'EOF' >> /tmp/mirror-eventhub3.config
> bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:12009
> sasl.mechanism=SCRAM-SHA-256
> security.protocol=SASL_SSL
> ssl.client.auth=none
> ssl.endpoint.identification.algorithm=
> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="avnadmin" password="AVNS_********************";
> EOF
I have no name!@new-kafka-release-client:/$ kafka-mirror-maker.sh --consumer.config /tmp/source-kafka.config --num.streams 1 --producer.config /tmp/mirror-eventhub3.config --whitelist=".*"
[2022-11-20 19:19:49,043] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
[2022-11-20 19:19:49,845] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,852] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,939] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:49,939] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,090] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Connection to node -1 (harbor.freshbrewed.science/73.242.50.46:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,091] WARN [Consumer clientId=example-mirrormaker-group-0, groupId=example-mirrormaker-group] Bootstrap broker harbor.freshbrewed.science:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,091] ERROR [Producer clientId=producer-1] Connection to node -1 (kafka-226f19f8-isaac-1040.aivencloud.com/34.71.243.224:12009) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2022-11-20 19:19:50,093] WARN [Producer clientId=producer-1] Bootstrap broker kafka-226f19f8-isaac-1040.aivencloud.com:12009 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
...
Pivot: switch to Ubuntu
I decided that I would follow the guide in quickconnect
First, I made a long running Ubuntu pod
$ cat ubuntu.pod.yaml
apiVersion: v1
kind: Pod
metadata:
name: ubuntu
labels:
app: ubuntu
spec:
containers:
- name: ubuntu
image: ubuntu:latest
command: ["/bin/sleep", "3650d"]
imagePullPolicy: IfNotPresent
restartPolicy: Always
$ kubectl apply -f ubuntu.pod.yaml
pod/ubuntu created
Then loaded a default JDK and common tools
root@ubuntu:/# apt install default-jdk software-properties-common
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be...
I added the Python3 PPA
root@ubuntu:/# add-apt-repository ppa:deadsnakes/ppa
Repository: 'deb https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu/ jammy main'
Description:
This PPA contains more recent Python versions packaged for Ubuntu.
Disclaimer: there's no guarantee of timely updates in case of security problems or other issues. If you want to use them in a security-or-otherwise-critical environment (say, on a production server), you do so at your own risk.
Update Note
===========
Please use this repository instead of ppa:fkrull/deadsnakes.
Reporting Issues
================
Issues can be reported in the master issue tracker at:
https://github.com/deadsnakes/issues/issues
...
Then installed Python3.8
root@ubuntu:/# apt update
Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:5 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
11 packages can be upgraded. Run 'apt list --upgradable' to see them.
root@ubuntu:/# apt install python3.8
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
bzip2 file libgdbm-compat4 libgdbm6 libmagic-mgc libmagic1 libperl5.34 libpython3.8-minimal libpython3.8-stdlib mailcap mime-support netbase perl perl-base perl-modules-5.34 python3.8-minimal tzdata
Suggested packages:
bzip2-doc gdbm-l10n perl-doc libterm-readline-gnu-perl | libterm-readline-perl-perl make libtap-harness-archive-perl python3.8-venv binutils binfmt-support
The following NEW packages will be installed:
bzip2 file libgdbm-compat4 libgdbm6 libmagic-mgc libmagic1 libperl5.34 libpython3.8-minimal libpython3.8-stdlib mailcap mime-support netbase perl perl-modules-5.34 python3.8 python3.8-minimal tzdata
The following packages will be upgraded:
perl-base
1 upgraded, 17 newly installed, 0 to remove and 10 not upgraded.
Need to get 15.7 MB of archives.
After this operation, 78.6 MB of additional disk space will be used.
Do you want to continue? [Y/n]
I needed to then added PIP
$ apt install python3-pip
To handle the Auth, the guide then had me add the Aiven client via PIP
root@ubuntu:/# python3 -m pip install aiven-client
Collecting aiven-client
Downloading aiven_client-2.16.0-py3-none-any.whl (73 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 74.0/74.0 KB 1.8 MB/s eta 0:00:00
Collecting requests>=2.2.1
Downloading requests-2.28.1-py3-none-any.whl (62 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.8/62.8 KB 6.3 MB/s eta 0:00:00
Collecting certifi>=2017.4.17
Downloading certifi-2022.9.24-py3-none-any.whl (161 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 161.1/161.1 KB 12.4 MB/s eta 0:00:00
Collecting idna<4,>=2.5
Downloading idna-3.4-py3-none-any.whl (61 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 61.5/61.5 KB 13.5 MB/s eta 0:00:00
Collecting charset-normalizer<3,>=2
Downloading charset_normalizer-2.1.1-py3-none-any.whl (39 kB)
Collecting urllib3<1.27,>=1.21.1
Downloading urllib3-1.26.12-py2.py3-none-any.whl (140 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 140.4/140.4 KB 22.9 MB/s eta 0:00:00
Installing collected packages: urllib3, idna, charset-normalizer, certifi, requests, aiven-client
Successfully installed aiven-client-2.16.0 certifi-2022.9.24 charset-normalizer-2.1.1 idna-3.4 requests-2.28.1 urllib3-1.26.12
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
root@ubuntu:/# python3 -m aiven.client user login isaac.johnson@gmail.com
isaac.johnson@gmail.com's Aiven password:
ERROR command failed: Error: {"errors":[{"message":"Authentication failed","status":403}],"message":"Authentication failed"}
I got auth errors until I moved off of federated to a direct username/password
root@ubuntu:/# python3 -m aiven.client user login isaac.johnson@gmail.com
isaac.johnson@gmail.com's Aiven password:
INFO Aiven credentials written to: /root/.config/aiven/aiven-credentials.json
INFO Default project set as 'isaac-1040' (change with 'avn project switch <project>')
root@ubuntu:/#
Now that we are logged in, we need to create a keystore into which to save our auth creds:
root@ubuntu:/# python3 -m aiven.client service user-kafka-java-creds kafka-226f19f8 --username avnadmin -d ~/ --password safePassword123
Downloaded to directory '/root/': CA certificate, certificate, key
To get the user passwords type:
avn service user-list --format '{username} {password}' --project isaac-1040 kafka-226f19f8
Certificate was added to keystore
Lastly, we can test
root@ubuntu:/# kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list kafka-226f19f8-isaac-1040.aivencloud.com:11998 --topic aiventest --producer.config ~/configuration.properties
[2022-11-20 14:07:57,380] ERROR Modification time of key store could not be obtained: client.keystore.p12 (org.apache.kafka.common.security.ssl.DefaultSslEngineFactory)
java.nio.file.NoSuchFileException: client.keystore.p12
at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
at java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149)
at java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
at java.base/java.nio.file.Files.readAttributes(Files.java:1764)
at java.base/java.nio.file.Files.getLastModifiedTime(Files.java:2315)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.lastModifiedMs(DefaultSslEngineFactory.java:381)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:346)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:297)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:161)
at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:140)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:97)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:73)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:514)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:44)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:468)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:44)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore client.keystore.p12 of type PKCS12
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:375)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:347)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:297)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:161)
at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:140)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:97)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:73)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:514)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
... 5 more
Caused by: java.nio.file.NoSuchFileException: client.keystore.p12
at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
at java.base/java.nio.file.Files.newByteChannel(Files.java:371)
at java.base/java.nio.file.Files.newByteChannel(Files.java:422)
at java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:420)
at java.base/java.nio.file.Files.newInputStream(Files.java:156)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:368)
... 16 more
root@ubuntu:/#
To get past the errors, I needed to just ensure password login was setup
I created a client secret in a fresh java keystore:
$ python3 -m aiven.client service user-kafka-java-creds kafka-226f19f8 --username avnadmin -d ~/ --password safePassword123
Then made a configuration that would use it:
root@ubuntu:~# cat configuration.properties
security.protocol=SSL
ssl.protocol=TLS
ssl.keystore.type=PKCS12
ssl.keystore.location=client.keystore.p12
ssl.keystore.password=safePassword123
ssl.key.password=safePassword123
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=safePassword123
ssl.truststore.type=JKS
I could then use the kafka binaries to test
$ wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
$ tar -xzvf kafka_2.13-3.3.1.tgz
note: since the file lists ssl.truststore.location=client.truststore.jks
, you’ll want to be in ~ when you run mirror maker or producer
Here we can see it producing just fine now
I now have a way to hit both my internal and external aiven.io Kafka clusters
root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list kafka-226f19f8-isaac-1040.aivencloud.com:11998 --topic aiventest --producer.config ~/configuration.properties
>This
S>till
>Works
root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list harbor.freshbrewed.science:9094 --topic test
>So
>Does
>This
>^Croot@ubuntu:~#
Mirror Maker setup and use
Next, I’ll create two local configs for MirrorMaker
root@ubuntu:~# cat localk8s.properties
bootstrap.servers=harbor.freshbrewed.science:9094
acks=1
batch.size=5
client.id=mirror_maker_producer
root@ubuntu:~# cat aiven.properties
bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:11998
client.id=mirror_maker_consumer
group.id=mirror_maker_consumer
exclude.internal.topics=true
security.protocol=SSL
ssl.protocol=TLS
ssl.keystore.type=PKCS12
ssl.keystore.location=client.keystore.p12
ssl.keystore.password=safePassword123
ssl.key.password=safePassword123
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=safePassword123
ssl.truststore.type=JKS
In the end, I tried both directions
root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-mirror-maker.sh --consumer.config aiven.properties --num.streams 1 --producer.config localk8s.properties --whitelist=".*"
[2022-11-21 07:03:25,499] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-mirror-maker.sh --producer.config aiven.properties --num.streams 1 --consumer.config localk8s.properties --whitelist=".*"
[2022-11-21 07:13:48,101] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
[2022-11-21 07:13:48,511] WARN These configurations '[group.id, exclude.internal.topics]' were supplied but are not used yet. (org.apache.kafka.clients.producer.ProducerConfig)
[2022-11-21 07:13:48,689] WARN These configurations '[acks, batch.size]' were supplied but are not used yet. (org.apache.kafka.clients.consumer.ConsumerConfig)
Neither seemed to replicate topics
To eliminate my local cluster, I tried with an upstash kafka instance. That too failed to sync (either direction)
… a short while later …
Once I had solved the Auto Topic create setting in the Kafka cluster (which we covered in Setup earlier), I was able to use the local MirorMaker
root@ubuntu:~# cat aiven.properties
bootstrap.servers=kafka-226f19f8-isaac-1040.aivencloud.com:11998
client.id=mirror_maker_producer
group.id=mirror_maker_producer
acks=1
batch.size=5
exclude.internal.topics=true
security.protocol=SSL
ssl.protocol=TLS
ssl.keystore.type=PKCS12
ssl.keystore.location=client.keystore.p12
ssl.keystore.password=safePassword123
ssl.key.password=safePassword123
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=safePassword123
ssl.truststore.type=JKS
root@ubuntu:~# cat localk8s.properties
bootstrap.servers=192.168.1.38:9094
acks=1
batch.size=5
client.id=mirror_maker_consumer
group.id=mirror_maker_consumer
I fired up the MM instance and, in another shell using the existing Confluent container, I then started to push messages into my local Kafka
root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-mirror-maker.sh --producer.config aiven.properties --num.streams 1 --consumer.config localk8s.properties --whitelist=".*"
[2022-11-22 06:31:52,052] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release
...
I have no name!@new-kafka-release-client:/$ kafka-console-producer.sh --broker-list harbor.freshbrewed.science:9094 --topic test>asdf
>asdf
>asdf
...
I can now see them in Aiven.io Kafka
Here is a quick demo
Using MirrorMaker in Aiven
First, I setup the config for Upstash in Integration Endpoints
then I add an integration. You can see from the top how many attempts I’ve made to get MirrorMaker to work
I selected the cluster I just added
and give it a name
we can now go to replication flows to create a replication flow
Let’s first attempt to pull topics from the Upstash into Aiven
I can see MirrorMaker kick in
But yet again, nothing synced
However, I cannot get either MirrorMaker to actually work.
Testing bi-directionally with Upstash.io
^Croot@ubuntu:~# cat upstash.properties
bootstrap.servers=well-octopus-6904-us1-kafka.upstash.io:9092
client.id=mirror_maker_consumer
group.id=mirror_maker_consumer
sasl.mechanism=SCRAM-SHA-256
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="**********************************************************" \
password="******************************************==";
[2022-11-21 07:46:41,134] ERROR include list must be specified (kafka.tools.MirrorMaker$)
root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-mirror-maker.sh --producer.config localk8s.properties --num.streams 1 --consumer.config upstash.properties --whitelist="aiventest"
[2022-11-21 07:46:55,204] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
[2022-11-21 07:46:55,456] WARN These configurations '[group.id]' were supplied but are not used yet. (org.apache.kafka.clients.producer.ProducerConfig)
root@ubuntu:~# /kafka_2.13-3.3.1/bin/kafka-mirror-maker.sh --producer.config upstash.properties --num.streams 1 --consumer.config localk8s.properties --whitelist="aiventest"
[2022-11-21 07:47:35,348] WARN This tool is deprecated and may be removed in a future major release. (kafka.tools.MirrorMaker$)
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
[2022-11-21 07:47:35,720] WARN These configurations '[group.id]' were supplied but are not used yet. (org.apache.kafka.clients.producer.ProducerConfig)
[2022-11-21 07:47:35,934] WARN These configurations '[acks, batch.size]' were supplied but are not used yet. (org.apache.kafka.clients.consumer.ConsumerConfig)
Neither worked
Even reloaded and resetting the password didnt seem to work. I kept getting errors in MirrorMaker
Azure Event Hub
I’ll try setting up a Kafka Endpoint in on of my favourite clouds, Azure. In Azure terminology, the “Event Hubs (Namespace)” is akin to the Cluster and the “Event Hub” (singular) is the Topic.
Create an Azure Event Hub
Create a Topic
Then add as an endpoint to Aiven.io
Add as an integration (Cluster for Replication)
We can pick the Az Event Hub
and give it a name
At first I was worried about the errors I saw about it not resolving Aiven.io’s Kafka cluster (a .local address)
When I set it to replicate from Aiven.io to Azure Event Hub
We finally saw some positive results!
And we can see that synched it both directions (as we setup in our flows)
Create a topic in AzEH
We can test the syncing by using a confluent CLI to Produce and Consume from the topic then see it replicated to Aiven from Azure via MirrorMaker
As we can see, the only limit now, with regards to EventHub is that we top out at 10 Topics per Event Hub. This is a fixed limit in the Basic and Standard Tiers of Azure Event Hubs but if we moved to the Premium SKU it would be 100 per pu, or 1000 in the Dedicated Tier.
Logging and Metrics
We can see project level events in “Event Logs” in Aiven
and our individual services also capture logs as well
We can view basic metrics, from MirrorMaker, for example:
and Kafka
However, if we want more, it’s pretty easy to plug in Datadog here as well
In Datadog, go to Organization Settings and Add a new API key under Access
When you click create, you’ll get your Key and Key ID - we’ll need that KEY
In Aiven.io, from Integration Endpoints, select Datadog
For this we’ll use that Key we created
and I can now see it as an option
In our service, we need to first add this to the Integrations
Picking Datadog for Metrics
We should not be sending service metrics to DD
which we can now query
and of course, metrics we query we can make into dashboards and alerts.
Perhaps we want to Alert when our Kafka project hits high loads. And we want warnings to go to Teams but alerts to hit up Pagerduty.
Summary
We’ve explored a lot today with Aiven.io Kafka. We Setup Kafka Clusters in Aiven.io, Azure Event Hub, locally in Kubernetes, and while I didn’t detail it out, in Upstash as well.
We worked on configuring MirrorMaker locally and in Aiven and managed to get it to connect bi-directionally with Azure Event Hub. We also setup and testing our local MirrorMaker to sync from a Helm installed Kafka cluster through to our Aiven.io one.
To be honest, we just scratched the surface on this. I will be adding a follow-up blog to explore Kafka Connect, more Sink options and more service integrations, such as Google Logging Workspaces.
My only issue, and it’s really quite minor, is the cost. I put in the referral link that gets both us more spend, but I like very cheap things (as I’m just a blogger). Thus, the lack of a free tier might be the only thing that holds me up using Aiven.io more.
Blog Art
Another fun AI Generated image from Midjourney was used in todays post; here it is full size - Franz Kafka holding a Red Crab (which itself is Kafkaesque):