實作 Google Cloud Pub/Sub Pull Subscription
學習如何透過 Golang 來撰寫 Google Cloud Pub/Sub Subscriber
文章目錄
在 Cloud Pub/Sub 中,訊息的傳遞分成 Pull Subscription
及 Push Subscription
兩種,本篇主要說明 Pull Subscription
模式,以及適用的場景。
一樣在進入程式碼前,先來看看如何執行會動的範例
$ cd pull
$ GOOGLE_CLOUD_PROJECT="<google project name>" go run pull.go
$ GOOGLE_CLOUD_PROJECT="test-project" go run pull.go
Created subscription: projects/test-project/subscriptions/example-topic
2017/01/04 01:04:22 Got message: 4d27aaba-e62b-49cf-8fd9-e784a99064d5
2017/01/04 01:04:24 Got message: 48b04306-18de-44f2-b1b3-c0e736f52d32
2017/01/04 01:04:25 Got message: d395cd6b-02ef-4e7d-a6ec-a84d0cf27045
2017/01/04 01:04:26 Got message: a56991b0-b664-48d7-8901-de80783f182d
2017/01/04 01:04:27 Got message: d0cebf2f-619c-4642-b51b-d5485fd072c2
2017/01/04 01:04:29 Got message: 46d25e6d-0f6b-4dc9-be88-8133e2a011e1
2017/01/04 01:04:30 Got message: dc517460-dc5b-4f61-ab20-490c186360a9
2017/01/04 01:04:32 Got message: f1ed3359-125b-4931-b631-d251a2109dcd
2017/01/04 01:04:32 Got message: eaf771ca-c8e3-40ef-a89e-429b6bf8f55a
2017/01/04 01:04:34 Got message: fb8b201c-2856-4bed-ac1a-3fc27d51bdac
如果沒有接收到任何訊息的話,請開啟額外視窗並重新執行 publisher.go
。
Pull Subscription
顧名思義就是 Subscriber 定期向 Pub/Sub 服務拉取最新的訊息進行處理。當 Subscriber 起來時會先呼叫 CreateSub
,然後透過官方 pubsub package 的 CreateSubscription
向服務建立 subscription。
- 完整程式碼請見 pullSubscription.go
// pull/pull.go
if err := common.CreateSub(client, topicName, topic, nil); err != nil {
log.Fatal(err)
}
- 完整程式碼請見 Functions
// common/functions.go
func CreateSub(client *pubsub.Client, name string, topic *pubsub.Topic, pushConfig *pubsub.PushConfig) error {
ctx := context.Background()
// [START create_subscription]
sub, err := client.CreateSubscription(ctx, name, topic, 20*time.Second, pushConfig)
if err != nil {
return err
}
fmt.Printf("Created subscription: %v\n", sub)
// [END create_subscription]
return nil
}
創建成功後,你可以在 GCP PubSub 頁面上看到 Topic 多出 Delivery Type
為 Pull
的 Subscriber
接著就透過 Pull
來拉取訊息,並利用 json.Unmarshal
取回剛剛放進去 Message.Data
的 Payload
// pull/pull.go
func pullMsgs(client *pubsub.Client, name string, topic *pubsub.Topic) error {
ctx := context.Background()
// get Subscription
sub := client.Subscription(name)
// get message iterator
it, err := sub.Pull(ctx)
if err != nil {
return err
}
defer it.Stop()
for i := 0; i < 10; i++ {
// retrieve message
msg, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
session := &common.Session{}
// retrieve payload we put into Data field
err = json.Unmarshal(msg.Data, session)
if err == nil {
log.Printf("Got message: %s\n", session.SessionID)
}
msg.Done(true)
}
return nil
}
接下來就把程式打包成 Docker image 以方便部署,首先我們必須 :
- 按照 Developer Guide 的 “How the Application Default Credentials work” 申請 certificate json 並重新命名為 crt.json 後放置到 pull/asset 下。
- (Optional) 把
ca-certificates.crt
也放在 push/asset 的資料夾下,原先已為各位預備好 Ubuntu 14.04 上的 crt,若有疑慮的話可以在/etc/ssl/certs
下找到。
build.sh
會自動編譯程式、打包 Docker Image,最後會將 Docker Image 上傳至 Google Container Registry。
$ cd pull
# build.sh will upload built image to google image registry automatically
$ GOOGLE_CLOUD_PROJECT="<google project name>" ./build.sh
此次我們採用 Google Kubernetes Engine (GKE) 測試 Pull Subscription 的部分,若不熟捻 GKE 可以參考
替換 pull/k8s/pull-deployment.yaml 內的設定
$ kubectl apply -f pull/k8s/pull-deployment.yaml
$ kubectl get pod -l service=pubsub
NAME READY STATUS RESTARTS AGE
pubsub-pull-sub-2287215605-d3xd1 1/1 Running 0 3m
最後查看該 Pod 是否有正常收到來自 Cloud Pub/Sub 訊息,若沒有請重新執行 Publisher。
$ kubectl logs -f pubsub-pull-sub-2287215605-d3xd1
2017/01/04 01:53:23 Got message: 2ada0c34-0fcf-46be-b3d7-82a87cf7234a
2017/01/04 01:53:23 Got message: 8b26e9c6-978b-422b-ba3b-6adf16312dea
2017/01/04 01:53:23 Got message: e8d5d81a-aaea-49a3-8a07-a53042440460
2017/01/04 01:53:23 Got message: 353c50a8-6e91-4ba0-949d-f95e473f923f
2017/01/04 01:53:23 Got message: ac2ddf3c-68cf-4417-8301-53a74d212ef7
2017/01/04 01:53:23 Got message: 3758bc3c-c8d0-47a3-a683-56e821418317
2017/01/04 01:53:23 Got message: 09a55b73-a67e-4be8-a309-ac50e528af3e
2017/01/04 01:53:23 Got message: 06211727-6f0e-40bd-b47f-143594410e6c
Pull Subscription 是由 Subscriber 主動
向 Cloud Pub/Sub 拉取訊息,較適用於以下幾種場景
- 自動擴展或縮減的運算叢集(Auto Scaling Cluster)
- 每次新啟動 Subscriber 可能擁有不同的 IP 位址
- Pub/Sub 會無法確切知道該推送訊息到哪裡,得由 Subscriber 主動拉取。
- 不限制網路流量/能源需求者
- 定期向 Pub/Sub 拉訊息的行為,會使得 Subscriber 不斷消耗能源及網路流量,因此不適合 IoT 相關設備。
- 沒有 SSL 憑證或是固定 IP 位址者
- Push Subscription 需要端點(Endpoint)有 HTTPS 支援
- 沒有 SSL 憑證或域名常更換 IP 位址者,建議採用 Pull Subscription。
- https://cloud.google.com/pubsub/docs/subscriber
- https://cloud.google.com/container-engine/docs/clusters/operations
相關文章
- 實作 Google Cloud Pub/Sub Push Subscription
- 實作 Google Cloud Pub/Sub Publisher
- 初探 Google Cloud Pub/Sub
- 在 Kubernetes 內取得使用者 IP - HTTP Loadbalancer
- 從 Google Kubernetes Engine 移除節點
文章內容的轉載、重製、發佈,請註明出處: https://tachingchen.com/tw/
Twitter
Google+
Facebook
Reddit
LinkedIn
StumbleUpon
Pinterest
Email