GeekFactory

int128.hatenablog.com

47,000件のbatch putを16秒で処理

以前に 大量のエンティティを処理するデザインパターン - GeekFactory を紹介しましたが、シングルスレッドのバッチ処理なのでスループットが頭打ちになる問題がありました。コンカレントに処理する方法を思いついたので実装してみました。

シングルスレッドではこんな流れでした。

  1. S3QueryResultListでn件のエンティティを取得する。
  2. エンティティをバッチ処理する。
  3. t秒以内であれば上記を繰り返す。
  4. 次のタスクにカーソルを渡す。

ここで、エンティティを取得するタスク(Splitter)とエンティティをバッチ処理するタスク(Mapper)を分けてみます。

  • Splitterタスク
    1. S3QueryResultListでn件のエンティティを取得する。
    2. エンティティをmemcacheに入れて*1、Mapperタスクに渡す。
    3. t秒以内であれば上記を繰り返す。
  • Mapperタスク
    1. memcacheからエンティティを取得する。消失していたら再クエリを投げる。
    2. エンティティをバッチ処理する。
    3. 終わり。

SplitterタスクとMapperタスクは並列に実行されるため、Mapperの内容に関係なく安定したスループットを確保できます。Producer-Consumerですね。

実験

nullプロパティを削除する(missingにする)ため、getしたエンティティをそのままputします。対象は47,110件のエンティティです。

以下の条件を設定しました。

  • Splitterタスクは、500件ごとにMapperタスクを生成する。
  • Splitterタスクは、10秒ごとに新しいタスクに生まれ変わる。
  • Mapperタスクは、Datastore.put()で500件のエンティティを更新する。

結果として、約16秒で処理されました。

管理コンソールを確認したところ、同時に3インスタンスが動いていました。今回は単純なbatch putを試しましたが、もっと複雑な処理を行うとタスクの粒度が大きくなるのでスケールアウトするかもしれません。

11-18 09:12AM 43.078 enqueueFirstMapperTask: Queued the first mapper task
11-18 09:12AM 43.085 enqueueSplitterTask: Queued the splitter task of offset 500
11-18 09:12AM 59.277 runMapper: The mapper task finished. Processed entities: 47110
11-18 09:12AM 59.422 runMapper: The mapper task finished. Processed entities: 47000

ソースコードはこちらからどうぞ。URLが間違っていたので修正しました(11/19 13:17)。

参考資料(メモ紙)

*1:エンティティはSerializableである必要があります。