Cloud Workflows × Momento Cacheで実現するシンプルな変数共有

Welcome file

Cloud Workflowsの制約

Google CloudのWorkflowsは、さまざまなGoogle Cloudサービスを組み合わせたワークフローを、フルマネージドかつサーバーレスで簡単に構築できる便利なサービスです。しかし、利用時にはいくつかの制約があり、その中の一つがWorkflows内で使用される変数、引数、イベントの合計サイズです。このサイズには512KBの上限が設定されており、例えば、各ステップ間でデータを共有するために変数を多用すると、容易にこの制限に達してしまいます。
Workflows リソースの上限

この課題を解決するために、今回はMomento社が提供するMomento Cacheを活用し、Workflows内での一時データの管理をしてみたいと思います。Momento Cacheを利用することで、Workflowsの制限を回避しながら柔軟かつスケーラブルなデータ管理が可能になります。この記事では、その具体的な実装方法とメリットを詳しく解説していきます。
*Workflowsのみでは難しいので、Cloud Run functionsを介した実装となっています

Momento Cacheについて

Momento Cacheは、Redis互換のシンプルで使いやすい完全マネージド型のサーバーレスキャッシュサービスです。いわゆる「なんちゃってサーバーレス」とは違い、複雑な設定が一切不要で、スケーラブルに利用できます。料金体系も完全な従量課金制なので、使った分だけ支払えばOKです。加えて充実の無料枠もあります。

また、主要な環境向けのSDKがそろっており、ドキュメントやサンプルコード等も充実しているため、簡単に導入して利用を始めることができます。

本記事で実装するもの

本記事では、音声データを文字起こしした後に、文字起こし後のデータを整形、複数のLLM(Gemini 1.5 flash, pro, Gemini 2.0 flash)で要約を行ってみるWorkflowsを構築していきます。

準備

Momento

Cacheの作成とAPI Keyの発行をしていきます。
ログインしたら左のメニューのCachesに移動し、右上のCreate Cacheを押します。Cache Nameにworkflows-exampleと入力してCreateを押します。

続いて、左のメニューのAPI KeysからGenerate Api Keyページに移動します。

Type of key: Fine-Grained Access Key
Expiration: 7 days

と入力。

Permission Type: Cache
Cache Name: workflows-example
Role Type: readwrite

でAdd Permissionを押します。
一番下にあるGenerate Api Keyを押すと、API Keyの情報が出てくるのでメモをしておきます。

ここで、Momentoを使う上で注意しないといけない点が一点あります。現状、Momentoでは一度発行したAPI Keyを無効化する手段が提供されていません。そのため、有効期限なし(No Expiration)のAPI Keyが漏洩した場合、目も当てられないような状況になってしまいます。したがって、通常のAPI Key以上に厳重な管理が求められます。

Secret Managerに作成したAPI Keyを格納します。

gcloud secrets create momento-api-key \
    --project=$PROJECT_ID \
    --replication-policy="automatic"

API_KEY=""
echo -n $API_KEY | gcloud secrets versions add momento-api-key \
    --project=$PROJECT_ID --data-file=-

サービスアカウントの作成

Workflows + Cloud Run functions用にサービスアカウントの作成と必要なロールの割り当てを行っていきます(横着して1つのサービスアカウントで行っています)。

# プロジェクトID等を設定
PROJECT_ID=""
SERVICE_ACCOUNT_NAME=""

# サービスアカウントの作成
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME \
    --project=$PROJECT_ID \
    --display-name="$SERVICE_ACCOUNT_NAME"

# サービスアカウントのメールアドレスを取得
SERVICE_ACCOUNT_EMAIL="$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com"

# 必要なロールをサービスアカウントに付与

# Speech-To-Text API 用のロール
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$SERVICE_ACCOUNT_EMAIL" \
    --role="roles/speech.editor"

# Cloud Storage の読み取りアクセス用のロール
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$SERVICE_ACCOUNT_EMAIL" \
    --role="roles/storage.objectViewer"

# Vertex AI 用のロール
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$SERVICE_ACCOUNT_EMAIL" \
    --role="roles/aiplatform.user"

# Cloud Run function用のロール
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$SERVICE_ACCOUNT_EMAIL" \
    --role="roles/run.invoker"

# ログ用のロール
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$SERVICE_ACCOUNT_EMAIL" \
    --role="roles/logging.logWriter"

# Secret Mangerから値を取得する用のロール
gcloud secrets add-iam-policy-binding momento-api-key \
    --member="serviceAccount:$SERVICE_ACCOUNT_EMAIL" \
    --role="roles/secretmanager.secretAccessor"

Cloud Storageと音声ファイルの準備

Cloud StorageのBucketの作成と、テスト用のmp3ファイルをアップロードします。

BUCKET_NAME="gs://"

gcloud storage buckets create $BUCKET_NAME --location=us-central1 --project=$PROJECT_ID
gcloud storage cp sample.mp3 $BUCKET_NAME --project=$PROJECT_ID

Cloud Run functionsの作成

2つのCloud Run functionsを作成します。1つ目は文字起こしデータの整形をするfunction1、2つ目はLLMを利用して文字起こしデータを要約するfunction2です。

func1/
  main.py
  momento_client.py
  requirements.txt
func2/
  main.py
  momento_client.py
  requirements.txt

momento_client.pyはMomento Cacheを利用するための簡易クラスを定義したものとなっています。

from datetime import timedelta
from momento import CacheClient, Configurations, CredentialProvider
from momento.responses import CacheGet, CacheSet

class MomentoCacheClient:
  def __init__(self, api_key, ttl_seconds=3600):
    self.momento_api_key = CredentialProvider.from_string(api_key)
    self.ttl = timedelta(seconds=ttl_seconds)
    self.client = CacheClient.create(
      configuration=Configurations.Laptop.v1(),
      credential_provider=self.momento_api_key,
      default_ttl=self.ttl
    )

  def set(self, cache_name, key, value, ttl=None):
    resp = self.client.set(cache_name, key, value, ttl)
    match resp:
      case CacheSet.Success():
        print("Value set successfully.")
      case CacheSet.Error() as error:
        print(f"Error setting value: {error.message}")
      case _:
        print("Unreachable")

  def get(self, cache_name, key):
    resp = self.client.get(cache_name, key)
    match resp:
      case CacheGet.Hit():
        print("Value is " + resp.value_string)
        return resp.value_string
      case CacheGet.Miss():
        print("Key not found.")
        return None
      case CacheGet.Error() as error:
        print(f"Error getting key: {error.message}")
        return None
      case _:
        print("Unreachable")
        return None

それぞれの、main.py、requirements.txtは以下のようになっています。

func1/requirements.txt

Flask
google-cloud-speech
google-cloud-secret-manager
protobuf
momento

func1/main.py

import os
import re
import json

from google.cloud import speech_v2
from google.protobuf.json_format import MessageToJson
from google.cloud import secretmanager
from flask import Flask, request, jsonify

from momento_client import MomentoCacheClient

PROJECT_ID = 'YOUR PROJECT ID'
CACHE_NAME = 'workflows-example'

app = Flask(__name__)

def get_secret(secret_name):
  client = secretmanager.SecretManagerServiceClient()
  name = f"projects/{PROJECT_ID}/secrets/{secret_name}/versions/latest"

  response = client.access_secret_version(request={"name": name})
  return response.payload.data.decode("UTF-8")

def fetch_transcription_data(operation_name):
  client = speech_v2.SpeechClient(client_options={"api_endpoint": "asia-northeast1-speech.googleapis.com"})
  operation = client.get_operation(request={'name': operation_name})

  response = json.loads(MessageToJson(operation))['response']
  return response


def formatted_sentences_with_timecode(response):
  # 最初の結果キーを取得
  key, *_ = response['results']
  full_transcript = ''
  all_words = []

  # 全ての結果から転写とワードを抽出
  for result in response['results'][key]['transcript']['results']:
    full_transcript += result['alternatives'][0]['transcript']
    all_words.extend(result['alternatives'][0]['words'])

  # 文を分割し、空の文を除去
  sentences = re.split(r'([。!?.!?])', full_transcript.replace(" ", ""))
  sentences = [''.join(i) for i in zip(sentences[0::2], sentences[1::2] + [''])]
  sentences = [s for s in sentences if s]

  sentences_with_timecode = []
  start_time = None
  sentence_idx = 0
  current_length = 0
  # 最初の文から句読点を除去
  cleaned_sentence = re.sub(r'[、。!?,.!?]', '', sentences[0])

  if 'startOffset' not in all_words[0]:
    all_words[0]['startOffset'] = '0.0s'

  # 各ワードを処理
  for word in all_words:
    start_offset = float(word['startOffset'][:-1])
    end_offset = float(word['endOffset'][:-1])
    if start_time is None:
      start_time = start_offset

    word_length = len(word['word'])
    # 現在の文が完成したかチェック
    if current_length + word_length >= len(cleaned_sentence):
      end_time = end_offset
      overshoot = 0
      # ワードが文の境界を超える場合、終了時間を調整
      if current_length + word_length > len(cleaned_sentence):
        overshoot = current_length + word_length - len(cleaned_sentence)
        ratio = 1 - overshoot / word_length
        end_time = start_offset + (end_offset - start_offset) * ratio

      # 文とそのタイムコードを追加
      sentences_with_timecode.append({
        'sentence': sentences[sentence_idx],
        'start_time': start_time,
        'end_time': end_time
      })

      # 次の文へ移動
      sentence_idx += 1
      if sentence_idx >= len(sentences):
        break

      # 次の文の処理準備
      start_time = end_time
      current_length = overshoot
      cleaned_sentence = re.sub(r'[、。!?,.!?]', '', sentences[sentence_idx])
    else:
      current_length += word_length

  return sentences_with_timecode

@app.route("/", methods=["POST"])
def generate_summary():
  data = request.get_json()
  operation_name = data["operation_name"]
  workflow_execution_id = data["workflow_execution_id"]

  transcription_response = fetch_transcription_data(operation_name)
  sentences_with_timecode = formatted_sentences_with_timecode(transcription_response)

  # Momentoにタイムコード付き文字起こしデータを保存
  momento_client = MomentoCacheClient(get_secret('momento-api-key'), 10800)
  momento_client.set(CACHE_NAME, f'{workflow_execution_id}/transcript', json.dumps(sentences_with_timecode))

  return jsonify({"message": "success"}), 200


app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))

Speech-To-Textの結果を受け取り、一文ごとにタイムコードをつけて分割したデータをMomento Cacheに格納しています。

func2/requirements.txt

Flask
google-genai
google-cloud-secret-manager
momento

func2/main.py

import os
import json

from google import genai
from google.genai import types
from google.cloud import secretmanager
from flask import Flask, request, jsonify

from momento_client import MomentoCacheClient

PROJECT_ID = 'YOUR PROJECT ID'
CACHE_NAME = 'workflows-example'

app = Flask(__name__)

def get_secret(secret_name):
  client = secretmanager.SecretManagerServiceClient()
  name = f"projects/{PROJECT_ID}/secrets/{secret_name}/versions/latest"

  response = client.access_secret_version(request={"name": name})
  return response.payload.data.decode("UTF-8")

def create_summary(contents, model_name):
  # タイムコード付きの文を使用して要約を作成
  client = genai.Client(
    vertexai=True, project=PROJECT_ID, location='us-central1'
  )

  response = client.models.generate_content(
    model='gemini-2.0-flash-exp',
    contents=contents,
    config=types.GenerateContentConfig(
      system_instruction='受け取ったタイムコード付きの文を150文字程度に要約してください。',
    )
  )

  return response.text

@app.route("/", methods=["POST"])
def generate_summary():
  data = request.get_json()
  workflow_execution_id = data["workflow_execution_id"]
  model_name = data["model_name"]

  # Momentoから文字起こし結果を取得
  momento_client = MomentoCacheClient(get_secret('momento-api-key'), 10800)
  transcript = json.loads(momento_client.get(CACHE_NAME, f'{workflow_execution_id}/transcript'))

  contents = "\n".join("[{start_time}-{end_time}] {sentence}".format(**item) for item in transcript)
  summary = create_summary(contents, model_name)

  return jsonify({"message": "success", 'summary': summary}), 200


app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))

func1/main.pyで処理されたデータを、Momento Cacheから取り出し、指定されたモデルを使用して要約文を作成します。

それぞれ、func1、func2のデプロイをしていきます。

# func1/
gcloud functions deploy format_transcript \
    --project=$PROJECT_ID \
    --runtime python310 \
    --trigger-http \
    --entry-point app \
    --timeout 3600s \
    --memory 256MB \
    --no-allow-unauthenticated \ 
    --service-account $SERVICE_ACCOUNT_EMAIL
# func2/
gcloud functions deploy create_summary \
    --project=$PROJECT_ID \
    --runtime python310 \
    --trigger-http \
    --entry-point app \
    --timeout 3600s \
    --memory 256MB \
    --no-allow-unauthenticated \ 
    --service-account $SERVICE_ACCOUNT_EMAIL

Workflowsの作成

Google Cloudのワークフロー/ワークフロー画面に行き、[+作成]ボタンを押します。

ワークフロー名: workflows-example
サービスアカウント: 先ほど作成したサービスアカウント

を設定し、その他のデフォルトのまま[次へ]を押します。

詳細な説明は省きますが、冒頭で述べたワークフローを実行するYAMLです。このYAMLを貼り付けた後、[デプロイ]ボタンを押してください。
GCSのURIを受け取り、Speech-To-Textで文字起こしを行い、データを整形した後、複数のモデルを利用して並列に要約を作成するWorkflowsです。

main:
    params: [input]
    steps:
        - initialize:
            assign:
               - workflow_execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
               - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
               - results: {}
    
        # Speech-To-Text 非同期処理の開始
        - initialize_transcription:
            call: http.post
            args:
                url: ${"https://asia-northeast1-speech.googleapis.com/v2/projects/" + project_id + "/locations/asia-northeast1/recognizers/_:batchRecognize"}
                auth:
                    type: OAuth2
                body:
                    config:
                        features:
                            enableWordConfidence: true
                            enableWordTimeOffsets: true
                            enableAutomaticPunctuation: true
                            enableSpokenPunctuation: true
                        model: "long"
                        languageCodes: ["ja-JP"]
                        autoDecodingConfig: {}
                    recognitionOutputConfig:
                        inlineResponseConfig: {}
                    files:
                        - uri: ${input.gcs_uri}
            result: transcription_operation
    
        # Speech-To-Text 非同期処理結果の確認
        - poll_transcription:
            try:
                steps:
                - sleep_for_wait_transcription:
                    call: sys.sleep
                    args:
                        seconds: 10
                    next: check_operation
                - check_operation:
                    call: http.get
                    args:
                        url: ${"https://asia-northeast1-speech.googleapis.com/v2/" + transcription_operation.body.name + "?fields=done,name,metadata"}
                        auth:
                          type: OAuth2
                    result: operation_status
                # "done" が operation_status.body に存在するか確認
                - check_field_exists:
                    switch:
                        - condition: ${"done" in operation_status.body}
                          next: evaluate_transcription_done
                    next: poll_transcription
                # done が true かどうかを確認
                - evaluate_transcription_done:
                    switch:
                        - condition: ${operation_status.body.done != true}
                          next: poll_transcription
            except:
                as: error
                steps:
                - handleError:
                    raise: ${error}
    
        # 文字起こしデータを整形する
        - format_transcript:
            call: http.post
            args:
              url: ${"https://us-central1-" + project_id + ".cloudfunctions.net/format_transcript"}
              auth:
                type: OIDC
                audience: ${"https://us-central1-" + project_id + ".cloudfunctions.net/format_transcript"}
              body:
                operation_name: ${transcription_operation.body.name}
                workflow_execution_id: ${workflow_execution_id}

        # 要約を作成する
        - create_summary:
            parallel:
              shared: [results]
              for:
                value: model_name
                in: ["gemini-1.5-flash", "gemini-1.5-pro", "gemini-2.0-flash-exp"]
                steps:
                  - send_request:
                      call: http.post
                      args:
                        url: ${"https://us-central1-" + project_id + ".cloudfunctions.net/create_summary"}
                        auth:
                          type: OIDC
                          audience: ${"https://us-central1-" + project_id + ".cloudfunctions.net/create_summary"}
                        body:
                          workflow_execution_id: ${workflow_execution_id}
                          model_name: ${model_name}
                      result: temp_result
                  - assign_result:
                      assign:
                        - results[model_name]: ${temp_result.body.summary}

        - finalize:
            return: ${results}

Workflowsの実行

ワークフローの詳細画面に移動し、上部の[実行]ボタンをクリックしてください。
次に、入力として以下のJSONをコピーし、貼り付けて、左下の実行ボタンを押してください。

{
  "gcs_uri":"gs://YOUR-BUCKET/sample.mp3"
}

しばらくすると、処理が完了し、要約された文章が表示されるはずです。

この結果から、func1で整形されたデータがMomento Cacheを介してfunc2で利用されていることを確認できます。 ここまでの一連の作業で、Momento Cacheに関する設定がほとんど不要であることにお気づきいただけたかと思います。 また、SDKが提供されており、その利用方法もシンプルであるため、特別な苦労なく手軽に変数共有を実現できることを実感していただけたのではないでしょうか。

まとめ

Google Cloud Workflowsでは、ステップ間での大規模なデータ連携が必要な場合、変数のサイズ制限が実務上の課題となることがあります。特に、音声の文字起こし結果のような、Workflowsの変数容量を超える大規模データを扱う際には、効果的な解決策が必要です。

この課題に対して、Momento Cacheは1つの解決策となるのがお分かりいただけたかと思います。Workflowsだけでは難しい大容量データの受け渡しも、Momento Cacheを組み合わせることで実現できます。例えば、大規模な文字起こしデータを一時的にキャッシュに保管し、後続の処理で必要に応じて取り出して利用することが可能です。

今回は、Workflowsにおけるステップ間の変数共有にMomento Cacheを活用しました。Momento CacheはRedis互換であるため、様々な場面での応用が期待できます。導入、管理、コストの各面から見ても優れたプロダクトだと感じているので、今後も積極的に活用していきたいと考えています。

Previous Post