Skip to content

Commit bd4b0c6

Browse files
committed
feat: add task status reporting load generator runner
1 parent dc27761 commit bd4b0c6

File tree

6 files changed

+837
-0
lines changed

6 files changed

+837
-0
lines changed

scaletest/taskstatus/client.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package taskstatus
2+
3+
import (
4+
"context"
5+
6+
"github.com/google/uuid"
7+
8+
"cdr.dev/slog"
9+
"github.com/coder/coder/v2/codersdk"
10+
"github.com/coder/coder/v2/codersdk/agentsdk"
11+
)
12+
13+
// client abstracts the details of using codersdk.Client and agentsdk.Client
14+
// for the taskstatus runner. This interface allows for easier testing by enabling
15+
// mock implementations and provides a cleaner separation of concerns.
16+
//
17+
// The interface is designed to be initialized in two phases:
18+
// 1. Create the client with NewClient(coderClient)
19+
// 2. Configure logging when the io.Writer is available in Run()
20+
type client interface {
21+
// WatchWorkspace watches for updates to a workspace.
22+
WatchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error)
23+
24+
// PatchAppStatus updates the status of a workspace app.
25+
PatchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error
26+
27+
// initialize sets up the client with the provided logger, which is only available after Run() is called.
28+
initialize(logger slog.Logger)
29+
}
30+
31+
// sdkClient is the concrete implementation of the client interface using
32+
// codersdk.Client and agentsdk.Client.
33+
type sdkClient struct {
34+
coderClient *codersdk.Client
35+
agentClient *agentsdk.Client
36+
}
37+
38+
// newClient creates a new client implementation using the provided codersdk.Client.
39+
func newClient(coderClient *codersdk.Client) client {
40+
return &sdkClient{
41+
coderClient: coderClient,
42+
}
43+
}
44+
45+
func (c *sdkClient) WatchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error) {
46+
return c.coderClient.WatchWorkspace(ctx, workspaceID)
47+
}
48+
49+
func (c *sdkClient) PatchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error {
50+
if c.agentClient == nil {
51+
panic("agentClient not initialized - call initialize first")
52+
}
53+
return c.agentClient.PatchAppStatus(ctx, req)
54+
}
55+
56+
func (c *sdkClient) initialize(logger slog.Logger) {
57+
// Configure the coder client logging
58+
c.coderClient.SetLogger(logger)
59+
c.coderClient.SetLogBodies(true)
60+
61+
// Create and configure the agent client with the same logging settings
62+
c.agentClient = agentsdk.New(
63+
c.coderClient.URL,
64+
agentsdk.WithFixedToken(c.coderClient.SessionTokenProvider.GetSessionToken()),
65+
codersdk.WithLogger(logger),
66+
codersdk.WithLogBodies(),
67+
)
68+
}
69+
70+
// Ensure sdkClient implements the client interface.
71+
var _ client = (*sdkClient)(nil)

scaletest/taskstatus/config.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package taskstatus
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/google/uuid"
8+
"golang.org/x/xerrors"
9+
)
10+
11+
type Config struct {
12+
// AgentID is the workspace agent ID to which to connect.
13+
AgentID uuid.UUID `json:"agent_id"`
14+
15+
// WorkspaceID is the workspace ID to watch.
16+
WorkspaceID uuid.UUID `json:"workspace_id"`
17+
18+
// AppSlug is the slug of the app designated as the AI Agent.
19+
AppSlug string `json:"app_slug"`
20+
21+
// When the runner has connected to the watch-ws endpoint, it will call Done once on this wait group. Used to
22+
// coordinate multiple runners from the higher layer.
23+
ConnectedWaitGroup *sync.WaitGroup `json:"-"`
24+
25+
// We read on this channel before starting to report task statuses. Used to coordinate multiple runners from the
26+
// higher layer.
27+
StartReporting chan struct{} `json:"-"`
28+
29+
// Time between reporting task statuses.
30+
ReportStatusPeriod time.Duration `json:"report_status_period"`
31+
32+
// Total time to report task statuses, starting from when we successfully read from the StartReporing channel.
33+
ReportStatusDuration time.Duration `json:"report_status_duration"`
34+
35+
Metrics *Metrics `json:"-"`
36+
MetricLabelValues []string `json:"metric_label_values"`
37+
}
38+
39+
func (c *Config) Validate() error {
40+
if c.AgentID == uuid.Nil {
41+
return xerrors.Errorf("validate agent_id: must not be nil")
42+
}
43+
44+
if c.AppSlug == "" {
45+
return xerrors.Errorf("validate app_slug: must not be empty")
46+
}
47+
48+
if c.ConnectedWaitGroup == nil {
49+
return xerrors.Errorf("validate connected_wait_group: must not be nil")
50+
}
51+
52+
if c.StartReporting == nil {
53+
return xerrors.Errorf("validate start_reporting: must not be nil")
54+
}
55+
56+
if c.ReportStatusPeriod <= 0 {
57+
return xerrors.Errorf("validate report_status_period: must be greater than zero")
58+
}
59+
60+
if c.ReportStatusDuration <= 0 {
61+
return xerrors.Errorf("validate report_status_duration: must be greater than zero")
62+
}
63+
64+
if c.Metrics == nil {
65+
return xerrors.Errorf("validate metrics: must not be nil")
66+
}
67+
68+
return nil
69+
}

scaletest/taskstatus/metrics.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package taskstatus
2+
3+
import "github.com/prometheus/client_golang/prometheus"
4+
5+
type Metrics struct {
6+
TaskStatusToWorkspaceUpdateLatencySeconds prometheus.HistogramVec
7+
MissingStatusUpdatesTotal prometheus.CounterVec
8+
ReportTaskStatusErrorsTotal prometheus.CounterVec
9+
}
10+
11+
func NewMetrics(reg prometheus.Registerer, labelNames ...string) *Metrics {
12+
m := &Metrics{
13+
TaskStatusToWorkspaceUpdateLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{
14+
Namespace: "coderd",
15+
Subsystem: "scaletest",
16+
Name: "task_status_to_workspace_update_latency_seconds",
17+
Help: "Time in seconds between reporting a task status and receiving the workspace update.",
18+
}, labelNames),
19+
MissingStatusUpdatesTotal: *prometheus.NewCounterVec(prometheus.CounterOpts{
20+
Namespace: "coderd",
21+
Subsystem: "scaletest",
22+
Name: "missing_status_updates_total",
23+
Help: "Total number of missing status updates.",
24+
}, labelNames),
25+
ReportTaskStatusErrorsTotal: *prometheus.NewCounterVec(prometheus.CounterOpts{
26+
Namespace: "coderd",
27+
Subsystem: "scaletest",
28+
Name: "report_task_status_errors_total",
29+
Help: "Total number of errors when reporting task status.",
30+
}, labelNames),
31+
}
32+
reg.MustRegister(m.TaskStatusToWorkspaceUpdateLatencySeconds)
33+
reg.MustRegister(m.MissingStatusUpdatesTotal)
34+
reg.MustRegister(m.ReportTaskStatusErrorsTotal)
35+
return m
36+
}

scaletest/taskstatus/run.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package taskstatus
2+
3+
import (
4+
"context"
5+
"io"
6+
"strconv"
7+
"strings"
8+
"sync"
9+
"time"
10+
11+
"golang.org/x/xerrors"
12+
13+
"cdr.dev/slog"
14+
"cdr.dev/slog/sloggers/sloghuman"
15+
16+
"github.com/coder/coder/v2/codersdk"
17+
"github.com/coder/coder/v2/codersdk/agentsdk"
18+
"github.com/coder/coder/v2/scaletest/harness"
19+
"github.com/coder/coder/v2/scaletest/loadtestutil"
20+
"github.com/coder/quartz"
21+
)
22+
23+
const statusUpdatePrefix = "scaletest status update:"
24+
25+
type Runner struct {
26+
client client
27+
cfg Config
28+
29+
logger slog.Logger
30+
31+
mu sync.Mutex
32+
reportTimes map[int]time.Time
33+
doneReporting bool
34+
35+
// testing only
36+
clock quartz.Clock
37+
}
38+
39+
var _ harness.Runnable = &Runner{}
40+
41+
// NewRunner creates a new Runner with the provided codersdk.Client and configuration.
42+
func NewRunner(coderClient *codersdk.Client, cfg Config) *Runner {
43+
return &Runner{
44+
client: newClient(coderClient),
45+
cfg: cfg,
46+
clock: quartz.NewReal(),
47+
reportTimes: make(map[int]time.Time),
48+
}
49+
}
50+
51+
func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error {
52+
logs = loadtestutil.NewSyncWriter(logs)
53+
r.logger = slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug).Named(name)
54+
r.client.initialize(r.logger)
55+
56+
// ensure these labels are initialized, so we see the time series right away in prometheus.
57+
r.cfg.Metrics.MissingStatusUpdatesTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
58+
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
59+
60+
workspaceUpdatesCtx, cancelWorkspaceUpdates := context.WithCancel(ctx)
61+
defer cancelWorkspaceUpdates()
62+
workspaceUpdatesResult := make(chan error, 1)
63+
go func() {
64+
workspaceUpdatesResult <- r.watchWorkspaceUpdates(workspaceUpdatesCtx)
65+
}()
66+
67+
err := r.reportTaskStatus(ctx)
68+
if err != nil {
69+
return xerrors.Errorf("report task status: %w", err)
70+
}
71+
72+
err = <-workspaceUpdatesResult
73+
if err != nil {
74+
return xerrors.Errorf("watch workspace: %w", err)
75+
}
76+
return nil
77+
}
78+
79+
func (r *Runner) watchWorkspaceUpdates(ctx context.Context) error {
80+
updates, err := r.client.WatchWorkspace(ctx, r.cfg.WorkspaceID)
81+
if err != nil {
82+
return xerrors.Errorf("watch workspace: %w", err)
83+
}
84+
r.cfg.ConnectedWaitGroup.Done()
85+
defer func() {
86+
r.mu.Lock()
87+
defer r.mu.Unlock()
88+
r.cfg.Metrics.MissingStatusUpdatesTotal.
89+
WithLabelValues(r.cfg.MetricLabelValues...).
90+
Add(float64(len(r.reportTimes)))
91+
}()
92+
for {
93+
select {
94+
case <-ctx.Done():
95+
return ctx.Err()
96+
case workspace := <-updates:
97+
if workspace.LatestAppStatus == nil {
98+
continue
99+
}
100+
msgNo, ok := parseStatusMessage(workspace.LatestAppStatus.Message)
101+
if !ok {
102+
continue
103+
}
104+
105+
r.mu.Lock()
106+
reportTime, ok := r.reportTimes[msgNo]
107+
delete(r.reportTimes, msgNo)
108+
allDone := r.doneReporting && len(r.reportTimes) == 0
109+
r.mu.Unlock()
110+
111+
if !ok {
112+
return xerrors.Errorf("report time not found for message %d", msgNo)
113+
}
114+
latency := r.clock.Since(reportTime, "watchWorkspaceUpdates")
115+
r.cfg.Metrics.TaskStatusToWorkspaceUpdateLatencySeconds.
116+
WithLabelValues(r.cfg.MetricLabelValues...).
117+
Observe(latency.Seconds())
118+
if allDone {
119+
return nil
120+
}
121+
}
122+
}
123+
}
124+
125+
func (r *Runner) reportTaskStatus(ctx context.Context) error {
126+
defer func() {
127+
r.mu.Lock()
128+
defer r.mu.Unlock()
129+
r.doneReporting = true
130+
}()
131+
132+
select {
133+
case <-ctx.Done():
134+
return ctx.Err()
135+
case <-r.cfg.StartReporting:
136+
r.logger.Info(ctx, "starting to report task status")
137+
}
138+
startedReporting := r.clock.Now("reportTaskStatus", "startedReporting")
139+
msgNo := 0
140+
141+
done := xerrors.New("done reporting task status") // sentinel error
142+
waiter := r.clock.TickerFunc(ctx, r.cfg.ReportStatusPeriod, func() error {
143+
r.mu.Lock()
144+
now := r.clock.Now("reportTaskStatus", "tick")
145+
r.reportTimes[msgNo] = now
146+
// It's important that we set doneReporting along with a final report, since the watchWorkspaceUpdates goroutine
147+
// needs a update to wake up and check if we're done. We could introduce a secondary signaling channel, but
148+
// it adds a lot of complexity and will be hard to test. We expect the tick period to be much smaller than the
149+
// report status duration, so one extra tick is not a big deal.
150+
if now.After(startedReporting.Add(r.cfg.ReportStatusDuration)) {
151+
r.doneReporting = true
152+
}
153+
r.mu.Unlock()
154+
155+
err := r.client.PatchAppStatus(ctx, agentsdk.PatchAppStatus{
156+
AppSlug: r.cfg.AppSlug,
157+
Message: statusUpdatePrefix + strconv.Itoa(msgNo),
158+
State: codersdk.WorkspaceAppStatusStateWorking,
159+
URI: "https://example.com/example-status/",
160+
})
161+
if err != nil {
162+
r.logger.Error(ctx, "failed to report task status", slog.Error(err))
163+
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Inc()
164+
}
165+
msgNo++
166+
// note that it's safe to read r.doneReporting here without a lock because we're the only goroutine that sets
167+
// it.
168+
if r.doneReporting {
169+
return done // causes the ticker to exit due to the sentinel error
170+
}
171+
return nil
172+
}, "reportTaskStatus")
173+
err := waiter.Wait()
174+
if xerrors.Is(err, done) {
175+
return nil
176+
}
177+
return err
178+
}
179+
180+
func parseStatusMessage(message string) (int, bool) {
181+
if !strings.HasPrefix(message, statusUpdatePrefix) {
182+
return 0, false
183+
}
184+
message = strings.TrimPrefix(message, statusUpdatePrefix)
185+
msgNo, err := strconv.Atoi(message)
186+
if err != nil {
187+
return 0, false
188+
}
189+
return msgNo, true
190+
}

0 commit comments

Comments
 (0)