From da5eec8d43d9e725120d9f2f66540a5c566b4d38 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Mon, 17 Nov 2025 12:24:50 +0000 Subject: [PATCH] fix: wait for build in task status load generator --- scaletest/taskstatus/client.go | 61 +++---- scaletest/taskstatus/run.go | 95 ++++++++++- scaletest/taskstatus/run_internal_test.go | 188 ++++++++++++++++++++-- 3 files changed, 283 insertions(+), 61 deletions(-) diff --git a/scaletest/taskstatus/client.go b/scaletest/taskstatus/client.go index badc6856f5140..d60f20ab8be07 100644 --- a/scaletest/taskstatus/client.go +++ b/scaletest/taskstatus/client.go @@ -11,14 +11,9 @@ import ( "cdr.dev/slog" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" + "github.com/coder/quartz" ) -// createExternalWorkspaceResult contains the results from creating an external workspace. -type createExternalWorkspaceResult struct { - WorkspaceID uuid.UUID - AgentToken string -} - // client abstracts the details of using codersdk.Client for workspace operations. // This interface allows for easier testing by enabling mock implementations and // provides a cleaner separation of concerns. @@ -27,9 +22,14 @@ type createExternalWorkspaceResult struct { // 1. Create the client with newClient(coderClient) // 2. Configure logging when the io.Writer is available in Run() type client interface { - // createExternalWorkspace creates an external workspace and returns the workspace ID - // and agent token for the first external agent found in the workspace resources. - createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) + // CreateUserWorkspace creates a workspace for a user. + CreateUserWorkspace(ctx context.Context, userID string, req codersdk.CreateWorkspaceRequest) (codersdk.Workspace, error) + + // WorkspaceByOwnerAndName retrieves a workspace by owner and name. + WorkspaceByOwnerAndName(ctx context.Context, owner string, name string, params codersdk.WorkspaceOptions) (codersdk.Workspace, error) + + // WorkspaceExternalAgentCredentials retrieves credentials for an external agent. + WorkspaceExternalAgentCredentials(ctx context.Context, workspaceID uuid.UUID, agentName string) (codersdk.ExternalAgentCredentials, error) // watchWorkspace watches for updates to a workspace. watchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error) @@ -56,48 +56,28 @@ type appStatusPatcher interface { // codersdk.Client. type sdkClient struct { coderClient *codersdk.Client + clock quartz.Clock + logger slog.Logger } // newClient creates a new client implementation using the provided codersdk.Client. func newClient(coderClient *codersdk.Client) client { return &sdkClient{ coderClient: coderClient, + clock: quartz.NewReal(), } } -func (c *sdkClient) createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) { - // Create the workspace - workspace, err := c.coderClient.CreateUserWorkspace(ctx, codersdk.Me, req) - if err != nil { - return createExternalWorkspaceResult{}, err - } - - // Get the workspace with latest build details - workspace, err = c.coderClient.WorkspaceByOwnerAndName(ctx, codersdk.Me, workspace.Name, codersdk.WorkspaceOptions{}) - if err != nil { - return createExternalWorkspaceResult{}, err - } +func (c *sdkClient) CreateUserWorkspace(ctx context.Context, userID string, req codersdk.CreateWorkspaceRequest) (codersdk.Workspace, error) { + return c.coderClient.CreateUserWorkspace(ctx, userID, req) +} - // Find external agents in resources - for _, resource := range workspace.LatestBuild.Resources { - if resource.Type != "coder_external_agent" || len(resource.Agents) == 0 { - continue - } - - // Get credentials for the first agent - agent := resource.Agents[0] - credentials, err := c.coderClient.WorkspaceExternalAgentCredentials(ctx, workspace.ID, agent.Name) - if err != nil { - return createExternalWorkspaceResult{}, err - } - - return createExternalWorkspaceResult{ - WorkspaceID: workspace.ID, - AgentToken: credentials.AgentToken, - }, nil - } +func (c *sdkClient) WorkspaceByOwnerAndName(ctx context.Context, owner string, name string, params codersdk.WorkspaceOptions) (codersdk.Workspace, error) { + return c.coderClient.WorkspaceByOwnerAndName(ctx, owner, name, params) +} - return createExternalWorkspaceResult{}, xerrors.Errorf("no external agent found in workspace") +func (c *sdkClient) WorkspaceExternalAgentCredentials(ctx context.Context, workspaceID uuid.UUID, agentName string) (codersdk.ExternalAgentCredentials, error) { + return c.coderClient.WorkspaceExternalAgentCredentials(ctx, workspaceID, agentName) } func (c *sdkClient) watchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error) { @@ -118,6 +98,7 @@ func (c *sdkClient) deleteWorkspace(ctx context.Context, workspaceID uuid.UUID) func (c *sdkClient) initialize(logger slog.Logger) { // Configure the coder client logging + c.logger = logger c.coderClient.SetLogger(logger) c.coderClient.SetLogBodies(true) } diff --git a/scaletest/taskstatus/run.go b/scaletest/taskstatus/run.go index 018ec72afedf0..87f0cbedd3b29 100644 --- a/scaletest/taskstatus/run.go +++ b/scaletest/taskstatus/run.go @@ -23,6 +23,12 @@ import ( const statusUpdatePrefix = "scaletest status update:" +// createExternalWorkspaceResult contains the results from creating an external workspace. +type createExternalWorkspaceResult struct { + workspaceID uuid.UUID + agentToken string +} + type Runner struct { client client patcher appStatusPatcher @@ -65,6 +71,10 @@ func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error { } }() + // ensure these labels are initialized, so we see the time series right away in prometheus. + r.cfg.Metrics.MissingStatusUpdatesTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0) + r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0) + logs = loadtestutil.NewSyncWriter(logs) r.logger = slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug).Named(name) r.client.initialize(r.logger) @@ -74,26 +84,23 @@ func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error { slog.F("template_id", r.cfg.TemplateID), slog.F("workspace_name", r.cfg.WorkspaceName)) - result, err := r.client.createExternalWorkspace(ctx, codersdk.CreateWorkspaceRequest{ + result, err := r.createExternalWorkspace(ctx, codersdk.CreateWorkspaceRequest{ TemplateID: r.cfg.TemplateID, Name: r.cfg.WorkspaceName, }) if err != nil { + r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Inc() return xerrors.Errorf("create external workspace: %w", err) } // Set the workspace ID - r.workspaceID = result.WorkspaceID + r.workspaceID = result.workspaceID r.logger.Info(ctx, "created external workspace", slog.F("workspace_id", r.workspaceID)) // Initialize the patcher with the agent token - r.patcher.initialize(r.logger, result.AgentToken) + r.patcher.initialize(r.logger, result.agentToken) r.logger.Info(ctx, "initialized app status patcher with agent token") - // ensure these labels are initialized, so we see the time series right away in prometheus. - r.cfg.Metrics.MissingStatusUpdatesTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0) - r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0) - workspaceUpdatesCtx, cancelWorkspaceUpdates := context.WithCancel(ctx) defer cancelWorkspaceUpdates() workspaceUpdatesResult := make(chan error, 1) @@ -257,3 +264,77 @@ func parseStatusMessage(message string) (int, bool) { } return msgNo, true } + +// createExternalWorkspace creates an external workspace and returns the workspace ID +// and agent token for the first external agent found in the workspace resources. +func (r *Runner) createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) { + // Create the workspace + workspace, err := r.client.CreateUserWorkspace(ctx, codersdk.Me, req) + if err != nil { + return createExternalWorkspaceResult{}, err + } + + r.logger.Info(ctx, "waiting for workspace build to complete", + slog.F("workspace_name", workspace.Name), + slog.F("workspace_id", workspace.ID)) + + // Poll the workspace until the build is complete + var finalWorkspace codersdk.Workspace + buildComplete := xerrors.New("build complete") // sentinel error + waiter := r.clock.TickerFunc(ctx, 30*time.Second, func() error { + // Get the workspace with latest build details + workspace, err := r.client.WorkspaceByOwnerAndName(ctx, codersdk.Me, workspace.Name, codersdk.WorkspaceOptions{}) + if err != nil { + r.logger.Error(ctx, "failed to poll workspace while waiting for build to complete", slog.Error(err)) + return nil + } + + jobStatus := workspace.LatestBuild.Job.Status + r.logger.Debug(ctx, "checking workspace build status", + slog.F("status", jobStatus), + slog.F("build_id", workspace.LatestBuild.ID)) + + switch jobStatus { + case codersdk.ProvisionerJobSucceeded: + // Build succeeded + r.logger.Info(ctx, "workspace build succeeded") + finalWorkspace = workspace + return buildComplete + case codersdk.ProvisionerJobFailed: + return xerrors.Errorf("workspace build failed: %s", workspace.LatestBuild.Job.Error) + case codersdk.ProvisionerJobCanceled: + return xerrors.Errorf("workspace build was canceled") + case codersdk.ProvisionerJobPending, codersdk.ProvisionerJobRunning, codersdk.ProvisionerJobCanceling: + // Still in progress, continue polling + return nil + default: + return xerrors.Errorf("unexpected job status: %s", jobStatus) + } + }, "createExternalWorkspace") + + err = waiter.Wait() + if err != nil && !xerrors.Is(err, buildComplete) { + return createExternalWorkspaceResult{}, xerrors.Errorf("wait for build completion: %w", err) + } + + // Find external agents in resources + for _, resource := range finalWorkspace.LatestBuild.Resources { + if resource.Type != "coder_external_agent" || len(resource.Agents) == 0 { + continue + } + + // Get credentials for the first agent + agent := resource.Agents[0] + credentials, err := r.client.WorkspaceExternalAgentCredentials(ctx, finalWorkspace.ID, agent.Name) + if err != nil { + return createExternalWorkspaceResult{}, err + } + + return createExternalWorkspaceResult{ + workspaceID: finalWorkspace.ID, + agentToken: credentials.AgentToken, + }, nil + } + + return createExternalWorkspaceResult{}, xerrors.Errorf("no external agent found in workspace") +} diff --git a/scaletest/taskstatus/run_internal_test.go b/scaletest/taskstatus/run_internal_test.go index fe5cdda04746b..7a82d4c6b2ad3 100644 --- a/scaletest/taskstatus/run_internal_test.go +++ b/scaletest/taskstatus/run_internal_test.go @@ -28,13 +28,17 @@ type fakeClient struct { logger slog.Logger // Channels for controlling the behavior - workspaceUpdatesCh chan codersdk.Workspace + workspaceUpdatesCh chan codersdk.Workspace + workspaceByOwnerAndNameStatus chan codersdk.ProvisionerJobStatus + workspaceByOwnerAndNameErrors chan error } func newFakeClient(t *testing.T) *fakeClient { return &fakeClient{ - t: t, - workspaceUpdatesCh: make(chan codersdk.Workspace), + t: t, + workspaceUpdatesCh: make(chan codersdk.Workspace), + workspaceByOwnerAndNameStatus: make(chan codersdk.ProvisionerJobStatus), + workspaceByOwnerAndNameErrors: make(chan error, 1), } } @@ -47,14 +51,62 @@ func (m *fakeClient) watchWorkspace(ctx context.Context, workspaceID uuid.UUID) return m.workspaceUpdatesCh, nil } -const testAgentToken = "test-agent-token" +const ( + testAgentToken = "test-agent-token" + testAgentName = "test-agent" + testWorkspaceName = "test-workspace" +) + +var ( + testWorkspaceID = uuid.UUID{1, 2, 3, 4} + testBuildID = uuid.UUID{5, 6, 7, 8} +) -func (m *fakeClient) createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) { - m.logger.Debug(ctx, "called fake CreateExternalWorkspace", slog.F("req", req)) - // Return a fake workspace ID and token for testing - return createExternalWorkspaceResult{ - WorkspaceID: uuid.UUID{1, 2, 3, 4}, // Fake workspace ID - AgentToken: testAgentToken, +func workspaceWithJobStatus(status codersdk.ProvisionerJobStatus) codersdk.Workspace { + return codersdk.Workspace{ + ID: testWorkspaceID, // Fake workspace ID + Name: testWorkspaceName, + LatestBuild: codersdk.WorkspaceBuild{ + ID: testBuildID, + Job: codersdk.ProvisionerJob{ + Status: status, + }, + Resources: []codersdk.WorkspaceResource{ + { + Type: "coder_external_agent", + Agents: []codersdk.WorkspaceAgent{ + { + Name: testAgentName, + }, + }, + }, + }, + }, + } +} + +func (m *fakeClient) CreateUserWorkspace(ctx context.Context, userID string, req codersdk.CreateWorkspaceRequest) (codersdk.Workspace, error) { + m.logger.Debug(ctx, "called fake CreateUserWorkspace", slog.F("user_id", userID), slog.F("req", req)) + return workspaceWithJobStatus(codersdk.ProvisionerJobPending), nil +} + +func (m *fakeClient) WorkspaceByOwnerAndName(ctx context.Context, owner string, name string, params codersdk.WorkspaceOptions) (codersdk.Workspace, error) { + m.logger.Debug(ctx, "called fake WorkspaceByOwnerAndName", slog.F("owner", owner), slog.F("name", name)) + status := <-m.workspaceByOwnerAndNameStatus + var err error + select { + case err = <-m.workspaceByOwnerAndNameErrors: + return codersdk.Workspace{}, err + default: + return workspaceWithJobStatus(status), nil + } +} + +func (m *fakeClient) WorkspaceExternalAgentCredentials(ctx context.Context, workspaceID uuid.UUID, agentName string) (codersdk.ExternalAgentCredentials, error) { + m.logger.Debug(ctx, "called fake WorkspaceExternalAgentCredentials", slog.F("workspace_id", workspaceID), slog.F("agent_name", agentName)) + // Return fake credentials for testing + return codersdk.ExternalAgentCredentials{ + AgentToken: testAgentToken, }, nil } @@ -145,10 +197,12 @@ func TestRunner_Run(t *testing.T) { reportTimes: make(map[int]time.Time), } - tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus") - defer tickerTrap.Close() + reportTickerTrap := mClock.Trap().TickerFunc("reportTaskStatus") + defer reportTickerTrap.Close() sinceTrap := mClock.Trap().Since("watchWorkspaceUpdates") defer sinceTrap.Close() + buildTickerTrap := mClock.Trap().TickerFunc("createExternalWorkspace") + defer buildTickerTrap.Close() // Run the runner in a goroutine runErr := make(chan error, 1) @@ -156,6 +210,12 @@ func TestRunner_Run(t *testing.T) { runErr <- runner.Run(ctx, "test-runner", testutil.NewTestLogWriter(t)) }() + // complete the build + buildTickerTrap.MustWait(ctx).MustRelease(ctx) + w := mClock.Advance(30 * time.Second) + testutil.RequireSend(ctx, t, fClient.workspaceByOwnerAndNameStatus, codersdk.ProvisionerJobSucceeded) + w.MustWait(ctx) + // Wait for the runner to connect and watch workspace connectedWaitGroup.Wait() @@ -163,7 +223,7 @@ func TestRunner_Run(t *testing.T) { close(startReporting) // Wait for the initial TickerFunc call before advancing time, otherwise our ticks will be off. - tickerTrap.MustWait(ctx).MustRelease(ctx) + reportTickerTrap.MustWait(ctx).MustRelease(ctx) // at this point, the patcher must be initialized require.Equal(t, testAgentToken, fPatcher.agentToken) @@ -263,6 +323,8 @@ func TestRunner_RunMissedUpdate(t *testing.T) { defer tickerTrap.Close() sinceTrap := mClock.Trap().Since("watchWorkspaceUpdates") defer sinceTrap.Close() + buildTickerTrap := mClock.Trap().TickerFunc("createExternalWorkspace") + defer buildTickerTrap.Close() // Run the runner in a goroutine runErr := make(chan error, 1) @@ -270,6 +332,12 @@ func TestRunner_RunMissedUpdate(t *testing.T) { runErr <- runner.Run(runCtx, "test-runner", testutil.NewTestLogWriter(t)) }() + // complete the build + buildTickerTrap.MustWait(testCtx).MustRelease(testCtx) + w := mClock.Advance(30 * time.Second) + testutil.RequireSend(testCtx, t, fClient.workspaceByOwnerAndNameStatus, codersdk.ProvisionerJobSucceeded) + w.MustWait(testCtx) + // Wait for the runner to connect and watch workspace connectedWaitGroup.Wait() @@ -378,13 +446,20 @@ func TestRunner_Run_WithErrors(t *testing.T) { tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus") defer tickerTrap.Close() - + buildTickerTrap := mClock.Trap().TickerFunc("createExternalWorkspace") + defer buildTickerTrap.Close() // Run the runner in a goroutine runErr := make(chan error, 1) go func() { runErr <- runner.Run(runCtx, "test-runner", testutil.NewTestLogWriter(t)) }() + // complete the build + buildTickerTrap.MustWait(testCtx).MustRelease(testCtx) + w := mClock.Advance(30 * time.Second) + testutil.RequireSend(testCtx, t, fClient.workspaceByOwnerAndNameStatus, codersdk.ProvisionerJobSucceeded) + w.MustWait(testCtx) + connectedWaitGroup.Wait() close(startReporting) @@ -430,6 +505,91 @@ func TestRunner_Run_WithErrors(t *testing.T) { assert.True(t, reportTaskStatusErrorsFound, "report task status errors metric not found") } +func TestRunner_Run_BuildFailed(t *testing.T) { + t.Parallel() + + testCtx := testutil.Context(t, testutil.WaitShort) + runCtx, cancel := context.WithCancel(testCtx) + defer cancel() + + mClock := quartz.NewMock(t) + fClient := newFakeClient(t) + fPatcher := newFakeAppStatusPatcher(t) + templateID := uuid.UUID{5, 6, 7, 8} + workspaceName := "test-workspace" + appSlug := "test-app" + + reg := prometheus.NewRegistry() + metrics := NewMetrics(reg, "test") + + connectedWaitGroup := &sync.WaitGroup{} + connectedWaitGroup.Add(1) + startReporting := make(chan struct{}) + + cfg := Config{ + TemplateID: templateID, + WorkspaceName: workspaceName, + AppSlug: appSlug, + ConnectedWaitGroup: connectedWaitGroup, + StartReporting: startReporting, + ReportStatusPeriod: 10 * time.Second, + ReportStatusDuration: 35 * time.Second, + Metrics: metrics, + MetricLabelValues: []string{"test"}, + } + runner := &Runner{ + client: fClient, + patcher: fPatcher, + cfg: cfg, + clock: mClock, + reportTimes: make(map[int]time.Time), + } + + buildTickerTrap := mClock.Trap().TickerFunc("createExternalWorkspace") + defer buildTickerTrap.Close() + // Run the runner in a goroutine + runErr := make(chan error, 1) + go func() { + runErr <- runner.Run(runCtx, "test-runner", testutil.NewTestLogWriter(t)) + }() + + // complete the build + buildTickerTrap.MustWait(testCtx).MustRelease(testCtx) + w := mClock.Advance(30 * time.Second) + testutil.RequireSend(testCtx, t, fClient.workspaceByOwnerAndNameStatus, codersdk.ProvisionerJobFailed) + w.MustWait(testCtx) + + connectedWaitGroup.Wait() + + // Wait for the runner to complete + err := testutil.RequireReceive(testCtx, t, runErr) + require.ErrorContains(t, err, "workspace build failed") + + // Verify metrics were updated correctly + metricFamilies, err := reg.Gather() + require.NoError(t, err) + + var missingUpdatesFound bool + var reportTaskStatusErrorsFound bool + for _, mf := range metricFamilies { + switch mf.GetName() { + case "coderd_scaletest_missing_status_updates_total": + missingUpdatesFound = true + require.Len(t, mf.GetMetric(), 1) + counter := mf.GetMetric()[0].GetCounter() + assert.Equal(t, float64(0), counter.GetValue()) + case "coderd_scaletest_report_task_status_errors_total": + reportTaskStatusErrorsFound = true + require.Len(t, mf.GetMetric(), 1) + counter := mf.GetMetric()[0].GetCounter() + assert.Equal(t, float64(1), counter.GetValue()) + } + } + + assert.True(t, missingUpdatesFound, "missing updates metric not found") + assert.True(t, reportTaskStatusErrorsFound, "report task status errors metric not found") +} + func TestParseStatusMessage(t *testing.T) { t.Parallel()