Dapr Revisit: PubSub

Published: Jul 26, 2022 by Isaac Johnson

Dapr.io is a framework we have covered multiple times before. However, it has continued to improve and evolve over time.

Today we will explore the Pubsub component which provides basic message queues via Dapr sidecars. We will leverage Redis, Kafka, GCP Pub/Sub and Azure Service Bus.

We’ll demo this with the Python and NodeJs quickstarts and show how easy it is to swap message queue providers.

Installation of Dapr

Since it has been a while since we last played with Dapr.io, let’ check the version of our local client

$ dapr -v
CLI version: 1.5.0
Runtime version: n/a

And we can see the app version in the Helm install as well

$ helm list -n dapr-system
NAME    NAMESPACE       REVISION        UPDATED                                 STATUS          CHART           APP VERSION
dapr    dapr-system     1               2022-06-21 11:58:31.583706874 -0500 CDT deployed        dapr-1.7.4      1.7.4

Since we are using the binary 1.5 and the app of 1.7.4, let’s update to the latest

$ wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash
Getting the latest Dapr CLI...
Your system is linux_amd64

Dapr CLI is detected:
CLI version: 1.5.0
Runtime version: n/a
Reinstalling Dapr CLI - /usr/local/bin/dapr...

Installing v1.8.0 Dapr CLI...
Downloading https://github.com/dapr/cli/releases/download/v1.8.0/dapr_linux_amd64.tar.gz ...
[sudo] password for builder:
dapr installed into /usr/local/bin successfully.
CLI version: 1.8.0
Runtime version: n/a

To get started with Dapr, please visit https://docs.dapr.io/getting-started/

Verification:

$ dapr version
CLI version: 1.8.0
Runtime version: n/a

There is a Helm install method for Kubernetes, but I’ve had the best success using the integrated “-k” method.

First, uninstall the prior

$ dapr uninstall -k
ℹ️  Removing Dapr from your cluster...
✅  Dapr has been removed successfully

The install the fresh version

$ dapr init -k
⌛  Making the jump to hyperspace...
ℹ️  Note: To install Dapr using Helm, see here: https://docs.dapr.io/getting-started/install-dapr-kubernetes/#install-with-helm-advanced

ℹ️  Container images will be pulled from Docker Hub
✅  Deploying the Dapr control plane to your cluster...
✅  Success! Dapr has been installed to namespace dapr-system. To verify, run `dapr status -k' in your terminal. To get started, go here: https://aka.ms/dapr-getting-started

We can now see our client and kubernetes system is 1.8

$ dapr -v
CLI version: 1.8.0
Runtime version: n/a


$ helm list -n dapr-system
NAME    NAMESPACE       REVISION        UPDATED                                 STATUS          CHART           APP VERSION
dapr    dapr-system     1               2022-07-20 20:26:13.8871589 -0500 CDT   deployed        dapr-1.8.1      1.8.1

We want to wait until we see the pods start up in the dapr-system namespaces

$ kubectl get pods -n dapr-system
NAME                                     READY   STATUS    RESTARTS   AGE
dapr-dashboard-86554d644d-g6tmb          1/1     Running   0          80s
dapr-sentry-cff79d84f-bbl2p              1/1     Running   0          80s
dapr-placement-server-0                  1/1     Running   0          80s
dapr-operator-6d4ff998-ljdrs             1/1     Running   0          80s
dapr-sidecar-injector-7d886f5686-qjr2p   1/1     Running   0          80s

Adding Redis

Our next step is to add a redis, if we haven’t already


$ helm repo add bitnami https://charts.bitnami.com/bitnami
"bitnami" already exists with the same configuration, skipping
builder@DESKTOP-72D2D9T:~/Workspaces/jekyll-blog$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "metallb" chart repository
...Successfully got an update from the "actions-runner-controller" chart repository
...Successfully got an update from the "lifen-charts" chart repository
...Successfully got an update from the "cribl" chart repository
...Successfully got an update from the "adwerx" chart repository
...Successfully got an update from the "novum-rgi-helm" chart repository
...Successfully got an update from the "hashicorp" chart repository
...Successfully got an update from the "harbor" chart repository
...Successfully got an update from the "datadog" chart repository
...Successfully got an update from the "jenkins" chart repository
...Successfully got an update from the "argo-cd" chart repository
...Successfully got an update from the "gitlab" chart repository
...Successfully got an update from the "bitnami" chart repository
...Successfully got an update from the "stable" chart repository
Update Complete. ⎈Happy Helming!⎈
builder@DESKTOP-72D2D9T:~/Workspaces/jekyll-blog$ helm install redis bitnami/redis
NAME: redis
LAST DEPLOYED: Wed Jul 20 20:51:30 2022
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: redis
CHART VERSION: 17.0.2
APP VERSION: 7.0.4

** Please be patient while the chart is being deployed **

Redis® can be accessed on the following DNS names from within your cluster:

    redis-master.default.svc.cluster.local for read/write operations (port 6379)
    redis-replicas.default.svc.cluster.local for read-only operations (port 6379)



To get your password run:

    export REDIS_PASSWORD=$(kubectl get secret --namespace default redis -o jsonpath="{.data.redis-password}" | base64 -d)

To connect to your Redis® server:

1. Run a Redis® pod that you can use as a client:

   kubectl run --namespace default redis-client --restart='Never'  --env REDIS_PASSWORD=$REDIS_PASSWORD  --image docker.io/bitnami/redis:7.0.4-debian-11-r2 --command -- sleep infinity

   Use the following command to attach to the pod:

   kubectl exec --tty -i redis-client \
   --namespace default -- bash

2. Connect using the Redis® CLI:
   REDISCLI_AUTH="$REDIS_PASSWORD" redis-cli -h redis-master
   REDISCLI_AUTH="$REDIS_PASSWORD" redis-cli -h redis-replicas

To connect to your database from outside the cluster execute the following commands:

    kubectl port-forward --namespace default svc/redis-master 6379:6379 &
    REDISCLI_AUTH="$REDIS_PASSWORD" redis-cli -h 127.0.0.1 -p 6379

We can fetch the password if we need

$ kubectl get secret redis -o json | jq .data | sed 's/: ".*/: "*********"/g'
{
  "redis-password": "*********"
}

We can now use Redis for our state store and default pubsub

$ cat redis-state.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
  namespace: default
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: redis-master.default.svc.cluster.local:6379
  - name: redisPassword
    secretKeyRef:
      name: redis
      key: redis-password


$ cat redis-pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: redis-master.default.svc.cluster.local:6379
  - name: redisPassword
    secretKeyRef:
      name: redis
      key: redis-password

And apply

$ kubectl apply -f redis-state.yaml
component.dapr.io/statestore created
$ kubectl apply -f redis-pubsub.yaml
component.dapr.io/pubsub created

Using the Dpar quickstart Demo

To leverage this, we’ll use the pubsub hello-world example from the Quickstarts

builder@DESKTOP-QADGF36:~/Workspaces$ git clone https://github.com/dapr/quickstarts.git
Cloning into 'quickstarts'...
remote: Enumerating objects: 13524, done.
remote: Counting objects: 100% (5/5), done.
remote: Compressing objects: 100% (5/5), done.
remote: Total 13524 (delta 1), reused 3 (delta 0), pack-reused 13519
Receiving objects: 100% (13524/13524), 75.09 MiB | 29.28 MiB/s, done.
Resolving deltas: 100% (5191/5191), done.
builder@DESKTOP-QADGF36:~/Workspaces$ cd quickstarts/tutorials/pub-sub/
builder@DESKTOP-QADGF36:~/Workspaces/quickstarts/tutorials/pub-sub$

builder@DESKTOP-72D2D9T:~/Workspaces/quickstarts/tutorials/pub-sub$ kubectl apply -f deploy/
deployment.apps/csharp-subscriber created
deployment.apps/node-subscriber created
deployment.apps/python-subscriber created
service/react-form created
deployment.apps/react-form created
component.dapr.io/pubsub configured

We can now test the react-form

$ kubectl get pods | tail -n 4
csharp-subscriber-66b7c5bcbc-gpt4c                     2/2     Running   0             32s
node-subscriber-74db445b79-88vnf                       2/2     Running   0             32s
python-subscriber-788bdb8749-vb5xh                     2/2     Running   0             32s
react-form-5cb78b6775-ns4gg                            2/2     Running   0             32s
$ kubectl port-forward react-form-5cb78b6775-ns4g 8080:8080
Forwarding from 127.0.0.1:8080 -> 8080
Forwarding from [::1]:8080 -> 8080
Handling connection for 8080
Handling connection for 8080
Handling connection for 8080
Handling connection for 8080

/content/images/2022/07/dapr-01.png

If we push a message, we can see it reflected in the Node Subscriber log

$ kubectl logs node-subscriber-74db445b79-xmr7s
Defaulted container "node-subscriber" out of: node-subscriber, daprd
Node App listening on port 3000!
A:  My Topic

builder@DESKTOP-72D2D9T:~/Workspaces/quickstarts/tutorials/pub-sub$ kubectl logs python-subscriber-788bdb8749-gzgl2
Defaulted container "python-subscriber" out of: python-subscriber, daprd
 * Serving Flask app 'app' (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://127.0.0.1:5001 (Press CTRL+C to quit)
127.0.0.1 - - [21/Jul/2022 02:15:29] "GET /dapr/config HTTP/1.1" 404 -
127.0.0.1 - - [21/Jul/2022 02:15:29] "GET /dapr/subscribe HTTP/1.1" 200 -
A: {'data': {'message': 'My Topic\n', 'messageType': 'A'}, 'datacontenttype': 'application/json', 'id': 'a12dd81e-1683-4359-bd63-35cacc4e0833', 'pubsubname': 'pubsub', 'source': 'react-form', 'specversion': '1.0', 'topic': 'A', 'traceid': '00-e997f6c2bff8cd6d4b212c895ca86cfb-964fa00d3ad02b75-00', 'traceparent': '00-e997f6c2bff8cd6d4b212c895ca86cfb-964fa00d3ad02b75-00', 'tracestate': '', 'type': 'com.dapr.event.sent'}
Received message "My Topic
" on topic "A"
127.0.0.1 - - [21/Jul/2022 02:17:48] "POST /A HTTP/1.1" 200 -

Using Datadog, I can leverage “Live Tail” to watch the logs as well

/content/images/2022/07/dapr-11.png

Apache Kakfa

We can just as easily switch from Redis to Apache Kafka

First, let’s install Kafka

$ helm install my-release bitnami/kafka
NAME: my-release
LAST DEPLOYED: Wed Jul 20 21:24:32 2022
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 18.0.3
APP VERSION: 3.2.0

** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

    my-release-kafka.default.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

    my-release-kafka-0.my-release-kafka-headless.default.svc.cluster.local:9092

To create a pod that you can use as a Kafka client run the following commands:

    kubectl run my-release-kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.2.0-debian-11-r12 --namespace default --command -- sleep infinity
    kubectl exec --tty -i my-release-kafka-client --namespace default -- bash

    PRODUCER:
        kafka-console-producer.sh \

            --broker-list my-release-kafka-0.my-release-kafka-headless.default.svc.cluster.local:9092 \
            --topic test

    CONSUMER:
        kafka-console-consumer.sh \

            --bootstrap-server my-release-kafka.default.svc.cluster.local:9092 \
            --topic test \
            --from-beginning

Now that it is installed, we can create a new pubsub using kafka.

Because our code has “pubsub” hardcoded

app.get('/dapr/subscribe', (_req, res) => {
    res.json([
        {
            pubsubname: "pubsub",
            topic: "A",
            route: "A"
        },
        {
            pubsubname: "pubsub",
            topic: "B",
            route: "B"
        }
    ]);
});

We need to use the same name to see the effect.

builder@DESKTOP-72D2D9T:~/Workspaces/jekyll-blog$ cat kafka-pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "new-kafka-release.default.svc.cluster.local:9092"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authType # Required.
    value: "none"
  - name: version # Optional.
    value: 0.10.2.0
  - name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
    value: "true"

Now let’s swap them out

builder@DESKTOP-72D2D9T:~/Workspaces/jekyll-blog$ kubectl delete -f redis-pubsub.yaml
component.dapr.io "pubsub" deleted
builder@DESKTOP-72D2D9T:~/Workspaces/jekyll-blog$ kubectl apply -f kafka-pubsub.yaml
component.dapr.io/pubsub created

A quick note, i got hung up far longer than I expected on crashing Kafka containers. I ended up rabbit-holing down various versions of the bitnami chart, alternate kakfa charts and even the Confluent community chart.

The cause of the errors was my volume class.

Fixing Storage Issues: Creating a working PVC SC

Two things needed to be done, first, setup the RBAC ClusterRole and ClusterRoleBinding as well as Role and RoleBinding.

$ cat k3s-prenfs.yaml 
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: default
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: nfs-client-provisioner-runner
rules:
  - apiGroups: [""]
    resources: ["persistentvolumes"]
    verbs: ["get", "list", "watch", "create", "delete"]
  - apiGroups: [""]
    resources: ["persistentvolumeclaims"]
    verbs: ["get", "list", "watch", "update"]
  - apiGroups: ["storage.k8s.io"]
    resources: ["storageclasses"]
    verbs: ["get", "list", "watch"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: run-nfs-client-provisioner
subjects:
  - kind: ServiceAccount
    name: nfs-client-provisioner
    # replace with namespace where provisioner is deployed
    namespace: default
roleRef:
  kind: ClusterRole
  name: nfs-client-provisioner-runner
  apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: default
rules:
  - apiGroups: [""]
    resources: ["endpoints"]
    verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: default
subjects:
  - kind: ServiceAccount
    name: nfs-client-provisioner
    # replace with namespace where provisioner is deployed
    namespace: default
roleRef:
  kind: Role
  name: leader-locking-nfs-client-provisioner
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: managed-nfs-storage
provisioner: fuseim.pri/ifs # or choose another name, must match deployment's env PROVISIONER_NAME'
parameters:
  archiveOnDelete: "false"
  allowVolumeExpansion: "true"
  reclaimPolicy: "Delete"
allowVolumeExpansion: true

and then using the deployment. I started by following my own guide from 2020, but in K8s version 1.20 and beyond, there is actually an issue with selfLink being deprecated.

Therefore, the manifest that worked used a new container image (gcr.io/k8s-staging-sig-storage/nfs-subdir-external-provisioner:v4.0.0)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nfs-client-provisioner
  labels:
    app: nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: default
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      app: nfs-client-provisioner
  template:
    metadata:
      labels:
        app: nfs-client-provisioner
    spec:
      serviceAccountName: nfs-client-provisioner
      containers:
        - name: nfs-client-provisioner
          image: gcr.io/k8s-staging-sig-storage/nfs-subdir-external-provisioner:v4.0.0
          volumeMounts:
            - name: nfs-client-root
              mountPath: /persistentvolumes
          env:
            - name: PROVISIONER_NAME
              value: fuseim.pri/ifs
            - name: NFS_SERVER
              value: 192.168.1.129
            - name: NFS_PATH
              value: /volume1/k3snfs77b2
      volumes:
        - name: nfs-client-root
          nfs:
            server: 192.168.1.129
            path: /volume1/k3snfs77b2

Then I swapped SC defaults

$ kubectl patch storageclass nfs -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"false"}}}' && kubectl patch storageclass local-path  -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"false"}}}' && kubectl patch storageclass managed-nfs-storage -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}'

We need our svc name. Easiest to just see what is advertising 9092

$ kubectl get svc | grep 9092
new-kafka-release                                          ClusterIP      10.43.127.27    <none>                      9092/TCP                                                                                                    18m
new-kafka-release-headless                                 ClusterIP      None            <none>                      9092/TCP,9093/TCP                                                                                           18m

Then use it in the component for “brokers”

$ cat dapr.kafka.pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "new-kafka-release.default.svc.cluster.local:9092"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authType # Required.
    value: "none"
  - name: version # Optional.
    value: 0.10.2.0
  - name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
    value: "true"

$ kubectl apply -f dapr.kafka.pubsub.yaml
component.dapr.io/pubsub configured

Then rotate the pods to use the new sidecar

$ kubectl delete pods -l app=python-subscriber && kubectl delete pods -l app=node-subscriber && kubectl delete pods -l app=react-form
pod "python-subscriber-788bdb8749-vb5xh" deleted
pod "node-subscriber-74db445b79-88vnf" deleted
pod "react-form-5cb78b6775-ns4gg" deleted


$ kubectl get pods | tail -n 4
new-kafka-release-0                                    1/1     Running   1 (18m ago)   25m
python-subscriber-788bdb8749-zbjtg                     2/2     Running   0             2m52s
node-subscriber-74db445b79-6v926                       2/2     Running   0             2m21s
react-form-5cb78b6775-m27x2                            2/2     Running   0             110s

We can try pushing and seeing the result

/content/images/2022/07/dapr-12.png

Using Google Pub/Sub

We can login on the command line

$ gcloud auth login

/content/images/2022/07/dapr-03.png

We need to create a service account

/content/images/2022/07/dapr-04.png

We give it a name and ID

/content/images/2022/07/dapr-05.png

Then grant access to Pubsub

/content/images/2022/07/dapr-06.png

I’ll allow myself to use this account

/content/images/2022/07/dapr-07.png

We will need some service keys to use it

/content/images/2022/07/dapr-08.png

/content/images/2022/07/dapr-09.png

/content/images/2022/07/dapr-10.png

The downloaded file will contain the parts we need for the Dapr component

$ cat /mnt/c/Users/isaac/Downloads/myanthosproject2-47313fb0a9ef.json
{
  "type": "service_account",
  "project_id": "myanthosproject2",
  "private_key_id": "47313fb0a9ef472ebab439b649a0f9a9455875ac",
  "private_key": "-----BEGIN PRIVATE KEY-----\nMII....

We can now setup the Pubsub

$ cat gcp-pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.gcp.pubsub
  version: v1
  metadata:
  - name: type
    value: service_account
  - name: projectId
    value: myanthosproject2
  - name: privateKeyId
    value: dasdf6768asdf76d76s7df67sda8a97f
  - name: clientEmail
    value: daprpubsub@myanthosproject2.iam.gserviceaccount.com
  - name: clientId
    value: "10022334455667788112233"
  - name: authUri
    value: https://accounts.google.com/o/oauth2/auth
  - name: tokenUri
    value: https://oauth2.googleapis.com/token
  - name: authProviderX509CertUrl
    value: https://www.googleapis.com/oauth2/v1/certs
  - name: clientX509CertUrl
    value: https://www.googleapis.com/robot/v1/metadata/x509/daprpubsub%40myanthosproject2.iam.gserviceaccount.com
  - name: privateKey
    value: |
      -----BEGIN PRIVATE KEY-----
      MIICXQIBAAKBgQCCGuEcuR7ZTuc6L+1gmVlmyTDJNQCZWbBG1a8CADVa1r3X72re
      z+yE8I5Vc22NV/cMNyLYsQZBGC4nlxyWRkqwn+9LO3aQDE1klbubd0XXdoo608bR
      7rYNKjdGMY9wpTU7xpJ83TJfLoXVbuQhQ2kzY9DbimLyg0pIAapFT/52tQIDAQAB
      AoGAUDh0nvpB3Cm7hvQwQEStjCP3Uci9gtB7UWHGE8y/GTCZQ8Cau6Gmq12L1YJj
      hSg86qpAcS3NtmtLYhvZg4r518a41cGJpPtVCwRkTkKFLYrq9JSW6LC4uDBbJtYP
      bjkztO7VSaWbdWmvWis1p8mFckuIfZzlUODPNmOpjLqOKAECQQC97EqJNmRWbR6i
      T4WiwaX8f7glc088qf4BSuscSVkLiZDXNtvO0P+eKGwjRSSNeacSJPvuLw6ANqkR
      E8pOOYIBAkEAr17UKEX4c8sPio2gSzq165cmNWcY5k3DKxtUizcGdaFDsVqMkZhG
      MmCEOQ26E2D4AVk0SMfrwLvOnPaYlD2MtQJAMQQU0IjbKHsNFvLTGIhF4H7N9cxm
      vM9aoNbHCwvbEWE1onWkESJbdmQUVJ35qxGkjshAxN4cLfbtd1zWL44kAQJBAIns
      g+E0vqWbI5rtrg7cmu0x0CwZ7Hxc1k1VzlU7LopfDZUzyPBNGZBPVk5k3F6XWhX9
      aWCAU4seae4LB483JS0CQQCD+6b6Mk0NxhuqLWOs55wRZXKRg1nuGQcY1dcHFMfR
      MKyF9/DsO1tKTog6bzm9f8z2TzVCwAZ6vaGm2NMHxEe1
      -----END PRIVATE KEY-----
  - name: disableEntityManagement
    value: "false"
  - name: enableMessageOrdering
    value: "false"

We can now apply it

$ kubectl get component
NAME         AGE
statestore   8h
pubsub       3h30m

$ kubectl delete component pubsub
component.dapr.io "pubsub" deleted

$ kubectl apply -f gcp-pubsub.yaml
component.dapr.io/pubsub created

Now let’s rotate the pods to make it active

$ kubectl delete pods -l app=python-subscriber && kubectl delete pods -l app=node-subscriber && kubectl delete pods -l app=react-form

Our main problem, and the reason we’ll see pods crash is GCP Pub/Sub requires topics to follow a pattern which includes a requirement to be at least 3 characters long

│     data: {                                                                                                                                                                                                                                ││       errorCode: 'ERR_PUBSUB_PUBLISH_MESSAGE',                                                                                                                                                                                             ││       message: 'error when publish to topic A in pubsub pubsub: gcp pubsub error: could not get valid topic A, rpc error: code = InvalidArgument desc = Invalid resource name given (name=projects/myanthosproject2/topics/A). Refer to ht ││     }    

Allowing different Topics

I’ll make a change to the app code

$ cat app.js
//
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//     http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

const express = require('express');
const bodyParser = require('body-parser');

const app = express();
// Dapr publishes messages with the application/cloudevents+json content-type
app.use(bodyParser.json({ type: 'application/*+json' }));

const port = 3000;

app.get('/dapr/subscribe', (_req, res) => {
    res.json([
        {
            pubsubname: "pubsub",
            topic: "AAA",
            route: "AAA"
        },
        {
            pubsubname: "pubsub",
            topic: "BBB",
            route: "BBB"
        }
    ]);
});

app.post('/AAA', (req, res) => {
    console.log("AAA: ", req.body.data.message);
    res.sendStatus(200);
});

app.post('/BBB', (req, res) => {
    console.log("BBB: ", req.body.data.message);
    res.sendStatus(200);
});

app.listen(port, () => console.log(`Node App listening on port ${port}!`));

Then build and push the container to my local harbor registry

builder@DESKTOP-QADGF36:~/Workspaces/quickstarts/tutorials/pub-sub/node-subscriber$ docker build -t harbor.freshbrewed.science/freshbrewedprivate/pubsub-node-subscriber:gcp1 .
[+] Building 15.8s (10/10) FINISHED
 => [internal] load build definition from Dockerfile                                                                                                                                                                                                            0.0s
 => => transferring dockerfile: 145B                                                                                                                                                                                                                            0.0s
 => [internal] load .dockerignore                                                                                                                                                                                                                               0.0s
 => => transferring context: 2B                                                                                                                                                                                                                                 0.0s
 => [internal] load metadata for docker.io/library/node:17-alpine                                                                                                                                                                                               7.4s
 => [auth] library/node:pull token for registry-1.docker.io                                                                                                                                                                                                     0.0s
 => [internal] load build context                                                                                                                                                                                                                               0.7s
 => => transferring context: 2.04kB                                                                                                                                                                                                                             0.6s
 => [1/4] FROM docker.io/library/node:17-alpine@sha256:76e638eb0d73ac5f0b76d70df3ce1ddad941ac63595d44092b625e2cd557ddbf                                                                                                                                         3.9s
 => => resolve docker.io/library/node:17-alpine@sha256:76e638eb0d73ac5f0b76d70df3ce1ddad941ac63595d44092b625e2cd557ddbf                                                                                                                                         0.0s
 => => sha256:76e638eb0d73ac5f0b76d70df3ce1ddad941ac63595d44092b625e2cd557ddbf 1.43kB / 1.43kB                                                                                                                                                                  0.0s
 => => sha256:c7bde48048debf58dba50f8d2ba674854bdf7dfc8c43bd468f19a5212facfdbe 1.16kB / 1.16kB                                                                                                                                                                  0.0s
 => => sha256:57488723f0872b65eb586f4fde54d5c25c16cde94da3bde8b338cf2af2aceb1c 6.67kB / 6.67kB                                                                                                                                                                  0.0s
 => => sha256:df9b9388f04ad6279a7410b85cedfdcb2208c0a003da7ab5613af71079148139 2.81MB / 2.81MB                                                                                                                                                                  0.2s
 => => sha256:1bedfac31d6a1e001d4e5d45ea1aba8f53e5f54b5555ce2c415a65a7041b074f 45.89MB / 45.89MB                                                                                                                                                                1.4s
 => => sha256:6463b5f3dbb1d524374fd51f430ea4837e794edd1c508bad449f93a86be57ccb 2.34MB / 2.34MB                                                                                                                                                                  0.4s
 => => extracting sha256:df9b9388f04ad6279a7410b85cedfdcb2208c0a003da7ab5613af71079148139                                                                                                                                                                       0.2s
 => => sha256:885e68a88c76f90ebf7b390469107ac661410a590df8939c237fa720ca91efb3 451B / 451B                                                                                                                                                                      0.3s
 => => extracting sha256:1bedfac31d6a1e001d4e5d45ea1aba8f53e5f54b5555ce2c415a65a7041b074f                                                                                                                                                                       1.9s
 => => extracting sha256:6463b5f3dbb1d524374fd51f430ea4837e794edd1c508bad449f93a86be57ccb                                                                                                                                                                       0.1s
 => => extracting sha256:885e68a88c76f90ebf7b390469107ac661410a590df8939c237fa720ca91efb3                                                                                                                                                                       0.0s
 => [2/4] WORKDIR /usr/src/app                                                                                                                                                                                                                                  1.2s
 => [3/4] COPY . .                                                                                                                                                                                                                                              0.0s
 => [4/4] RUN npm install                                                                                                                                                                                                                                       3.0s
 => exporting to image                                                                                                                                                                                                                                          0.1s
 => => exporting layers                                                                                                                                                                                                                                         0.1s
 => => writing image sha256:f37562184f3a5558b7e35a7355ebfe0f9736103a647bb7d4c9889592d22d182e                                                                                                                                                                    0.0s
 => => naming to harbor.freshbrewed.science/freshbrewedprivate/pubsub-node-subscriber:gcp1

$ docker push harbor.freshbrewed.science/freshbrewedprivate/pubsub-node-subscriber:gcp1
The push refers to repository [harbor.freshbrewed.science/freshbrewedprivate/pubsub-node-subscriber]
9f88422dec6e: Pushed
bffb6c8f713f: Pushed
be03214d51e8: Pushed
e6a74996eabe: Pushed
db2e1fd51a80: Pushed
19ebba8d6369: Pushed
4fc242d58285: Pushed
gcp1: digest: sha256:736c71d7cdf596f6fbe6d11d827009f8f6c3feeb85d94058becee0f7fd87e207 size: 1784

Then I could use it

$ cat node-subscriber.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: node-subscriber
  labels:
    app: node-subscriber
spec:
  replicas: 1
  selector:
    matchLabels:
      app: node-subscriber
  template:
    metadata:
      labels:
        app: node-subscriber
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "node-subscriber"
        dapr.io/app-port: "3000"
    spec:
      containers:
      - name: node-subscriber
        image: harbor.freshbrewed.science/freshbrewedprivate/pubsub-node-subscriber:gcp1
        ports:
        - containerPort: 3000
        imagePullPolicy: Always
      imagePullSecrets:
      - name: myharborreg

$ kubectl apply -f node-subscriber.yaml
deployment.apps/node-subscriber configured

I’ll also update the react form:

diff --git a/tutorials/pub-sub/react-form/client/src/MessageForm.js b/tutorials/pub-sub/react-form/client/src/MessageForm.js
index 698b7d8..fb80930 100644
--- a/tutorials/pub-sub/react-form/client/src/MessageForm.js
+++ b/tutorials/pub-sub/react-form/client/src/MessageForm.js
@@ -58,9 +58,9 @@ export class MessageForm extends React.Component {
         <div className="form-group">
           <label>Select Message Type</label>
           <select className="custom-select custom-select-lg mb-3" name="messageType" onChange={this.handleInputChange} value={this.state.messageType}>
-            <option value="A">A</option>
-            <option value="B">B</option>
-            <option value="C">C</option>
+            <option value="AAA">AAA</option>
+            <option value="BBB">BBB</option>
+            <option value="CCC">CCC</option>
           </select>
         </div>
         <div className="form-group">

Then I’ll build and push

builder@DESKTOP-QADGF36:~/Workspaces/quickstarts/tutorials/pub-sub/react-form$ docker build -t harbor.freshbrewed.science/freshbrewedprivate/pubsub-react-form:gcp2 .
[+] Building 54.8s (10/10) FINISHED
 => [internal] load build definition from Dockerfile                                                                                                                                                                                                                                       0.0s
 => => transferring dockerfile: 153B                                                                                                                                                                                                                                                       0.0s
 => [internal] load .dockerignore                                                                                                                                                                                                                                                          0.0s
 => => transferring context: 52B                                                                                                                                                                                                                                                           0.0s
 => [internal] load metadata for docker.io/library/node:17-alpine                                                                                                                                                                                                                          6.4s
 => [auth] library/node:pull token for registry-1.docker.io                                                                                                                                                                                                                                0.0s
 => [internal] load build context                                                                                                                                                                                                                                                          0.0s
 => => transferring context: 1.49MB                                                                                                                                                                                                                                                        0.0s
 => [1/4] FROM docker.io/library/node:17-alpine@sha256:76e638eb0d73ac5f0b76d70df3ce1ddad941ac63595d44092b625e2cd557ddbf                                                                                                                                                                    0.0s
 => CACHED [2/4] WORKDIR /usr/src/app                                                                                                                                                                                                                                                      0.0s
 => [3/4] COPY . .                                                                                                                                                                                                                                                                         0.0s
 => [4/4] RUN npm run build                                                                                                                                                                                                                                                               44.2s
 => exporting to image                                                                                                                                                                                                                                                                     4.0s
 => => exporting layers                                                                                                                                                                                                                                                                    4.0s
 => => writing image sha256:1df258e2f17d571585f606c0939ea9a97281f8d5b48209898dad497ae5642da7                                                                                                                                                                                               0.0s
 => => naming to harbor.freshbrewed.science/freshbrewedprivate/pubsub-react-form:gcp1                                                                                                                                                                                                      0.0s
builder@DESKTOP-QADGF36:~/Workspaces/quickstarts/tutorials/pub-sub/react-form$ docker push harbor.freshbrewed.science/freshbrewedprivate/pubsub-react-form:gcp2
The push refers to repository [harbor.freshbrewed.science/freshbrewedprivate/pubsub-react-form]
c8076356e667: Pushed
f96ac2baf7fb: Pushed
be03214d51e8: Mounted from freshbrewedprivate/pubsub-node-subscriber
e6a74996eabe: Mounted from freshbrewedprivate/pubsub-node-subscriber
db2e1fd51a80: Mounted from freshbrewedprivate/pubsub-node-subscriber
19ebba8d6369: Mounted from freshbrewedprivate/pubsub-node-subscriber
4fc242d58285: Mounted from freshbrewedprivate/pubsub-node-subscriber
gcp1: digest: sha256:da03dfdbde31098ac1c58adbebf0657ce4fea3ba7cc4a0fa920d18eafa336ed9 size: 1787

Now we can use it

builder@DESKTOP-QADGF36:~/Workspaces/quickstarts/tutorials/pub-sub/deploy$ git diff react-form.yaml
diff --git a/tutorials/pub-sub/deploy/react-form.yaml b/tutorials/pub-sub/deploy/react-form.yaml
index 66cc2eb..c35cc46 100644
--- a/tutorials/pub-sub/deploy/react-form.yaml
+++ b/tutorials/pub-sub/deploy/react-form.yaml
@@ -36,7 +36,9 @@ spec:
     spec:
       containers:
       - name: react-form
-        image: ghcr.io/dapr/samples/pubsub-react-form:latest
+        image: harbor.freshbrewed.science/freshbrewedprivate/pubsub-react-form:gcp2
         ports:
         - containerPort: 8080
         imagePullPolicy: Always
+      imagePullSecrets:
+      - name: myharborreg

builder@DESKTOP-QADGF36:~/Workspaces/quickstarts/tutorials/pub-sub/deploy$ kubectl apply -f react-form.yaml
service/react-form unchanged
deployment.apps/react-form configured

builder@DESKTOP-QADGF36:~/Workspaces/quickstarts/tutorials/pub-sub/deploy$ kubectl get pods | tail -n1
react-form-7b4c74d7-7nrgs                              2/2     Running   0              40s

I had to do some hacking around, but I did get it to publish and receive messages

/content/images/2022/07/dapr-13.png

And I can also see the subscriptions made by the Node Subscriber in Pubsub

/content/images/2022/07/dapr-14.png

I updated the python subscriber

$ kubectl apply -f ../deploy/python-subscriber.yaml
deployment.apps/python-subscriber configured

/content/images/2022/07/dapr-15.png

and we can see it reflected if we use Topic CCC

/content/images/2022/07/dapr-16.png

Azure Service Bus

We can easily create a Service Bus in the Azure Portal

/content/images/2022/07/dapr-17.png

There are a few pricing tiers, but to KISS, we’ll use Basic

/content/images/2022/07/dapr-18.png

Then I’ll use defaults for the rest of the values and deploy

/content/images/2022/07/dapr-19.png

Once created, I can lookup the Connection string that includes the SAS Key in the Shared access policies section

/content/images/2022/07/dapr-20.png

Which will look as such

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.azure.servicebus
  version: v1
  metadata:
  - name: connectionString # Required when not using Azure Authentication.
    value: "Endpoint=sb://daprpubsubdemo1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=ponT1KjFAsSzErgzYH6d4B20elV/fyedLG9s4CTiRV0="

Now we can try using it

 $ kubectl delete component pubsub
component.dapr.io "pubsub" deleted
$ kubectl apply -f dapr.azuresb.pubsub.yaml 
component.dapr.io/pubsub created

and rotate the pods to take effect

$ kubectl delete pods -l app=python-subscriber && kubectl delete pods -l app=node-subscriber && kubectl delete pods -l app=react-form

However, I got an error


    data: {
      errorCode: 'ERR_PUBSUB_PUBLISH_MESSAGE',
      message: 'error when publish to topic CCC in pubsub pubsub: azure service bus error: could not create topic CCC, PUT https://daprpubsubdemo1.servicebus.windows.net/CCC\n' +
        '--------------------------------------------------------------------------------\n' +
        'RESPONSE 400: 400 Bad Request\n' +
        'ERROR CODE: 400\n' +
        '--------------------------------------------------------------------------------\n' +
        "<Error><Code>400</Code><Detail>SubCode=40000. Cannot operate on type Topic because the namespace 'daprpubsubdemo1' is using 'Basic' tier. TrackingId:500c2e1b-b989-4c8b-a000-9b29e1484921_G53, SystemTracker:daprpubsubdemo1.servicebus.windows.net:CCC, Timestamp:2022-07-27T00:33:12</Detail></Error>\n" +
        '--------------------------------------------------------------------------------\n'
    }

I’ll try and make a queue

/content/images/2022/07/dapr-21.png

I then make a policy

/content/images/2022/07/dapr-22.png

and that will have a proper connection string

/content/images/2022/07/dapr-23.png

I’ll update the string in the Dapr component and replace the existing

builder@DESKTOP-QADGF36:~/Workspaces/jekyll-blog$ cat dapr.azuresb.pubsub.yaml 
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.azure.servicebus
  version: v1
  metadata:
  - name: connectionString # Required when not using Azure Authentication.
    value: "Endpoint=sb://daprpubsubdemo1.servicebus.windows.net/;SharedAccessKeyName=MYTEST;SharedAccessKey=4tSzGcZJ2Z0VDOHuBiy7v402LVMepDaBinP5KCIb/QA=;EntityPath=mydaprqueue"

builder@DESKTOP-QADGF36:~/Workspaces/jekyll-blog$ kubectl delete component pubsub
component.dapr.io "pubsub" deleted
builder@DESKTOP-QADGF36:~/Workspaces/jekyll-blog$ kubectl apply -f dapr.azuresb.pubsub.yaml 
component.dapr.io/pubsub created

Then rotate again

$ kubectl delete pods -l app=python-subscriber & kubectl delete pods -l app=node-subscriber & kubectl delete pods -l app=react-form &

This time


    data: {
      errorCode: 'ERR_PUBSUB_PUBLISH_MESSAGE',
      message: 'error when publish to topic CCC in pubsub pubsub: azure service bus error: could not create topic CCC, PUT https://daprpubsubdemo1.servicebus.windows.net/CCC\n' +
        '--------------------------------------------------------------------------------\n' +
        'RESPONSE 401: 401 Unauthorized\n' +
        'ERROR CODE: 401\n' +
        '--------------------------------------------------------------------------------\n' +
        '<Error><Code>401</Code><Detail>InvalidSignature: The token has an invalid signature. TrackingId:02718121-5069-42e0-aae4-2fc404505495, SystemTracker:NoSystemTracker, Timestamp:2022-07-27T00:40:11</Detail></Error>\n' +
        '--------------------------------------------------------------------------------\n'
    }

Something that dawned on me later was that ASB does not auto create Topics.

Once I created the Topics (Queues) directly, then it worked

/content/images/2022/07/dapr-24.png

Cleanup

For the queues I don’t need, I can just remove them

/content/images/2022/07/dapr-25.png

For areas I exposed keys, I can just regenerate on the UI

/content/images/2022/07/dapr-26.png

Summary

In this post we examined how Dapr.io can be used to easily swap message queue providers. We used Redis, Kafka, GCP Pub/Sub and Azure Service Bus.

We switched easily between providers without having to rebuild containers with the one minor exception - that being that GCP had a minimum Topic name length and our demo used just one letter.

There are more choices as well. We can look into all the supported pubsub components in Dapr Docs.

Dapr.io, like Istio (and ASM which is just GCP’s Istio) provides mTLS via the Dapr sidecars

$ kubectl get configuration daprsystem -n dapr-system -o yaml | tail -n 6
  metric:
    enabled: true
  mtls:
    allowedClockSkew: 15m
    enabled: true
    workloadCertTTL: 24h

While they use one year self-signed certs created and updated by Dapr, there are many optons on certificate providers

Dapr.io can also provide a middleware pipeline to provide OAuth which we will explore in a later blog.

dapr pubsub kafka redis azureservicebus gcp

Have something to add? Feedback? Try our new forums

Isaac Johnson

Isaac Johnson

Cloud Solutions Architect

Isaac is a CSA and DevOps engineer who focuses on cloud migrations and devops processes. He also is a dad to three wonderful daughters (hence the references to Princess King sprinkled throughout the blog).

Theme built by C.S. Rhymes