diff --git a/cli/exp_scaletest.go b/cli/exp_scaletest.go index 419b1955477b9..80fdadaf50647 100644 --- a/cli/exp_scaletest.go +++ b/cli/exp_scaletest.go @@ -48,6 +48,8 @@ import ( const scaletestTracerName = "coder_scaletest" +var BypassHeader = map[string][]string{codersdk.BypassRatelimitHeader: {"true"}} + func (r *RootCmd) scaletestCmd() *serpent.Command { cmd := &serpent.Command{ Use: "scaletest", @@ -690,15 +692,6 @@ func (r *RootCmd) scaletestCreateWorkspaces() *serpent.Command { return err } - client.HTTPClient = &http.Client{ - Transport: &codersdk.HeaderTransport{ - Transport: http.DefaultTransport, - Header: map[string][]string{ - codersdk.BypassRatelimitHeader: {"true"}, - }, - }, - } - if count <= 0 { return xerrors.Errorf("--count is required and must be greater than 0") } @@ -810,7 +803,13 @@ func (r *RootCmd) scaletestCreateWorkspaces() *serpent.Command { return xerrors.Errorf("validate config: %w", err) } - var runner harness.Runnable = createworkspaces.NewRunner(client, config) + // use an independent client for each Runner, so they don't reuse TCP connections. This can lead to + // requests being unbalanced among Coder instances. + runnerClient, err := loadtestutil.DupClientCopyingHeaders(client, BypassHeader) + if err != nil { + return xerrors.Errorf("create runner client: %w", err) + } + var runner harness.Runnable = createworkspaces.NewRunner(runnerClient, config) if tracingEnabled { runner = &runnableTraceWrapper{ tracer: tracer, @@ -1011,15 +1010,6 @@ func (r *RootCmd) scaletestWorkspaceUpdates() *serpent.Command { return err } - client.HTTPClient = &http.Client{ - Transport: &codersdk.HeaderTransport{ - Transport: http.DefaultTransport, - Header: map[string][]string{ - codersdk.BypassRatelimitHeader: {"true"}, - }, - }, - } - if workspaceCount <= 0 { return xerrors.Errorf("--workspace-count must be greater than 0") } @@ -1158,7 +1148,14 @@ func (r *RootCmd) scaletestWorkspaceUpdates() *serpent.Command { for i, config := range configs { name := fmt.Sprintf("workspaceupdates-%dw", config.WorkspaceCount) id := strconv.Itoa(i) - var runner harness.Runnable = workspaceupdates.NewRunner(client, config) + + // use an independent client for each Runner, so they don't reuse TCP connections. This can lead to + // requests being unbalanced among Coder instances. + runnerClient, err := loadtestutil.DupClientCopyingHeaders(client, BypassHeader) + if err != nil { + return xerrors.Errorf("create runner client: %w", err) + } + var runner harness.Runnable = workspaceupdates.NewRunner(runnerClient, config) if tracingEnabled { runner = &runnableTraceWrapper{ tracer: tracer, @@ -1315,16 +1312,6 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *serpent.Command { prometheusSrvClose := ServeHandler(ctx, logger, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), prometheusFlags.Address, "prometheus") defer prometheusSrvClose() - // Bypass rate limiting - client.HTTPClient = &http.Client{ - Transport: &codersdk.HeaderTransport{ - Transport: http.DefaultTransport, - Header: map[string][]string{ - codersdk.BypassRatelimitHeader: {"true"}, - }, - }, - } - workspaces, err := targetFlags.getTargetedWorkspaces(ctx, client, me.OrganizationIDs, inv.Stdout) if err != nil { return err @@ -1421,7 +1408,13 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *serpent.Command { if err := config.Validate(); err != nil { return xerrors.Errorf("validate config: %w", err) } - var runner harness.Runnable = workspacetraffic.NewRunner(client, config) + // use an independent client for each Runner, so they don't reuse TCP connections. This can lead to + // requests being unbalanced among Coder instances. + runnerClient, err := loadtestutil.DupClientCopyingHeaders(client, BypassHeader) + if err != nil { + return xerrors.Errorf("create runner client: %w", err) + } + var runner harness.Runnable = workspacetraffic.NewRunner(runnerClient, config) if tracingEnabled { runner = &runnableTraceWrapper{ tracer: tracer, @@ -1609,9 +1602,13 @@ func (r *RootCmd) scaletestDashboard() *serpent.Command { return xerrors.Errorf("create token for user: %w", err) } - userClient := codersdk.New(client.URL, - codersdk.WithSessionToken(userTokResp.Key), - ) + // use an independent client for each Runner, so they don't reuse TCP connections. This can lead to + // requests being unbalanced among Coder instances. + userClient, err := loadtestutil.DupClientCopyingHeaders(client, BypassHeader) + if err != nil { + return xerrors.Errorf("create runner client: %w", err) + } + codersdk.WithSessionToken(userTokResp.Key)(userClient) config := dashboard.Config{ Interval: interval, @@ -1758,15 +1755,6 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command { return err } - client.HTTPClient = &http.Client{ - Transport: &codersdk.HeaderTransport{ - Transport: http.DefaultTransport, - Header: map[string][]string{ - codersdk.BypassRatelimitHeader: {"true"}, - }, - }, - } - if workspaceCount <= 0 { return xerrors.Errorf("--workspace-count must be greater than zero") } @@ -1832,7 +1820,13 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command { if err := config.Validate(); err != nil { return xerrors.Errorf("validate config: %w", err) } - var runner harness.Runnable = autostart.NewRunner(client, config) + // use an independent client for each Runner, so they don't reuse TCP connections. This can lead to + // requests being unbalanced among Coder instances. + runnerClient, err := loadtestutil.DupClientCopyingHeaders(client, BypassHeader) + if err != nil { + return xerrors.Errorf("create runner client: %w", err) + } + var runner harness.Runnable = autostart.NewRunner(runnerClient, config) if tracingEnabled { runner = &runnableTraceWrapper{ tracer: tracer, diff --git a/cli/exp_scaletest_dynamicparameters.go b/cli/exp_scaletest_dynamicparameters.go index 31b6766ac6acf..ba34ff27ea98b 100644 --- a/cli/exp_scaletest_dynamicparameters.go +++ b/cli/exp_scaletest_dynamicparameters.go @@ -4,18 +4,18 @@ package cli import ( "fmt" - "net/http" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/xerrors" + "github.com/coder/coder/v2/scaletest/loadtestutil" + "cdr.dev/slog" "cdr.dev/slog/sloggers/sloghuman" "github.com/coder/serpent" - "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/scaletest/dynamicparameters" "github.com/coder/coder/v2/scaletest/harness" ) @@ -72,15 +72,6 @@ func (r *RootCmd) scaletestDynamicParameters() *serpent.Command { return err } - client.HTTPClient = &http.Client{ - Transport: &codersdk.HeaderTransport{ - Transport: http.DefaultTransport, - Header: map[string][]string{ - codersdk.BypassRatelimitHeader: {"true"}, - }, - }, - } - reg := prometheus.NewRegistry() metrics := dynamicparameters.NewMetrics(reg, "concurrent_evaluations") @@ -122,7 +113,13 @@ func (r *RootCmd) scaletestDynamicParameters() *serpent.Command { Metrics: metrics, MetricLabelValues: []string{fmt.Sprintf("%d", part.ConcurrentEvaluations)}, } - var runner harness.Runnable = dynamicparameters.NewRunner(client, cfg) + // use an independent client for each Runner, so they don't reuse TCP connections. This can lead to + // requests being unbalanced among Coder instances. + runnerClient, err := loadtestutil.DupClientCopyingHeaders(client, BypassHeader) + if err != nil { + return xerrors.Errorf("create runner client: %w", err) + } + var runner harness.Runnable = dynamicparameters.NewRunner(runnerClient, cfg) if tracingEnabled { runner = &runnableTraceWrapper{ tracer: tracer, diff --git a/cli/exp_scaletest_notifications.go b/cli/exp_scaletest_notifications.go index 074343e10b3cc..6d9987bf61dff 100644 --- a/cli/exp_scaletest_notifications.go +++ b/cli/exp_scaletest_notifications.go @@ -18,6 +18,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/xerrors" + "github.com/coder/coder/v2/scaletest/loadtestutil" + "cdr.dev/slog" notificationsLib "github.com/coder/coder/v2/coderd/notifications" @@ -66,15 +68,6 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command { return err } - client.HTTPClient = &http.Client{ - Transport: &codersdk.HeaderTransport{ - Transport: http.DefaultTransport, - Header: map[string][]string{ - codersdk.BypassRatelimitHeader: {"true"}, - }, - }, - } - if userCount <= 0 { return xerrors.Errorf("--user-count must be greater than 0") } @@ -206,7 +199,13 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command { for i, config := range configs { id := strconv.Itoa(i) name := fmt.Sprintf("notifications-%s", id) - var runner harness.Runnable = notifications.NewRunner(client, config) + // use an independent client for each Runner, so they don't reuse TCP connections. This can lead to + // requests being unbalanced among Coder instances. + runnerClient, err := loadtestutil.DupClientCopyingHeaders(client, BypassHeader) + if err != nil { + return xerrors.Errorf("create runner client: %w", err) + } + var runner harness.Runnable = notifications.NewRunner(runnerClient, config) if tracingEnabled { runner = &runnableTraceWrapper{ tracer: tracer, diff --git a/cli/exp_scaletest_prebuilds.go b/cli/exp_scaletest_prebuilds.go index f8cee15514b8a..34fee3b9eba11 100644 --- a/cli/exp_scaletest_prebuilds.go +++ b/cli/exp_scaletest_prebuilds.go @@ -4,7 +4,6 @@ package cli import ( "fmt" - "net/http" "os/signal" "strconv" "sync" @@ -14,6 +13,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/xerrors" + "github.com/coder/coder/v2/scaletest/loadtestutil" + "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/scaletest/harness" "github.com/coder/coder/v2/scaletest/prebuilds" @@ -56,15 +57,6 @@ func (r *RootCmd) scaletestPrebuilds() *serpent.Command { return err } - client.HTTPClient = &http.Client{ - Transport: &codersdk.HeaderTransport{ - Transport: http.DefaultTransport, - Header: map[string][]string{ - codersdk.BypassRatelimitHeader: {"true"}, - }, - }, - } - if numTemplates <= 0 { return xerrors.Errorf("--num-templates must be greater than 0") } @@ -140,7 +132,13 @@ func (r *RootCmd) scaletestPrebuilds() *serpent.Command { return xerrors.Errorf("validate config: %w", err) } - var runner harness.Runnable = prebuilds.NewRunner(client, cfg) + // use an independent client for each Runner, so they don't reuse TCP connections. This can lead to + // requests being unbalanced among Coder instances. + runnerClient, err := loadtestutil.DupClientCopyingHeaders(client, BypassHeader) + if err != nil { + return xerrors.Errorf("create runner client: %w", err) + } + var runner harness.Runnable = prebuilds.NewRunner(runnerClient, cfg) if tracingEnabled { runner = &runnableTraceWrapper{ tracer: tracer, diff --git a/cli/exp_scaletest_taskstatus.go b/cli/exp_scaletest_taskstatus.go index 8621d7d2ae798..c4ecad14d061a 100644 --- a/cli/exp_scaletest_taskstatus.go +++ b/cli/exp_scaletest_taskstatus.go @@ -14,6 +14,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/xerrors" + "github.com/coder/coder/v2/scaletest/loadtestutil" + "cdr.dev/slog" "cdr.dev/slog/sloggers/sloghuman" "github.com/coder/serpent" @@ -143,7 +145,13 @@ After all runners connect, it waits for the baseline duration before triggering return xerrors.Errorf("validate config for runner %d: %w", i, err) } - var runner harness.Runnable = taskstatus.NewRunner(client, cfg) + // use an independent client for each Runner, so they don't reuse TCP connections. This can lead to + // requests being unbalanced among Coder instances. + runnerClient, err := loadtestutil.DupClientCopyingHeaders(client, BypassHeader) + if err != nil { + return xerrors.Errorf("create runner client: %w", err) + } + var runner harness.Runnable = taskstatus.NewRunner(runnerClient, cfg) if tracingEnabled { runner = &runnableTraceWrapper{ tracer: tracer, diff --git a/scaletest/loadtestutil/client.go b/scaletest/loadtestutil/client.go new file mode 100644 index 0000000000000..797139c6ad696 --- /dev/null +++ b/scaletest/loadtestutil/client.go @@ -0,0 +1,45 @@ +package loadtestutil + +import ( + "maps" + "net/http" + + "golang.org/x/xerrors" + + "github.com/coder/coder/v2/codersdk" +) + +// DupClientCopyingHeaders duplicates the Client, but with an independent underlying HTTP transport, so that it will not +// share connections with the client being duplicated. It copies any headers already on the existing transport as +// [codersdk.HeaderTransport] and add the headers in the argument. +func DupClientCopyingHeaders(client *codersdk.Client, header http.Header) (*codersdk.Client, error) { + nc := codersdk.New(client.URL, codersdk.WithLogger(client.Logger())) + nc.SessionTokenProvider = client.SessionTokenProvider + newHeader, t, err := extractHeaderAndInnerTransport(client.HTTPClient.Transport) + if err != nil { + return nil, xerrors.Errorf("extract headers: %w", err) + } + maps.Copy(newHeader, header) + + nc.HTTPClient.Transport = &codersdk.HeaderTransport{ + Transport: t.Clone(), + Header: newHeader, + } + return nc, nil +} + +func extractHeaderAndInnerTransport(rt http.RoundTripper) (http.Header, *http.Transport, error) { + if t, ok := rt.(*http.Transport); ok { + // base case + return make(http.Header), t, nil + } + if ht, ok := rt.(*codersdk.HeaderTransport); ok { + headers, t, err := extractHeaderAndInnerTransport(ht.Transport) + if err != nil { + return nil, nil, err + } + maps.Copy(headers, ht.Header) + return headers, t, nil + } + return nil, nil, xerrors.New("round tripper is neither HeaderTransport nor Transport") +} diff --git a/scaletest/loadtestutil/client_test.go b/scaletest/loadtestutil/client_test.go new file mode 100644 index 0000000000000..e1379d33ea6b2 --- /dev/null +++ b/scaletest/loadtestutil/client_test.go @@ -0,0 +1,50 @@ +package loadtestutil_test + +import ( + "net/http" + "net/url" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/scaletest/loadtestutil" +) + +func TestDupClientCopyingHeaders(t *testing.T) { + t.Parallel() + httpClient := &http.Client{ + Transport: &codersdk.HeaderTransport{ + Transport: &codersdk.HeaderTransport{ + Transport: http.DefaultTransport, + Header: map[string][]string{ + "X-Coder-Test": {"foo"}, + "X-Coder-Test3": {"socks"}, + }, + }, + Header: map[string][]string{ + "X-Coder-Test": {"bar"}, + "X-Coder-Test2": {"baz"}, + }, + }, + } + serverURL, err := url.Parse("http://coder.example.com") + require.NoError(t, err) + sdkClient := codersdk.New(serverURL, + codersdk.WithSessionToken("test-token"), codersdk.WithHTTPClient(httpClient)) + + dup, err := loadtestutil.DupClientCopyingHeaders(sdkClient, map[string][]string{ + "X-Coder-Test3": {"clocks"}, + "X-Coder-Test4": {"bears"}, + }) + require.NoError(t, err) + require.Equal(t, "http://coder.example.com", dup.URL.String()) + require.Equal(t, "test-token", dup.SessionToken()) + ht, ok := dup.HTTPClient.Transport.(*codersdk.HeaderTransport) + require.True(t, ok) + require.Equal(t, "bar", ht.Header.Get("X-Coder-Test")) + require.Equal(t, "baz", ht.Header.Get("X-Coder-Test2")) + require.Equal(t, "clocks", ht.Header.Get("X-Coder-Test3")) + require.Equal(t, "bears", ht.Header.Get("X-Coder-Test4")) + require.NotEqual(t, http.DefaultTransport, ht.Transport) +}