GeekFactory

int128.hatenablog.com

App Engineで動く並列処理フレームワーク ElShard

App Engineで大量のデータを並列処理するフレームワーク ElShard を作っています。batch addとdeleteのサンプルができたので、とりあえずまとめてみます。

ElShardは、入力リストを分割して処理して集約する考え方に基づいています。並列処理はApp EngineのTaskQueueで実現しています。タスク間では10kBのペイロードしかやり取りできないため、データをやり取りする用途には適していません。そのため、実際のデータはDatastoreに格納しておき、キーをタスク間でやり取りすることになります。

現段階では集約をどうやって実現するか未定ですが、キーでソートされるというDatastoreの性質をうまく利用できる気がします。Matcher APIが使えるといいなぁ。

リストを処理する

ElShardでは、InputとTaskの2種類のタスクを組み合わせてジョブネットのようなものを構成します。

Inputは、WebブラウザのHTTPリクエストを受けてリスト(List)を生成するタスクです。そこで生成されたリストはSplitterによって適切な大きさに分割されます。デフォルトは分割無しです。

分割されたリストはそれぞれTaskに渡されます。Taskがリストを返した場合はSplitterで分割されて後続のTaskに渡されます。何も返さなければ終了です。

Input, Task はインタフェースが定義されています。

public interface Input {
  public abstract List<String> input(Context context) throws Exception;
}
public interface Task {
  public abstract List<String> run(List<String> input, Context context) throws Exception;
}

タスクは TaskChainController をextendsして定義します。下記はフォームに入力されたテキストを1行ずつDatastoreに登録するサンプルです。まあ、こういうのは一気に登録した方が速い気がしますが(汗

public class AddController extends TaskChainController {
  @Override
  protected void configure(Configuration configuration) {
    configuration.setSplitter(new SizeSplitter(50));
  }

  @Override
  public List<String> input(Context context) throws Exception {
    String userId = authenticationService.getUserId();
    context.addParameter("user", userId);
    return Arrays.asList(asString("books").split("\n"));
  }

  @Override
  public List<String> run(List<String> input, Context context) {
    String user = asString("user");

    for(String line : input) {
      String[] parts = line.split("\t");
      if(parts.length < 1) {
        continue;
      }
      Book book = new Book();
      book.setTitle(parts[0]);
      if(parts.length > 1) {
        book.setAuthor(parts[1]);
      }
      book.setUser(user);
      bookService.add(book);
    }

    return null;
  }
}
https://github.com/int128/elshard/blob/master/src/demo/org/hidetake/elshard/demo/controller/demo/internal/AddController.java

Datastoreのエンティティを処理する

エンティティを並列処理するには、Keyを取得してリストをばらまく方法と、カーソルで逐次処理する方法があると思います。後者は AppEngineで大量のエンティティを処理するパターン - GeekFactory で説明した方法です。

QueryとProcessorのインタフェースは下記のように定義されています。

  public abstract ModelQuery<M> query(Context context);
  public abstract void process(List<M> input, Context context);

下記はbatch deleteを行うサンプルです。まあ、この例も一気に削除した方が速いですが(汗 エンティティの中身を見て操作する場合は重宝すると思います。

public class DeleteController extends QueryProcessorController<Book> {
  @Override
  protected void configure(QueryProcessorConfiguration configuration) {
  }

  @Override
  public ModelQuery<Book> query(Context context) {
    return Datastore.query(Book.class);
  }

  @Override
  public void process(List<Book> input, Context context) {
    List<Key> keys = new ArrayList<Key>(input.size());
    for(Book entity : input) {
      keys.add(entity.getKey());
    }
    Datastore.delete(keys);
  }
}
https://github.com/int128/elshard/blob/master/src/demo/org/hidetake/elshard/demo/controller/demo/internal/DeleteController.java

ソースコードなど

Google Codeは準備中。

ソースコードgithubで公開しています。

デモサイトはこちら。ページングしていないので見るだけでCPU食います。お手柔らかに・・・w

ちなみにElShardはElastic Sharding Frameworkの略です。そんな名前で大丈夫か?w