Learn how to implement Cloud Pub/Sub Subscriber in Golang

3 minute read

Prerequisite

Subscriber

Cloud Pub/Sub supports push and pull message delivery. In this post, we would dive into Push Subscrption.

Push Subscription

Pub/Sub Flow

Cloud Pub/Sub supports push and pull message delivery. In this post, we would dive into the Push Subscrption.

Register for the push endpoint on GCP PubSub.

Create Push Subscription Endpoint

Cloud Pub/Sub push messages to the enpdoint we registered before automatically.

Here we create a RESTful API route / to handle the requests from Pub/Sub.

// push/httpServer.go
http.HandleFunc("/", handler)
// You cann apply for free ssl cert at https://www.sslforfree.com/
err := http.ListenAndServeTLS("0.0.0.0:443", "/certificate.crt", "/private.key", nil)
if err != nil {
    log.Fatal("ListenAndServe: ", err)
}

Define the pushRequest struct for messages come from Pub/Sub.

type pushRequest struct {
    Message struct {
        Attributes map[string]string
        Data       []byte
        ID         string `json:"message_id"`
    }
    Subscription string
}
func handler(w http.ResponseWriter, r *http.Request) {
    msg := &pushRequest{}
    if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
        http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
        return
    }
    session := &common.Session{}
    if e := json.Unmarshal(msg.Message.Data, session); e != nil {
        log.Printf("Decode Error: %v. Wrong payload format: %v", e, session)
    } else {
        log.Printf("sessionid: %s timestamp: %d\n", session.SessionID, session.TimeStamp)
    }
}

Build Docker Image

  • Follow the instruction of “How the Application Default Credentials work” on Developer Guide to download the certificates.json, rename the file into crt.json then place it under the pull/asset.
$ cd push
# build.sh will upload built image to google image registry automatically
$ GOOGLE_CLOUD_PROJECT="<google project name>" ./build.sh

Further Readings


Kubernetes ON GKE

Create the Kubernetes Service,and point the domain name to the EXTERNAL-IP of service.

$ kubectl apply -f push/k8s/push-svc.yaml
$ kubectl get svc
NAME               CLUSTER-IP     EXTERNAL-IP       PORT(S)            AGE
pubsub-service     10.3.246.101   x.x.x.x           443/TCP            39s

Replace the configuration in push/k8s/push-deployment.yaml

$ kubectl apply -f push/k8s/push-deployment.yaml
$ kubectl get pod -l service=pubsub
NAME                               READY     STATUS    RESTARTS   AGE
pubsub-push-sub-3887215605-d9xbc   1/1       Running   0          3m
$ kubectl logs -f pubsub-push-sub-3887215605-d9xbc
2017/01/05 17:13:27 sessionid: 096341fa-d1ee-4c21-bcd6-ae5be94f3be5 timestamp: 1483636407
2017/01/05 17:13:28 sessionid: 3a882651-043c-477a-b3b3-c67247dfa02c timestamp: 1483636408
2017/01/05 17:13:29 sessionid: 84d1bd85-f880-4479-ac76-cfed0aea8a4e timestamp: 1483636409
2017/01/05 17:13:31 sessionid: 57bc6c95-3170-4914-aebc-23f6e16a6de6 timestamp: 1483636410
2017/01/05 17:13:32 sessionid: 06211727-6f0e-40bd-b47f-143594410e6c timestamp: 1483636411
2017/01/05 17:13:33 sessionid: 3758bc3c-c8d0-47a3-a683-56e821418317 timestamp: 1483636413

TroubleShooting

  • Not received any messages from Pub/Sub
    • Make sure that SSL certificate is signed for the right domain.

Use Case

Push Subscription is suitable for the following scenario:

  • The device with network traffic/power consumption limitation.
    • Subscriber is triggered only when messages are pushed the the endpoint, so it consumes very little network traffic and lower power consumption.
  • Time-sensitive service
    • Messages are pushed to endpoint with nearly zero delay, so its suitable for the time-sensitive service.

Further Readings

Reference


comments powered by Disqus