實作 Google Cloud Pub/Sub Push Subscription

學習如何透過 Golang 來撰寫 Google Cloud Pub/Sub Subscriber

ta-ching chen

3 minute read

 文章目錄

Prerequisite 

Subscriber 

在 Cloud Pub/Sub 中,訊息的傳遞分成 Pull SubscriptionPush Subscription 兩種,本篇主要說明 Push Subscription 模式,以及適用的場景。

Push Subscription 

Pub/Sub Flow

Push Subscription 是 Cloud Pub/Sub 將收到的訊息透過 RESTful API 的方式即時推送到 Subscriber 端。

首先至 GCP PubSub 頁面 註冊 Push Endpoint,不讓 Subscriber 自己註冊的原因在於 Endpoint 位置為固定的,若自行註冊相同 Endpoint 則彼此會衝突。

Create Push Subscription Endpoint

Push Subscription 是被動等待訊息推送過來,我們先撰寫個簡單的 HTTP Server 以供服務。

建立 RESTful API route /,並指定該 route 對應的 handler 來處理請求。

// push/httpServer.go
http.HandleFunc("/", handler)

必須指定 SSL 憑證的 crt 以及 key 的位置,才能正常服務 HTTPS 請求。

// You cann apply for free ssl cert at https://www.sslforfree.com/
err := http.ListenAndServeTLS("0.0.0.0:443", "/certificate.crt", "/private.key", nil)
if err != nil {
    log.Fatal("ListenAndServe: ", err)
}

透過 RESTful API 傳送過來訊息含有 Pub/Sub 的訊息元資料(MetaData),因此我們需要先定義 pushRequest 格式

type pushRequest struct {
    Message struct {
        Attributes map[string]string
        Data       []byte
        ID         string `json:"message_id"`
    }
    Subscription string
}

Request handler 會利用上面定義好的格式,將推送過來的訊息轉成 object,由於 Payload 放在 Message.Data 裡,將其取出做 json.Unmarshal 就能得到我們真的所需要的訊息了!

func handler(w http.ResponseWriter, r *http.Request) {
    msg := &pushRequest{}
    if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
        http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
        return
    }
    session := &common.Session{}
    if e := json.Unmarshal(msg.Message.Data, session); e != nil {
        log.Printf("Decode Error: %v. Wrong payload format: %v", e, session)
    } else {
        log.Printf("sessionid: %s timestamp: %d\n", session.SessionID, session.TimeStamp)
    }
}

Build Docker Image 

接下來就把程式打包成 Docker image 以方便部署,首先我們必須 :

  • 把 SSL 憑證的 certificate.crtprivate.key 放在 push/asset 的資料夾下
  • (Optional) 把 ca-certificates.crt 也放在 push/asset 的資料夾下,原先已為各位預備好 Ubuntu 14.04 上的 crt,若有疑慮的話可以在 /etc/ssl/certs 下找到。

build.sh 會自動編譯程式、打包 Docker Image,最後會將 Docker Image 上傳至 Google Container Registry

$ cd push
# build.sh will upload built image to google image registry automatically
$ GOOGLE_CLOUD_PROJECT="<google project name>" ./build.sh

Further Readings 


Kubernetes (GKE) 

此次我們採用 Google Kubernetes Engine (GKE) 測試 Push Subscription 的部分,若不熟捻 GKE 可以參考

首先建立 Kubernetes 的 Service,將域名指到該 Service 的 EXTERNAL-IP 位址。

$ kubectl apply -f push/k8s/push-svc.yaml
$ kubectl get svc
NAME               CLUSTER-IP     EXTERNAL-IP       PORT(S)            AGE
pubsub-service     10.3.246.101   x.x.x.x           443/TCP            39s

替換 push/k8s/push-deployment.yaml 內的設定

$ kubectl apply -f push/k8s/push-deployment.yaml
$ kubectl get pod -l service=pubsub
NAME                               READY     STATUS    RESTARTS   AGE
pubsub-push-sub-3887215605-d9xbc   1/1       Running   0          3m

最後查看該 Pod 是否有正常收到來自 Cloud Pub/Sub 訊息,若沒有請重新執行 Publisher。

$ kubectl logs -f pubsub-push-sub-3887215605-d9xbc
2017/01/05 17:13:27 sessionid: 096341fa-d1ee-4c21-bcd6-ae5be94f3be5 timestamp: 1483636407
2017/01/05 17:13:28 sessionid: 3a882651-043c-477a-b3b3-c67247dfa02c timestamp: 1483636408
2017/01/05 17:13:29 sessionid: 84d1bd85-f880-4479-ac76-cfed0aea8a4e timestamp: 1483636409
2017/01/05 17:13:31 sessionid: 57bc6c95-3170-4914-aebc-23f6e16a6de6 timestamp: 1483636410
2017/01/05 17:13:32 sessionid: 06211727-6f0e-40bd-b47f-143594410e6c timestamp: 1483636411
2017/01/05 17:13:33 sessionid: 3758bc3c-c8d0-47a3-a683-56e821418317 timestamp: 1483636413

TroubleShooting 

  • 一直沒有收到來自 Cloud Pub/Sub 的 Push Request
    • 記得確認 SSL 憑證是否合法,比方說核發給錯誤域名、自己簽署都會導致無法運作

使用案例 

Push Subscription 是由 Subscriber 被動接受來自 Cloud Pub/Sub 的訊息,較適用於以下幾種場景

  • 網路流量/能源需求受限制者
    • 由於是有訊息來才會觸發行為,Subscriber 能較為節省能源及網路流量,適合 IoT 相關設備。
  • 要求即時性的服務
    • 當 Pub/Sub 一收到訊息會立即推送到 Endpoint,非常適合即時性高的服務。

參考連結 

相關文章

comments powered by Disqus