diff --git a/scaletest/taskstatus/client.go b/scaletest/taskstatus/client.go new file mode 100644 index 0000000000000..f4271f4f31a33 --- /dev/null +++ b/scaletest/taskstatus/client.go @@ -0,0 +1,148 @@ +package taskstatus + +import ( + "context" + "net/http" + "net/url" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/agentsdk" +) + +// createExternalWorkspaceResult contains the results from creating an external workspace. +type createExternalWorkspaceResult struct { + WorkspaceID uuid.UUID + AgentToken string +} + +// client abstracts the details of using codersdk.Client for workspace operations. +// This interface allows for easier testing by enabling mock implementations and +// provides a cleaner separation of concerns. +// +// The interface is designed to be initialized in two phases: +// 1. Create the client with newClient(coderClient) +// 2. Configure logging when the io.Writer is available in Run() +type client interface { + // createExternalWorkspace creates an external workspace and returns the workspace ID + // and agent token for the first external agent found in the workspace resources. + createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) + + // watchWorkspace watches for updates to a workspace. + watchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error) + + // initialize sets up the client with the provided logger, which is only available after Run() is called. + initialize(logger slog.Logger) +} + +// appStatusPatcher abstracts the details of using agentsdk.Client for updating app status. +// This interface is separate from client because it requires an agent token which is only +// available after creating an external workspace. +type appStatusPatcher interface { + // patchAppStatus updates the status of a workspace app. + patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error + + // initialize sets up the patcher with the provided logger and agent token. + initialize(logger slog.Logger, agentToken string) +} + +// sdkClient is the concrete implementation of the client interface using +// codersdk.Client. +type sdkClient struct { + coderClient *codersdk.Client +} + +// newClient creates a new client implementation using the provided codersdk.Client. +func newClient(coderClient *codersdk.Client) client { + return &sdkClient{ + coderClient: coderClient, + } +} + +func (c *sdkClient) createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) { + // Create the workspace + workspace, err := c.coderClient.CreateUserWorkspace(ctx, codersdk.Me, req) + if err != nil { + return createExternalWorkspaceResult{}, err + } + + // Get the workspace with latest build details + workspace, err = c.coderClient.WorkspaceByOwnerAndName(ctx, codersdk.Me, workspace.Name, codersdk.WorkspaceOptions{}) + if err != nil { + return createExternalWorkspaceResult{}, err + } + + // Find external agents in resources + for _, resource := range workspace.LatestBuild.Resources { + if resource.Type != "coder_external_agent" || len(resource.Agents) == 0 { + continue + } + + // Get credentials for the first agent + agent := resource.Agents[0] + credentials, err := c.coderClient.WorkspaceExternalAgentCredentials(ctx, workspace.ID, agent.Name) + if err != nil { + return createExternalWorkspaceResult{}, err + } + + return createExternalWorkspaceResult{ + WorkspaceID: workspace.ID, + AgentToken: credentials.AgentToken, + }, nil + } + + return createExternalWorkspaceResult{}, xerrors.Errorf("no external agent found in workspace") +} + +func (c *sdkClient) watchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error) { + return c.coderClient.WatchWorkspace(ctx, workspaceID) +} + +func (c *sdkClient) initialize(logger slog.Logger) { + // Configure the coder client logging + c.coderClient.SetLogger(logger) + c.coderClient.SetLogBodies(true) +} + +// sdkAppStatusPatcher is the concrete implementation of the appStatusPatcher interface +// using agentsdk.Client. +type sdkAppStatusPatcher struct { + agentClient *agentsdk.Client + url *url.URL + httpClient *http.Client +} + +// newAppStatusPatcher creates a new appStatusPatcher implementation. +func newAppStatusPatcher(client *codersdk.Client) appStatusPatcher { + return &sdkAppStatusPatcher{ + url: client.URL, + httpClient: client.HTTPClient, + } +} + +func (p *sdkAppStatusPatcher) patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error { + if p.agentClient == nil { + panic("agentClient not initialized - call initialize first") + } + return p.agentClient.PatchAppStatus(ctx, req) +} + +func (p *sdkAppStatusPatcher) initialize(logger slog.Logger, agentToken string) { + // Create and configure the agent client with the provided token + p.agentClient = agentsdk.New( + p.url, + agentsdk.WithFixedToken(agentToken), + codersdk.WithHTTPClient(p.httpClient), + codersdk.WithLogger(logger), + codersdk.WithLogBodies(), + ) +} + +// Ensure sdkClient implements the client interface. +var _ client = (*sdkClient)(nil) + +// Ensure sdkAppStatusPatcher implements the appStatusPatcher interface. +var _ appStatusPatcher = (*sdkAppStatusPatcher)(nil) diff --git a/scaletest/taskstatus/config.go b/scaletest/taskstatus/config.go new file mode 100644 index 0000000000000..1c3f26cfabfa1 --- /dev/null +++ b/scaletest/taskstatus/config.go @@ -0,0 +1,73 @@ +package taskstatus + +import ( + "sync" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" +) + +type Config struct { + // TemplateID is the template ID to use for creating the external workspace. + TemplateID uuid.UUID `json:"template_id"` + + // WorkspaceName is the name for the external workspace to create. + WorkspaceName string `json:"workspace_name"` + + // AppSlug is the slug of the app designated as the AI Agent. + AppSlug string `json:"app_slug"` + + // When the runner has connected to the watch-ws endpoint, it will call Done once on this wait group. Used to + // coordinate multiple runners from the higher layer. + ConnectedWaitGroup *sync.WaitGroup `json:"-"` + + // We read on this channel before starting to report task statuses. Used to coordinate multiple runners from the + // higher layer. + StartReporting chan struct{} `json:"-"` + + // Time between reporting task statuses. + ReportStatusPeriod time.Duration `json:"report_status_period"` + + // Total time to report task statuses, starting from when we successfully read from the StartReporting channel. + ReportStatusDuration time.Duration `json:"report_status_duration"` + + Metrics *Metrics `json:"-"` + MetricLabelValues []string `json:"metric_label_values"` +} + +func (c *Config) Validate() error { + if c.TemplateID == uuid.Nil { + return xerrors.Errorf("validate template_id: must not be nil") + } + + if c.WorkspaceName == "" { + return xerrors.Errorf("validate workspace_name: must not be empty") + } + + if c.AppSlug == "" { + return xerrors.Errorf("validate app_slug: must not be empty") + } + + if c.ConnectedWaitGroup == nil { + return xerrors.Errorf("validate connected_wait_group: must not be nil") + } + + if c.StartReporting == nil { + return xerrors.Errorf("validate start_reporting: must not be nil") + } + + if c.ReportStatusPeriod <= 0 { + return xerrors.Errorf("validate report_status_period: must be greater than zero") + } + + if c.ReportStatusDuration <= 0 { + return xerrors.Errorf("validate report_status_duration: must be greater than zero") + } + + if c.Metrics == nil { + return xerrors.Errorf("validate metrics: must not be nil") + } + + return nil +} diff --git a/scaletest/taskstatus/metrics.go b/scaletest/taskstatus/metrics.go new file mode 100644 index 0000000000000..1b312a41a3338 --- /dev/null +++ b/scaletest/taskstatus/metrics.go @@ -0,0 +1,36 @@ +package taskstatus + +import "github.com/prometheus/client_golang/prometheus" + +type Metrics struct { + TaskStatusToWorkspaceUpdateLatencySeconds prometheus.HistogramVec + MissingStatusUpdatesTotal prometheus.CounterVec + ReportTaskStatusErrorsTotal prometheus.CounterVec +} + +func NewMetrics(reg prometheus.Registerer, labelNames ...string) *Metrics { + m := &Metrics{ + TaskStatusToWorkspaceUpdateLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "coderd", + Subsystem: "scaletest", + Name: "task_status_to_workspace_update_latency_seconds", + Help: "Time in seconds between reporting a task status and receiving the workspace update.", + }, labelNames), + MissingStatusUpdatesTotal: *prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: "scaletest", + Name: "missing_status_updates_total", + Help: "Total number of missing status updates.", + }, labelNames), + ReportTaskStatusErrorsTotal: *prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: "scaletest", + Name: "report_task_status_errors_total", + Help: "Total number of errors when reporting task status.", + }, labelNames), + } + reg.MustRegister(m.TaskStatusToWorkspaceUpdateLatencySeconds) + reg.MustRegister(m.MissingStatusUpdatesTotal) + reg.MustRegister(m.ReportTaskStatusErrorsTotal) + return m +} diff --git a/scaletest/taskstatus/run.go b/scaletest/taskstatus/run.go new file mode 100644 index 0000000000000..59b11237d6c9e --- /dev/null +++ b/scaletest/taskstatus/run.go @@ -0,0 +1,232 @@ +package taskstatus + +import ( + "context" + "io" + "strconv" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/sloghuman" + + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/agentsdk" + "github.com/coder/coder/v2/scaletest/harness" + "github.com/coder/coder/v2/scaletest/loadtestutil" + "github.com/coder/quartz" +) + +const statusUpdatePrefix = "scaletest status update:" + +type Runner struct { + client client + patcher appStatusPatcher + cfg Config + + logger slog.Logger + + // workspaceID is set after creating the external workspace + workspaceID uuid.UUID + + mu sync.Mutex + reportTimes map[int]time.Time + doneReporting bool + + // testing only + clock quartz.Clock +} + +var _ harness.Runnable = &Runner{} + +// NewRunner creates a new Runner with the provided codersdk.Client and configuration. +func NewRunner(coderClient *codersdk.Client, cfg Config) *Runner { + return &Runner{ + client: newClient(coderClient), + patcher: newAppStatusPatcher(coderClient), + cfg: cfg, + clock: quartz.NewReal(), + reportTimes: make(map[int]time.Time), + } +} + +func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error { + shouldMarkConnectedDone := true + defer func() { + if shouldMarkConnectedDone { + r.cfg.ConnectedWaitGroup.Done() + } + }() + + logs = loadtestutil.NewSyncWriter(logs) + r.logger = slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug).Named(name) + r.client.initialize(r.logger) + + // Create the external workspace + r.logger.Info(ctx, "creating external workspace", + slog.F("template_id", r.cfg.TemplateID), + slog.F("workspace_name", r.cfg.WorkspaceName)) + + result, err := r.client.createExternalWorkspace(ctx, codersdk.CreateWorkspaceRequest{ + TemplateID: r.cfg.TemplateID, + Name: r.cfg.WorkspaceName, + }) + if err != nil { + return xerrors.Errorf("create external workspace: %w", err) + } + + // Set the workspace ID + r.workspaceID = result.WorkspaceID + r.logger.Info(ctx, "created external workspace", slog.F("workspace_id", r.workspaceID)) + + // Initialize the patcher with the agent token + r.patcher.initialize(r.logger, result.AgentToken) + r.logger.Info(ctx, "initialized app status patcher with agent token") + + // ensure these labels are initialized, so we see the time series right away in prometheus. + r.cfg.Metrics.MissingStatusUpdatesTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0) + r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0) + + workspaceUpdatesCtx, cancelWorkspaceUpdates := context.WithCancel(ctx) + defer cancelWorkspaceUpdates() + workspaceUpdatesResult := make(chan error, 1) + shouldMarkConnectedDone = false // we are passing this responsibility to the watchWorkspaceUpdates goroutine + go func() { + workspaceUpdatesResult <- r.watchWorkspaceUpdates(workspaceUpdatesCtx) + }() + + err = r.reportTaskStatus(ctx) + if err != nil { + return xerrors.Errorf("report task status: %w", err) + } + + err = <-workspaceUpdatesResult + if err != nil { + return xerrors.Errorf("watch workspace: %w", err) + } + return nil +} + +func (r *Runner) watchWorkspaceUpdates(ctx context.Context) error { + shouldMarkConnectedDone := true + defer func() { + if shouldMarkConnectedDone { + r.cfg.ConnectedWaitGroup.Done() + } + }() + updates, err := r.client.watchWorkspace(ctx, r.workspaceID) + if err != nil { + return xerrors.Errorf("watch workspace: %w", err) + } + shouldMarkConnectedDone = false + r.cfg.ConnectedWaitGroup.Done() + defer func() { + r.mu.Lock() + defer r.mu.Unlock() + r.cfg.Metrics.MissingStatusUpdatesTotal. + WithLabelValues(r.cfg.MetricLabelValues...). + Add(float64(len(r.reportTimes))) + }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case workspace := <-updates: + if workspace.LatestAppStatus == nil { + continue + } + msgNo, ok := parseStatusMessage(workspace.LatestAppStatus.Message) + if !ok { + continue + } + + r.mu.Lock() + reportTime, ok := r.reportTimes[msgNo] + delete(r.reportTimes, msgNo) + allDone := r.doneReporting && len(r.reportTimes) == 0 + r.mu.Unlock() + + if !ok { + return xerrors.Errorf("report time not found for message %d", msgNo) + } + latency := r.clock.Since(reportTime, "watchWorkspaceUpdates") + r.cfg.Metrics.TaskStatusToWorkspaceUpdateLatencySeconds. + WithLabelValues(r.cfg.MetricLabelValues...). + Observe(latency.Seconds()) + if allDone { + return nil + } + } + } +} + +func (r *Runner) reportTaskStatus(ctx context.Context) error { + defer func() { + r.mu.Lock() + defer r.mu.Unlock() + r.doneReporting = true + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-r.cfg.StartReporting: + r.logger.Info(ctx, "starting to report task status") + } + startedReporting := r.clock.Now("reportTaskStatus", "startedReporting") + msgNo := 0 + + done := xerrors.New("done reporting task status") // sentinel error + waiter := r.clock.TickerFunc(ctx, r.cfg.ReportStatusPeriod, func() error { + r.mu.Lock() + now := r.clock.Now("reportTaskStatus", "tick") + r.reportTimes[msgNo] = now + // It's important that we set doneReporting along with a final report, since the watchWorkspaceUpdates goroutine + // needs a update to wake up and check if we're done. We could introduce a secondary signaling channel, but + // it adds a lot of complexity and will be hard to test. We expect the tick period to be much smaller than the + // report status duration, so one extra tick is not a big deal. + if now.After(startedReporting.Add(r.cfg.ReportStatusDuration)) { + r.doneReporting = true + } + r.mu.Unlock() + + err := r.patcher.patchAppStatus(ctx, agentsdk.PatchAppStatus{ + AppSlug: r.cfg.AppSlug, + Message: statusUpdatePrefix + strconv.Itoa(msgNo), + State: codersdk.WorkspaceAppStatusStateWorking, + URI: "https://example.com/example-status/", + }) + if err != nil { + r.logger.Error(ctx, "failed to report task status", slog.Error(err)) + r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Inc() + } + msgNo++ + // note that it's safe to read r.doneReporting here without a lock because we're the only goroutine that sets + // it. + if r.doneReporting { + return done // causes the ticker to exit due to the sentinel error + } + return nil + }, "reportTaskStatus") + err := waiter.Wait() + if xerrors.Is(err, done) { + return nil + } + return err +} + +func parseStatusMessage(message string) (int, bool) { + if !strings.HasPrefix(message, statusUpdatePrefix) { + return 0, false + } + message = strings.TrimPrefix(message, statusUpdatePrefix) + msgNo, err := strconv.Atoi(message) + if err != nil { + return 0, false + } + return msgNo, true +} diff --git a/scaletest/taskstatus/run_internal_test.go b/scaletest/taskstatus/run_internal_test.go new file mode 100644 index 0000000000000..3821b794aeb2c --- /dev/null +++ b/scaletest/taskstatus/run_internal_test.go @@ -0,0 +1,482 @@ +package taskstatus + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/xerrors" + + "cdr.dev/slog" + "github.com/coder/quartz" + + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/agentsdk" + "github.com/coder/coder/v2/testutil" +) + +// fakeClient implements the client interface for testing +type fakeClient struct { + t *testing.T + logger slog.Logger + + // Channels for controlling the behavior + workspaceUpdatesCh chan codersdk.Workspace +} + +func newFakeClient(t *testing.T) *fakeClient { + return &fakeClient{ + t: t, + workspaceUpdatesCh: make(chan codersdk.Workspace), + } +} + +func (m *fakeClient) initialize(logger slog.Logger) { + m.logger = logger +} + +func (m *fakeClient) watchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error) { + m.logger.Debug(ctx, "called fake WatchWorkspace", slog.F("workspace_id", workspaceID.String())) + return m.workspaceUpdatesCh, nil +} + +const testAgentToken = "test-agent-token" + +func (m *fakeClient) createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) { + m.logger.Debug(ctx, "called fake CreateExternalWorkspace", slog.F("req", req)) + // Return a fake workspace ID and token for testing + return createExternalWorkspaceResult{ + WorkspaceID: uuid.UUID{1, 2, 3, 4}, // Fake workspace ID + AgentToken: testAgentToken, + }, nil +} + +// fakeAppStatusPatcher implements the appStatusPatcher interface for testing +type fakeAppStatusPatcher struct { + t *testing.T + logger slog.Logger + agentToken string + + // Channels for controlling the behavior + patchStatusCalls chan agentsdk.PatchAppStatus + patchStatusErrors chan error +} + +func newFakeAppStatusPatcher(t *testing.T) *fakeAppStatusPatcher { + return &fakeAppStatusPatcher{ + t: t, + patchStatusCalls: make(chan agentsdk.PatchAppStatus), + patchStatusErrors: make(chan error, 1), + } +} + +func (p *fakeAppStatusPatcher) initialize(logger slog.Logger, agentToken string) { + p.logger = logger + p.agentToken = agentToken +} + +func (p *fakeAppStatusPatcher) patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error { + assert.NotEmpty(p.t, p.agentToken) + p.logger.Debug(ctx, "called fake PatchAppStatus", slog.F("req", req)) + // Send the request to the channel so tests can verify it + select { + case p.patchStatusCalls <- req: + case <-ctx.Done(): + return ctx.Err() + } + + // Check if there's an error to return + select { + case err := <-p.patchStatusErrors: + return err + default: + return nil + } +} + +func TestRunner_Run(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + + mClock := quartz.NewMock(t) + fClient := newFakeClient(t) + fPatcher := newFakeAppStatusPatcher(t) + templateID := uuid.UUID{5, 6, 7, 8} + workspaceName := "test-workspace" + appSlug := "test-app" + + reg := prometheus.NewRegistry() + metrics := NewMetrics(reg, "test") + + connectedWaitGroup := &sync.WaitGroup{} + connectedWaitGroup.Add(1) + startReporting := make(chan struct{}) + + cfg := Config{ + TemplateID: templateID, + WorkspaceName: workspaceName, + AppSlug: appSlug, + ConnectedWaitGroup: connectedWaitGroup, + StartReporting: startReporting, + ReportStatusPeriod: 10 * time.Second, + ReportStatusDuration: 35 * time.Second, + Metrics: metrics, + MetricLabelValues: []string{"test"}, + } + runner := &Runner{ + client: fClient, + patcher: fPatcher, + cfg: cfg, + clock: mClock, + reportTimes: make(map[int]time.Time), + } + + tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus") + defer tickerTrap.Close() + sinceTrap := mClock.Trap().Since("watchWorkspaceUpdates") + defer sinceTrap.Close() + + // Run the runner in a goroutine + runErr := make(chan error, 1) + go func() { + runErr <- runner.Run(ctx, "test-runner", testutil.NewTestLogWriter(t)) + }() + + // Wait for the runner to connect and watch workspace + connectedWaitGroup.Wait() + + // Signal to start reporting + close(startReporting) + + // Wait for the initial TickerFunc call before advancing time, otherwise our ticks will be off. + tickerTrap.MustWait(ctx).MustRelease(ctx) + + // at this point, the patcher must be initialized + require.Equal(t, testAgentToken, fPatcher.agentToken) + + updateDelay := time.Duration(0) + for i := 0; i < 4; i++ { + tickWaiter := mClock.Advance((10 * time.Second) - updateDelay) + + patchCall := testutil.RequireReceive(ctx, t, fPatcher.patchStatusCalls) + require.Equal(t, appSlug, patchCall.AppSlug) + require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), patchCall.Message) + require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, patchCall.State) + tickWaiter.MustWait(ctx) + + // Send workspace update 1, 2, 3, or 4 seconds after the report + updateDelay = time.Duration(i+1) * time.Second + mClock.Advance(updateDelay) + + workspace := codersdk.Workspace{ + LatestAppStatus: &codersdk.WorkspaceAppStatus{ + Message: fmt.Sprintf("scaletest status update:%d", i), + }, + } + testutil.RequireSend(ctx, t, fClient.workspaceUpdatesCh, workspace) + sinceTrap.MustWait(ctx).MustRelease(ctx) + } + + // Wait for the runner to complete + err := testutil.RequireReceive(ctx, t, runErr) + require.NoError(t, err) + + // Verify metrics were updated correctly + metricFamilies, err := reg.Gather() + require.NoError(t, err) + + var latencyMetricFound bool + var missingUpdatesFound bool + for _, mf := range metricFamilies { + switch mf.GetName() { + case "coderd_scaletest_task_status_to_workspace_update_latency_seconds": + latencyMetricFound = true + require.Len(t, mf.GetMetric(), 1) + hist := mf.GetMetric()[0].GetHistogram() + assert.Equal(t, uint64(4), hist.GetSampleCount()) + case "coderd_scaletest_missing_status_updates_total": + missingUpdatesFound = true + require.Len(t, mf.GetMetric(), 1) + counter := mf.GetMetric()[0].GetCounter() + assert.Equal(t, float64(0), counter.GetValue()) + } + } + assert.True(t, latencyMetricFound, "latency metric not found") + assert.True(t, missingUpdatesFound, "missing updates metric not found") +} + +func TestRunner_RunMissedUpdate(t *testing.T) { + t.Parallel() + + testCtx := testutil.Context(t, testutil.WaitShort) + runCtx, cancel := context.WithCancel(testCtx) + defer cancel() + + mClock := quartz.NewMock(t) + fClient := newFakeClient(t) + fPatcher := newFakeAppStatusPatcher(t) + templateID := uuid.UUID{5, 6, 7, 8} + workspaceName := "test-workspace" + appSlug := "test-app" + + reg := prometheus.NewRegistry() + metrics := NewMetrics(reg, "test") + + connectedWaitGroup := &sync.WaitGroup{} + connectedWaitGroup.Add(1) + startReporting := make(chan struct{}) + + cfg := Config{ + TemplateID: templateID, + WorkspaceName: workspaceName, + AppSlug: appSlug, + ConnectedWaitGroup: connectedWaitGroup, + StartReporting: startReporting, + ReportStatusPeriod: 10 * time.Second, + ReportStatusDuration: 35 * time.Second, + Metrics: metrics, + MetricLabelValues: []string{"test"}, + } + runner := &Runner{ + client: fClient, + patcher: fPatcher, + cfg: cfg, + clock: mClock, + reportTimes: make(map[int]time.Time), + } + + tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus") + defer tickerTrap.Close() + sinceTrap := mClock.Trap().Since("watchWorkspaceUpdates") + defer sinceTrap.Close() + + // Run the runner in a goroutine + runErr := make(chan error, 1) + go func() { + runErr <- runner.Run(runCtx, "test-runner", testutil.NewTestLogWriter(t)) + }() + + // Wait for the runner to connect and watch workspace + connectedWaitGroup.Wait() + + // Signal to start reporting + close(startReporting) + + // Wait for the initial TickerFunc call before advancing time, otherwise our ticks will be off. + tickerTrap.MustWait(testCtx).MustRelease(testCtx) + + updateDelay := time.Duration(0) + for i := 0; i < 4; i++ { + tickWaiter := mClock.Advance((10 * time.Second) - updateDelay) + patchCall := testutil.RequireReceive(testCtx, t, fPatcher.patchStatusCalls) + require.Equal(t, appSlug, patchCall.AppSlug) + require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), patchCall.Message) + require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, patchCall.State) + tickWaiter.MustWait(testCtx) + + // Send workspace update 1, 2, 3, or 4 seconds after the report + updateDelay = time.Duration(i+1) * time.Second + mClock.Advance(updateDelay) + + workspace := codersdk.Workspace{ + LatestAppStatus: &codersdk.WorkspaceAppStatus{ + Message: fmt.Sprintf("scaletest status update:%d", i), + }, + } + if i != 2 { + // skip the third update, to test that we report missed updates and still complete. + testutil.RequireSend(testCtx, t, fClient.workspaceUpdatesCh, workspace) + sinceTrap.MustWait(testCtx).MustRelease(testCtx) + } + } + + // Cancel the run context to simulate the runner being killed. + cancel() + + // Wait for the runner to complete + err := testutil.RequireReceive(testCtx, t, runErr) + require.ErrorIs(t, err, context.Canceled) + + // Verify metrics were updated correctly + metricFamilies, err := reg.Gather() + require.NoError(t, err) + + // Check that metrics were recorded + var latencyMetricFound bool + var missingUpdatesFound bool + for _, mf := range metricFamilies { + switch mf.GetName() { + case "coderd_scaletest_task_status_to_workspace_update_latency_seconds": + latencyMetricFound = true + require.Len(t, mf.GetMetric(), 1) + hist := mf.GetMetric()[0].GetHistogram() + assert.Equal(t, uint64(3), hist.GetSampleCount()) + case "coderd_scaletest_missing_status_updates_total": + missingUpdatesFound = true + require.Len(t, mf.GetMetric(), 1) + counter := mf.GetMetric()[0].GetCounter() + assert.Equal(t, float64(1), counter.GetValue()) + } + } + assert.True(t, latencyMetricFound, "latency metric not found") + assert.True(t, missingUpdatesFound, "missing updates metric not found") +} + +func TestRunner_Run_WithErrors(t *testing.T) { + t.Parallel() + + testCtx := testutil.Context(t, testutil.WaitShort) + runCtx, cancel := context.WithCancel(testCtx) + defer cancel() + + mClock := quartz.NewMock(t) + fClient := newFakeClient(t) + fPatcher := newFakeAppStatusPatcher(t) + templateID := uuid.UUID{5, 6, 7, 8} + workspaceName := "test-workspace" + appSlug := "test-app" + + reg := prometheus.NewRegistry() + metrics := NewMetrics(reg, "test") + + connectedWaitGroup := &sync.WaitGroup{} + connectedWaitGroup.Add(1) + startReporting := make(chan struct{}) + + cfg := Config{ + TemplateID: templateID, + WorkspaceName: workspaceName, + AppSlug: appSlug, + ConnectedWaitGroup: connectedWaitGroup, + StartReporting: startReporting, + ReportStatusPeriod: 10 * time.Second, + ReportStatusDuration: 35 * time.Second, + Metrics: metrics, + MetricLabelValues: []string{"test"}, + } + runner := &Runner{ + client: fClient, + patcher: fPatcher, + cfg: cfg, + clock: mClock, + reportTimes: make(map[int]time.Time), + } + + tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus") + defer tickerTrap.Close() + + // Run the runner in a goroutine + runErr := make(chan error, 1) + go func() { + runErr <- runner.Run(runCtx, "test-runner", testutil.NewTestLogWriter(t)) + }() + + connectedWaitGroup.Wait() + close(startReporting) + + // Wait for the initial TickerFunc call before advancing time, otherwise our ticks will be off. + tickerTrap.MustWait(testCtx).MustRelease(testCtx) + + for i := 0; i < 4; i++ { + tickWaiter := mClock.Advance(10 * time.Second) + testutil.RequireSend(testCtx, t, fPatcher.patchStatusErrors, xerrors.New("a bad thing happened")) + _ = testutil.RequireReceive(testCtx, t, fPatcher.patchStatusCalls) + tickWaiter.MustWait(testCtx) + } + + // Cancel the run context to simulate the runner being killed. + cancel() + + // Wait for the runner to complete + err := testutil.RequireReceive(testCtx, t, runErr) + require.ErrorIs(t, err, context.Canceled) + + // Verify metrics were updated correctly + metricFamilies, err := reg.Gather() + require.NoError(t, err) + + var missingUpdatesFound bool + var reportTaskStatusErrorsFound bool + for _, mf := range metricFamilies { + switch mf.GetName() { + case "coderd_scaletest_missing_status_updates_total": + missingUpdatesFound = true + require.Len(t, mf.GetMetric(), 1) + counter := mf.GetMetric()[0].GetCounter() + assert.Equal(t, float64(4), counter.GetValue()) + case "coderd_scaletest_report_task_status_errors_total": + reportTaskStatusErrorsFound = true + require.Len(t, mf.GetMetric(), 1) + counter := mf.GetMetric()[0].GetCounter() + assert.Equal(t, float64(4), counter.GetValue()) + } + } + + assert.True(t, missingUpdatesFound, "missing updates metric not found") + assert.True(t, reportTaskStatusErrorsFound, "report task status errors metric not found") +} + +func TestParseStatusMessage(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + message string + wantNum int + wantOk bool + }{ + { + name: "valid message", + message: "scaletest status update:42", + wantNum: 42, + wantOk: true, + }, + { + name: "valid message zero", + message: "scaletest status update:0", + wantNum: 0, + wantOk: true, + }, + { + name: "invalid prefix", + message: "wrong prefix:42", + wantNum: 0, + wantOk: false, + }, + { + name: "invalid number", + message: "scaletest status update:abc", + wantNum: 0, + wantOk: false, + }, + { + name: "empty message", + message: "", + wantNum: 0, + wantOk: false, + }, + { + name: "missing number", + message: "scaletest status update:", + wantNum: 0, + wantOk: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + gotNum, gotOk := parseStatusMessage(tt.message) + assert.Equal(t, tt.wantNum, gotNum) + assert.Equal(t, tt.wantOk, gotOk) + }) + } +} diff --git a/testutil/logger.go b/testutil/logger.go index 88b6e20bada51..051d7c9daf1ab 100644 --- a/testutil/logger.go +++ b/testutil/logger.go @@ -2,7 +2,9 @@ package testutil import ( "context" + "io" "strings" + "sync" "testing" "github.com/hashicorp/yamux" @@ -51,3 +53,31 @@ func isQueryCanceledError(err error) bool { } return false } + +type testLogWriter struct { + t testing.TB + mu sync.Mutex + testOver bool +} + +func NewTestLogWriter(t testing.TB) io.Writer { + w := &testLogWriter{t: t} + t.Cleanup(func() { + w.mu.Lock() + defer w.mu.Unlock() + w.testOver = true + }) + return w +} + +func (w *testLogWriter) Write(p []byte) (n int, err error) { + n = len(p) + w.mu.Lock() + defer w.mu.Unlock() + if w.testOver { + return n, nil + } + w.t.Logf("%q", string(p)) + + return n, nil +}