Published: Apr 15, 2021 by Isaac Johnson
We’ve covered pubsub and secrets in detail. Daprcan also provide service-to-service discovery and routing much like istio (less the encryption). Dapr’s binding component allows us to route to external services for input and output. This can include pubsub to kafka, or posting to topics or consuming from feeds. We will explore this with Azure Event Grid Topics, AWS SNS and consuming a Twitter query.
Service to service connections
We can port forward to one service, for instance our Perl service:
builder@DESKTOP-72D2D9T:~/Workspaces/dapr-quickstarts$ kubectl get pods | grep perl
perl-subscriber-7b4457c4bf-4zghq 2/2 Running 0 6d
builder@DESKTOP-72D2D9T:~/Workspaces/dapr-quickstarts$ kubectl port-forward perl-subscriber-7b4457c4bf-4zghq 3500:3500
Forwarding from 127.0.0.1:3500 -> 3500
Forwarding from [::1]:3500 -> 3500
Handling connection for 3500
Then on another session, we can hit a different service altogether, say the python subscriber.
$ kubectl get pods -l app=python-subscriber
NAME READY STATUS RESTARTS AGE
python-subscriber-6fd5dc7f8c-46v2t 2/2 Running 0 12d
$ kubectl describe pod python-subscriber-6fd5dc7f8c-46v2t | grep app-id
Annotations: dapr.io/app-id: python-subscriber
Since all the subscribers, for dapr, have a “/dapr/subscribe” endpoint for pub-sub, so let’s try that
builder@DESKTOP-72D2D9T:~$ curl http://localhost:3500/v1.0/invoke/python-subscriber/method/dapr/subscribe
[{"pubsubname":"pubsub","route":"A","topic":"A"},{"pubsubname":"pubsub","route":"C","topic":"C"}]
What that shows is invoke/$theDaprApp/method/$DaprAppEndpoint ….
We could even encapsulate that within our code.
my %dispatch = (
'/dapr/subscribe' => \&resp_subscribe,
'/hello' => \&resp_hello,
'/A' => \&resp_A,
'/B' => \&resp_B,
'/C' => \&resp_C,
'/D' => \&resp_D,
'/PYC' => \&resp_PythonCheck,
# ...
);
…
sub resp_PythonCheck {
my $cgi = shift; # CGI.pm object
return if !ref $cgi;
my $methodsURL = "http://localhost:3500/v1.0/invoke";
print $cgi->header('application/json');
# azure kv
my $cmd = "curl -H 'Content-Type: application/json' $methodsURL/python-subscriber/method/dapr/subscribe”;
print STDERR "\ncmd: $cmd\n";
my $rc =`$cmd`;
print STDERR "\n$rc\n";
print STDERR "\n";
}
I built and pushed that as idjohnson/dapr-perl:v18
Validation
$ curl -X POST http://localhost:8080/PYC -H 'Content-Type: application/json'
$ kubectl logs perl-subscriber-bdddff584-c7ktg perl-subscriber
running on 8080
MyWebServer: You can connect to your server at http://localhost:8080/
cmd: curl -H 'Content-Type: application/json' http://localhost:3500/v1.0/invoke/python-subscriber/method/dapr/subscribe
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 98 100 98 0 0 4900 0 --:--:-- --:--:-- --:--:-- 4900
[{"pubsubname":"pubsub","route":"A","topic":"A"},{"pubsubname":"pubsub","route":"C","topic":"C"}]
We can see the output below:
So what does this mean? We can use Dapr.io to abstract away the service name or even the implementation details for RESTful services. Merely setting the app-id and knowing the endpoint is sufficient to call the method throughDdapr sidecars
Because our invocation is going to localhost:3500, we are entrusting Dapr with the routing. This is similar to service meshes like Linkerd, Istio or Consul. While we can engage with Dapr via HTTP or gRPC, the communication between sidecars is done entirely with gRPC (Googles Open Source Remote Procedure Call interface). Thus, unlike Istio or Consul for instance, this does not encrypt traffic. If you need mTLS, it’s better to dig into a service mesh solution that provides mutual TLS.
Dapr Bindings
Let’s create an Azure Event Grid Topic to test Dapr bindings
First create a resource group:
$ az group create -n idjeventgridrg --location centralus
In the Even Topics blade, we can make the event grid topic with default values:
From there we can get the topic access keys. e.g. 3nkpme1TF1VVDi5PI91may6aD2owm9HsEe5X6E8UPyA=
Looking at the binding spec,we can see that we just need the topic endpoint and keys
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: <name>
spec:
type: bindings.azure.eventgrid
version: v1
metadata:
# Required Input Binding Metadata
- name: tenantId
value: "[AzureTenantId]"
- name: subscriptionId
value: "[AzureSubscriptionId]"
- name: clientId
value: "[ClientId]"
- name: clientSecret
value: "[ClientSecret]"
- name: subscriberEndpoint
value: "[SubscriberEndpoint]"
- name: handshakePort
value: [HandshakePort]
- name: scope
value: "[Scope]"
# Optional Input Binding Metadata
- name: eventSubscriptionName
value: "[EventSubscriptionName]"
# Required Output Binding Metadata
- name: accessKey
value: "[AccessKey]"
- name: topicEndpoint
value: "[TopicEndpoint]
The topic endpoint is on the main page for the topic: https://mytopicname.eastus-1.eventgrid.azure.net/api/events
We can also easily retrieve it from the CLI
$ az eventgrid topic list -o table
Endpoint InputSchema Location MetricResourceId Name ProvisioningState PublicNetworkAccess ResourceGroup
----------------------------------------------------------- --------------- ---------- ------------------------------------ ----------- ------------------- --------------------- ---------------
https://mytopicname.eastus-1.eventgrid.azure.net/api/events EventGridSchema eastus 0e925654-14cc-42ef-b8ca-1c01bd231f69 mytopicname Succeeded Enabled idjeventgridrg
Next, let’s update our deployment to use the binding:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azegtopic
spec:
type: bindings.azure.eventgrid
version: v1
metadata:
- name: accessKey
value: "3nkpme1TF1VVDi5PI91may6aD2owm9HsEe5X6E8UPyA="
- name: topicEndpoint
value: "https://mytopicname.eastus-1.eventgrid.azure.net/api/events"
Let’s first just apply this binding
$ cat testbinding.yml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azegtopic
spec:
type: bindings.azure.eventgrid
version: v1
metadata:
- name: accessKey
value: "3nkpme1TF1VVDi5PI91may6aD2owm9HsEe5X6E8UPyA="
- name: topicEndpoint
value: "https://mytopicname.eastus-1.eventgrid.azure.net/api/events"
$ kubectl get nodes
NAME STATUS ROLES AGE VERSION
isaac-macbookair Ready master 109d v1.19.5+k3s2
isaac-macbookpro Ready <none> 109d v1.19.5+k3s2
$ kubectl apply -f testbinding.yml
component.dapr.io/azegtopic created
Validation
Now let’s test an event to the topic
I’ll bounce the perl mod just in case the dapr pod has issues (good sanity check). I would expect the bindings to be immediately available without needing to cycle a pod
$ kubectl get pods | grep perl
perl-subscriber-bdddff584-c7ktg 2/2 Running 0 11h
$ kubectl delete pod perl-subscriber-bdddff584-c7ktg
pod "perl-subscriber-bdddff584-c7ktg" deleted
$ kubectl get pods | grep perl
perl-subscriber-bdddff584-j5bvn 2/2 Running 0 45s
Now let’s port forward and send an event
$ kubectl port-forward perl-subscriber-bdddff584-j5bvn 3500:3500
Forwarding from 127.0.0.1:3500 -> 3500
Forwarding from [::1]:3500 -> 3500
Handling connection for 3500
Handling connection for 3500
Handling connection for 3500
Handling connection for 3500
The event payload
$ cat eventpayload.json | jq
{
"data": {
"subject": "/dapr/myevent",
"eventType": "Microsoft.Storage.BlobCreated",
"eventTime": "2021-03-26T18:41:00.9584103Z",
"id": "831e1650-001e-001b-66ab-eeb76e069631",
"data": {
"api": "PutBlockList",
"clientRequestId": "6d79dbfb-0e37-4fc4-981f-442c9ca65760",
"requestId": "831e1650-001e-001b-66ab-eeb76e000000",
"eTag": "0x8D4BCC2E4835CD0",
"contentType": "application/octet-stream",
"contentLength": 524288,
"blobType": "BlockBlob",
"url": "https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob",
"sequencer": "00000000000004420000000000028963",
"storageDiagnostics": {
"batchId": "b68529f3-68cd-4744-baa4-3c0498ec19f0"
}
},
"dataVersion": "",
"metadataVersion": "1"
},
"operation": "create"
}
And sending the payload
$ curl -X POST -H 'Content-Type: application/json' http://localhost:3500/v1.0/bindings/azegtopic -d @eventpayload.json
I then tweaked the payload a bit (different id and time) and sent that:
$ cat eventpayload2.json | jq
{
"data": [
{
"subject": "/dapr/myevent",
"eventType": "Microsoft.Storage.BlobCreated",
"eventTime": "2021-03-26T19:45:00.9584103Z",
"id": "831e1650-ffff-001b-66ab-eeb76e069631",
"data": {
"api": "PutBlockList",
"clientRequestId": "6d79dbfb-0e37-4fc4-981f-442c9ca65760",
"requestId": "831e1650-001e-001b-66ab-eeb76e000000",
"eTag": "0x8D4BCC2E4835CD0",
"contentType": "application/octet-stream",
"contentLength": 524288,
"blobType": "BlockBlob",
"url": "https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob",
"sequencer": "00000000000004420000000000028963",
"storageDiagnostics": {
"batchId": "b68529f3-68cd-4744-baa4-3c0498ec19f0"
}
},
"dataVersion": "",
"metadataVersion": "1"
}
],
"operation": "create"
}
$ curl -X POST -H 'Content-Type: application/json' http://localhost:3500/v1.0/bindings/azegtopic -d @eventpayload2.json
Pushing a few times, I can see this reflected in metrics for unmatched events:
To use the event grid topic inbound would require properly exposing dapr externally for the purpose of a subscription. Since I don’t want to do that, i’ll point you to the documentation if you want to setup Dapr behind a proper TLS NGinx ingress: https://docs.dapr.io/reference/components-reference/supported-bindings/eventgrid/
I perhaps made this more complicated than needbe by accepting the default event grid schema
$ az eventgrid topic show -g idjeventgridrg -n mytopicname | jq .inputSchema
"EventGridSchema"
As such a basic “hello” message would be rejected for violating schema
$ curl -X POST -H 'Content-Type: application/json' http://localhost:3500/v1.0/bindings/azegtopic -d '{ "data": { "message": "Hi!" }, "operation": "create" }'
{"errorCode":"ERR_INVOKE_OUTPUT_BINDING","message":"error when invoke output binding azegtopic: {\r\n \"error\": {\r\n \"code\": \"BadRequest\",\r\n \"message\": \"This resource is configured to receive event in 'EventGridEvent' schema. The JSON received does not conform to the expected schema. Token Expected: StartArray, Actual Token Received: StartObject. Report '0cef821f-048c-4d45-865e-2a152f98953f:0:4/15/2021 2:42:45 PM (UTC)' to our forums for assistance or raise a support ticket.\",\r\n \"details\": [{\r\n \"code\": \"InputJsonInvalid\",\r\n \"message\": \"This resource is configured to receive event in 'EventGridEvent' schema. The JSON received does not conform to the expected schema. Token Expected: StartArray, Actual Token Received: StartObject. Report '0cef821f-048c-4d45-865e-2a152f98953f:0:4/15/2021 2:42:45 PM (UTC)' to our forums for assistance or raise a support ticket.\"\r\n }]\r\n }\r\n}"}
We can create a new topic with more basic schema
We can then create new binding
$ cat testbinding2.yml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: azegtopic2
spec:
type: bindings.azure.eventgrid
version: v1
metadata:
- name: accessKey
value: "JYLTFtN89DQeVtl8Xm0a5R0PkKIRZ8N6tapZsmD0sDk="
- name: topicEndpoint
value: "https://mybasictopic.centralus-1.eventgrid.azure.net/api/events"
$ kubectl apply -f testbinding2.yml
component.dapr.io/azegtopic2 created
Testing proved i did need to bounce the pod
$ curl -X POST -H 'Content-Type: application/json' http://localhost:3500/v1.0/bindings/azegtopic2 -d @eventpayload3.json
{"errorCode":"ERR_INVOKE_OUTPUT_BINDING","message":"error when invoke output binding azegtopic2: couldn't find output binding azegtopic2"}
We can use a simple payload:
$ cat eventpayload3.json
{
"data":
[
{
"subject": "/dapr/myevent",
"id": "831e1650-001e-001b-66ab-eeb76e069631"
}
],
"operation": "create"
}
invokation
$ curl -X POST -H 'Content-Type: application/json' http://localhost:3500/v1.0/bindings/azegtopic2 -d @eventpayload3.json
And see that reflected in the metrics page for the topic
We can fire another
$ cat eventpayload4.json
{
"data":
[
{
"subject": "/dapr/myevent2",
"id": "831e1650-001e-ffff-66ab-eeb76e069631"
}
],
"operation": "create"
}
$ curl -X POST -H 'Content-Type: application/json' http://localhost:3500/v1.0/bindings/azegtopic2 -d @eventpayload4.json
And see that reflected
]We could subscribe to the topic or even do a basic alert run to trigger when we see events, if, perhaps this event grid was a problem queue. Clicking the “New alert rule” would let us specify that:
Cleanup
I don’t plan to keep those Event Grid Topics around, so we can easily delete them.
$ kubectl get components
NAME AGE
pubsub 13d
azurekeyvault 7d12h
awssecretstore 7d2h
azegtopic 62m
azegtopic2 22m
$ kubectl delete component azegtopic && kubectl delete component azegtopic2
component.dapr.io "azegtopic" deleted
component.dapr.io "azegtopic2" deleted
Cleanup in Azure
$ az eventgrid topic list -o table
Endpoint InputSchema Location MetricResourceId Name ProvisioningState PublicNetworkAccess ResourceGroup
--------------------------------------------------------------- ----------------- ---------- ------------------------------------ ------------ ------------------- --------------------- ---------------
https://mytopicname.eastus-1.eventgrid.azure.net/api/events EventGridSchema eastus 0e925654-14cc-42ef-b8ca-1c01bd231f69 mytopicname Succeeded Enabled idjeventgridrg
https://mybasictopic.centralus-1.eventgrid.azure.net/api/events CustomEventSchema centralus b8762a68-2f6b-451f-a608-e82f207b3f24 mybasictopic Succeeded Enabled idjeventgridrg
Verify
$ az eventgrid topic delete -g idjeventgridrg -n mytopicname
$ az eventgrid topic delete -g idjeventgridrg -n mybasictopic
$ az eventgrid topic list -o table
We can also delete the resource group in Azure to be sure
AWS SNS
It seems if we are trying to do a simple notification system, AWS SNS would work pretty well too. The docs for the SNS binding are fairly straightforward.
I already have a handy utility SNS Topic called NotifyMe
Let’s rotate the Access Key on the last IAM user we used for secrets management, arn:aws:iam::095928337644:user/daprIamUser
Now with a secret
We just need the region, topicarn and access key/secret to use it
$ cat bindingsns.yml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: snsnotify
spec:
type: bindings.aws.sns
version: v1
metadata:
- name: topicArn
value: "arn:aws:sns:us-east-1:095928337644:NotifyMe"
- name: region
value: us-east-1
- name: accessKey
value: "AKIARMVOGITWH53M7SGF"
- name: secretKey
value: " *******masked************"
- name: sessionToken
value: ""
$ kubectl apply -f bindingsns.yml
component.dapr.io/snsnotify created
We can now rotate a pod and try hitting the endpoint
$ kubectl port-forward perl-subscriber-bdddff584-qpc64 3500:3500
Forwarding from 127.0.0.1:3500 -> 3500
Forwarding from [::1]:3500 -> 3500
Handling connection for 3500
Handling connection for 3500
$ curl -X POST -H 'Content-Type: application/json' http://localhost:3500/v1.0/bindings/snsnotify -d '{ "data": { "message": "Hi!" }, "operation": "create" }'
And now we see that reflected… almost immediately i got an email:
And of course we could put a subject in there within the payload
$ curl -X POST -H 'Content-Type: application/json' http://localhost:3500/v1.0/bindings/snsnotify -d '{ "data": { "message": "Hi!", "subject": "sample subject" }, "operation": "create" }'
And this could easily be handled in code if we wanted:
sub resp_SNSNotify {
my $cgi = shift; # CGI.pm object
return if !ref $cgi;
my $methodsURL = "http://localhost:3500/v1.0/bindings";
print $cgi->header('application/json');
my $cmd = "curl -X POST -H 'Content-Type: application/json' $methodsURL/snsnotify
-d '{ \"data\": { \"message\": \"Hi!\", \"subject\": \"sample subject\" }, \"operation\": \"create\" }'";
print STDERR "\ncmd: $cmd\n";
my $rc =`$cmd`;
print STDERR "\n$rc\n";
print STDERR "\n";
}
The value here, just like the secrets engine, is that my microservice has no idea about AWS credentials or even topic name. In fact, we could parameterize that and pass it in as a setting in the deployment yaml.
The microservice uses the Dapr sidecar to assume there is a binding called “snsnotify”. And Dapr hands all the heavy lifting on connecting to AWS. Additionally, if we were to automatically rotate the secret for the IAM, we would merely need to change the binding once.
This allows our services to focus on their work, not communication protocols. I could just as easily drop AWS SNS for another supported bindingsuch as GCP pubsubor Twitter. As long as the binding could accept that data payload, it would just work seamlessly in the pod.
Create the twitter binding, but use the other bindings name
Still, we can prove the idea:
$ kubectl delete component snsnotify
component.dapr.io "snsnotify" deleted
Create the twitter binding, but use the other bindings name
$ cat binding-twitter.yml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: snsnotify
spec:
type: bindings.twitter
version: v1
metadata:
- name: consumerKey
value: " ************************"
- name: consumerSecret
value: " ************************"
- name: accessToken
value: " ************************"
- name: accessSecret
Value: "" ************************"
$ kubectl apply -f binding-twitter.yml
component.dapr.io/snsnotify created
Delete the pod to rotate and port-forward
$ kubectl port-forward perl-subscriber-bdddff584-4qq9n 3500:3500
Forwarding from 127.0.0.1:3500 -> 3500
Forwarding from [::1]:3500 -> 3500
Handling connection for 3500
And then a quick test of someone I follow on Twitter
$ curl -X POST -H 'Content-Type: application/json' http://localhost:3500/v1.0/bindings/snsnotify -d '{ "data": "", "metadata": { "query": "from:MKBHD", "result":"recent" }, "operation": "get" }'
[{"coordinates":null,"created_at":"Thu Apr 15 16:37:06 +0000 2021","current_user_retweet":null,"entities":{"hashtags":[],"media":null,"urls":[],"user_mentions":[{"indices":[0,12],"id":15779865,"id_str":"15779865","name":"Candice Poon","screen_name":"candicepoon"}]},"favorite_count":194,"favorited":false,"filter_level":"","id":1382734741191987202,"id_str":"1382734741191987202","in_reply_to_screen_name":"candicepoon","in_reply_to_status_id":1382734074733764614,"in_reply_to_status_id_str":"1382734074733764614","in_reply_to_user_id":15779865,"in_reply_to_user_id_str":"15779865","lang":"en","possibly_sensitive":false,"quote_count":0,"reply_count":0,"retweet_count":0,"retweeted":false,"retweeted_status":null,"source":"\u003ca href=\"https://tapbots.com/software/tweetbot/mac\" rel=\"nofollow\"\u003eTweetbot for Mac\u003c/a\u003e","scopes":null,"text":"@candicepoon That was an option??!","full_text":"","display_text_range":[0,0],"place":null,"truncated":false,"user":{"contributors_enabled":false,"created_at":"Thu Apr 09 00:50:00 +0000 2009","default_profile":false,"default_profile_image":false,"description":"Web Video Producer | ⋈ | Pro Ultimate Frisbee Player | Host of @WVFRM","email":"","entities":{"url":{"hashtags":null,"media":null,"urls":[{"indices":[0,23],"display_url":"youtube.com/MKBHD","expanded_url":"http://youtube.com/MKBHD","url":"https://t.co/zGcF5QxPBI"}],"user_mentions":null},"description":{"hashtags":null,"media":null,"urls":[],"user_mentions":null}},"favourites_count":26928,"follow_request_sent":false,"following":true,"followers_count":5035732,"friends_count":409,"geo_enabled":true,"id":29873662,"id_str":"29873662","is_translator":false,"lang":"","listed_count":11933,"location":"NYC","name":"Marques Brownle
And i would expect to see the pull increase in time
Summary
We explored using Dapr to route between services. We then explored several published bindings including Azure Event Grid Topics, AWS Simple Notification Service (SNS) and Twitter. In all cases, the bindings were accessible from a Dapr app without having to modify the deployment of the app. However, we did need to rotate the pod to make them live.
Using Dapr we can interact with external service and routing to internal services with minimal work. We take the burden of service mesh implementation out of the picture so we can focus on our microservice work. I definitely plan to continue using the SNS notification as a low cost notification system from pods.