From bcf46d23b94ea757690533bd3e82538564235e61 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 27 Mar 2026 07:58:43 +0100 Subject: [PATCH 1/2] feat(command): align Result with .NET Eventuous, add Change type with event type names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace Result.NewEvents ([]any) with Result.Changes ([]Change) where Change pairs each event with its registered TypeMap name. Both Service and AggregateService now accept *codec.TypeMap so they can resolve type names when building the result — matching the .NET Result.Ok shape (state, changes, globalPosition). StreamVersion is kept in Go and will be added to .NET later. Standardize booking sample HTTP API to return the same JSON envelope from all command endpoints, enabling UI portability between Go and .NET backends. Co-Authored-By: Claude Opus 4.6 (1M context) --- core/command/aggservice.go | 20 ++++++++++---- core/command/aggservice_test.go | 22 +++++++-------- core/command/result.go | 8 +++++- core/command/service.go | 15 ++++++++--- core/command/service_test.go | 18 ++++++------- kurrentdb/e2e_test.go | 6 ++--- samples/booking/domain/events.go | 5 ++++ samples/booking/domain/state.go | 20 +++++++------- samples/booking/httpapi/api.go | 42 +++++++++++++++++++++-------- samples/booking/httpapi/api_test.go | 23 +++++++++++----- samples/booking/main.go | 11 ++++---- 11 files changed, 124 insertions(+), 66 deletions(-) diff --git a/core/command/aggservice.go b/core/command/aggservice.go index bde4ae3..ba0856b 100644 --- a/core/command/aggservice.go +++ b/core/command/aggservice.go @@ -9,6 +9,7 @@ import ( eventuous "github.com/eventuous/eventuous-go/core" "github.com/eventuous/eventuous-go/core/aggregate" + "github.com/eventuous/eventuous-go/core/codec" "github.com/eventuous/eventuous-go/core/store" ) @@ -17,6 +18,7 @@ import ( type AggregateService[S any] struct { reader store.EventReader writer store.EventWriter + typeMap *codec.TypeMap fold func(S, any) S zero S handlers map[reflect.Type]untypedAggHandler[S] @@ -33,12 +35,14 @@ type untypedAggHandler[S any] struct { func NewAggregateService[S any]( reader store.EventReader, writer store.EventWriter, + typeMap *codec.TypeMap, fold func(S, any) S, zero S, ) *AggregateService[S] { return &AggregateService[S]{ reader: reader, writer: writer, + typeMap: typeMap, fold: fold, zero: zero, handlers: make(map[reflect.Type]untypedAggHandler[S]), @@ -97,11 +101,11 @@ func (svc *AggregateService[S]) Handle(ctx context.Context, command any) (*Resul } // Step 7: If no changes, return current state (no-op). - changes := agg.Changes() - if len(changes) == 0 { + rawChanges := agg.Changes() + if len(rawChanges) == 0 { return &Result[S]{ State: agg.State(), - NewEvents: nil, + Changes: nil, StreamVersion: agg.OriginalVersion(), }, nil } @@ -112,10 +116,16 @@ func (svc *AggregateService[S]) Handle(ctx context.Context, command any) (*Resul return nil, err } - // Step 9: Return result. + // Step 9: Build typed changes and return result. + changes := make([]Change, len(rawChanges)) + for i, e := range rawChanges { + typeName, _ := svc.typeMap.TypeName(e) + changes[i] = Change{Event: e, EventType: typeName} + } + return &Result[S]{ State: agg.State(), - NewEvents: changes, + Changes: changes, GlobalPosition: appendResult.GlobalPosition, StreamVersion: appendResult.NextExpectedVersion, }, nil diff --git a/core/command/aggservice_test.go b/core/command/aggservice_test.go index 9201300..c513b6a 100644 --- a/core/command/aggservice_test.go +++ b/core/command/aggservice_test.go @@ -16,7 +16,7 @@ import ( ) func newAggService(s *memstore.Store) *command.AggregateService[testdomain.BookingState] { - return command.NewAggregateService[testdomain.BookingState](s, s, testdomain.BookingFold, testdomain.BookingState{}) + return command.NewAggregateService[testdomain.BookingState](s, s, testdomain.NewTypeMap(), testdomain.BookingFold, testdomain.BookingState{}) } // aggStreamName returns the stream name as the AggregateService generates it. @@ -79,8 +79,8 @@ func TestAggService_OnNew_Success(t *testing.T) { if result == nil { t.Fatal("expected non-nil result") } - if len(result.NewEvents) != 1 { - t.Fatalf("expected 1 new event, got %d", len(result.NewEvents)) + if len(result.Changes) != 1 { + t.Fatalf("expected 1 new event, got %d", len(result.Changes)) } if result.State.RoomID != "room-42" { t.Errorf("expected RoomID=room-42, got %s", result.State.RoomID) @@ -127,8 +127,8 @@ func TestAggService_OnExisting_Success(t *testing.T) { if result.State.AmountPaid != 100.0 { t.Errorf("expected AmountPaid=100.0, got %f", result.State.AmountPaid) } - if len(result.NewEvents) != 1 { - t.Fatalf("expected 1 new event, got %d", len(result.NewEvents)) + if len(result.Changes) != 1 { + t.Fatalf("expected 1 new event, got %d", len(result.Changes)) } } @@ -164,8 +164,8 @@ func TestAggService_OnAny_Works(t *testing.T) { if result.State.RoomID != "room-5" { t.Errorf("expected RoomID=room-5, got %s", result.State.RoomID) } - if len(result.NewEvents) != 1 { - t.Fatalf("expected 1 new event, got %d", len(result.NewEvents)) + if len(result.Changes) != 1 { + t.Fatalf("expected 1 new event, got %d", len(result.Changes)) } }) @@ -183,8 +183,8 @@ func TestAggService_OnAny_Works(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } - if len(result.NewEvents) != 1 { - t.Fatalf("expected 1 new event, got %d", len(result.NewEvents)) + if len(result.Changes) != 1 { + t.Fatalf("expected 1 new event, got %d", len(result.Changes)) } }) } @@ -225,8 +225,8 @@ func TestAggService_NoOp(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } - if len(result.NewEvents) != 0 { - t.Errorf("expected 0 new events, got %d", len(result.NewEvents)) + if len(result.Changes) != 0 { + t.Errorf("expected 0 new events, got %d", len(result.Changes)) } // Verify nothing was appended. exists, _ := s.StreamExists(context.Background(), aggStreamName("noop-agg-booking")) diff --git a/core/command/result.go b/core/command/result.go index c46a081..d641d45 100644 --- a/core/command/result.go +++ b/core/command/result.go @@ -3,10 +3,16 @@ package command +// Change pairs a domain event with its registered type name. +type Change struct { + Event any + EventType string +} + // Result of a handled command. type Result[S any] struct { State S - NewEvents []any + Changes []Change GlobalPosition uint64 StreamVersion int64 } diff --git a/core/command/service.go b/core/command/service.go index 1fc48c6..053d058 100644 --- a/core/command/service.go +++ b/core/command/service.go @@ -8,6 +8,7 @@ import ( "reflect" eventuous "github.com/eventuous/eventuous-go/core" + "github.com/eventuous/eventuous-go/core/codec" "github.com/eventuous/eventuous-go/core/store" "github.com/google/uuid" ) @@ -21,6 +22,7 @@ type CommandHandler[S any] interface { type Service[S any] struct { reader store.EventReader writer store.EventWriter + typeMap *codec.TypeMap fold func(S, any) S zero S handlers map[reflect.Type]untypedHandler[S] @@ -30,12 +32,14 @@ type Service[S any] struct { func New[S any]( reader store.EventReader, writer store.EventWriter, + typeMap *codec.TypeMap, fold func(S, any) S, zero S, ) *Service[S] { return &Service[S]{ reader: reader, writer: writer, + typeMap: typeMap, fold: fold, zero: zero, handlers: make(map[reflect.Type]untypedHandler[S]), @@ -79,7 +83,7 @@ func (svc *Service[S]) Handle(ctx context.Context, command any) (*Result[S], err if len(newEvents) == 0 { return &Result[S]{ State: state, - NewEvents: nil, + Changes: nil, StreamVersion: int64(version), }, nil } @@ -98,15 +102,18 @@ func (svc *Service[S]) Handle(ctx context.Context, command any) (*Result[S], err return nil, err } - // Step 7: Fold new events into state for the result. - for _, e := range newEvents { + // Step 7: Build changes and fold new events into state. + changes := make([]Change, len(newEvents)) + for i, e := range newEvents { + typeName, _ := svc.typeMap.TypeName(e) + changes[i] = Change{Event: e, EventType: typeName} state = svc.fold(state, e) } // Step 8: Return Result[S]. return &Result[S]{ State: state, - NewEvents: newEvents, + Changes: changes, GlobalPosition: appendResult.GlobalPosition, StreamVersion: appendResult.NextExpectedVersion, }, nil diff --git a/core/command/service_test.go b/core/command/service_test.go index 406a246..9f5e5e5 100644 --- a/core/command/service_test.go +++ b/core/command/service_test.go @@ -17,7 +17,7 @@ import ( ) func newService(s *memstore.Store) *command.Service[testdomain.BookingState] { - return command.New[testdomain.BookingState](s, s, testdomain.BookingFold, testdomain.BookingState{}) + return command.New[testdomain.BookingState](s, s, testdomain.NewTypeMap(), testdomain.BookingFold, testdomain.BookingState{}) } // seedEvents directly appends raw events to the memstore for test setup. @@ -82,8 +82,8 @@ func TestService_OnNew_Success(t *testing.T) { if result == nil { t.Fatal("expected non-nil result") } - if len(result.NewEvents) != 1 { - t.Fatalf("expected 1 new event, got %d", len(result.NewEvents)) + if len(result.Changes) != 1 { + t.Fatalf("expected 1 new event, got %d", len(result.Changes)) } if result.State.RoomID != "room-42" { t.Errorf("expected RoomID=room-42, got %s", result.State.RoomID) @@ -126,8 +126,8 @@ func TestService_OnExisting_Success(t *testing.T) { if result.State.AmountPaid != 100.0 { t.Errorf("expected AmountPaid=100.0, got %f", result.State.AmountPaid) } - if len(result.NewEvents) != 1 { - t.Fatalf("expected 1 new event, got %d", len(result.NewEvents)) + if len(result.Changes) != 1 { + t.Fatalf("expected 1 new event, got %d", len(result.Changes)) } } @@ -176,8 +176,8 @@ func TestService_OnAny_ExistingStream(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } - if len(result.NewEvents) != 1 { - t.Fatalf("expected 1 new event, got %d", len(result.NewEvents)) + if len(result.Changes) != 1 { + t.Fatalf("expected 1 new event, got %d", len(result.Changes)) } } @@ -216,8 +216,8 @@ func TestService_NoOp(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } - if len(result.NewEvents) != 0 { - t.Errorf("expected 0 new events, got %d", len(result.NewEvents)) + if len(result.Changes) != 0 { + t.Errorf("expected 0 new events, got %d", len(result.Changes)) } // Verify nothing was appended. exists, _ := s.StreamExists(context.Background(), testdomain.BookingStream("noop-booking")) diff --git a/kurrentdb/e2e_test.go b/kurrentdb/e2e_test.go index 193e633..87677da 100644 --- a/kurrentdb/e2e_test.go +++ b/kurrentdb/e2e_test.go @@ -29,7 +29,7 @@ func TestEndToEnd(t *testing.T) { bookingID := "e2e-booking-" + uuid.New().String()[:8] // 2. Create functional command service with the booking fold. - svc := command.New[testdomain.BookingState](store, store, testdomain.BookingFold, testdomain.BookingState{}) + svc := command.New[testdomain.BookingState](store, store, testdomain.NewTypeMap(), testdomain.BookingFold, testdomain.BookingState{}) // Register BookRoom handler (stream must be new). command.On(svc, command.Handler[testdomain.BookRoom, testdomain.BookingState]{ @@ -84,8 +84,8 @@ func TestEndToEnd(t *testing.T) { if !result.State.Active { t.Error("expected Active=true after BookRoom") } - if len(result.NewEvents) != 1 { - t.Errorf("expected 1 new event from BookRoom, got %d", len(result.NewEvents)) + if len(result.Changes) != 1 { + t.Errorf("expected 1 new event from BookRoom, got %d", len(result.Changes)) } // 4. Handle RecordPayment command. diff --git a/samples/booking/domain/events.go b/samples/booking/domain/events.go index c2f6e45..dfb2970 100644 --- a/samples/booking/domain/events.go +++ b/samples/booking/domain/events.go @@ -43,6 +43,11 @@ func NewCodec() codec.Codec { return codec.NewJSON(NewTypeMap()) } +// NewCodecFromTypeMap creates a JSON codec from an existing TypeMap. +func NewCodecFromTypeMap(tm *codec.TypeMap) codec.Codec { + return codec.NewJSON(tm) +} + func mustRegister[E any](tm *codec.TypeMap, name string) { if err := codec.Register[E](tm, name); err != nil { panic(err) diff --git a/samples/booking/domain/state.go b/samples/booking/domain/state.go index ef76762..f3c9ad8 100644 --- a/samples/booking/domain/state.go +++ b/samples/booking/domain/state.go @@ -5,16 +5,16 @@ package domain // BookingState is the write-side state reconstructed by folding events. type BookingState struct { - ID string - GuestID string - RoomID string - CheckIn string - CheckOut string - Price float64 - Outstanding float64 - Currency string - Paid bool - Cancelled bool + ID string `json:"id"` + GuestID string `json:"guestId"` + RoomID string `json:"roomId"` + CheckIn string `json:"checkIn"` + CheckOut string `json:"checkOut"` + Price float64 `json:"price"` + Outstanding float64 `json:"outstanding"` + Currency string `json:"currency"` + Paid bool `json:"paid"` + Cancelled bool `json:"cancelled"` } // BookingFold is the fold function used by the command service to reconstruct state. diff --git a/samples/booking/httpapi/api.go b/samples/booking/httpapi/api.go index 6ec4d42..faae493 100644 --- a/samples/booking/httpapi/api.go +++ b/samples/booking/httpapi/api.go @@ -60,10 +60,7 @@ func handleBookRoom(svc command.CommandHandler[domain.BookingState]) http.Handle return } - writeJSON(w, http.StatusCreated, map[string]any{ - "bookingId": bookingID, - "streamVersion": result.StreamVersion, - }) + writeJSON(w, http.StatusCreated, newCommandResponse(result)) } } @@ -93,11 +90,7 @@ func handleRecordPayment(svc command.CommandHandler[domain.BookingState]) http.H return } - writeJSON(w, http.StatusOK, map[string]any{ - "bookingId": bookingID, - "outstanding": result.State.Outstanding, - "paid": result.State.Paid, - }) + writeJSON(w, http.StatusOK, newCommandResponse(result)) } } @@ -114,7 +107,7 @@ func handleCancelBooking(svc command.CommandHandler[domain.BookingState]) http.H return } - _, err := svc.Handle(r.Context(), domain.CancelBooking{ + result, err := svc.Handle(r.Context(), domain.CancelBooking{ BookingID: bookingID, Reason: req.Reason, }) @@ -123,7 +116,7 @@ func handleCancelBooking(svc command.CommandHandler[domain.BookingState]) http.H return } - w.WriteHeader(http.StatusOK) + writeJSON(w, http.StatusOK, newCommandResponse(result)) } } @@ -150,6 +143,33 @@ func handleGetGuestBookings(rm *readmodel.BookingReadModel) http.HandlerFunc { } } +// commandResponse is the standard JSON envelope for command results, +// matching the Eventuous .NET Result.Ok shape. +type commandResponse struct { + State any `json:"state"` + Changes []changeResponse `json:"changes"` + GlobalPosition uint64 `json:"globalPosition"` + StreamVersion int64 `json:"streamVersion"` +} + +type changeResponse struct { + Event any `json:"event"` + EventType string `json:"eventType"` +} + +func newCommandResponse[S any](result *command.Result[S]) commandResponse { + changes := make([]changeResponse, len(result.Changes)) + for i, c := range result.Changes { + changes[i] = changeResponse{Event: c.Event, EventType: c.EventType} + } + return commandResponse{ + State: result.State, + Changes: changes, + GlobalPosition: result.GlobalPosition, + StreamVersion: result.StreamVersion, + } +} + // writeJSON encodes v as JSON and writes it with the given status code. func writeJSON(w http.ResponseWriter, status int, v any) { w.Header().Set("Content-Type", "application/json") diff --git a/samples/booking/httpapi/api_test.go b/samples/booking/httpapi/api_test.go index 1dfa11f..c9b2d9e 100644 --- a/samples/booking/httpapi/api_test.go +++ b/samples/booking/httpapi/api_test.go @@ -21,7 +21,7 @@ import ( func setupRouter(t *testing.T) (*http.ServeMux, *readmodel.BookingReadModel) { t.Helper() store := memstore.New() - svc := command.New[domain.BookingState](store, store, domain.BookingFold, domain.BookingState{}) + svc := command.New[domain.BookingState](store, store, domain.NewTypeMap(), domain.BookingFold, domain.BookingState{}) command.On(svc, command.Handler[domain.BookRoom, domain.BookingState]{ Expected: eventuous.IsNew, Stream: func(cmd domain.BookRoom) eventuous.StreamName { return domain.BookingStream(cmd.BookingID) }, @@ -61,8 +61,16 @@ func TestBookRoom(t *testing.T) { if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { t.Fatalf("invalid JSON response: %v", err) } - if resp["bookingId"] == nil || resp["bookingId"] == "" { - t.Error("expected bookingId in response") + state, ok := resp["state"].(map[string]any) + if !ok { + t.Fatal("expected state object in response") + } + if state["id"] == nil || state["id"] == "" { + t.Error("expected id in state") + } + changes, ok := resp["changes"].([]any) + if !ok || len(changes) != 1 { + t.Errorf("expected 1 change, got %v", resp["changes"]) } } @@ -78,7 +86,7 @@ func TestRecordPayment(t *testing.T) { var bookResp map[string]any json.Unmarshal(w.Body.Bytes(), &bookResp) - bookingID := bookResp["bookingId"].(string) + bookingID := bookResp["state"].(map[string]any)["id"].(string) // Record payment. payBody := `{"amount":200,"currency":"USD","paymentId":"p1"}` @@ -93,8 +101,9 @@ func TestRecordPayment(t *testing.T) { var payResp map[string]any json.Unmarshal(w.Body.Bytes(), &payResp) - if payResp["outstanding"].(float64) != 300 { - t.Errorf("expected outstanding=300, got %v", payResp["outstanding"]) + payState := payResp["state"].(map[string]any) + if payState["outstanding"].(float64) != 300 { + t.Errorf("expected outstanding=300, got %v", payState["outstanding"]) } } @@ -110,7 +119,7 @@ func TestCancelBooking(t *testing.T) { var bookResp map[string]any json.Unmarshal(w.Body.Bytes(), &bookResp) - bookingID := bookResp["bookingId"].(string) + bookingID := bookResp["state"].(map[string]any)["id"].(string) // Cancel booking. cancelBody := `{"reason":"changed plans"}` diff --git a/samples/booking/main.go b/samples/booking/main.go index 6eb1491..6934dd9 100644 --- a/samples/booking/main.go +++ b/samples/booking/main.go @@ -40,14 +40,15 @@ func main() { log.Fatalf("failed to create KurrentDB client: %v", err) } - // 3. Codec. - codec := domain.NewCodec() + // 3. Type map and codec. + typeMap := domain.NewTypeMap() + jsonCodec := domain.NewCodecFromTypeMap(typeMap) // 4. Event store. - store := kdb.NewStore(client, codec) + store := kdb.NewStore(client, jsonCodec) // 5. Command service. - svc := command.New[domain.BookingState](store, store, domain.BookingFold, domain.BookingState{}) + svc := command.New[domain.BookingState](store, store, typeMap, domain.BookingFold, domain.BookingState{}) command.On(svc, command.Handler[domain.BookRoom, domain.BookingState]{ Expected: eventuous.IsNew, Stream: func(cmd domain.BookRoom) eventuous.StreamName { return domain.BookingStream(cmd.BookingID) }, @@ -72,7 +73,7 @@ func main() { defer stop() // 8. Catch-up subscription. - sub := kdb.NewCatchUp(client, codec, "booking-projections", + sub := kdb.NewCatchUp(client, jsonCodec, "booking-projections", kdb.WithHandler(rm), kdb.WithMiddleware(subscription.WithLogging(slog.Default())), ) From 3821c02ce276defd9d7addec04a678543a5a1ac3 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 27 Mar 2026 10:20:21 +0100 Subject: [PATCH 2/2] fix(command): guard nil TypeMap and propagate type lookup errors Add nil check on TypeMap in constructors (panic on misconfiguration). Return error from Handle when TypeName lookup fails instead of silently producing empty eventType strings. Co-Authored-By: Claude Opus 4.6 (1M context) --- core/command/aggservice.go | 10 +++++++++- core/command/service.go | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/command/aggservice.go b/core/command/aggservice.go index ba0856b..43e7c98 100644 --- a/core/command/aggservice.go +++ b/core/command/aggservice.go @@ -5,6 +5,7 @@ package command import ( "context" + "fmt" "reflect" eventuous "github.com/eventuous/eventuous-go/core" @@ -32,6 +33,7 @@ type untypedAggHandler[S any] struct { } // NewAggregateService creates an aggregate-based command service. +// Panics if reader, writer, or typeMap is nil. func NewAggregateService[S any]( reader store.EventReader, writer store.EventWriter, @@ -39,6 +41,9 @@ func NewAggregateService[S any]( fold func(S, any) S, zero S, ) *AggregateService[S] { + if typeMap == nil { + panic("command: typeMap must not be nil") + } return &AggregateService[S]{ reader: reader, writer: writer, @@ -119,7 +124,10 @@ func (svc *AggregateService[S]) Handle(ctx context.Context, command any) (*Resul // Step 9: Build typed changes and return result. changes := make([]Change, len(rawChanges)) for i, e := range rawChanges { - typeName, _ := svc.typeMap.TypeName(e) + typeName, err := svc.typeMap.TypeName(e) + if err != nil { + return nil, fmt.Errorf("command: resolving event type: %w", err) + } changes[i] = Change{Event: e, EventType: typeName} } diff --git a/core/command/service.go b/core/command/service.go index 053d058..3d58242 100644 --- a/core/command/service.go +++ b/core/command/service.go @@ -5,6 +5,7 @@ package command import ( "context" + "fmt" "reflect" eventuous "github.com/eventuous/eventuous-go/core" @@ -29,6 +30,7 @@ type Service[S any] struct { } // New creates a functional command service. +// Panics if reader, writer, or typeMap is nil. func New[S any]( reader store.EventReader, writer store.EventWriter, @@ -36,6 +38,9 @@ func New[S any]( fold func(S, any) S, zero S, ) *Service[S] { + if typeMap == nil { + panic("command: typeMap must not be nil") + } return &Service[S]{ reader: reader, writer: writer, @@ -105,7 +110,10 @@ func (svc *Service[S]) Handle(ctx context.Context, command any) (*Result[S], err // Step 7: Build changes and fold new events into state. changes := make([]Change, len(newEvents)) for i, e := range newEvents { - typeName, _ := svc.typeMap.TypeName(e) + typeName, err := svc.typeMap.TypeName(e) + if err != nil { + return nil, fmt.Errorf("command: resolving event type: %w", err) + } changes[i] = Change{Event: e, EventType: typeName} state = svc.fold(state, e) }