Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions chasm/lib/activity/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
return nil, err
}

if err := validateOnConflictOptions(req); err != nil {
return nil, err
}

return req, nil
}

Expand Down
25 changes: 25 additions & 0 deletions chasm/lib/activity/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,31 @@ func validateAndNormalizeIDPolicy(req *workflowservice.StartActivityExecutionReq
return nil
}

// validateOnConflictOptions validates the on_conflict_options of a start request:
// - attach_completion_callbacks requires attach_request_id. A completion callback is recorded
// against the request ID (see addCompletionCallbacks, which keys the callback by request ID).
// - attach_request_id requires at least one completion callback or link, since attaching a
// request ID is only meaningful alongside something to attach.
//
// attach_links is independent and may be set on its own.
func validateOnConflictOptions(req *workflowservice.StartActivityExecutionRequest) error {
onConflictOptions := req.GetOnConflictOptions()
if onConflictOptions == nil {
return nil
}
if onConflictOptions.GetAttachCompletionCallbacks() && !onConflictOptions.GetAttachRequestId() {
return serviceerror.NewInvalidArgument(
"on_conflict_options: attach_completion_callbacks requires attach_request_id to be set")
}
if onConflictOptions.GetAttachRequestId() &&
len(req.GetCompletionCallbacks()) == 0 &&
len(req.GetLinks()) == 0 {
return serviceerror.NewInvalidArgument(
"on_conflict_options: attach_request_id requires at least one completion callback or link")
}
return nil
}

func validateBlobSize(
activityID string,
blobSizeViolationTagValue string,
Expand Down
71 changes: 71 additions & 0 deletions chasm/lib/activity/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,77 @@ func TestValidateStartDelay(t *testing.T) {
})
}

func TestValidateOnConflictOptions(t *testing.T) {
t.Run("NilOptions", func(t *testing.T) {
require.NoError(t, validateOnConflictOptions(&workflowservice.StartActivityExecutionRequest{}))
})

t.Run("AttachRequestIdAndCallbacksWithCallback", func(t *testing.T) {
req := &workflowservice.StartActivityExecutionRequest{
CompletionCallbacks: []*commonpb.Callback{{}},
OnConflictOptions: &commonpb.OnConflictOptions{
AttachRequestId: true,
AttachCompletionCallbacks: true,
},
}
require.NoError(t, validateOnConflictOptions(req))
})

t.Run("AttachLinksOnly", func(t *testing.T) {
// Links do not require a request ID; this combination must remain valid.
req := &workflowservice.StartActivityExecutionRequest{
OnConflictOptions: &commonpb.OnConflictOptions{AttachLinks: true},
}
require.NoError(t, validateOnConflictOptions(req))
})

t.Run("AttachRequestIdWithLink", func(t *testing.T) {
// A request ID alongside a link is valid (request ID has something to attach to).
req := &workflowservice.StartActivityExecutionRequest{
Links: []*commonpb.Link{{}},
OnConflictOptions: &commonpb.OnConflictOptions{
AttachRequestId: true,
AttachLinks: true,
},
}
require.NoError(t, validateOnConflictOptions(req))
})

t.Run("AttachCallbacksWithoutRequestId", func(t *testing.T) {
req := &workflowservice.StartActivityExecutionRequest{
CompletionCallbacks: []*commonpb.Callback{{}},
OnConflictOptions: &commonpb.OnConflictOptions{AttachCompletionCallbacks: true},
}
err := validateOnConflictOptions(req)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
require.Contains(t, invalidArgErr.Message, "attach_completion_callbacks requires attach_request_id")
})

t.Run("AttachRequestIdWithoutCallbackOrLink", func(t *testing.T) {
req := &workflowservice.StartActivityExecutionRequest{
OnConflictOptions: &commonpb.OnConflictOptions{AttachRequestId: true},
}
err := validateOnConflictOptions(req)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
require.Contains(t, invalidArgErr.Message, "attach_request_id requires at least one completion callback or link")
})

t.Run("AttachRequestIdAndCallbacksWithoutCallbackProvided", func(t *testing.T) {
req := &workflowservice.StartActivityExecutionRequest{
OnConflictOptions: &commonpb.OnConflictOptions{
AttachRequestId: true,
AttachCompletionCallbacks: true,
},
}
err := validateOnConflictOptions(req)
var invalidArgErr *serviceerror.InvalidArgument
require.ErrorAs(t, err, &invalidArgErr)
require.Contains(t, invalidArgErr.Message, "attach_request_id requires at least one completion callback or link")
})
}

func getDefaultRetrySettings(_ string) retrypolicy.DefaultRetrySettings {
return retrypolicy.DefaultRetrySettings{
InitialInterval: time.Second,
Expand Down
Loading