blog.bouzuya.net

2021-09-07 ES のことを考えている (2)

今日も ES: Event Sourcing のことを考えている。結論なくだらだら考えたことをメモする。

昨日 (2021-09-06) はリポジトリの実装の外にイベントを登場させること決めた。集約などがイベントを認識することになる。改めて昨日のソースコード (アプリケーションサービスにおけるイメージ) を見る。

let mut aggregate = repository.find_by_id(aggregate_id);
aggregate.update();
repository.save(aggregate);
  • このアプリケーションサービスにおけるイメージはイベントが出てきていない
  • Aggregate::update は変更 (=新規に追加された Event) を保持する (集約がイベントを認識している)
  • Repository::saveAggregate から新規に追加された Event を得てそれを保存する

Event を登場させると決めたので、無理に Event を隠す必要はなくなった。そこであえて露出させた形で書いてみる。

// Repository::find_by_id(&self, aggregate_id: AggregateId) -> Aggregate
let mut aggregate = {
  let committed_events = event_store.find_by_id(aggregate_id);
  Aggregate::from_events(committed_events)
};

aggregate.update(); // add uncommitted events

// Repository::save(&self, aggregate: Aggregate)
event_store.save(aggregate.id(), aggregate.uncommitted_events());
  • uncommitted_events は『実践ドメイン駆動設計 (IDDD) 』では changes となっていたもの
  • Aggregate はコミット済みのイベントと未コミットのイベントを持っている
  • Aggregateupdate というコマンドを受けて、新しい状態と未コミットのイベント (変更) を生み出す

いろいろ気になる点がある。メモをしつつひとまず IDD と比較する。

  • 排他制御を導入・考慮するとどうなる?
  • スナップショットを導入・考慮するとどうなる?
  • イベントは集約 ID 単位なのか?
  • IDDD の付録 A との比較

IDDD の付録 A (一部改変) との比較

var stream = _eventStore.LoadEventStream(customerId);
var customer = new Customer(stream.Events);
customer.LockCustomer(reason);
_eventStore.AppendToStream(customerId, stream.Version, customer.Changes);
  • 復元時にイベントからどのように ID を取得するのだろうか
  • イベントはどのように ID 単位だと分かるのだろうか (文脈的に customerId で絞られていそうだけど……)
  • イベントのうち CustomerCreated はどう扱われるのか
  • Version がある (streamVersionEvents を持っている)

排他制御の観点と Event の順序の観点がほしいので version を持たせてみる。

// Repository::find_by_id(&self, aggregate_id: AggregateId) -> Aggregate
let mut aggregate = {
  let committed_events = event_store.find_by_id(aggregate_id);
  Aggregate::from_events(committed_events)
};

aggregate.update(); // add uncommitted events

// Repository::save(&self, aggregate: Aggregate)
event_store.save(
  aggregate.id(),
  aggregate.committed_version(),
  aggregate.uncommitted_events()
);

うーん……。見通しが悪い。

  • ほしいのは更新前のバージョンでおそらく更新が進んでいないかを確認するのに使う (排他制御)
  • イベントが外側に出てきてしまうなら無理に状態を更新していく必要はないような気がする
// Repository::find_by_id(&self, aggregate_id: AggregateId) -> Aggregate
let aggregate = {
  let committed_events = event_store.find_by_id(aggregate_id);
  Aggregate::from_events(committed_events)
};

let uncommitted_events = aggregate.update();

// Repository::save(&self, aggregate: Aggregate)
event_store.save(
  aggregate.id(),
  aggregate.version(),
  uncommitted_events,
);
  • 試しに aggregate を immutable にして uncommitted_events を持たないようにしてみた
  • 状態更新するメソッドは状態を更新する代わりに追加されたイベントを返す
    • 初期状態と追加されたイベントを持っていれば更新後の状態を復元できる
      • おそらくスナップショットの導入時に役立つ
  • Aggregate の複数操作を連続して実行することができなくなっている

長くなってきたので区切ろう。以下メモ。

  • Version がある (streamVersionEvents を持っている)
  • スナップショットを導入・考慮するとどうなる?
    • スナップショットを導入すると State はそこまでの commited_events を保持しているわけではなくなる
  • イベントは集約 ID 単位なのか?
    • 集約を横断したイベントはないのか?
  • 復元時にイベントからどのように ID を取得するのだろうか
    • イベントのうち CustomerCreated はどう扱われるのか
    • イベントはどのように ID 単位だと分かるのだろうか
  • 状態とイベントの関係の整理をしたい
    • 状態をイベントの積み重ねたものとする
    • イベントなしに状態が変化しているとイベントが欠落しているので状態を復元できなくなる
    • 状態から次の状態へはイベントが必要になる
  • テーブルの構造がほしい
  • Redux 的だ

『モデリングの学び方:座談会』 https://modeling-how-to-learn.connpass.com/event/223326/ に参加した (を観た) 。


今日のコミット。