diff --git a/tests/task_queue_stats_test.go b/tests/task_queue_stats_test.go index adb502e788..c64502e529 100644 --- a/tests/task_queue_stats_test.go +++ b/tests/task_queue_stats_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/google/uuid" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" @@ -20,6 +19,7 @@ import ( deploymentspb "go.temporal.io/server/api/deployment/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/testing/await" "go.temporal.io/server/common/testing/parallelsuite" "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/tests/testcore" @@ -47,6 +47,50 @@ type workflowTasksAndActivitiesPollerParams struct { versioningBehavior enumspb.VersioningBehavior } +// taskQueueStatsContext holds the per-test environment and configuration for task queue stats tests. +type taskQueueStatsContext struct { + testcore.Env + *require.Assertions + tb testing.TB + ctx context.Context + usePriMatcher bool + minPriority int + maxPriority int + defaultPriority int + partitionCount int +} + +func newTaskQueueStatsContext( + ctx context.Context, + t *testing.T, + usePriMatcher bool, + behavior testcore.MatchingBehavior, + extraOpts ...testcore.TestOption, +) *taskQueueStatsContext { + opts := []testcore.TestOption{ + testcore.WithWorkerService("worker-deployment versioning"), + testcore.WithDynamicConfig(dynamicconfig.EnableDeploymentVersions, true), + testcore.WithDynamicConfig(dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs, true), + testcore.WithDynamicConfig(dynamicconfig.MatchingUseNewMatcher, usePriMatcher), + testcore.WithDynamicConfig(dynamicconfig.MatchingPriorityLevels, 5), // maxPriority + } + opts = append(opts, behavior.Options()...) + opts = append(opts, extraOpts...) + env := testcore.NewEnv(t, opts...) + behavior.InjectHooks(env) + return &taskQueueStatsContext{ + Env: env, + Assertions: require.New(t), + tb: t, + ctx: ctx, + usePriMatcher: usePriMatcher, + minPriority: 1, + maxPriority: 5, + defaultPriority: 3, + partitionCount: 2, // kept low to reduce test time on CI + } +} + // TaskQueueStatsSuite groups task queue stats tests that are run with different matcher configurations. type TaskQueueStatsSuite struct { parallelsuite.Suite[*TaskQueueStatsSuite] @@ -56,16 +100,24 @@ func TestTaskQueueStats_Pri_Suite(t *testing.T) { parallelsuite.Run(t, &TaskQueueStatsSuite{}, true) // usePriMatcher = true } +func (s *TaskQueueStatsSuite) newTaskQueueStatsContext( + usePriMatcher bool, + behavior testcore.MatchingBehavior, + extraOpts ...testcore.TestOption, +) *taskQueueStatsContext { + return newTaskQueueStatsContext(s.Context(), s.T(), usePriMatcher, behavior, extraOpts...) +} + func (s *TaskQueueStatsSuite) TestDescribeTaskQueue_NonRoot(usePriMatcher bool) { - env := newTaskQueueStatsContext(s.T(), usePriMatcher, testcore.MatchingBehavior{}) - resp, err := env.FrontendClient().DescribeTaskQueue(context.Background(), &workflowservice.DescribeTaskQueueRequest{ + env := s.newTaskQueueStatsContext(usePriMatcher, testcore.MatchingBehavior{}) + resp, err := env.FrontendClient().DescribeTaskQueue(s.Context(), &workflowservice.DescribeTaskQueueRequest{ Namespace: env.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{Name: "/_sys/foo/1", Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, }) s.NoError(err) s.NotNil(resp) - _, err = env.FrontendClient().DescribeTaskQueue(context.Background(), + _, err = env.FrontendClient().DescribeTaskQueue(s.Context(), &workflowservice.DescribeTaskQueueRequest{ Namespace: env.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{Name: "/_sys/foo/1", Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, @@ -75,7 +127,7 @@ func (s *TaskQueueStatsSuite) TestDescribeTaskQueue_NonRoot(usePriMatcher bool) } func (s *TaskQueueStatsSuite) TestNoTasks_ValidateStats(usePriMatcher bool) { - env := newTaskQueueStatsContext(s.T(), usePriMatcher, testcore.MatchingBehavior{}, + env := s.newTaskQueueStatsContext(usePriMatcher, testcore.MatchingBehavior{}, testcore.WithDynamicConfig(dynamicconfig.MatchingNumTaskqueueReadPartitions, 2), testcore.WithDynamicConfig(dynamicconfig.MatchingNumTaskqueueWritePartitions, 2), testcore.WithDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second), @@ -85,7 +137,7 @@ func (s *TaskQueueStatsSuite) TestNoTasks_ValidateStats(usePriMatcher bool) { } func (s *TaskQueueStatsSuite) TestAddMultipleTasks_ValidateStats_Cached(usePriMatcher bool) { - env := newTaskQueueStatsContext(s.T(), usePriMatcher, testcore.MatchingBehavior{}, + env := s.newTaskQueueStatsContext(usePriMatcher, testcore.MatchingBehavior{}, testcore.WithDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second), testcore.WithDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Hour), ) @@ -149,37 +201,42 @@ type TaskQueueStatsVersionSuite struct { parallelsuite.Suite[*TaskQueueStatsVersionSuite] } +func (s *TaskQueueStatsVersionSuite) newTaskQueueStatsContext( + usePriMatcher bool, + behavior testcore.MatchingBehavior, + extraOpts ...testcore.TestOption, +) *taskQueueStatsContext { + return newTaskQueueStatsContext(s.Context(), s.T(), usePriMatcher, behavior, extraOpts...) +} + func (s *TaskQueueStatsVersionSuite) TestMultipleTasks_ValidateStats(usePriMatcher bool, behavior testcore.MatchingBehavior) { - env := newTaskQueueStatsContext(s.T(), usePriMatcher, behavior) + env := s.newTaskQueueStatsContext(usePriMatcher, behavior) env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second) env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) env.publishConsumeWorkflowTasksValidateStats(4, false) } func (s *TaskQueueStatsVersionSuite) TestCurrentVersionAbsorbsUnversionedBacklog_NoRamping(usePriMatcher bool, behavior testcore.MatchingBehavior) { - env := newTaskQueueStatsContext(s.T(), usePriMatcher, behavior) + env := s.newTaskQueueStatsContext(usePriMatcher, behavior) env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second) env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - deploymentName := testcore.RandomizeStr("deployment") tqName := "tq-" + common.GenerateRandomString(5) currentBuildID := "v1" // Register this version in the task queue - pollerCtx, cancelPoller := context.WithCancel(testcore.NewContext()) - env.createVersionsInTaskQueue(pollerCtx, tqName, deploymentName, currentBuildID) + pollerCtx, cancelPoller := context.WithCancel(s.Context()) + s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, currentBuildID) // Set current version only (no ramping) - env.setCurrentVersion(deploymentName, currentBuildID) + s.setCurrentVersion(env, deploymentName, currentBuildID) // Stopping the pollers so that we verify the backlog expectations cancelPoller() // Enqueue unversioned backlog unversionedWorkflowCount := 10 * env.partitionCount - env.startUnversionedWorkflows(unversionedWorkflowCount, tqName) + s.startUnversionedWorkflows(env, unversionedWorkflowCount, tqName) // Verify workflow add rate env.validateRates(tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, true, false) @@ -189,13 +246,10 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentVersionAbsorbsUnversionedBacklog MaxExtraTasks: 0, } - s.EventuallyWithT(func(c *assert.CollectT) { - a := require.New(c) - + s.Await(func(s *TaskQueueStatsVersionSuite) { // DescribeWorkerDeploymentVersion: current version should also show the full backlog for this task queue. - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[workflow]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -205,9 +259,8 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentVersionAbsorbsUnversionedBacklog ) // DescribeTaskQueue Legacy Mode: Since the task queue is part of the current version, the legacy mode should report the total backlog count. - env.requireLegacyTaskQueueStatsRelaxed( - ctx, - a, + s.requireLegacyTaskQueueStatsRelaxed( + env, "DescribeTaskQueue[legacy]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -217,7 +270,7 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentVersionAbsorbsUnversionedBacklog // The backlog count for the activity task queue should be equal to the number of activities scheduled since the activity task queue is part of the current version. activitesToSchedule := 10 * env.partitionCount - env.completeWorkflowTasksAndScheduleActivities(tqName, deploymentName, currentBuildID, activitesToSchedule) + s.completeWorkflowTasksAndScheduleActivities(env, tqName, deploymentName, currentBuildID, activitesToSchedule) // Verify activity add rate env.validateRates(tqName, enumspb.TASK_QUEUE_TYPE_ACTIVITY, true, false) @@ -227,14 +280,11 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentVersionAbsorbsUnversionedBacklog MaxExtraTasks: 0, } - s.EventuallyWithT(func(c *assert.CollectT) { - a := require.New(c) - + s.Await(func(s *TaskQueueStatsVersionSuite) { // Since the activity task queue is part of the current version, // the DescribeWorkerDeploymentVersion should report the backlog count for the activity task queue. - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[activity][after-scheduling-activities]", tqName, enumspb.TASK_QUEUE_TYPE_ACTIVITY, // Querying the activity task queue to validate the backlogged activities @@ -244,9 +294,8 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentVersionAbsorbsUnversionedBacklog ) // DescribeTaskQueue Legacy Mode: Since the activity task queue is part of the current version, the legacy mode should report the total backlog count. - env.requireLegacyTaskQueueStatsRelaxed( - ctx, - a, + s.requireLegacyTaskQueueStatsRelaxed( + env, "DescribeTaskQueue[legacy][activity]", tqName, enumspb.TASK_QUEUE_TYPE_ACTIVITY, @@ -256,11 +305,11 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentVersionAbsorbsUnversionedBacklog } func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBacklog(usePriMatcher bool, behavior testcore.MatchingBehavior) { - env := newTaskQueueStatsContext(s.T(), usePriMatcher, behavior) + env := s.newTaskQueueStatsContext(usePriMatcher, behavior) env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second) env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + ctx, cancel := context.WithTimeout(s.Context(), 120*time.Second) defer cancel() tqName := "tq-" + common.GenerateRandomString(5) @@ -269,20 +318,20 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBackl rampingBuildID := "v2" pollCtx, cancelPoll := context.WithCancel(ctx) - env.createVersionsInTaskQueue(pollCtx, tqName, deploymentName, currentBuildID) - env.createVersionsInTaskQueue(pollCtx, tqName, deploymentName, rampingBuildID) + s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, currentBuildID) + s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, rampingBuildID) cancelPoll() // cancel the pollers so that we can verify the backlog expectations // Set ramping version to 30% rampPercentage := 30 - env.setRampingVersion(deploymentName, rampingBuildID, rampPercentage) + s.setRampingVersion(env, deploymentName, rampingBuildID, rampPercentage) // Set current version - env.setCurrentVersion(deploymentName, currentBuildID) + s.setCurrentVersion(env, deploymentName, currentBuildID) // Enqueue unversioned backlog. unversionedWorkflowCount := 10 * env.partitionCount - env.startUnversionedWorkflows(unversionedWorkflowCount, tqName) + s.startUnversionedWorkflows(env, unversionedWorkflowCount, tqName) // Verify workflow add rate env.validateRates(tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, true, false) @@ -303,14 +352,11 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBackl // Currently only testing the following API's: // - DescribeWorkerDeploymentVersion for the current and ramping versions. // - DescribeTaskQueue Legacy Mode for the current and ramping versions. - s.EventuallyWithT(func(c *assert.CollectT) { - a := require.New(c) - + s.Await(func(s *TaskQueueStatsVersionSuite) { // DescribeWorkerDeploymentVersion: current version should also show only 70% of the unversioned backlog for this task queue // as a ramping version, with ramp set to 30%, exists and absorbs 30% of the unversioned backlog. - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[current][workflow]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -320,9 +366,8 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBackl ) // DescribeWorkerDeploymentVersion: ramping version should show the remaining 30% of the unversioned backlog for this task queue - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[ramping][workflow]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -331,9 +376,8 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBackl rampingExpectation, ) // Since the task queue is part of both the current and ramping versions, the legacy mode should report the total backlog count. - env.requireLegacyTaskQueueStatsRelaxed( - ctx, - a, + s.requireLegacyTaskQueueStatsRelaxed( + env, "DescribeTaskQueue[legacy][workflow]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -343,7 +387,7 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBackl // Here, since the activity task queue is present both in the current and in the ramping version, the backlog count would differ depending on the version described. // Poll with BOTH buildIDs in parallel to drain all workflow tasks (hash distribution splits them between current and ramping) - env.pollWorkflowTasksAndScheduleActivitiesParallel( + s.pollWorkflowTasksAndScheduleActivitiesParallel(env, workflowTasksAndActivitiesPollerParams{ tqName: tqName, deploymentName: deploymentName, @@ -385,13 +429,10 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBackl MaxExtraTasks: 0, } - s.EventuallyWithT(func(c *assert.CollectT) { - a := require.New(c) - + s.Await(func(s *TaskQueueStatsVersionSuite) { // Validate current version activity stats - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[activity][after-scheduling-activities][current-version]", tqName, enumspb.TASK_QUEUE_TYPE_ACTIVITY, @@ -401,9 +442,8 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBackl ) // Validate ramping version activity stats - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[activity][after-scheduling-activities][ramping-version]", tqName, enumspb.TASK_QUEUE_TYPE_ACTIVITY, @@ -416,11 +456,11 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBackl } func (s *TaskQueueStatsVersionSuite) TestCurrentAbsorbsUnversionedBacklog_WhenRampingToUnversioned(usePriMatcher bool, behavior testcore.MatchingBehavior) { - env := newTaskQueueStatsContext(s.T(), usePriMatcher, behavior) + env := s.newTaskQueueStatsContext(usePriMatcher, behavior) env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second) env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(s.Context(), 60*time.Second) defer cancel() deploymentName := testcore.RandomizeStr("deployment") @@ -428,18 +468,18 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentAbsorbsUnversionedBacklog_WhenRa currentBuildID := "v1" pollCtx, cancelPoll := context.WithCancel(ctx) - env.createVersionsInTaskQueue(pollCtx, tqName, deploymentName, currentBuildID) + s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, currentBuildID) cancelPoll() // cancel the pollers so that we can verify the backlog expectations // Set current version. - env.setCurrentVersion(deploymentName, currentBuildID) + s.setCurrentVersion(env, deploymentName, currentBuildID) rampPercentage := 20 - env.setRampingVersion(deploymentName, "", rampPercentage) + s.setRampingVersion(env, deploymentName, "", rampPercentage) // Enqueue unversioned backlog. unversionedWorkflowCount := 10 * env.partitionCount - env.startUnversionedWorkflows(unversionedWorkflowCount, tqName) + s.startUnversionedWorkflows(env, unversionedWorkflowCount, tqName) // Verify workflow add rate env.validateRates(tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, true, false) @@ -453,14 +493,11 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentAbsorbsUnversionedBacklog_WhenRa MaxExtraTasks: 0, } - s.EventuallyWithT(func(c *assert.CollectT) { - a := require.New(c) - + s.Await(func(s *TaskQueueStatsVersionSuite) { // There is no way right now for a user to query stats of the "unversioned" version. All we can do in this case // is to query the current version's stats and see that it is attributed 80% of the unversioned backlog. - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[current][workflow][ramping-to-unversioned]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -470,9 +507,8 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentAbsorbsUnversionedBacklog_WhenRa ) // Since the task queue is part of both the current and ramping versions, the legacy mode should report the total backlog count. - env.requireLegacyTaskQueueStatsRelaxed( - ctx, - a, + s.requireLegacyTaskQueueStatsRelaxed( + env, "DescribeTaskQueue[legacy][workflow][ramping-to-unversioned]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -482,11 +518,11 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentAbsorbsUnversionedBacklog_WhenRa } func (s *TaskQueueStatsVersionSuite) TestRampingAbsorbsUnversionedBacklog_WhenCurrentIsUnversioned(usePriMatcher bool, behavior testcore.MatchingBehavior) { - env := newTaskQueueStatsContext(s.T(), usePriMatcher, behavior) + env := s.newTaskQueueStatsContext(usePriMatcher, behavior) env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second) env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(s.Context(), 60*time.Second) defer cancel() deploymentName := testcore.RandomizeStr("deployment") @@ -494,19 +530,19 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAbsorbsUnversionedBacklog_WhenCu rampingBuildID := "v2" pollCtx, cancelPoll := context.WithCancel(ctx) - env.createVersionsInTaskQueue(pollCtx, tqName, deploymentName, rampingBuildID) + s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, rampingBuildID) cancelPoll() // cancel the pollers so that we can verify the backlog expectations // Set current to unversioned (nil current version). - env.setCurrentVersion(deploymentName, "") + s.setCurrentVersion(env, deploymentName, "") // Set ramping to a versioned deployment. rampPercentage := 30 - env.setRampingVersion(deploymentName, rampingBuildID, rampPercentage) + s.setRampingVersion(env, deploymentName, rampingBuildID, rampPercentage) // Enqueue unversioned backlog. unversionedWorkflowCount := 10 * env.partitionCount - env.startUnversionedWorkflows(unversionedWorkflowCount, tqName) + s.startUnversionedWorkflows(env, unversionedWorkflowCount, tqName) // Verify workflow add rate env.validateRates(tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, true, false) @@ -520,14 +556,11 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAbsorbsUnversionedBacklog_WhenCu MaxExtraTasks: 0, } - s.EventuallyWithT(func(c *assert.CollectT) { - a := require.New(c) - + s.Await(func(s *TaskQueueStatsVersionSuite) { // We can't query "unversioned" as a WorkerDeploymentVersion, but we can validate that the ramping version // is attributed its ramp share of the unversioned backlog. - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[ramping][workflow][current-unversioned]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -537,9 +570,8 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAbsorbsUnversionedBacklog_WhenCu ) // Legacy mode should continue to report the total backlog for the task queue. - env.requireLegacyTaskQueueStatsRelaxed( - ctx, - a, + s.requireLegacyTaskQueueStatsRelaxed( + env, "DescribeTaskQueue[legacy][workflow][current-unversioned]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -549,10 +581,7 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAbsorbsUnversionedBacklog_WhenCu } func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversionedBacklog(usePriMatcher bool, behavior testcore.MatchingBehavior) { - env := newTaskQueueStatsContext(s.T(), usePriMatcher, behavior) - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - + env := s.newTaskQueueStatsContext(usePriMatcher, behavior) env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second) env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL @@ -561,24 +590,24 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned currentBuildID := "v1" inactiveBuildID := "v2" - pollCtx, cancelPoll := context.WithCancel(testcore.NewContext()) + pollCtx, cancelPoll := context.WithCancel(s.Context()) - env.createVersionsInTaskQueue(pollCtx, tqName, deploymentName, currentBuildID) - env.createVersionsInTaskQueue(pollCtx, tqName, deploymentName, inactiveBuildID) + s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, currentBuildID) + s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, inactiveBuildID) // Set current version - env.setCurrentVersion(deploymentName, currentBuildID) + s.setCurrentVersion(env, deploymentName, currentBuildID) // Stopping the pollers so that we verify the backlog expectations cancelPoll() // Enqueue unversioned backlog. unversionedWorkflows := 10 * env.partitionCount - env.startUnversionedWorkflows(unversionedWorkflows, tqName) + s.startUnversionedWorkflows(env, unversionedWorkflows, tqName) // Enqueue pinned workflows. pinnedWorkflows := 10 * env.partitionCount - env.startPinnedWorkflows(pinnedWorkflows, tqName, deploymentName, inactiveBuildID) + s.startPinnedWorkflows(env, pinnedWorkflows, tqName, deploymentName, inactiveBuildID) // Verify workflow add rate env.validateRates(tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, true, false) @@ -595,13 +624,10 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned // Currently only testing the following API's: // - DescribeWorkerDeploymentVersion // - DescribeTaskQueue Legacy Mode - s.EventuallyWithT(func(c *assert.CollectT) { - a := require.New(c) - + s.Await(func(s *TaskQueueStatsVersionSuite) { // DescribeWorkerDeploymentVersion: current version should should show 100% of the unversioned backlog for this task queue - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[current][workflow]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -611,9 +637,8 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned ) // DescribeWorkerDeploymentVersion: inactive version should only show the pinned workflows that are scheduled on it. - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[inactive][workflow]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -624,7 +649,7 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned }, 10*time.Second, 200*time.Millisecond) // Polling the workflow tasks and scheduling activities - env.pollWorkflowTasksAndScheduleActivitiesParallel( + s.pollWorkflowTasksAndScheduleActivitiesParallel(env, workflowTasksAndActivitiesPollerParams{ tqName: tqName, deploymentName: deploymentName, @@ -667,13 +692,10 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned MaxExtraTasks: 0, } - s.EventuallyWithT(func(c *assert.CollectT) { - a := require.New(c) - + s.Await(func(s *TaskQueueStatsVersionSuite) { // The activity task queue of the current version should have the backlog count for the activities that were scheduled - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[current][activity]", tqName, enumspb.TASK_QUEUE_TYPE_ACTIVITY, @@ -683,9 +705,8 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned ) // The workflow task queue of the current version should be empty since activities were scheduled - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[current][workflow]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -695,9 +716,8 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned ) // The workflow task queue of the inactive version should be empty since activities were scheduled - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[inactive][workflow]", tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW, @@ -707,9 +727,8 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned ) // The activity task queue of the inactive version should have the backlog count for the activities that were scheduled - env.requireWDVTaskQueueStatsRelaxed( - ctx, - a, + s.requireWDVTaskQueueStatsRelaxed( + env, "DescribeWorkerDeploymentVersion[inactive][activity]", tqName, enumspb.TASK_QUEUE_TYPE_ACTIVITY, @@ -720,49 +739,11 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned }, 10*time.Second, 200*time.Millisecond) } -// taskQueueStatsContext holds the per-test environment and configuration for task queue stats tests. -type taskQueueStatsContext struct { - testcore.Env - usePriMatcher bool - minPriority int - maxPriority int - defaultPriority int - partitionCount int -} - -func newTaskQueueStatsContext( - t *testing.T, - usePriMatcher bool, - behavior testcore.MatchingBehavior, - extraOpts ...testcore.TestOption, -) *taskQueueStatsContext { - opts := []testcore.TestOption{ - testcore.WithWorkerService("worker-deployment versioning"), - testcore.WithDynamicConfig(dynamicconfig.EnableDeploymentVersions, true), - testcore.WithDynamicConfig(dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs, true), - testcore.WithDynamicConfig(dynamicconfig.MatchingUseNewMatcher, usePriMatcher), - testcore.WithDynamicConfig(dynamicconfig.MatchingPriorityLevels, 5), // maxPriority - } - opts = append(opts, behavior.Options()...) - opts = append(opts, extraOpts...) - env := testcore.NewEnv(t, opts...) - behavior.InjectHooks(env) - return &taskQueueStatsContext{ - Env: env, - usePriMatcher: usePriMatcher, - minPriority: 1, - maxPriority: 5, - defaultPriority: 3, - partitionCount: 2, // kept low to reduce test time on CI - } -} - // requireWDVTaskQueueStatsRelaxed asserts task queue statistics by allowing for over-counting in multi-partition ramping scenarios. // The production code intentionally uses math.Ceil for both ramping and current percentage // calculations across partitions, which can result in slight over-counting. -func (s *taskQueueStatsContext) requireWDVTaskQueueStatsRelaxed( - ctx context.Context, - a *require.Assertions, +func (s *TaskQueueStatsVersionSuite) requireWDVTaskQueueStatsRelaxed( + env *taskQueueStatsContext, label string, tqName string, tqType enumspb.TaskQueueType, @@ -770,37 +751,36 @@ func (s *taskQueueStatsContext) requireWDVTaskQueueStatsRelaxed( buildID string, expectation taskQueueExpectations, ) { - stats, found, err := s.describeWDVTaskQueueStats(ctx, tqName, tqType, deploymentName, buildID) - a.NoError(err) - a.True(found, "expected %s task queue %s in DescribeWorkerDeploymentVersion response", tqType, tqName) - a.NotNil(stats, "expected %s task queue %s to have stats in DescribeWorkerDeploymentVersion response", tqType, tqName) + stats, found, err := env.describeWDVTaskQueueStats(s.Context(), tqName, tqType, deploymentName, buildID) + s.NoError(err) + s.True(found, "expected %s task queue %s in DescribeWorkerDeploymentVersion response", tqType, tqName) + s.NotNil(stats, "expected %s task queue %s to have stats in DescribeWorkerDeploymentVersion response", tqType, tqName) // Use the existing validateTaskQueueStats with MaxExtraTasks set to numPartitions // to account for ceiling operations across partitions - expectation.MaxExtraTasks = s.partitionCount - validateTaskQueueStats(label, a, stats, expectation) + expectation.MaxExtraTasks = env.partitionCount + validateTaskQueueStats(s.T(), label, stats, expectation) } // requireLegacyTaskQueueStatsRelaxed asserts task queue statistics by allowing for over-counting in multi-partition scenarios. // The production code intentionally uses math.Ceil for both ramping and current percentage calculations across partitions, // which can result in slight over-counting. -func (s *taskQueueStatsContext) requireLegacyTaskQueueStatsRelaxed( - ctx context.Context, - a *require.Assertions, +func (s *TaskQueueStatsVersionSuite) requireLegacyTaskQueueStatsRelaxed( + env *taskQueueStatsContext, label string, tqName string, tqType enumspb.TaskQueueType, expectation taskQueueExpectations, ) { - stats, found, err := s.describeLegacyTaskQueueStats(ctx, tqName, tqType) - a.NoError(err) - a.True(found, "expected %s task queue %s in DescribeTaskQueue response", tqType, tqName) - a.NotNil(stats, "expected %s task queue %s to have stats in DescribeTaskQueue response", tqType, tqName) + stats, found, err := env.describeLegacyTaskQueueStats(s.Context(), tqName, tqType) + s.NoError(err) + s.True(found, "expected %s task queue %s in DescribeTaskQueue response", tqType, tqName) + s.NotNil(stats, "expected %s task queue %s to have stats in DescribeTaskQueue response", tqType, tqName) // Use the existing validateTaskQueueStats with MaxExtraTasks set to numPartitions // to account for ceiling operations across partitions - expectation.MaxExtraTasks = s.partitionCount - validateTaskQueueStats(label, a, stats, expectation) + expectation.MaxExtraTasks = env.partitionCount + validateTaskQueueStats(s.T(), label, stats, expectation) } // Publishes versioned and unversioned entities; with one entity per priority (plus default priority). Multiplied by `sets`. @@ -847,7 +827,7 @@ func (s *taskQueueStatsContext) publishConsumeWorkflowTasksValidateStats(sets in // poll all workflow tasks and enqueue one activity task for each workflow totalAct := s.enqueueActivitiesForEachWorkflow(sets, tqName) - require.Equal(s.T(), total, totalAct, "should have enqueued the same number of activities as workflows") + s.Equal(total, totalAct, "should have enqueued the same number of activities as workflows") // verify workflow dispatch rate and activity add rate if sets > 0 { @@ -888,11 +868,11 @@ func (s *taskQueueStatsContext) publishConsumeWorkflowTasksValidateStats(sets in s.validateAllTaskQueueStats(tqName, expectations, singlePartition) } -func (s *taskQueueStatsContext) startUnversionedWorkflows(count int, tqName string) { +func (s *TaskQueueStatsVersionSuite) startUnversionedWorkflows(env *taskQueueStatsContext, count int, tqName string) { wt := "functional-workflow-current-absorbs-unversioned" workflowType := &commonpb.WorkflowType{Name: wt} request := &workflowservice.StartWorkflowExecutionRequest{ - Namespace: s.Namespace().String(), + Namespace: env.Namespace().String(), WorkflowType: workflowType, TaskQueue: &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, WorkflowRunTimeout: durationpb.New(10 * time.Minute), @@ -903,17 +883,17 @@ func (s *taskQueueStatsContext) startUnversionedWorkflows(count int, tqName stri for range count { request.WorkflowId = uuid.NewString() // starting "count" different Unversioned workflows. - _, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) - require.NoError(s.T(), err) + _, err := env.FrontendClient().StartWorkflowExecution(s.Context(), request) + s.NoError(err) } } -func (s *taskQueueStatsContext) startPinnedWorkflows(count int, tqName string, deploymentName string, buildID string) { +func (s *TaskQueueStatsVersionSuite) startPinnedWorkflows(env *taskQueueStatsContext, count int, tqName string, deploymentName string, buildID string) { wt := "functional-workflow-pinned" workflowType := &commonpb.WorkflowType{Name: wt} request := &workflowservice.StartWorkflowExecutionRequest{ - Namespace: s.Namespace().String(), + Namespace: env.Namespace().String(), WorkflowType: workflowType, TaskQueue: &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, WorkflowRunTimeout: durationpb.New(10 * time.Minute), @@ -934,18 +914,18 @@ func (s *taskQueueStatsContext) startPinnedWorkflows(count int, tqName string, d } for range count { request.WorkflowId = uuid.NewString() // starting "n" different Pinned workflows. - _, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) - require.NoError(s.T(), err) + _, err := env.FrontendClient().StartWorkflowExecution(s.Context(), request) + s.NoError(err) } } -func (s *taskQueueStatsContext) pollWorkflowTasksAndScheduleActivitiesParallel(params ...workflowTasksAndActivitiesPollerParams) { +func (s *TaskQueueStatsVersionSuite) pollWorkflowTasksAndScheduleActivitiesParallel(env *taskQueueStatsContext, params ...workflowTasksAndActivitiesPollerParams) { var wg sync.WaitGroup errCh := make(chan error, len(params)) for _, p := range params { wg.Go(func() { - _, err := s.pollWorkflowTasksAndScheduleActivities(p) + _, err := s.pollWorkflowTasksAndScheduleActivities(env, p) errCh <- err }) } @@ -953,18 +933,18 @@ func (s *taskQueueStatsContext) pollWorkflowTasksAndScheduleActivitiesParallel(p wg.Wait() close(errCh) for err := range errCh { - require.NoError(s.T(), err) + s.NoError(err) } } -func (s *taskQueueStatsContext) pollWorkflowTasksAndScheduleActivities(params workflowTasksAndActivitiesPollerParams) (int, error) { +func (s *TaskQueueStatsVersionSuite) pollWorkflowTasksAndScheduleActivities(env *taskQueueStatsContext, params workflowTasksAndActivitiesPollerParams) (int, error) { deploymentOpts := createDeploymentOptions(params.deploymentName, params.buildID) scheduled := 0 emptyPollCount := 0 for i := 0; i < params.maxToSchedule; { - resp, err := s.FrontendClient().PollWorkflowTaskQueue(testcore.NewContext(), &workflowservice.PollWorkflowTaskQueueRequest{ - Namespace: s.Namespace().String(), + resp, err := env.FrontendClient().PollWorkflowTaskQueue(s.Context(), &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: env.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{Name: params.tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, Identity: params.identity, DeploymentOptions: deploymentOpts, @@ -985,7 +965,7 @@ func (s *taskQueueStatsContext) pollWorkflowTasksAndScheduleActivities(params wo emptyPollCount = 0 respondReq := &workflowservice.RespondWorkflowTaskCompletedRequest{ - Namespace: s.Namespace().String(), + Namespace: env.Namespace().String(), TaskToken: resp.TaskToken, Commands: []*commandpb.Command{ { @@ -1004,7 +984,7 @@ func (s *taskQueueStatsContext) pollWorkflowTasksAndScheduleActivities(params wo VersioningBehavior: params.versioningBehavior, DeploymentOptions: deploymentOpts, } - _, err = s.FrontendClient().RespondWorkflowTaskCompleted(testcore.NewContext(), respondReq) + _, err = env.FrontendClient().RespondWorkflowTaskCompleted(s.Context(), respondReq) if err != nil { return scheduled, err } @@ -1016,7 +996,8 @@ func (s *taskQueueStatsContext) pollWorkflowTasksAndScheduleActivities(params wo return scheduled, nil } -func (s *taskQueueStatsContext) completeWorkflowTasksAndScheduleActivities( +func (s *TaskQueueStatsVersionSuite) completeWorkflowTasksAndScheduleActivities( + env *taskQueueStatsContext, tqName string, deploymentName string, buildID string, @@ -1025,13 +1006,13 @@ func (s *taskQueueStatsContext) completeWorkflowTasksAndScheduleActivities( deploymentOpts := createDeploymentOptions(deploymentName, buildID) for i := 0; i < activityCount; { - resp, err := s.FrontendClient().PollWorkflowTaskQueue(testcore.NewContext(), &workflowservice.PollWorkflowTaskQueueRequest{ - Namespace: s.Namespace().String(), + resp, err := env.FrontendClient().PollWorkflowTaskQueue(s.Context(), &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: env.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, Identity: "current-version-worker", DeploymentOptions: deploymentOpts, }) - require.NoError(s.T(), err) + s.NoError(err) if resp == nil || resp.GetAttempt() < 1 { fmt.Println("Empty poll! Continuing...") continue @@ -1040,7 +1021,7 @@ func (s *taskQueueStatsContext) completeWorkflowTasksAndScheduleActivities( // Note: Scheduling activities with no VersioningBehaviour only for the purpose of this test so that we can validate // if unversioned activity tasks are considered part of the backlog for the activity task queue in a current version. respondReq := &workflowservice.RespondWorkflowTaskCompletedRequest{ - Namespace: s.Namespace().String(), + Namespace: env.Namespace().String(), TaskToken: resp.TaskToken, Commands: []*commandpb.Command{ { @@ -1060,37 +1041,37 @@ func (s *taskQueueStatsContext) completeWorkflowTasksAndScheduleActivities( DeploymentOptions: deploymentOpts, } - _, err = s.FrontendClient().RespondWorkflowTaskCompleted(testcore.NewContext(), respondReq) - require.NoError(s.T(), err) + _, err = env.FrontendClient().RespondWorkflowTaskCompleted(s.Context(), respondReq) + s.NoError(err) i++ } } // TODO (Shivam): We may have to wait for the propagation status to show completed if we are using async workflows here. -func (s *taskQueueStatsContext) setCurrentVersion(deploymentName, buildID string) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func (s *TaskQueueStatsVersionSuite) setCurrentVersion(env *taskQueueStatsContext, deploymentName, buildID string) { + ctx, cancel := context.WithTimeout(s.Context(), 10*time.Second) defer cancel() - _, err := s.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, &workflowservice.SetWorkerDeploymentCurrentVersionRequest{ - Namespace: s.Namespace().String(), + _, err := env.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, &workflowservice.SetWorkerDeploymentCurrentVersionRequest{ + Namespace: env.Namespace().String(), DeploymentName: deploymentName, BuildId: buildID, }) - require.NoError(s.T(), err) + s.NoError(err) } // TODO (Shivam): We may have to wait for the propagation status to show completed if we are using async workflows here. -func (s *taskQueueStatsContext) setRampingVersion(deploymentName, buildID string, rampPercentage int) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func (s *TaskQueueStatsVersionSuite) setRampingVersion(env *taskQueueStatsContext, deploymentName, buildID string, rampPercentage int) { + ctx, cancel := context.WithTimeout(s.Context(), 10*time.Second) defer cancel() - _, err := s.FrontendClient().SetWorkerDeploymentRampingVersion(ctx, &workflowservice.SetWorkerDeploymentRampingVersionRequest{ - Namespace: s.Namespace().String(), + _, err := env.FrontendClient().SetWorkerDeploymentRampingVersion(ctx, &workflowservice.SetWorkerDeploymentRampingVersionRequest{ + Namespace: env.Namespace().String(), DeploymentName: deploymentName, BuildId: buildID, Percentage: float32(rampPercentage), }) - require.NoError(s.T(), err) + s.NoError(err) } func (s *taskQueueStatsContext) describeWDVTaskQueueStats( @@ -1175,8 +1156,8 @@ func (s *taskQueueStatsContext) enqueueWorkflows(sets int, tqName string) int { } } - _, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) - require.NoError(s.T(), err) + _, err := s.FrontendClient().StartWorkflowExecution(s.ctx, request) + s.NoError(err) total++ } @@ -1187,10 +1168,10 @@ func (s *taskQueueStatsContext) enqueueWorkflows(sets int, tqName string) int { return total } -func (s *taskQueueStatsContext) createVersionsInTaskQueue(ctx context.Context, tqName string, deploymentName string, buildID string) { +func (s *TaskQueueStatsVersionSuite) createVersionsInTaskQueue(ctx context.Context, env *taskQueueStatsContext, tqName string, deploymentName string, buildID string) { go func() { - _, _ = s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ - Namespace: s.Namespace().String(), + _, _ = env.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: env.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, Identity: "random", DeploymentOptions: createDeploymentOptions(deploymentName, buildID), @@ -1198,8 +1179,8 @@ func (s *taskQueueStatsContext) createVersionsInTaskQueue(ctx context.Context, t }() go func() { - _, _ = s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ - Namespace: s.Namespace().String(), + _, _ = env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, Identity: "random", DeploymentOptions: createDeploymentOptions(deploymentName, buildID), @@ -1207,17 +1188,16 @@ func (s *taskQueueStatsContext) createVersionsInTaskQueue(ctx context.Context, t }() // Wait for the version to be created. - require.EventuallyWithT(s.T(), func(c *assert.CollectT) { - a := require.New(c) - resp, err := s.FrontendClient().DescribeWorkerDeploymentVersion(ctx, &workflowservice.DescribeWorkerDeploymentVersionRequest{ - Namespace: s.Namespace().String(), + s.Await(func(s *TaskQueueStatsVersionSuite) { + resp, err := env.FrontendClient().DescribeWorkerDeploymentVersion(s.Context(), &workflowservice.DescribeWorkerDeploymentVersionRequest{ + Namespace: env.Namespace().String(), DeploymentVersion: &deploymentpb.WorkerDeploymentVersion{ DeploymentName: deploymentName, BuildId: buildID, }, }) - a.NoError(err) - a.NotNil(resp) + s.NoError(err) + s.NotNil(resp) }, 10*time.Second, 200*time.Millisecond) } @@ -1227,7 +1207,7 @@ func (s *taskQueueStatsContext) createDeploymentInTaskQueue(tqName string) { var wg sync.WaitGroup wg.Go(func() { - _, _ = s.FrontendClient().PollWorkflowTaskQueue(testcore.NewContext(), &workflowservice.PollWorkflowTaskQueueRequest{ + _, _ = s.FrontendClient().PollWorkflowTaskQueue(s.ctx, &workflowservice.PollWorkflowTaskQueueRequest{ Namespace: s.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, Identity: "random", @@ -1236,7 +1216,7 @@ func (s *taskQueueStatsContext) createDeploymentInTaskQueue(tqName string) { }) wg.Go(func() { - _, _ = s.FrontendClient().PollActivityTaskQueue(testcore.NewContext(), &workflowservice.PollActivityTaskQueueRequest{ + _, _ = s.FrontendClient().PollActivityTaskQueue(s.ctx, &workflowservice.PollActivityTaskQueueRequest{ Namespace: s.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, Identity: "random", @@ -1262,8 +1242,8 @@ func (s *taskQueueStatsContext) enqueueActivitiesForEachWorkflow(sets int, tqNam pollReq.DeploymentOptions = deploymentOpts } - resp, err := s.FrontendClient().PollWorkflowTaskQueue(testcore.NewContext(), pollReq) - require.NoError(s.T(), err) + resp, err := s.FrontendClient().PollWorkflowTaskQueue(s.ctx, pollReq) + s.NoError(err) if resp == nil || resp.GetAttempt() < 1 { continue } @@ -1293,8 +1273,8 @@ func (s *taskQueueStatsContext) enqueueActivitiesForEachWorkflow(sets int, tqNam respondReq.VersioningBehavior = enumspb.VERSIONING_BEHAVIOR_PINNED } - _, err = s.FrontendClient().RespondWorkflowTaskCompleted(testcore.NewContext(), respondReq) - require.NoError(s.T(), err) + _, err = s.FrontendClient().RespondWorkflowTaskCompleted(s.ctx, respondReq) + s.NoError(err) i++ total++ @@ -1319,9 +1299,9 @@ func (s *taskQueueStatsContext) pollActivities(count int, tqName string) { } resp, err := s.FrontendClient().PollActivityTaskQueue( - testcore.NewContext(), pollReq, + s.ctx, pollReq, ) - require.NoError(s.T(), err) + s.NoError(err) if resp == nil || resp.GetAttempt() < 1 { continue // poll again on empty responses } @@ -1340,7 +1320,7 @@ func (s *taskQueueStatsContext) validateAllTaskQueueStats( } } -// validateRates verifies TasksAddRate and/or TasksDispatchRate in a dedicated EventuallyWithT block. +// validateRates verifies TasksAddRate and/or TasksDispatchRate in a dedicated await block. // This should be called immediately after the relevant operation (enqueue for add rate, poll for dispatch rate) // to ensure the rate is checked while still fresh (before the 30-second sliding window decays). func (s *taskQueueStatsContext) validateRates( @@ -1349,9 +1329,6 @@ func (s *taskQueueStatsContext) validateRates( expectAddRate bool, expectDispatchRate bool, ) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - req := &workflowservice.DescribeTaskQueueRequest{ Namespace: s.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, @@ -1359,21 +1336,20 @@ func (s *taskQueueStatsContext) validateRates( ReportStats: true, } - require.EventuallyWithT(s.T(), func(c *assert.CollectT) { - a := require.New(c) + await.Require(s.Context(), s.tb, func(t *await.T) { label := "validateRates[" + tqType.String() + "]" - resp, err := s.FrontendClient().DescribeTaskQueue(ctx, req) - a.NoError(err) - a.NotNil(resp) - a.NotNil(resp.Stats) + resp, err := s.FrontendClient().DescribeTaskQueue(t.Context(), req) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.Stats) if expectAddRate { - a.Greater(resp.Stats.TasksAddRate, float32(0), + require.Greater(t, resp.Stats.TasksAddRate, float32(0), "%s: TasksAddRate should be > 0, got %f", label, resp.Stats.TasksAddRate) } if expectDispatchRate { - a.Greater(resp.Stats.TasksDispatchRate, float32(0), + require.Greater(t, resp.Stats.TasksDispatchRate, float32(0), "%s: TasksDispatchRate should be > 0, got %f", label, resp.Stats.TasksDispatchRate) } }, 5*time.Second, 100*time.Millisecond) @@ -1385,10 +1361,7 @@ func (s *taskQueueStatsContext) validateTaskQueueStatsByType( expectation taskQueueExpectations, singlePartition bool, ) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - s.validateDescribeTaskQueueWithDefaultMode(ctx, tqName, tqType, expectation, singlePartition) + s.validateDescribeTaskQueueWithDefaultMode(tqName, tqType, expectation, singlePartition) // The other two methods report *either* versioned or unversioned stats, so we need to // half the expectations to account for that. @@ -1396,12 +1369,11 @@ func (s *taskQueueStatsContext) validateTaskQueueStatsByType( halfExpectation.BacklogCount /= 2 halfExpectation.MaxExtraTasks /= 2 - s.validateDescribeTaskQueueWithEnhancedMode(ctx, tqName, tqType, halfExpectation) - s.validateDescribeWorkerDeploymentVersion(ctx, tqName, tqType, halfExpectation) + s.validateDescribeTaskQueueWithEnhancedMode(tqName, tqType, halfExpectation) + s.validateDescribeWorkerDeploymentVersion(tqName, tqType, halfExpectation) } func (s *taskQueueStatsContext) validateDescribeTaskQueueWithDefaultMode( - ctx context.Context, tqName string, tqType enumspb.TaskQueueType, expectation taskQueueExpectations, @@ -1414,41 +1386,39 @@ func (s *taskQueueStatsContext) validateDescribeTaskQueueWithDefaultMode( } // test stats are not reported by default (and therefore also not cached) - resp, err := s.FrontendClient().DescribeTaskQueue(ctx, req) - require.NoError(s.T(), err) - require.NotNil(s.T(), resp) - require.Nil(s.T(), resp.Stats, "stats should not be reported by default") + resp, err := s.FrontendClient().DescribeTaskQueue(s.Context(), req) + s.NoError(err) + s.NotNil(resp) + s.Nil(resp.Stats, "stats should not be reported by default") //nolint:staticcheck // SA1019 deprecated - require.Nil(s.T(), resp.TaskQueueStatus, "status should not be reported by default") + s.Nil(resp.TaskQueueStatus, "status should not be reported by default") - require.EventuallyWithT(s.T(), func(c *assert.CollectT) { - a := require.New(c) + await.Require(s.Context(), s.tb, func(t *await.T) { label := "DescribeTaskQueue_DefaultMode[" + tqType.String() + "]" req.ReportStats = true //nolint:staticcheck // SA1019 deprecated req.IncludeTaskQueueStatus = true - resp, err := s.FrontendClient().DescribeTaskQueue(ctx, req) - a.NoError(err) - a.NotNil(resp) + resp, err := s.FrontendClient().DescribeTaskQueue(t.Context(), req) + require.NoError(t, err) + require.NotNil(t, resp) if singlePartition { expected := expectation.BacklogCount / 2 // only reports unversioned //nolint:staticcheck // SA1019 deprecated field actual := resp.TaskQueueStatus.GetBacklogCountHint() - a.EqualValuesf(expected, actual, "%s: backlog hint should be %d, got %d", label, expected, actual) + require.EqualValuesf(t, expected, actual, "%s: backlog hint should be %d, got %d", label, expected, actual) } - validateTaskQueueStats(label, a, resp.Stats, expectation) + validateTaskQueueStats(t, label, resp.Stats, expectation) if s.usePriMatcher && expectation.BacklogCount > 0 { // Per priority stats are only available with the priority matcher and when they've been actively used. - s.validateTaskQueueStatsByPriority(label, a, resp.StatsByPriorityKey, expectation) + s.validateTaskQueueStatsByPriority(t, label, resp.StatsByPriorityKey, expectation) } }, 5*time.Second, 100*time.Millisecond) } func (s *taskQueueStatsContext) validateDescribeTaskQueueWithEnhancedMode( - ctx context.Context, tqName string, tqType enumspb.TaskQueueType, expectation taskQueueExpectations, @@ -1471,45 +1441,42 @@ func (s *taskQueueStatsContext) validateDescribeTaskQueueWithEnhancedMode( } if !expectation.CachedEnabled { // skip if testing caching; as this would pin the result to the cache - resp, err := s.FrontendClient().DescribeTaskQueue(ctx, req) - require.NoError(s.T(), err) - require.NotNil(s.T(), resp) - require.Nil(s.T(), resp.Stats, "stats should not be reported by default") + resp, err := s.FrontendClient().DescribeTaskQueue(s.Context(), req) + s.NoError(err) + s.NotNil(resp) + s.Nil(resp.Stats, "stats should not be reported by default") //nolint:staticcheck // SA1019 deprecated - require.Nil(s.T(), resp.TaskQueueStatus, "status should not be reported") + s.Nil(resp.TaskQueueStatus, "status should not be reported") } - require.EventuallyWithT(s.T(), func(c *assert.CollectT) { - a := require.New(c) - + await.Require(s.Context(), s.tb, func(t *await.T) { req.ReportStats = true - resp, err := s.FrontendClient().DescribeTaskQueue(ctx, req) - a.NoError(err) - a.NotNil(resp) + resp, err := s.FrontendClient().DescribeTaskQueue(t.Context(), req) + require.NoError(t, err) + require.NotNil(t, resp) //nolint:staticcheck // SA1019 deprecated - a.Len(resp.GetVersionsInfo(), 2, "should be 2: 1 default/unversioned + 1 versioned") + require.Len(t, resp.GetVersionsInfo(), 2, "should be 2: 1 default/unversioned + 1 versioned") //nolint:staticcheck // SA1019 deprecated for _, v := range resp.GetVersionsInfo() { - a.Equal(enumspb.BUILD_ID_TASK_REACHABILITY_UNSPECIFIED, v.GetTaskReachability()) + require.Equal(t, enumspb.BUILD_ID_TASK_REACHABILITY_UNSPECIFIED, v.GetTaskReachability()) info := v.GetTypesInfo()[int32(tqType)] - a.NotNil(info, "should have info for task queue type %s", tqType) + require.NotNil(t, info, "should have info for task queue type %s", tqType) if info == nil { return } - a.NotNil(info.Stats, "should have stats for task queue type %s", tqType) + require.NotNil(t, info.Stats, "should have stats for task queue type %s", tqType) if info.Stats == nil { return } - validateTaskQueueStats("DescribeTaskQueue_EnhancedMode["+tqType.String()+"]", a, info.Stats, expectation) + validateTaskQueueStats(t, "DescribeTaskQueue_EnhancedMode["+tqType.String()+"]", info.Stats, expectation) } }, 5*time.Second, 100*time.Millisecond) } func (s *taskQueueStatsContext) validateDescribeWorkerDeploymentVersion( - ctx context.Context, tqName string, tqType enumspb.TaskQueueType, expectation taskQueueExpectations, @@ -1524,43 +1491,41 @@ func (s *taskQueueStatsContext) validateDescribeWorkerDeploymentVersion( } // test stats are not reported by default (and therefore also not cached) - resp, err := s.FrontendClient().DescribeWorkerDeploymentVersion(ctx, req) - require.NoError(s.T(), err) - require.NotNil(s.T(), resp) + resp, err := s.FrontendClient().DescribeWorkerDeploymentVersion(s.Context(), req) + s.NoError(err) + s.NotNil(resp) for _, info := range resp.VersionTaskQueues { - require.Nil(s.T(), info.Stats, "stats should not be reported by default") + s.Nil(info.Stats, "stats should not be reported by default") } - require.EventuallyWithT(s.T(), func(c *assert.CollectT) { - a := require.New(c) - + await.Require(s.Context(), s.tb, func(t *await.T) { req.ReportTaskQueueStats = true - resp, err := s.FrontendClient().DescribeWorkerDeploymentVersion(ctx, req) - a.NoError(err) - a.Len(resp.VersionTaskQueues, 2, "should be 1 task queue for Workflows and 1 for Activities") + resp, err := s.FrontendClient().DescribeWorkerDeploymentVersion(t.Context(), req) + require.NoError(t, err) + require.Len(t, resp.VersionTaskQueues, 2, "should be 1 task queue for Workflows and 1 for Activities") for _, info := range resp.VersionTaskQueues { if info.Name == tqName || info.Type == tqType { label := "DescribeWorkerDeploymentVersion[" + tqType.String() + "]" - validateTaskQueueStats(label, a, info.Stats, expectation) + validateTaskQueueStats(t, label, info.Stats, expectation) if s.usePriMatcher && expectation.BacklogCount > 0 { // Per priority stats are only available with the priority matcher and when they've been actively used. - s.validateTaskQueueStatsByPriority(label, a, info.StatsByPriorityKey, expectation) + s.validateTaskQueueStatsByPriority(t, label, info.StatsByPriorityKey, expectation) } return } } - a.Failf("Task queue %s of type %s not found in response", tqName, tqType) + require.Failf(t, "task queue not found", "Task queue %s of type %s not found in response", tqName, tqType) }, 5*time.Second, 100*time.Millisecond) } func (s *taskQueueStatsContext) validateTaskQueueStatsByPriority( + t require.TestingT, label string, - a *require.Assertions, stats map[int32]*taskqueuepb.TaskQueueStats, taskQueueExpectation taskQueueExpectations, ) { - a.Len(stats, s.maxPriority, "%s: stats should contain %d priorities", label, s.maxPriority) + require.Len(t, stats, s.maxPriority, "%s: stats should contain %d priorities", label, s.maxPriority) // use an abgridged version when caching since the exact stats are difficult to predict if taskQueueExpectation.CachedEnabled { @@ -1569,7 +1534,12 @@ func (s *taskQueueStatsContext) validateTaskQueueStatsByPriority( return } } - a.Failf("%s: should have found at least one non-zero backlog count with any non-zero rate across priorities", label) + require.Failf( + t, + "no non-zero priority stats found", + "%s: should have found at least one non-zero backlog count with any non-zero rate across priorities", + label, + ) } var accBacklogCount int @@ -1580,11 +1550,11 @@ func (s *taskQueueStatsContext) validateTaskQueueStatsByPriority( priExpectation.BacklogCount *= 2 // zero priority translates to default priority 3 } - a.Containsf(stats, i, "%s: stats should contain priority %d", label, i) - validateTaskQueueStats(fmt.Sprintf("%s_Pri[%d]", label, i), a, stats[i], priExpectation) + require.Containsf(t, stats, i, "%s: stats should contain priority %d", label, i) + validateTaskQueueStats(t, fmt.Sprintf("%s_Pri[%d]", label, i), stats[i], priExpectation) accBacklogCount += int(stats[i].ApproximateBacklogCount) } - a.GreaterOrEqualf(taskQueueExpectation.BacklogCount, accBacklogCount, + require.GreaterOrEqualf(t, taskQueueExpectation.BacklogCount, accBacklogCount, "%s: accumulated backlog count from all priorities should be at least %d, got %d", label, taskQueueExpectation.BacklogCount, accBacklogCount) } @@ -1599,38 +1569,38 @@ func (s *taskQueueStatsContext) deploymentOptions(tqName string) *deploymentpb.W } func validateTaskQueueStatsStrict( + t require.TestingT, label string, - a *require.Assertions, stats *taskqueuepb.TaskQueueStats, expectation taskQueueExpectations, ) { - a.Equal(int64(expectation.BacklogCount), stats.ApproximateBacklogCount, + require.Equal(t, int64(expectation.BacklogCount), stats.ApproximateBacklogCount, "%s: ApproximateBacklogCount should be %d, got %d", label, expectation.BacklogCount, stats.ApproximateBacklogCount) - a.Equal(stats.ApproximateBacklogCount == 0, stats.ApproximateBacklogAge.AsDuration() == time.Duration(0), + require.Equal(t, stats.ApproximateBacklogCount == 0, stats.ApproximateBacklogAge.AsDuration() == time.Duration(0), "%s: ApproximateBacklogAge should be 0 when ApproximateBacklogCount is 0, got %s", label, stats.ApproximateBacklogAge.AsDuration()) } func validateTaskQueueStats( + t require.TestingT, label string, - a *require.Assertions, stats *taskqueuepb.TaskQueueStats, expectation taskQueueExpectations, ) { // Actual counter can be greater than the expected due to history retries. We make sure the counter is in // range [expected, expected+maxBacklogExtraTasks] - a.GreaterOrEqual(stats.ApproximateBacklogCount, int64(expectation.BacklogCount), + require.GreaterOrEqual(t, stats.ApproximateBacklogCount, int64(expectation.BacklogCount), "%s: ApproximateBacklogCount should be at least %d, got %d", label, expectation.BacklogCount, stats.ApproximateBacklogCount) maxApproximateBacklogCount := int64(expectation.BacklogCount + expectation.MaxExtraTasks) - a.LessOrEqual(stats.ApproximateBacklogCount, maxApproximateBacklogCount, + require.LessOrEqual(t, stats.ApproximateBacklogCount, maxApproximateBacklogCount, "%s: ApproximateBacklogCount should be at most %d, got %d", label, maxApproximateBacklogCount, stats.ApproximateBacklogCount) - a.Equal(stats.ApproximateBacklogCount == 0, stats.ApproximateBacklogAge.AsDuration() == time.Duration(0), + require.Equal(t, stats.ApproximateBacklogCount == 0, stats.ApproximateBacklogAge.AsDuration() == time.Duration(0), "%s: ApproximateBacklogAge should be 0 when ApproximateBacklogCount is 0, got %s", label, stats.ApproximateBacklogAge.AsDuration()) }