メッセージングサービスとして GCP の Cloud Pub/Sub を Python で扱ってみた

はじめに

とある理由からメッセージングの機能を使いたく、最終的にGCP で提供されているCloud Pub/Subを選んだため、何か書こうと思います。

cloud.google.com

AmazonKinesisApacheのKafka、RedisのPub/Subなどと同じような、メッセージング(キューイングみたいな)サービスです。
GCPを他で使う必要があったため、同じGCP上のCloud Pub/Subを使おうとなりました。

使ってみる

トピックとサブスクリプションの作成

トピックとは、メッセージを送る先のプールのようなものです。
サブスクリプションとは、トピックからメッセージを受け取り、管理し、最終的に処理を行うアプリケーション(サブスクライバー)に引き渡す役割があるものです。

まず、Cloud Pub/SubのAPIを有効にして、トピックを作成します。 今回は my-test-topic とします。

f:id:ohshige:20190316174142p:plain:w300

続いて、今作成したトピックに紐づくサブスクリプションを作成します。

f:id:ohshige:20190316174205p:plain:w500

名前は my-test-subscription にして、それ以外はデフォルトにします。 
配信タイプは、pull型だとサブスクライバー(処理する側)が定期的にメッセージを受け取り、push型だと特定のエンドポイントにメッセージが届きます。
受信確認を待つ時間は、再送するまでの猶予時間です。

f:id:ohshige:20190316174221p:plain:w500

サービスアカウントの作成と権限付与

作成したトピックやサブスクリプションを使えるようにするために、サービスアカウントを作成して権限を付与します。

まずは、サービスアカウントを作成します。
アカウント名と説明だけを入力して、一旦そのまま作成を完了させます。
今回はmy-test-cloud-pubsubという名前にします。

f:id:ohshige:20190316182519p:plain:w500

その後でJSON形式で鍵を生成します。

f:id:ohshige:20190316182534p:plain:w500

f:id:ohshige:20190316182545p:plain:w300

ダウンロードした鍵は適当な場所に適当な名前で置いておきます。
今回は/home/ohshige/my-test-cloud-pubsub.jsonにします。

続いて、サブスクリプションの詳細ページから、右上の情報パネルを表示した上で、今作成したサービスアカウントを選択し「Pub/Sub サブスクライバー」の役割を追加します。 f:id:ohshige:20190316183205p:plain

メッセージの公開

試しに作成したトピックにメッセージをパブリッシュしてみます。

作成したトピックを選んで、「メッセージをパブリッシュ」を選択します。

f:id:ohshige:20190316175129p:plain:w500

続いて、任意のメッセージと属性を追加して、「公開」します。

f:id:ohshige:20190316175252p:plain:w300

これをサブスクライバーで受け取ることができれば確認できます。

サブスクライバーを作る

Pythonでサブスクライバーを作ってみます。
以下はPython 3.7で試しています。

準備

まずやるべきことは環境変数の設定です。
先程ダウンロードした鍵へのパスを環境変数に与えます。

export GOOGLE_APPLICATION_CREDENTIALS="/home/ohshige/my-test-cloud-pubsub.json"

続いて、適当な環境でgoogle-cloud-pubsubをインストールします。

pip install google-cloud-pubsub

動作確認

とりあえず、サブスクライバーとして動作するか確認してみます。
subscriptionがprojects/ohshige/subscriptions/my-test-subscriptionだとすると、以下のようなPythonを実行します。

import time
from google.cloud import pubsub_v1

def callback(message):
    print(f"receive: {message}")
    message.ack()

subscriber = pubsub_v1.SubscriberClient()
subscriber.subscribe("projects/ohshige/subscriptions/my-test-subscription", callback=callback)

while True:
    time.sleep(60)

そうすると、各種設定が間違っていなければ以下のような出力が得られます。

receive: Message {
  data: b'\xe9\x81\xa9\xe5\xbd\x93\xe3\x81\xaa\xe6\x96\x87\xe5\xad\x97\xe5\x88\x97'
  attributes: {
    "hoge": "fuga"
  }
}

先程パブリッシュした内容です。
「適当な文字列」という文字列は、b'\xe9\x81〜'となっていることからバイト列になっていることがわかります。

無限ループになっているので上記の出力がされたあともプログラムは終了しませんが、特に何もおきません。
また、一旦Ctl+Cで停止させてから、再度実行したとしても何も起こりません。
その理由が、callback関数内のmessage.ack()です。
これは、受け取ったメッセージの処理が完全に完了したことを通知するもので、この結果サブスクリプション側はこのメッセージは再送しなくなります。
確認するために、message.ack()を消した上で実行し、改めてGCP上でメッセージを公開してみます。
もちろん、そのメッセージは出力されます。
しかし、完了通知はされていないため、しばらくすると改めてメッセージが届きます。
(というはずなのですが、待ってもメッセージは届かず、一旦プログラムを停止してから、再度実行すると同じメッセージが届きます。理由がわからない...)

それっぽい処理

動作確認はできたので、ありえそうなプログラムに変えてみます。

attributesの1つとしてフォーマットが、dataにはそのフォーマットに沿ってエンコードされた文字列が与えられるとして、デコードした結果を表示してみます。

import time
import json
import yaml
from google.cloud import pubsub_v1

def callback(message):
    print(f"receive: {message}")

    # データはバイト列なので文字列にするためにデコードする
    data = message.data.decode()
    # 属性は辞書型のように扱える
    attributes = message.attributes

    # フォーマットに応じてデコードの方法を変える
    if "format" not in attributes:
        print(data)
    elif attributes["format"] == "json":
        print(json.loads(data))
    elif attributes["format"] == "yaml":
        print(yaml.load(data, Loader=yaml.BaseLoader))
    else:
        pass

    message.ack()

subscriber = pubsub_v1.SubscriberClient()
subscriber.subscribe("projects/ohshige/subscriptions/my-test-subscription", callback=callback)

while True:
    time.sleep(60)

YAMLを使いたかったため、pyyamlをインストールしています。

formatを指定しないパターン

f:id:ohshige:20190321151543p:plain:w500

receive: Message {
  data: b'x:\xe3\x81\x82\xe3\x81\x82\xe3\x81\x82/y:\xe3\x81\x84\xe3\x81\x84\xe3\x81\x84/z:\xe3\x81\x86\xe3\x81\x86\xe3\x81\x86,\xe3\x81\x88\xe3\x81\x88\xe3\x81\x88,\xe3\x81\x8a\xe3...'
  attributes: {}
}
x:あああ/y:いいい/z:ううう,えええ,おおお

format=jsonのパターン

f:id:ohshige:20190321151732p:plain:w500

receive: Message {
  data: b'{"x": "\xe3\x81\x82\xe3\x81\x82\xe3\x81\x82", "y": "\xe3\x81\x84\xe3\x81\x84\xe3\x81\x84", "z": ["\xe3\x81\x86\xe3\x81\x86...'
  attributes: {
    "format": "json"
  }
}
{'x': 'あああ', 'y': 'いいい', 'z': ['ううう', 'えええ', 'おおお']}

format=yamlのパターン

f:id:ohshige:20190321151825p:plain:w500

receive: Message {
  data: b'x: \xe3\x81\x82\xe3\x81\x82\xe3\x81\x82\ny: \xe3\x81\x84\xe3\x81\x84\xe3\x81\x84\nz:\n - \xe3\x81\x86\xe3\x81\x86\xe3\x81\x86\n - \xe3\x81\x88\xe3\x81...'
  attributes: {
    "format": "yaml"
  }
}
{'x': 'あああ', 'y': 'いいい', 'z': ['ううう', 'えええ', 'おおお']}

jsonyamlはどうでも良くて、dataやattributesの使い方がわかればと思います。
attributesの使い道がよくわからないのですが、こんな感じを想定しているのでしょうか。

さいごに

本当はパブリッシャーのPythonでの扱い方も書く予定でしたが、画像が多くなってしまったので、サブスクライバーだけにします。

受信確認時間がうまく機能していないように感じたり、使っていて思う部分はあるのですが、とりあえず以上です。