Learn how to implement Cloud Pub/Sub Subscriber in Golang

3 minute read

Prerequisite

Subscriber

Cloud Pub/Sub supports push and pull message delivery. In this post, we would dive into the Pull Subscrption.

Pull Subscription

IT’S ALIVE

$ cd pull
$ GOOGLE_CLOUD_PROJECT="<google project name>" go run pull.go
$ GOOGLE_CLOUD_PROJECT="test-project" go run pull.go
Created subscription: projects/test-project/subscriptions/example-topic
2017/01/04 01:04:22 Got message: 4d27aaba-e62b-49cf-8fd9-e784a99064d5
2017/01/04 01:04:24 Got message: 48b04306-18de-44f2-b1b3-c0e736f52d32
2017/01/04 01:04:25 Got message: d395cd6b-02ef-4e7d-a6ec-a84d0cf27045
2017/01/04 01:04:26 Got message: a56991b0-b664-48d7-8901-de80783f182d
2017/01/04 01:04:27 Got message: d0cebf2f-619c-4642-b51b-d5485fd072c2
2017/01/04 01:04:29 Got message: 46d25e6d-0f6b-4dc9-be88-8133e2a011e1
2017/01/04 01:04:30 Got message: dc517460-dc5b-4f61-ab20-490c186360a9
2017/01/04 01:04:32 Got message: f1ed3359-125b-4931-b631-d251a2109dcd
2017/01/04 01:04:32 Got message: eaf771ca-c8e3-40ef-a89e-429b6bf8f55a
2017/01/04 01:04:34 Got message: fb8b201c-2856-4bed-ac1a-3fc27d51bdac

Pub/Sub Flow

Pull subscription means that the subscriber pulls messages from the Pub/Sub periodically.

Subsriber creates a pull subscription when its up.

// pull/pull.go
if err := common.CreateSub(client, topicName, topic, nil); err != nil {
    log.Fatal(err)
}
// common/functions.go
func CreateSub(client *pubsub.Client, name string, topic *pubsub.Topic, pushConfig *pubsub.PushConfig) error {
    ctx := context.Background()
    // [START create_subscription]
    sub, err := client.CreateSubscription(ctx, name, topic, 20*time.Second, pushConfig)
    if err != nil {
        return err
    }
    fmt.Printf("Created subscription: %v\n", sub)
    // [END create_subscription]
    return nil
}

Once the creation is successfully, you’ll see a new subscriber on GCP PubSub with Pull delivery type.

Pull Subscriber

Subscriber pull message from Cloud Pub/Sub periodically until the end of iterator.

// pull/pull.go
func pullMsgs(client *pubsub.Client, name string, topic *pubsub.Topic) error {
    ctx := context.Background()
    // get Subscription
    sub := client.Subscription(name)
    // get message iterator
    it, err := sub.Pull(ctx)
    if err != nil {
        return err
    }
    defer it.Stop()

    for i := 0; i < 10; i++ {
        // retrieve message
        msg, err := it.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            return err
        }
        session := &common.Session{}
        // retrieve payload we put into Data field
        err = json.Unmarshal(msg.Data, session)
        if err == nil {
            log.Printf("Got message: %s\n", session.SessionID)
        }
        msg.Done(true)
    }
    return nil
}

Build Docker Image

  • Follow the instruction of “How the Application Default Credentials work” on Developer Guide to download the certificates.json, rename the file into crt.json then place it under the pull/asset.
$ cd pull
# build.sh will upload built image to google image registry automatically
$ GOOGLE_CLOUD_PROJECT="<google project name>" ./build.sh

Further Readings


Kubernetes ON GKE

Replace the configuration in pull/k8s/pull-deployment.yaml

$ kubectl apply -f pull/k8s/pull-deployment.yaml
$ kubectl get pod -l service=pubsub
NAME                               READY     STATUS    RESTARTS   AGE
pubsub-pull-sub-2287215605-d3xd1   1/1       Running   0          3m
$ kubectl logs -f pubsub-pull-sub-2287215605-d3xd1
2017/01/04 01:53:23 Got message: 2ada0c34-0fcf-46be-b3d7-82a87cf7234a
2017/01/04 01:53:23 Got message: 8b26e9c6-978b-422b-ba3b-6adf16312dea
2017/01/04 01:53:23 Got message: e8d5d81a-aaea-49a3-8a07-a53042440460
2017/01/04 01:53:23 Got message: 353c50a8-6e91-4ba0-949d-f95e473f923f
2017/01/04 01:53:23 Got message: ac2ddf3c-68cf-4417-8301-53a74d212ef7
2017/01/04 01:53:23 Got message: 3758bc3c-c8d0-47a3-a683-56e821418317
2017/01/04 01:53:23 Got message: 09a55b73-a67e-4be8-a309-ac50e528af3e
2017/01/04 01:53:23 Got message: 06211727-6f0e-40bd-b47f-143594410e6c

Further Readings


Use Case

Pull Subscription is suitable for the following scenario:

  • Autoscaling Cluster
    • Subscriber has different IP address every time when its up, hence Pub/Sub service has no idea where to push messages.
  • The device without network traffic/power consumption limitation.
    • Subscriber need to pull messages from Pub/Sub periodically, so its not suitable for IoT devices.
  • Endpoint without HTTPS support
    • Push Subscription requires the endpoint is an HTTPS server and is accessible from the public web. If you dont have the own SSL crt, its better to use pull subscription.

Further Readings

Reference


comments powered by Disqus