Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ const (
EnvProcOOMScore = "CODER_PROC_OOM_SCORE"
)

var ErrAgentClosing = xerrors.New("agent is closing")

type Options struct {
Filesystem afero.Fs
LogDir string
Expand Down Expand Up @@ -401,6 +403,7 @@ func (a *agent) runLoop() {
// need to keep retrying up to the hardCtx so that we can send graceful shutdown-related
// messages.
ctx := a.hardCtx
defer a.logger.Info(ctx, "agent main loop exited")
for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); {
a.logger.Info(ctx, "connecting to coderd")
err := a.run()
Expand Down Expand Up @@ -1348,7 +1351,7 @@ func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(co
a.closeMutex.Unlock()
if closing {
_ = network.Close()
return xerrors.New("agent is closing")
return xerrors.Errorf("agent closed while creating tailnet: %w", ErrAgentClosing)
}
} else {
// Update the wireguard IPs if the agent ID changed.
Expand Down Expand Up @@ -1471,7 +1474,7 @@ func (a *agent) trackGoroutine(fn func()) error {
a.closeMutex.Lock()
defer a.closeMutex.Unlock()
if a.closing {
return xerrors.New("track conn goroutine: agent is closing")
return xerrors.Errorf("track conn goroutine: %w", ErrAgentClosing)
}
a.closeWaitGroup.Add(1)
go func() {
Expand Down Expand Up @@ -2152,16 +2155,7 @@ func (a *apiConnRoutineManager) startAgentAPI(
a.eg.Go(func() error {
logger.Debug(ctx, "starting agent routine")
err := f(ctx, a.aAPI)
if xerrors.Is(err, context.Canceled) && ctx.Err() != nil {
logger.Debug(ctx, "swallowing context canceled")
// Don't propagate context canceled errors to the error group, because we don't want the
// graceful context being canceled to halt the work of routines with
// gracefulShutdownBehaviorRemain. Note that we check both that the error is
// context.Canceled and that *our* context is currently canceled, because when Coderd
// unilaterally closes the API connection (for example if the build is outdated), it can
// sometimes show up as context.Canceled in our RPC calls.
return nil
}
err = shouldPropagateError(ctx, logger, err)
logger.Debug(ctx, "routine exited", slog.Error(err))
if err != nil {
return xerrors.Errorf("error in routine %s: %w", name, err)
Expand Down Expand Up @@ -2189,21 +2183,7 @@ func (a *apiConnRoutineManager) startTailnetAPI(
a.eg.Go(func() error {
logger.Debug(ctx, "starting tailnet routine")
err := f(ctx, a.tAPI)
if (xerrors.Is(err, context.Canceled) ||
xerrors.Is(err, io.EOF)) &&
ctx.Err() != nil {
logger.Debug(ctx, "swallowing error because context is canceled", slog.Error(err))
// Don't propagate context canceled errors to the error group, because we don't want the
// graceful context being canceled to halt the work of routines with
// gracefulShutdownBehaviorRemain. Unfortunately, the dRPC library closes the stream
// when context is canceled on an RPC, so canceling the context can also show up as
// io.EOF. Also, when Coderd unilaterally closes the API connection (for example if the
// build is outdated), it can sometimes show up as context.Canceled in our RPC calls.
// We can't reliably distinguish between a context cancelation and a legit EOF, so we
// also check that *our* context is currently canceled. If it is, we can safely ignore
// the error.
return nil
}
err = shouldPropagateError(ctx, logger, err)
logger.Debug(ctx, "routine exited", slog.Error(err))
if err != nil {
return xerrors.Errorf("error in routine %s: %w", name, err)
Expand All @@ -2212,6 +2192,34 @@ func (a *apiConnRoutineManager) startTailnetAPI(
})
}

// shouldPropagateError decides whether an error from an API connection routine should be propagated to the
// apiConnRoutineManager. Its purpose is to prevent errors related to shutting down from propagating to the manager's
// error group, which will tear down the API connection and potentially stop graceful shutdown from succeeding.
func shouldPropagateError(ctx context.Context, logger slog.Logger, err error) error {
if (xerrors.Is(err, context.Canceled) ||
xerrors.Is(err, io.EOF)) &&
ctx.Err() != nil {
logger.Debug(ctx, "swallowing error because context is canceled", slog.Error(err))
// Don't propagate context canceled errors to the error group, because we don't want the
// graceful context being canceled to halt the work of routines with
// gracefulShutdownBehaviorRemain. Unfortunately, the dRPC library closes the stream
// when context is canceled on an RPC, so canceling the context can also show up as
// io.EOF. Also, when Coderd unilaterally closes the API connection (for example if the
// build is outdated), it can sometimes show up as context.Canceled in our RPC calls.
// We can't reliably distinguish between a context cancelation and a legit EOF, so we
// also check that *our* context is currently canceled. If it is, we can safely ignore
// the error.
return nil
}
if xerrors.Is(err, ErrAgentClosing) {
logger.Debug(ctx, "swallowing error because agent is closing")
// This can only be generated when the agent is closing, so we never want it to propagate to other routines.
// (They are signaled to exit via canceled contexts.)
return nil
}
return err
}

func (a *apiConnRoutineManager) wait() error {
return a.eg.Wait()
}
Expand Down
35 changes: 13 additions & 22 deletions coderd/workspaceagentsrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,16 @@ func (m *agentConnectionMonitor) start(ctx context.Context) {
}

func (m *agentConnectionMonitor) monitor(ctx context.Context) {
reason := "disconnect"
defer func() {
m.logger.Debug(ctx, "agent connection monitor is closing connection",
slog.F("reason", reason))
_ = m.conn.Close(websocket.StatusGoingAway, reason)
m.disconnectedAt = sql.NullTime{
Time: dbtime.Now(),
Valid: true,
}

// If connection closed then context will be canceled, try to
// ensure our final update is sent. By waiting at most the agent
// inactive disconnect timeout we ensure that we don't block but
Expand All @@ -372,13 +381,6 @@ func (m *agentConnectionMonitor) monitor(ctx context.Context) {
finalCtx, cancel := context.WithTimeout(dbauthz.AsSystemRestricted(m.apiCtx), m.disconnectTimeout)
defer cancel()

// Only update timestamp if the disconnect is new.
if !m.disconnectedAt.Valid {
m.disconnectedAt = sql.NullTime{
Time: dbtime.Now(),
Valid: true,
}
}
err := m.updateConnectionTimes(finalCtx)
if err != nil {
// This is a bug with unit tests that cancel the app context and
Expand All @@ -398,12 +400,6 @@ func (m *agentConnectionMonitor) monitor(ctx context.Context) {
AgentID: &m.workspaceAgent.ID,
})
}()
reason := "disconnect"
defer func() {
m.logger.Debug(ctx, "agent connection monitor is closing connection",
slog.F("reason", reason))
_ = m.conn.Close(websocket.StatusGoingAway, reason)
}()

err := m.updateConnectionTimes(ctx)
if err != nil {
Expand Down Expand Up @@ -432,8 +428,7 @@ func (m *agentConnectionMonitor) monitor(ctx context.Context) {
m.logger.Warn(ctx, "connection to agent timed out")
return
}
connectionStatusChanged := m.disconnectedAt.Valid
m.disconnectedAt = sql.NullTime{}

m.lastConnectedAt = sql.NullTime{
Time: dbtime.Now(),
Valid: true,
Expand All @@ -447,13 +442,9 @@ func (m *agentConnectionMonitor) monitor(ctx context.Context) {
}
return
}
if connectionStatusChanged {
m.updater.publishWorkspaceUpdate(ctx, m.workspace.OwnerID, wspubsub.WorkspaceEvent{
Kind: wspubsub.WorkspaceEventKindAgentConnectionUpdate,
WorkspaceID: m.workspaceBuild.WorkspaceID,
AgentID: &m.workspaceAgent.ID,
})
}
// we don't need to publish a workspace update here because we published an update when the workspace first
// connected. Since all we've done is updated lastConnectedAt, the workspace is still connected and hasn't
// changed status. We don't expect to get updates just for the times changing.

ctx, err := dbauthz.WithWorkspaceRBAC(ctx, m.workspace.RBACObject())
if err != nil {
Expand Down
Loading
Loading