Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions coderd/agentapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ func (a *API) Serve(ctx context.Context, l net.Listener) error {
return xerrors.Errorf("create agent API server: %w", err)
}

if err := a.ResourcesMonitoringAPI.InitMonitors(ctx); err != nil {
return xerrors.Errorf("initialize resource monitoring: %w", err)
}

return server.Serve(ctx, l)
}

Expand Down
87 changes: 52 additions & 35 deletions coderd/agentapi/resources_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"sync"
"time"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -33,42 +34,60 @@ type ResourcesMonitoringAPI struct {

Debounce time.Duration
Config resourcesmonitor.Config

// Cache resource monitors on first call to avoid millions of DB queries per day.
memoryMonitor database.WorkspaceAgentMemoryResourceMonitor
volumeMonitors []database.WorkspaceAgentVolumeResourceMonitor
monitorsLock sync.RWMutex
}

func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) {
memoryMonitor, memoryErr := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if memoryErr != nil && !errors.Is(memoryErr, sql.ErrNoRows) {
return nil, xerrors.Errorf("failed to fetch memory resource monitor: %w", memoryErr)
// InitMonitors fetches resource monitors from the database and caches them.
// This must be called once after creating a ResourcesMonitoringAPI, the context should be
// the agent per-RPC connection context. If fetching fails with a real error (not sql.ErrNoRows), the
// connection should be torn down.
func (a *ResourcesMonitoringAPI) InitMonitors(ctx context.Context) error {
memMon, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("fetch memory resource monitor: %w", err)
}
// If sql.ErrNoRows, memoryMonitor stays as zero value (CreatedAt.IsZero() = true).
// Otherwise, store the fetched monitor.
if err == nil {
a.memoryMonitor = memMon
}

volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
volMons, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
return nil, xerrors.Errorf("failed to fetch volume resource monitors: %w", err)
return xerrors.Errorf("fetch volume resource monitors: %w", err)
}
// 0 length is valid, indicating none configured, since the volume monitors in the DB can be many.
a.volumeMonitors = volMons

return nil
}

func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(_ context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) {
return &proto.GetResourcesMonitoringConfigurationResponse{
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
CollectionIntervalSeconds: int32(a.Config.CollectionInterval.Seconds()),
NumDatapoints: a.Config.NumDatapoints,
},
Memory: func() *proto.GetResourcesMonitoringConfigurationResponse_Memory {
if memoryErr != nil {
if a.memoryMonitor.CreatedAt.IsZero() {
return nil
}

return &proto.GetResourcesMonitoringConfigurationResponse_Memory{
Enabled: memoryMonitor.Enabled,
Enabled: a.memoryMonitor.Enabled,
}
}(),
Volumes: func() []*proto.GetResourcesMonitoringConfigurationResponse_Volume {
volumes := make([]*proto.GetResourcesMonitoringConfigurationResponse_Volume, 0, len(volumeMonitors))
for _, monitor := range volumeMonitors {
volumes := make([]*proto.GetResourcesMonitoringConfigurationResponse_Volume, 0, len(a.volumeMonitors))
for _, monitor := range a.volumeMonitors {
volumes = append(volumes, &proto.GetResourcesMonitoringConfigurationResponse_Volume{
Enabled: monitor.Enabled,
Path: monitor.Path,
})
}

return volumes
}(),
}, nil
Expand All @@ -77,6 +96,10 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context
func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
var err error

// Lock for the entire push operation since calls are sequential from the agent
a.monitorsLock.Lock()
defer a.monitorsLock.Unlock()

if memoryErr := a.monitorMemory(ctx, req.Datapoints); memoryErr != nil {
err = errors.Join(err, xerrors.Errorf("monitor memory: %w", memoryErr))
}
Expand All @@ -89,18 +112,7 @@ func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Contex
}

func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error {
monitor, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
// It is valid for an agent to not have a memory monitor, so we
// do not want to treat it as an error.
if errors.Is(err, sql.ErrNoRows) {
return nil
}

return xerrors.Errorf("fetch memory resource monitor: %w", err)
}

if !monitor.Enabled {
if !a.memoryMonitor.Enabled {
return nil
}

Expand All @@ -109,15 +121,15 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
usageDatapoints = append(usageDatapoints, datapoint.Memory)
}

usageStates := resourcesmonitor.CalculateMemoryUsageStates(monitor, usageDatapoints)
usageStates := resourcesmonitor.CalculateMemoryUsageStates(a.memoryMonitor, usageDatapoints)

oldState := monitor.State
oldState := a.memoryMonitor.State
newState := resourcesmonitor.NextState(a.Config, oldState, usageStates)

debouncedUntil, shouldNotify := monitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState)
debouncedUntil, shouldNotify := a.memoryMonitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState)

//nolint:gocritic // We need to be able to update the resource monitor here.
err = a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{
err := a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{
AgentID: a.AgentID,
State: newState,
UpdatedAt: dbtime.Time(a.Clock.Now()),
Expand All @@ -127,6 +139,11 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
return xerrors.Errorf("update workspace monitor: %w", err)
}

// Update cached state
a.memoryMonitor.State = newState
a.memoryMonitor.DebouncedUntil = dbtime.Time(debouncedUntil)
a.memoryMonitor.UpdatedAt = dbtime.Time(a.Clock.Now())

if !shouldNotify {
return nil
}
Expand All @@ -143,7 +160,7 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
notifications.TemplateWorkspaceOutOfMemory,
map[string]string{
"workspace": workspace.Name,
"threshold": fmt.Sprintf("%d%%", monitor.Threshold),
"threshold": fmt.Sprintf("%d%%", a.memoryMonitor.Threshold),
},
map[string]any{
// NOTE(DanielleMaywood):
Expand All @@ -169,14 +186,9 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
}

func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error {
volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
return xerrors.Errorf("get or insert volume monitor: %w", err)
}

outOfDiskVolumes := make([]map[string]any, 0)

for _, monitor := range volumeMonitors {
for i, monitor := range a.volumeMonitors {
if !monitor.Enabled {
continue
}
Expand Down Expand Up @@ -219,6 +231,11 @@ func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints
}); err != nil {
return xerrors.Errorf("update workspace monitor: %w", err)
}

// Update cached state
a.volumeMonitors[i].State = newState
a.volumeMonitors[i].DebouncedUntil = dbtime.Time(debouncedUntil)
a.volumeMonitors[i].UpdatedAt = dbtime.Time(a.Clock.Now())
}

if len(outOfDiskVolumes) == 0 {
Expand Down
26 changes: 26 additions & 0 deletions coderd/agentapi/resources_monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func TestMemoryResourceMonitorDebounce(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: The monitor is given a state that will trigger NOK
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
Expand Down Expand Up @@ -304,6 +307,9 @@ func TestMemoryResourceMonitor(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

clock.Set(collectedAt)
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: datapoints,
Expand Down Expand Up @@ -337,6 +343,8 @@ func TestMemoryResourceMonitorMissingData(t *testing.T) {
State: database.WorkspaceAgentMonitorStateOK,
Threshold: 80,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: A datapoint is missing, surrounded by two NOK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Expand Down Expand Up @@ -387,6 +395,9 @@ func TestMemoryResourceMonitorMissingData(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: A datapoint is missing, surrounded by two OK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
Expand Down Expand Up @@ -466,6 +477,9 @@ func TestVolumeResourceMonitorDebounce(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When:
// - First monitor is in a NOK state
// - Second monitor is in an OK state
Expand Down Expand Up @@ -742,6 +756,9 @@ func TestVolumeResourceMonitor(t *testing.T) {
Threshold: tt.thresholdPercent,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

clock.Set(collectedAt)
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: datapoints,
Expand Down Expand Up @@ -780,6 +797,9 @@ func TestVolumeResourceMonitorMultiple(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: both of them move to a NOK state
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
Expand Down Expand Up @@ -832,6 +852,9 @@ func TestVolumeResourceMonitorMissingData(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: A datapoint is missing, surrounded by two NOK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
Expand Down Expand Up @@ -891,6 +914,9 @@ func TestVolumeResourceMonitorMissingData(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: A datapoint is missing, surrounded by two OK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
Expand Down