-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(agent): add agent unit manager #20715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
3a5c3ca
feat(agent): add agent unit manager
SasSwart c4bd0d3
linter fixes
SasSwart 9c59bb6
rename consumer to unit
SasSwart 95e66a3
remove defunct test status types in the agent unit manager
SasSwart e55e3c8
remove low value tests
SasSwart e64eff5
tidy up variable declaration
SasSwart 97e5225
ensure we catch indirect cycles
SasSwart 0b1b903
deduplicate agent fecthing logic
SasSwart c9166be
properly utilize unit sentinel errors
SasSwart 62cb948
Provide proper type information to unit status constants
SasSwart 838999d
review nits
SasSwart 04bc903
Simplify the unit manager
SasSwart 117e138
make unit manager errors more consistent
SasSwart 029fd18
Review notes
SasSwart File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,255 @@ | ||
| package unit | ||
|
|
||
| import ( | ||
| "errors" | ||
| "sync" | ||
|
|
||
| "golang.org/x/xerrors" | ||
|
|
||
| "github.com/coder/coder/v2/coderd/util/slice" | ||
| ) | ||
|
|
||
| var ( | ||
| ErrUnitNotFound = xerrors.New("unit not found") | ||
| ErrUnitAlreadyRegistered = xerrors.New("unit already registered") | ||
| ErrCannotUpdateOtherUnit = xerrors.New("cannot update other unit's status") | ||
| ErrDependenciesNotSatisfied = xerrors.New("unit dependencies not satisfied") | ||
| ErrSameStatusAlreadySet = xerrors.New("same status already set") | ||
| ErrCycleDetected = xerrors.New("cycle detected") | ||
| ErrFailedToAddDependency = xerrors.New("failed to add dependency") | ||
| ) | ||
|
|
||
| // Status represents the status of a unit. | ||
| type Status string | ||
|
|
||
| // Status constants for dependency tracking. | ||
| const ( | ||
| StatusNotRegistered Status = "" | ||
| StatusPending Status = "pending" | ||
| StatusStarted Status = "started" | ||
| StatusComplete Status = "completed" | ||
| ) | ||
|
|
||
| // ID provides a type narrowed representation of the unique identifier of a unit. | ||
| type ID string | ||
|
|
||
| // Unit represents a point-in-time snapshot of a vertex in the dependency graph. | ||
| // Units may depend on other units, or be depended on by other units. The unit struct | ||
| // is not aware of updates made to the dependency graph after it is initialized and should | ||
| // not be cached. | ||
| type Unit struct { | ||
| id ID | ||
| status Status | ||
| // ready is true if all dependencies are satisfied. | ||
| // It does not have an accessor method on Unit, because a unit cannot know whether it is ready. | ||
| // Only the Manager can calculate whether a unit is ready based on knowledge of the dependency graph. | ||
| // To discourage use of an outdated readiness value, only the Manager should set and return this field. | ||
| ready bool | ||
| } | ||
|
|
||
| func (u Unit) ID() ID { | ||
| return u.id | ||
| } | ||
|
|
||
| func (u Unit) Status() Status { | ||
| return u.status | ||
| } | ||
|
|
||
| // Dependency represents a dependency relationship between units. | ||
| type Dependency struct { | ||
| Unit ID | ||
| DependsOn ID | ||
| RequiredStatus Status | ||
| CurrentStatus Status | ||
| IsSatisfied bool | ||
| } | ||
|
|
||
| // Manager provides reactive dependency tracking over a Graph. | ||
| // It manages Unit registration, dependency relationships, and status updates | ||
| // with automatic recalculation of readiness when dependencies are satisfied. | ||
| type Manager struct { | ||
| mu sync.RWMutex | ||
|
|
||
| // The underlying graph that stores dependency relationships | ||
| graph *Graph[Status, ID] | ||
|
|
||
| // Store vertex instances for each unit to ensure consistent references | ||
| units map[ID]Unit | ||
| } | ||
|
|
||
| // NewManager creates a new Manager instance. | ||
| func NewManager() *Manager { | ||
| return &Manager{ | ||
| graph: &Graph[Status, ID]{}, | ||
| units: make(map[ID]Unit), | ||
| } | ||
| } | ||
|
|
||
| // Register adds a unit to the manager if it is not already registered. | ||
| // If a Unit is already registered (per the ID field), it is not updated. | ||
| func (m *Manager) Register(id ID) error { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
|
|
||
| if m.registered(id) { | ||
| return xerrors.Errorf("registering unit %q: %w", id, ErrUnitAlreadyRegistered) | ||
| } | ||
|
|
||
| m.units[id] = Unit{ | ||
| id: id, | ||
| status: StatusPending, | ||
| ready: true, | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // registered checks if a unit is registered in the manager. | ||
| func (m *Manager) registered(id ID) bool { | ||
| return m.units[id].status != StatusNotRegistered | ||
| } | ||
|
|
||
| // Unit fetches a unit from the manager. If the unit does not exist, | ||
| // it returns the Unit zero-value as a placeholder unit, because | ||
| // units may depend on other units that have not yet been created. | ||
| func (m *Manager) Unit(id ID) Unit { | ||
| m.mu.RLock() | ||
| defer m.mu.RUnlock() | ||
|
|
||
| return m.units[id] | ||
| } | ||
|
|
||
| func (m *Manager) IsReady(id ID) bool { | ||
| m.mu.RLock() | ||
| defer m.mu.RUnlock() | ||
|
|
||
| if !m.registered(id) { | ||
SasSwart marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return false | ||
| } | ||
|
|
||
| return m.units[id].ready | ||
| } | ||
|
|
||
| // AddDependency adds a dependency relationship between units. | ||
| // The unit depends on the dependsOn unit reaching the requiredStatus. | ||
| func (m *Manager) AddDependency(unit ID, dependsOn ID, requiredStatus Status) error { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
|
|
||
| if !m.registered(unit) { | ||
| return xerrors.Errorf("checking registration for unit %q: %w", unit, ErrUnitNotFound) | ||
| } | ||
|
|
||
| // Add the dependency edge to the graph | ||
| // The edge goes from unit to dependsOn, representing the dependency | ||
| err := m.graph.AddEdge(unit, dependsOn, requiredStatus) | ||
| if err != nil { | ||
| return xerrors.Errorf("adding edge for unit %q: %w", unit, errors.Join(ErrFailedToAddDependency, err)) | ||
| } | ||
|
|
||
| // Recalculate readiness for the unit since it now has a new dependency | ||
| m.recalculateReadinessUnsafe(unit) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // UpdateStatus updates a unit's status and recalculates readiness for affected dependents. | ||
| func (m *Manager) UpdateStatus(unit ID, newStatus Status) error { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
|
|
||
| if !m.registered(unit) { | ||
| return xerrors.Errorf("checking registration for unit %q: %w", unit, ErrUnitNotFound) | ||
| } | ||
|
|
||
| u := m.units[unit] | ||
| if u.status == newStatus { | ||
| return xerrors.Errorf("checking status for unit %q: %w", unit, ErrSameStatusAlreadySet) | ||
| } | ||
|
|
||
| u.status = newStatus | ||
| m.units[unit] = u | ||
|
|
||
| // Get all units that depend on this one (reverse adjacent vertices) | ||
| dependents := m.graph.GetReverseAdjacentVertices(unit) | ||
|
|
||
| // Recalculate readiness for all dependents | ||
| for _, dependent := range dependents { | ||
| m.recalculateReadinessUnsafe(dependent.From) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // recalculateReadinessUnsafe recalculates the readiness state for a unit. | ||
| // This method assumes the caller holds the write lock. | ||
| func (m *Manager) recalculateReadinessUnsafe(unit ID) { | ||
| u := m.units[unit] | ||
| dependencies := m.graph.GetForwardAdjacentVertices(unit) | ||
|
|
||
| allSatisfied := true | ||
| for _, dependency := range dependencies { | ||
| requiredStatus := dependency.Edge | ||
| dependsOnUnit := m.units[dependency.To] | ||
| if dependsOnUnit.status != requiredStatus { | ||
| allSatisfied = false | ||
| break | ||
| } | ||
| } | ||
|
|
||
| u.ready = allSatisfied | ||
SasSwart marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| m.units[unit] = u | ||
| } | ||
|
|
||
| // GetGraph returns the underlying graph for visualization and debugging. | ||
| // This should be used carefully as it exposes the internal graph structure. | ||
| func (m *Manager) GetGraph() *Graph[Status, ID] { | ||
| return m.graph | ||
| } | ||
|
|
||
| // GetAllDependencies returns all dependencies for a unit, both satisfied and unsatisfied. | ||
| func (m *Manager) GetAllDependencies(unit ID) ([]Dependency, error) { | ||
SasSwart marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| m.mu.RLock() | ||
| defer m.mu.RUnlock() | ||
|
|
||
| if !m.registered(unit) { | ||
| return nil, xerrors.Errorf("checking registration for unit %q: %w", unit, ErrUnitNotFound) | ||
| } | ||
|
|
||
| dependencies := m.graph.GetForwardAdjacentVertices(unit) | ||
|
|
||
| var allDependencies []Dependency | ||
|
|
||
| for _, dependency := range dependencies { | ||
| dependsOnUnit := m.units[dependency.To] | ||
| requiredStatus := dependency.Edge | ||
| allDependencies = append(allDependencies, Dependency{ | ||
| Unit: unit, | ||
| DependsOn: dependsOnUnit.id, | ||
| RequiredStatus: requiredStatus, | ||
| CurrentStatus: dependsOnUnit.status, | ||
| IsSatisfied: dependsOnUnit.status == requiredStatus, | ||
| }) | ||
| } | ||
|
|
||
| return allDependencies, nil | ||
| } | ||
|
|
||
| // GetUnmetDependencies returns a list of unsatisfied dependencies for a unit. | ||
| func (m *Manager) GetUnmetDependencies(unit ID) ([]Dependency, error) { | ||
| allDependencies, err := m.GetAllDependencies(unit) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| var unmetDependencies []Dependency = slice.Filter(allDependencies, func(dependency Dependency) bool { | ||
| return !dependency.IsSatisfied | ||
| }) | ||
|
|
||
| return unmetDependencies, nil | ||
| } | ||
|
|
||
| // ExportDOT exports the dependency graph to DOT format for visualization. | ||
| func (m *Manager) ExportDOT(name string) (string, error) { | ||
| return m.graph.ToDOT(name) | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.