實作 Google Cloud Pub/Sub Pull Subscription

學習如何透過 Golang 來撰寫 Google Cloud Pub/Sub Subscriber

ta-ching chen

3 minute read

 文章目錄

事前準備 

Subscriber 

在 Cloud Pub/Sub 中,訊息的傳遞分成 Pull SubscriptionPush Subscription 兩種,本篇主要說明 Pull Subscription 模式,以及適用的場景。

Pull Subscription 

一樣在進入程式碼前,先來看看如何執行會動的範例

IT’S ALIVE 

$ cd pull
$ GOOGLE_CLOUD_PROJECT="<google project name>" go run pull.go
$ GOOGLE_CLOUD_PROJECT="test-project" go run pull.go
Created subscription: projects/test-project/subscriptions/example-topic
2017/01/04 01:04:22 Got message: 4d27aaba-e62b-49cf-8fd9-e784a99064d5
2017/01/04 01:04:24 Got message: 48b04306-18de-44f2-b1b3-c0e736f52d32
2017/01/04 01:04:25 Got message: d395cd6b-02ef-4e7d-a6ec-a84d0cf27045
2017/01/04 01:04:26 Got message: a56991b0-b664-48d7-8901-de80783f182d
2017/01/04 01:04:27 Got message: d0cebf2f-619c-4642-b51b-d5485fd072c2
2017/01/04 01:04:29 Got message: 46d25e6d-0f6b-4dc9-be88-8133e2a011e1
2017/01/04 01:04:30 Got message: dc517460-dc5b-4f61-ab20-490c186360a9
2017/01/04 01:04:32 Got message: f1ed3359-125b-4931-b631-d251a2109dcd
2017/01/04 01:04:32 Got message: eaf771ca-c8e3-40ef-a89e-429b6bf8f55a
2017/01/04 01:04:34 Got message: fb8b201c-2856-4bed-ac1a-3fc27d51bdac

如果沒有接收到任何訊息的話,請開啟額外視窗並重新執行 publisher.go


Pub/Sub Flow

Pull Subscription 顧名思義就是 Subscriber 定期向 Pub/Sub 服務拉取最新的訊息進行處理。當 Subscriber 起來時會先呼叫 CreateSub,然後透過官方 pubsub package 的 CreateSubscription 向服務建立 subscription。

// pull/pull.go
if err := common.CreateSub(client, topicName, topic, nil); err != nil {
    log.Fatal(err)
}
// common/functions.go
func CreateSub(client *pubsub.Client, name string, topic *pubsub.Topic, pushConfig *pubsub.PushConfig) error {
    ctx := context.Background()
    // [START create_subscription]
    sub, err := client.CreateSubscription(ctx, name, topic, 20*time.Second, pushConfig)
    if err != nil {
        return err
    }
    fmt.Printf("Created subscription: %v\n", sub)
    // [END create_subscription]
    return nil
}

創建成功後,你可以在 GCP PubSub 頁面上看到 Topic 多出 Delivery TypePull 的 Subscriber

Pull Subscriber

接著就透過 Pull 來拉取訊息,並利用 json.Unmarshal 取回剛剛放進去 Message.Data 的 Payload

// pull/pull.go
func pullMsgs(client *pubsub.Client, name string, topic *pubsub.Topic) error {
    ctx := context.Background()
    // get Subscription
    sub := client.Subscription(name)
    // get message iterator
    it, err := sub.Pull(ctx)
    if err != nil {
        return err
    }
    defer it.Stop()

    for i := 0; i < 10; i++ {
        // retrieve message
        msg, err := it.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            return err
        }
        session := &common.Session{}
        // retrieve payload we put into Data field
        err = json.Unmarshal(msg.Data, session)
        if err == nil {
            log.Printf("Got message: %s\n", session.SessionID)
        }
        msg.Done(true)
    }
    return nil
}

打包 Docker 映像檔 

接下來就把程式打包成 Docker image 以方便部署,首先我們必須 :

  • 按照 Developer Guide 的 “How the Application Default Credentials work” 申請 certificate json 並重新命名為 crt.json 後放置到 pull/asset 下。
  • (Optional) 把 ca-certificates.crt 也放在 push/asset 的資料夾下,原先已為各位預備好 Ubuntu 14.04 上的 crt,若有疑慮的話可以在 /etc/ssl/certs 下找到。

build.sh 會自動編譯程式、打包 Docker Image,最後會將 Docker Image 上傳至 Google Container Registry

$ cd pull
# build.sh will upload built image to google image registry automatically
$ GOOGLE_CLOUD_PROJECT="<google project name>" ./build.sh

進階閱讀 


Kubernetes (GKE) 

此次我們採用 Google Kubernetes Engine (GKE) 測試 Pull Subscription 的部分,若不熟捻 GKE 可以參考

替換 pull/k8s/pull-deployment.yaml 內的設定

$ kubectl apply -f pull/k8s/pull-deployment.yaml
$ kubectl get pod -l service=pubsub
NAME                               READY     STATUS    RESTARTS   AGE
pubsub-pull-sub-2287215605-d3xd1   1/1       Running   0          3m

最後查看該 Pod 是否有正常收到來自 Cloud Pub/Sub 訊息,若沒有請重新執行 Publisher。

$ kubectl logs -f pubsub-pull-sub-2287215605-d3xd1
2017/01/04 01:53:23 Got message: 2ada0c34-0fcf-46be-b3d7-82a87cf7234a
2017/01/04 01:53:23 Got message: 8b26e9c6-978b-422b-ba3b-6adf16312dea
2017/01/04 01:53:23 Got message: e8d5d81a-aaea-49a3-8a07-a53042440460
2017/01/04 01:53:23 Got message: 353c50a8-6e91-4ba0-949d-f95e473f923f
2017/01/04 01:53:23 Got message: ac2ddf3c-68cf-4417-8301-53a74d212ef7
2017/01/04 01:53:23 Got message: 3758bc3c-c8d0-47a3-a683-56e821418317
2017/01/04 01:53:23 Got message: 09a55b73-a67e-4be8-a309-ac50e528af3e
2017/01/04 01:53:23 Got message: 06211727-6f0e-40bd-b47f-143594410e6c

使用案例 

Pull Subscription 是由 Subscriber 主動向 Cloud Pub/Sub 拉取訊息,較適用於以下幾種場景

  • 自動擴展或縮減的運算叢集(Auto Scaling Cluster)
    • 每次新啟動 Subscriber 可能擁有不同的 IP 位址
    • Pub/Sub 會無法確切知道該推送訊息到哪裡,得由 Subscriber 主動拉取。
  • 不限制網路流量/能源需求者
    • 定期向 Pub/Sub 拉訊息的行為,會使得 Subscriber 不斷消耗能源及網路流量,因此不適合 IoT 相關設備。
  • 沒有 SSL 憑證或是固定 IP 位址者
    • Push Subscription 需要端點(Endpoint)有 HTTPS 支援
    • 沒有 SSL 憑證或域名常更換 IP 位址者,建議採用 Pull Subscription。

參考連結 

相關文章

comments powered by Disqus