diff --git a/coderd/agentapi/api.go b/coderd/agentapi/api.go index dbcb8ea024914..f8f72a5d0d6ea 100644 --- a/coderd/agentapi/api.go +++ b/coderd/agentapi/api.go @@ -239,6 +239,10 @@ func (a *API) Serve(ctx context.Context, l net.Listener) error { return xerrors.Errorf("create agent API server: %w", err) } + if err := a.ResourcesMonitoringAPI.InitMonitors(ctx); err != nil { + return xerrors.Errorf("initialize resource monitoring: %w", err) + } + return server.Serve(ctx, l) } diff --git a/coderd/agentapi/resources_monitoring.go b/coderd/agentapi/resources_monitoring.go index e5ee97e681a58..db0d523192280 100644 --- a/coderd/agentapi/resources_monitoring.go +++ b/coderd/agentapi/resources_monitoring.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "sync" "time" "golang.org/x/xerrors" @@ -33,42 +34,60 @@ type ResourcesMonitoringAPI struct { Debounce time.Duration Config resourcesmonitor.Config + + // Cache resource monitors on first call to avoid millions of DB queries per day. + memoryMonitor database.WorkspaceAgentMemoryResourceMonitor + volumeMonitors []database.WorkspaceAgentVolumeResourceMonitor + monitorsLock sync.RWMutex } -func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) { - memoryMonitor, memoryErr := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID) - if memoryErr != nil && !errors.Is(memoryErr, sql.ErrNoRows) { - return nil, xerrors.Errorf("failed to fetch memory resource monitor: %w", memoryErr) +// InitMonitors fetches resource monitors from the database and caches them. +// This must be called once after creating a ResourcesMonitoringAPI, the context should be +// the agent per-RPC connection context. If fetching fails with a real error (not sql.ErrNoRows), the +// connection should be torn down. +func (a *ResourcesMonitoringAPI) InitMonitors(ctx context.Context) error { + memMon, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return xerrors.Errorf("fetch memory resource monitor: %w", err) + } + // If sql.ErrNoRows, memoryMonitor stays as zero value (CreatedAt.IsZero() = true). + // Otherwise, store the fetched monitor. + if err == nil { + a.memoryMonitor = memMon } - volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID) + volMons, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID) if err != nil { - return nil, xerrors.Errorf("failed to fetch volume resource monitors: %w", err) + return xerrors.Errorf("fetch volume resource monitors: %w", err) } + // 0 length is valid, indicating none configured, since the volume monitors in the DB can be many. + a.volumeMonitors = volMons + + return nil +} +func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(_ context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) { return &proto.GetResourcesMonitoringConfigurationResponse{ Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{ CollectionIntervalSeconds: int32(a.Config.CollectionInterval.Seconds()), NumDatapoints: a.Config.NumDatapoints, }, Memory: func() *proto.GetResourcesMonitoringConfigurationResponse_Memory { - if memoryErr != nil { + if a.memoryMonitor.CreatedAt.IsZero() { return nil } - return &proto.GetResourcesMonitoringConfigurationResponse_Memory{ - Enabled: memoryMonitor.Enabled, + Enabled: a.memoryMonitor.Enabled, } }(), Volumes: func() []*proto.GetResourcesMonitoringConfigurationResponse_Volume { - volumes := make([]*proto.GetResourcesMonitoringConfigurationResponse_Volume, 0, len(volumeMonitors)) - for _, monitor := range volumeMonitors { + volumes := make([]*proto.GetResourcesMonitoringConfigurationResponse_Volume, 0, len(a.volumeMonitors)) + for _, monitor := range a.volumeMonitors { volumes = append(volumes, &proto.GetResourcesMonitoringConfigurationResponse_Volume{ Enabled: monitor.Enabled, Path: monitor.Path, }) } - return volumes }(), }, nil @@ -77,6 +96,10 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) { var err error + // Lock for the entire push operation since calls are sequential from the agent + a.monitorsLock.Lock() + defer a.monitorsLock.Unlock() + if memoryErr := a.monitorMemory(ctx, req.Datapoints); memoryErr != nil { err = errors.Join(err, xerrors.Errorf("monitor memory: %w", memoryErr)) } @@ -89,18 +112,7 @@ func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Contex } func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error { - monitor, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID) - if err != nil { - // It is valid for an agent to not have a memory monitor, so we - // do not want to treat it as an error. - if errors.Is(err, sql.ErrNoRows) { - return nil - } - - return xerrors.Errorf("fetch memory resource monitor: %w", err) - } - - if !monitor.Enabled { + if !a.memoryMonitor.Enabled { return nil } @@ -109,15 +121,15 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [ usageDatapoints = append(usageDatapoints, datapoint.Memory) } - usageStates := resourcesmonitor.CalculateMemoryUsageStates(monitor, usageDatapoints) + usageStates := resourcesmonitor.CalculateMemoryUsageStates(a.memoryMonitor, usageDatapoints) - oldState := monitor.State + oldState := a.memoryMonitor.State newState := resourcesmonitor.NextState(a.Config, oldState, usageStates) - debouncedUntil, shouldNotify := monitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState) + debouncedUntil, shouldNotify := a.memoryMonitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState) //nolint:gocritic // We need to be able to update the resource monitor here. - err = a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{ + err := a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{ AgentID: a.AgentID, State: newState, UpdatedAt: dbtime.Time(a.Clock.Now()), @@ -127,6 +139,11 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [ return xerrors.Errorf("update workspace monitor: %w", err) } + // Update cached state + a.memoryMonitor.State = newState + a.memoryMonitor.DebouncedUntil = dbtime.Time(debouncedUntil) + a.memoryMonitor.UpdatedAt = dbtime.Time(a.Clock.Now()) + if !shouldNotify { return nil } @@ -143,7 +160,7 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [ notifications.TemplateWorkspaceOutOfMemory, map[string]string{ "workspace": workspace.Name, - "threshold": fmt.Sprintf("%d%%", monitor.Threshold), + "threshold": fmt.Sprintf("%d%%", a.memoryMonitor.Threshold), }, map[string]any{ // NOTE(DanielleMaywood): @@ -169,14 +186,9 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [ } func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error { - volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID) - if err != nil { - return xerrors.Errorf("get or insert volume monitor: %w", err) - } - outOfDiskVolumes := make([]map[string]any, 0) - for _, monitor := range volumeMonitors { + for i, monitor := range a.volumeMonitors { if !monitor.Enabled { continue } @@ -219,6 +231,11 @@ func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints }); err != nil { return xerrors.Errorf("update workspace monitor: %w", err) } + + // Update cached state + a.volumeMonitors[i].State = newState + a.volumeMonitors[i].DebouncedUntil = dbtime.Time(debouncedUntil) + a.volumeMonitors[i].UpdatedAt = dbtime.Time(a.Clock.Now()) } if len(outOfDiskVolumes) == 0 { diff --git a/coderd/agentapi/resources_monitoring_test.go b/coderd/agentapi/resources_monitoring_test.go index c491d3789355b..7b457dd45331a 100644 --- a/coderd/agentapi/resources_monitoring_test.go +++ b/coderd/agentapi/resources_monitoring_test.go @@ -101,6 +101,9 @@ func TestMemoryResourceMonitorDebounce(t *testing.T) { Threshold: 80, }) + // Initialize API to fetch and cache the monitors + require.NoError(t, api.InitMonitors(context.Background())) + // When: The monitor is given a state that will trigger NOK _, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{ Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{ @@ -304,6 +307,9 @@ func TestMemoryResourceMonitor(t *testing.T) { Threshold: 80, }) + // Initialize API to fetch and cache the monitors + require.NoError(t, api.InitMonitors(context.Background())) + clock.Set(collectedAt) _, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{ Datapoints: datapoints, @@ -337,6 +343,8 @@ func TestMemoryResourceMonitorMissingData(t *testing.T) { State: database.WorkspaceAgentMonitorStateOK, Threshold: 80, }) + // Initialize API to fetch and cache the monitors + require.NoError(t, api.InitMonitors(context.Background())) // When: A datapoint is missing, surrounded by two NOK datapoints. _, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{ @@ -387,6 +395,9 @@ func TestMemoryResourceMonitorMissingData(t *testing.T) { Threshold: 80, }) + // Initialize API to fetch and cache the monitors + require.NoError(t, api.InitMonitors(context.Background())) + // When: A datapoint is missing, surrounded by two OK datapoints. _, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{ Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{ @@ -466,6 +477,9 @@ func TestVolumeResourceMonitorDebounce(t *testing.T) { Threshold: 80, }) + // Initialize API to fetch and cache the monitors + require.NoError(t, api.InitMonitors(context.Background())) + // When: // - First monitor is in a NOK state // - Second monitor is in an OK state @@ -742,6 +756,9 @@ func TestVolumeResourceMonitor(t *testing.T) { Threshold: tt.thresholdPercent, }) + // Initialize API to fetch and cache the monitors + require.NoError(t, api.InitMonitors(context.Background())) + clock.Set(collectedAt) _, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{ Datapoints: datapoints, @@ -780,6 +797,9 @@ func TestVolumeResourceMonitorMultiple(t *testing.T) { Threshold: 80, }) + // Initialize API to fetch and cache the monitors + require.NoError(t, api.InitMonitors(context.Background())) + // When: both of them move to a NOK state _, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{ Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{ @@ -832,6 +852,9 @@ func TestVolumeResourceMonitorMissingData(t *testing.T) { Threshold: 80, }) + // Initialize API to fetch and cache the monitors + require.NoError(t, api.InitMonitors(context.Background())) + // When: A datapoint is missing, surrounded by two NOK datapoints. _, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{ Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{ @@ -891,6 +914,9 @@ func TestVolumeResourceMonitorMissingData(t *testing.T) { Threshold: 80, }) + // Initialize API to fetch and cache the monitors + require.NoError(t, api.InitMonitors(context.Background())) + // When: A datapoint is missing, surrounded by two OK datapoints. _, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{ Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{