Skip to content

Commit 950f771

Browse files
committed
feat: add inital API for boundary log forwarding to coderd
Add the API changes to support the feature that transmits boundary logs from workspaces to coderd via the agent API, then re-emits them to stderr. Architecture: - Boundary process connects to Unix socket - Boundary batches logs and sends TLV prefixed protobuf ReportBoundaryLogsRequest messages to Agent - Agent proxies messages to coderd via DRPC - coderd re-emits to stderr
1 parent 871ed12 commit 950f771

File tree

14 files changed

+744
-279
lines changed

14 files changed

+744
-279
lines changed

agent/agent.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ type Options struct {
105105
}
106106

107107
type Client interface {
108-
ConnectRPC26(ctx context.Context) (
109-
proto.DRPCAgentClient26, tailnetproto.DRPCTailnetClient26, error,
108+
ConnectRPC27(ctx context.Context) (
109+
proto.DRPCAgentClient27, tailnetproto.DRPCTailnetClient26, error,
110110
)
111111
tailnet.DERPMapRewriter
112112
agentsdk.RefreshableSessionTokenProvider
@@ -506,7 +506,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
506506
fn()
507507
}
508508

509-
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
509+
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
510510
tickerDone := make(chan struct{})
511511
collectDone := make(chan struct{})
512512
ctx, cancel := context.WithCancel(ctx)
@@ -721,7 +721,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient26
721721

722722
// reportLifecycle reports the current lifecycle state once. All state
723723
// changes are reported in order.
724-
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
724+
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
725725
for {
726726
select {
727727
case <-a.lifecycleUpdate:
@@ -801,7 +801,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
801801
}
802802

803803
// reportConnectionsLoop reports connections to the agent for auditing.
804-
func (a *agent) reportConnectionsLoop(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
804+
func (a *agent) reportConnectionsLoop(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
805805
for {
806806
select {
807807
case <-a.reportConnectionsUpdate:
@@ -932,7 +932,7 @@ func (a *agent) reportConnection(id uuid.UUID, connectionType proto.Connection_T
932932
// fetchServiceBannerLoop fetches the service banner on an interval. It will
933933
// not be fetched immediately; the expectation is that it is primed elsewhere
934934
// (and must be done before the session actually starts).
935-
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
935+
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
936936
ticker := time.NewTicker(a.announcementBannersRefreshInterval)
937937
defer ticker.Stop()
938938
for {
@@ -967,7 +967,7 @@ func (a *agent) run() (retErr error) {
967967
}
968968

969969
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
970-
aAPI, tAPI, err := a.client.ConnectRPC26(a.hardCtx)
970+
aAPI, tAPI, err := a.client.ConnectRPC27(a.hardCtx)
971971
if err != nil {
972972
return err
973973
}
@@ -984,7 +984,7 @@ func (a *agent) run() (retErr error) {
984984
connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI)
985985

986986
connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop,
987-
func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
987+
func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
988988
bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{})
989989
if err != nil {
990990
return xerrors.Errorf("fetch service banner: %w", err)
@@ -1001,7 +1001,7 @@ func (a *agent) run() (retErr error) {
10011001
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
10021002
// shutdown scripts.
10031003
connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain,
1004-
func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
1004+
func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
10051005
err := a.logSender.SendLoop(ctx, aAPI)
10061006
if xerrors.Is(err, agentsdk.ErrLogLimitExceeded) {
10071007
// we don't want this error to tear down the API connection and propagate to the
@@ -1020,7 +1020,7 @@ func (a *agent) run() (retErr error) {
10201020
connMan.startAgentAPI("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)
10211021

10221022
// resources monitor can cease as soon as we start gracefully shutting down.
1023-
connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
1023+
connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
10241024
logger := a.logger.Named("resources_monitor")
10251025
clk := quartz.NewReal()
10261026
config, err := aAPI.GetResourcesMonitoringConfiguration(ctx, &proto.GetResourcesMonitoringConfigurationRequest{})
@@ -1067,7 +1067,7 @@ func (a *agent) run() (retErr error) {
10671067
connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
10681068

10691069
connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop,
1070-
func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
1070+
func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
10711071
if err := manifestOK.wait(ctx); err != nil {
10721072
return xerrors.Errorf("no manifest: %w", err)
10731073
}
@@ -1100,7 +1100,7 @@ func (a *agent) run() (retErr error) {
11001100

11011101
connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)
11021102

1103-
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
1103+
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
11041104
if err := networkOK.wait(ctx); err != nil {
11051105
return xerrors.Errorf("no network: %w", err)
11061106
}
@@ -1115,8 +1115,8 @@ func (a *agent) run() (retErr error) {
11151115
}
11161116

11171117
// handleManifest returns a function that fetches and processes the manifest
1118-
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
1119-
return func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
1118+
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
1119+
return func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
11201120
var (
11211121
sentResult = false
11221122
err error
@@ -1279,7 +1279,7 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
12791279

12801280
func (a *agent) createDevcontainer(
12811281
ctx context.Context,
1282-
aAPI proto.DRPCAgentClient26,
1282+
aAPI proto.DRPCAgentClient27,
12831283
dc codersdk.WorkspaceAgentDevcontainer,
12841284
script codersdk.WorkspaceAgentScript,
12851285
) (err error) {
@@ -1311,8 +1311,8 @@ func (a *agent) createDevcontainer(
13111311

13121312
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
13131313
// the tailnet using the information in the manifest
1314-
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient26) error {
1315-
return func(ctx context.Context, aAPI proto.DRPCAgentClient26) (retErr error) {
1314+
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient27) error {
1315+
return func(ctx context.Context, aAPI proto.DRPCAgentClient27) (retErr error) {
13161316
if err := manifestOK.wait(ctx); err != nil {
13171317
return xerrors.Errorf("no manifest: %w", err)
13181318
}
@@ -1401,6 +1401,7 @@ func (a *agent) updateCommandEnv(current []string) (updated []string, err error)
14011401
"CODER_WORKSPACE_NAME": manifest.WorkspaceName,
14021402
"CODER_WORKSPACE_AGENT_NAME": manifest.AgentName,
14031403
"CODER_WORKSPACE_OWNER_NAME": manifest.OwnerName,
1404+
"CODER_WORKSPACE_ID": manifest.WorkspaceID.String(),
14041405

14051406
// Specific Coder subcommands require the agent token exposed!
14061407
"CODER_AGENT_TOKEN": a.client.GetSessionToken(),
@@ -2098,7 +2099,7 @@ const (
20982099

20992100
type apiConnRoutineManager struct {
21002101
logger slog.Logger
2101-
aAPI proto.DRPCAgentClient26
2102+
aAPI proto.DRPCAgentClient27
21022103
tAPI tailnetproto.DRPCTailnetClient24
21032104
eg *errgroup.Group
21042105
stopCtx context.Context
@@ -2107,7 +2108,7 @@ type apiConnRoutineManager struct {
21072108

21082109
func newAPIConnRoutineManager(
21092110
gracefulCtx, hardCtx context.Context, logger slog.Logger,
2110-
aAPI proto.DRPCAgentClient26, tAPI tailnetproto.DRPCTailnetClient24,
2111+
aAPI proto.DRPCAgentClient27, tAPI tailnetproto.DRPCTailnetClient24,
21112112
) *apiConnRoutineManager {
21122113
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
21132114
// exit if the errgroup hits an error, which usually means a problem with the conn.
@@ -2140,7 +2141,7 @@ func newAPIConnRoutineManager(
21402141
// but for Tailnet.
21412142
func (a *apiConnRoutineManager) startAgentAPI(
21422143
name string, behavior gracefulShutdownBehavior,
2143-
f func(context.Context, proto.DRPCAgentClient26) error,
2144+
f func(context.Context, proto.DRPCAgentClient27) error,
21442145
) {
21452146
logger := a.logger.With(slog.F("name", name))
21462147
var ctx context.Context

agent/agentcontainers/subagent.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,12 @@ type SubAgentClient interface {
147147
// agent API client.
148148
type subAgentAPIClient struct {
149149
logger slog.Logger
150-
api agentproto.DRPCAgentClient26
150+
api agentproto.DRPCAgentClient27
151151
}
152152

153153
var _ SubAgentClient = (*subAgentAPIClient)(nil)
154154

155-
func NewSubAgentClientFromAPI(logger slog.Logger, agentAPI agentproto.DRPCAgentClient26) SubAgentClient {
155+
func NewSubAgentClientFromAPI(logger slog.Logger, agentAPI agentproto.DRPCAgentClient27) SubAgentClient {
156156
if agentAPI == nil {
157157
panic("developer error: agentAPI cannot be nil")
158158
}

agent/agentcontainers/subagent_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestSubAgentClient_CreateWithDisplayApps(t *testing.T) {
8181

8282
agentAPI := agenttest.NewClient(t, logger, uuid.New(), agentsdk.Manifest{}, statsCh, tailnet.NewCoordinator(logger))
8383

84-
agentClient, _, err := agentAPI.ConnectRPC26(ctx)
84+
agentClient, _, err := agentAPI.ConnectRPC27(ctx)
8585
require.NoError(t, err)
8686

8787
subAgentClient := agentcontainers.NewSubAgentClientFromAPI(logger, agentClient)
@@ -245,7 +245,7 @@ func TestSubAgentClient_CreateWithDisplayApps(t *testing.T) {
245245

246246
agentAPI := agenttest.NewClient(t, logger, uuid.New(), agentsdk.Manifest{}, statsCh, tailnet.NewCoordinator(logger))
247247

248-
agentClient, _, err := agentAPI.ConnectRPC26(ctx)
248+
agentClient, _, err := agentAPI.ConnectRPC27(ctx)
249249
require.NoError(t, err)
250250

251251
subAgentClient := agentcontainers.NewSubAgentClientFromAPI(logger, agentClient)

agent/agenttest/client.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ func (c *Client) Close() {
124124
c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
125125
}
126126

127-
func (c *Client) ConnectRPC26(ctx context.Context) (
128-
agentproto.DRPCAgentClient26, proto.DRPCTailnetClient26, error,
127+
func (c *Client) ConnectRPC27(ctx context.Context) (
128+
agentproto.DRPCAgentClient27, proto.DRPCTailnetClient26, error,
129129
) {
130130
conn, lis := drpcsdk.MemTransportPipe()
131131
c.LastWorkspaceAgent = func() {
@@ -405,6 +405,10 @@ func (f *FakeAgentAPI) ReportConnection(_ context.Context, req *agentproto.Repor
405405
return &emptypb.Empty{}, nil
406406
}
407407

408+
func (*FakeAgentAPI) ReportBoundaryLogs(_ context.Context, _ *agentproto.ReportBoundaryLogsRequest) (*agentproto.ReportBoundaryLogsResponse, error) {
409+
return &agentproto.ReportBoundaryLogsResponse{}, nil
410+
}
411+
408412
func (f *FakeAgentAPI) GetConnectionReports() []*agentproto.ReportConnectionRequest {
409413
f.Lock()
410414
defer f.Unlock()

agent/boundarylogproxy/proxy.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Package boundarylogproxy provides a Unix socket server that receives boundary
2+
// audit logs and forwards them to coderd via the agent API.
3+
package boundarylogproxy
4+
5+
// Server a placeholder for the server that will listen on a Unix socket for
6+
// boundary logs to be forwarded.
7+
type Server struct{}

0 commit comments

Comments
 (0)