From ec165d2538ccab4c2e9d37c3fe12e7828c9d4551 Mon Sep 17 00:00:00 2001 From: Stanislas Lange Date: Sun, 23 May 2021 13:39:56 +0000 Subject: [PATCH] Go back to redis for job handling? --- job.go | 97 ++++++++++++++----------------------------- job_queue_rabbitmq.go | 18 ++++---- job_queue_redis.go | 8 ++-- main.go | 37 ++++++++++------- 4 files changed, 66 insertions(+), 94 deletions(-) diff --git a/job.go b/job.go index c667685..c642c42 100644 --- a/job.go +++ b/job.go @@ -12,14 +12,8 @@ import ( func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker) { log.WithField("job", job).Info("Handling job") - err := q.getQueueForJob(ctx, job) - if err != nil { - log.WithError(err).Fatal("Failed to get status queue") - return - } - - err = q.setjobReceived(ctx, job) - if err != nil { + res, err := q.setjobReceived(ctx, job) + if err != nil && res != 1 { q.setjobFailed(ctx, job) return } @@ -35,74 +29,45 @@ func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker) defer vm.shutDown() var reqJSON []byte - switch job.Type { - case "command": - reqJSON, err = json.Marshal(agentExecReq{ - ID: job.ID, - Command: job.Command, - }) - if err != nil { - log.WithError(err).Error("Failed to marshal JSON request") - q.setjobFailed(ctx, job) - return - } - case "code": - reqJSON, err = json.Marshal(agentRunReq{ - ID: job.ID, - Code: job.Code, - }) - if err != nil { - log.WithError(err).Error("Failed to marshal JSON request") - q.setjobFailed(ctx, job) - return - } + reqJSON, err = json.Marshal(agentRunReq{ + ID: job.ID, + Variant: job.Variant, + Code: job.Code, + }) + if err != nil { + log.WithError(err).Error("Failed to marshal JSON request") + q.setjobFailed(ctx, job) + return } - err = q.setjobRunning(ctx, job) - if err != nil { + res, err = q.setjobRunning(ctx, job) + if err != nil && res != 1 { q.setjobFailed(ctx, job) return } var httpRes *http.Response - var res agentExecRes - - switch job.Type { - case "command": - httpRes, err = http.Post("http://"+vm.ip.String()+":8080/exec", "application/json", bytes.NewBuffer(reqJSON)) - if err != nil || httpRes.StatusCode != 200 { - log.WithError(err).Error("Failed to request execution to agent") - q.setjobFailed(ctx, job) - return - } - json.NewDecoder(httpRes.Body).Decode(&res) - log.WithField("result", res).Info("Job execution finished") + var agentRes agentExecRes - err = q.setjobResult(ctx, job, res) - if err != nil { - q.setjobFailed(ctx, job) - } - - case "code": - httpRes, err = http.Post("http://"+vm.ip.String()+":8080/run/c", "application/json", bytes.NewBuffer(reqJSON)) - if err != nil { - log.WithError(err).Error("Failed to request execution to agent") - q.setjobFailed(ctx, job) - return - } - json.NewDecoder(httpRes.Body).Decode(&res) - log.WithField("result", res).Info("Job execution finished") + // FIXME + httpRes, err = http.Post("http://"+vm.ip.String()+":8080/run/python", "application/json", bytes.NewBuffer(reqJSON)) + if err != nil { + log.WithError(err).Error("Failed to request execution to agent") + q.setjobFailed(ctx, job) + return + } + json.NewDecoder(httpRes.Body).Decode(&agentRes) + log.WithField("result", agentRes).Info("Job execution finished") - if httpRes.StatusCode != 200 { - log.WithField("res", res).Error("Failed to compile and run code") - q.setjobFailed(ctx, job) - return - } + if httpRes.StatusCode != 200 { + log.WithField("res", agentRes).Error("Failed to compile and run code") + q.setjobFailed(ctx, job) + return + } - err = q.setjobResult(ctx, job, res) - if err != nil { - q.setjobFailed(ctx, job) - } + res, err = q.setjobResult(ctx, job, agentRes) + if err != nil && res != 1 { + q.setjobFailed(ctx, job) } } diff --git a/job_queue_rabbitmq.go b/job_queue_rabbitmq.go index 6877a24..1282b61 100644 --- a/job_queue_rabbitmq.go +++ b/job_queue_rabbitmq.go @@ -82,11 +82,10 @@ func (q jobQueue) getQueueForJob(ctx context.Context, job benchJob) error { func (q jobQueue) setjobStatus(ctx context.Context, job benchJob, status string) error { log.WithField("status", status).Info("Set job status") jobStatus := &jobStatus{ - ID: job.ID, - Status: status, - Command: job.Command, - StdErr: "", - StdOut: "", + ID: job.ID, + Status: status, + StdErr: "", + StdOut: "", } b, err := json.Marshal(jobStatus) if err != nil { @@ -117,11 +116,10 @@ func (q jobQueue) setjobFailed(ctx context.Context, job benchJob) error { } func (q jobQueue) setjobResult(ctx context.Context, job benchJob, res agentExecRes) error { jobStatus := &jobStatus{ - ID: job.ID, - Status: "done", - Command: job.Command, - StdErr: res.StdErr, - StdOut: res.StdOut, + ID: job.ID, + Status: "done", + StdErr: res.StdErr, + StdOut: res.StdOut, } log.WithField("jobStatus", jobStatus).Info("Set job result") diff --git a/job_queue_redis.go b/job_queue_redis.go index d8508fe..b303c15 100644 --- a/job_queue_redis.go +++ b/job_queue_redis.go @@ -19,7 +19,7 @@ func (q jobQueueRedis) getJob(ctx context.Context) ([]string, error) { } func (q jobQueueRedis) setjobReceived(ctx context.Context, job benchJob) (int64, error) { - result, err := q.redis.RPush(ctx, job.ID, "received").Result() + result, err := q.redis.RPush(ctx, "worker_job_status_"+job.ID, "received").Result() if err != nil { log.WithError(err).Error("Failed to set job status in queue") @@ -32,7 +32,7 @@ func (q jobQueueRedis) setjobReceived(ctx context.Context, job benchJob) (int64, } func (q jobQueueRedis) setjobRunning(ctx context.Context, job benchJob) (int64, error) { - result, err := q.redis.RPush(ctx, job.ID, "running").Result() + result, err := q.redis.RPush(ctx, "worker_job_status_"+job.ID, "running").Result() if err != nil { log.WithError(err).Error("Failed to set job status in queue") @@ -45,7 +45,7 @@ func (q jobQueueRedis) setjobRunning(ctx context.Context, job benchJob) (int64, } func (q jobQueueRedis) setjobFailed(ctx context.Context, job benchJob) (int64, error) { - result, err := q.redis.RPush(ctx, job.ID, "failed").Result() + result, err := q.redis.RPush(ctx, "worker_job_status_"+job.ID, "failed").Result() if err != nil { log.WithError(err).Error("Failed to set job status in queue") @@ -58,7 +58,7 @@ func (q jobQueueRedis) setjobFailed(ctx context.Context, job benchJob) (int64, e } func (q jobQueueRedis) setjobResult(ctx context.Context, job benchJob, res agentExecRes) (int64, error) { - result, err := q.redis.RPush(ctx, job.ID, "done", res.StdOut, res.StdErr).Result() + result, err := q.redis.RPush(ctx, "worker_job_status_"+job.ID, "done", res.StdOut, res.StdErr).Result() if err != nil { log.WithError(err).Error("Failed to set job status in queue") diff --git a/main.go b/main.go index 9f0bf12..b4a0011 100644 --- a/main.go +++ b/main.go @@ -13,13 +13,13 @@ import ( "syscall" firecracker "github.com/firecracker-microvm/firecracker-go-sdk" + "github.com/go-redis/redis/v8" log "github.com/sirupsen/logrus" ) type benchJob struct { ID string `json:"id"` - Type string `json:"type"` - Command string `json:"command"` + Variant string `json:"variant"` Code string `json:"code"` } @@ -29,8 +29,9 @@ type agentExecReq struct { } type agentRunReq struct { - ID string `json:"id"` - Code string `json:"code"` + ID string `json:"id"` + Variant string `json:"variant"` + Code string `json:"code"` } type agentExecRes struct { @@ -46,7 +47,7 @@ type runningFirecracker struct { } var ( - q jobQueue + q jobQueueRedis ) func main() { @@ -54,24 +55,32 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - WarmVMs := make(chan runningFirecracker, 1) + WarmVMs := make(chan runningFirecracker, 0) go fillVMPool(ctx, WarmVMs) installSignalHandlers() log.SetReportCaller(true) - q = newJobQueue("amqp://admin:admin@localhost:5672/") - defer q.ch.Close() - defer q.conn.Close() + redisClient := redis.NewClient(&redis.Options{ + Addr: "localhost:6378", + }) + + q = jobQueueRedis{redis: redisClient} - log.Info("Waiting for RabbitMQ jobs...") - for d := range q.jobs { - log.Printf("Received a message: %s", d.Body) + fmt.Println("Waiting for jobs on redis job queue") + for { var job benchJob - err := json.Unmarshal([]byte(d.Body), &job) + result, err := q.getJob(ctx) + if err != nil { - log.WithError(err).Error("Received invalid job") + log.WithError(err).Error("Failed to get job from redis queue") + continue + } + err = json.Unmarshal([]byte(result[1]), &job) + if err != nil || job.ID == "" || job.Code == "" || job.Variant == "" { + log.WithError(err).WithField("job", result[1]).Error("Failed to unmarshal job") + q.setjobFailed(ctx, job) continue }