メッセージングサービスとして GCP の Cloud Pub/Sub を Python で扱ってみた
はじめに
とある理由からメッセージングの機能を使いたく、最終的にGCP で提供されているCloud Pub/Subを選んだため、何か書こうと思います。
AmazonのKinesis、ApacheのKafka、RedisのPub/Subなどと同じような、メッセージング(キューイングみたいな)サービスです。
GCPを他で使う必要があったため、同じGCP上のCloud Pub/Subを使おうとなりました。
使ってみる
トピックとサブスクリプションの作成
トピックとは、メッセージを送る先のプールのようなものです。
サブスクリプションとは、トピックからメッセージを受け取り、管理し、最終的に処理を行うアプリケーション(サブスクライバー)に引き渡す役割があるものです。
まず、Cloud Pub/SubのAPIを有効にして、トピックを作成します。
今回は my-test-topic
とします。
続いて、今作成したトピックに紐づくサブスクリプションを作成します。
名前は my-test-subscription
にして、それ以外はデフォルトにします。
配信タイプは、pull型だとサブスクライバー(処理する側)が定期的にメッセージを受け取り、push型だと特定のエンドポイントにメッセージが届きます。
受信確認を待つ時間は、再送するまでの猶予時間です。
サービスアカウントの作成と権限付与
作成したトピックやサブスクリプションを使えるようにするために、サービスアカウントを作成して権限を付与します。
まずは、サービスアカウントを作成します。
アカウント名と説明だけを入力して、一旦そのまま作成を完了させます。
今回はmy-test-cloud-pubsub
という名前にします。
その後でJSON形式で鍵を生成します。
ダウンロードした鍵は適当な場所に適当な名前で置いておきます。
今回は/home/ohshige/my-test-cloud-pubsub.json
にします。
続いて、サブスクリプションの詳細ページから、右上の情報パネルを表示した上で、今作成したサービスアカウントを選択し「Pub/Sub サブスクライバー」の役割を追加します。
メッセージの公開
試しに作成したトピックにメッセージをパブリッシュしてみます。
作成したトピックを選んで、「メッセージをパブリッシュ」を選択します。
続いて、任意のメッセージと属性を追加して、「公開」します。
これをサブスクライバーで受け取ることができれば確認できます。
サブスクライバーを作る
Pythonでサブスクライバーを作ってみます。
以下はPython 3.7
で試しています。
準備
まずやるべきことは環境変数の設定です。
先程ダウンロードした鍵へのパスを環境変数に与えます。
export GOOGLE_APPLICATION_CREDENTIALS="/home/ohshige/my-test-cloud-pubsub.json"
続いて、適当な環境でgoogle-cloud-pubsubをインストールします。
pip install google-cloud-pubsub
動作確認
とりあえず、サブスクライバーとして動作するか確認してみます。
subscriptionがprojects/ohshige/subscriptions/my-test-subscription
だとすると、以下のようなPythonを実行します。
import time from google.cloud import pubsub_v1 def callback(message): print(f"receive: {message}") message.ack() subscriber = pubsub_v1.SubscriberClient() subscriber.subscribe("projects/ohshige/subscriptions/my-test-subscription", callback=callback) while True: time.sleep(60)
そうすると、各種設定が間違っていなければ以下のような出力が得られます。
receive: Message { data: b'\xe9\x81\xa9\xe5\xbd\x93\xe3\x81\xaa\xe6\x96\x87\xe5\xad\x97\xe5\x88\x97' attributes: { "hoge": "fuga" } }
先程パブリッシュした内容です。
「適当な文字列」という文字列は、b'\xe9\x81〜'
となっていることからバイト列になっていることがわかります。
無限ループになっているので上記の出力がされたあともプログラムは終了しませんが、特に何もおきません。
また、一旦Ctl+Cで停止させてから、再度実行したとしても何も起こりません。
その理由が、callback関数内のmessage.ack()
です。
これは、受け取ったメッセージの処理が完全に完了したことを通知するもので、この結果サブスクリプション側はこのメッセージは再送しなくなります。
確認するために、message.ack()
を消した上で実行し、改めてGCP上でメッセージを公開してみます。
もちろん、そのメッセージは出力されます。
しかし、完了通知はされていないため、しばらくすると改めてメッセージが届きます。
(というはずなのですが、待ってもメッセージは届かず、一旦プログラムを停止してから、再度実行すると同じメッセージが届きます。理由がわからない...)
それっぽい処理
動作確認はできたので、ありえそうなプログラムに変えてみます。
attributesの1つとしてフォーマットが、dataにはそのフォーマットに沿ってエンコードされた文字列が与えられるとして、デコードした結果を表示してみます。
import time import json import yaml from google.cloud import pubsub_v1 def callback(message): print(f"receive: {message}") # データはバイト列なので文字列にするためにデコードする data = message.data.decode() # 属性は辞書型のように扱える attributes = message.attributes # フォーマットに応じてデコードの方法を変える if "format" not in attributes: print(data) elif attributes["format"] == "json": print(json.loads(data)) elif attributes["format"] == "yaml": print(yaml.load(data, Loader=yaml.BaseLoader)) else: pass message.ack() subscriber = pubsub_v1.SubscriberClient() subscriber.subscribe("projects/ohshige/subscriptions/my-test-subscription", callback=callback) while True: time.sleep(60)
YAMLを使いたかったため、pyyaml
をインストールしています。
formatを指定しないパターン
receive: Message { data: b'x:\xe3\x81\x82\xe3\x81\x82\xe3\x81\x82/y:\xe3\x81\x84\xe3\x81\x84\xe3\x81\x84/z:\xe3\x81\x86\xe3\x81\x86\xe3\x81\x86,\xe3\x81\x88\xe3\x81\x88\xe3\x81\x88,\xe3\x81\x8a\xe3...' attributes: {} } x:あああ/y:いいい/z:ううう,えええ,おおお
format=jsonのパターン
receive: Message { data: b'{"x": "\xe3\x81\x82\xe3\x81\x82\xe3\x81\x82", "y": "\xe3\x81\x84\xe3\x81\x84\xe3\x81\x84", "z": ["\xe3\x81\x86\xe3\x81\x86...' attributes: { "format": "json" } } {'x': 'あああ', 'y': 'いいい', 'z': ['ううう', 'えええ', 'おおお']}
format=yamlのパターン
receive: Message { data: b'x: \xe3\x81\x82\xe3\x81\x82\xe3\x81\x82\ny: \xe3\x81\x84\xe3\x81\x84\xe3\x81\x84\nz:\n - \xe3\x81\x86\xe3\x81\x86\xe3\x81\x86\n - \xe3\x81\x88\xe3\x81...' attributes: { "format": "yaml" } } {'x': 'あああ', 'y': 'いいい', 'z': ['ううう', 'えええ', 'おおお']}
jsonやyamlはどうでも良くて、dataやattributesの使い方がわかればと思います。
attributesの使い道がよくわからないのですが、こんな感じを想定しているのでしょうか。
さいごに
本当はパブリッシャーのPythonでの扱い方も書く予定でしたが、画像が多くなってしまったので、サブスクライバーだけにします。
受信確認時間がうまく機能していないように感じたり、使っていて思う部分はあるのですが、とりあえず以上です。