Skip to content

Commit 00b043e

Browse files
authored
refactor: rework rabbitmq job handling & structs (#6)
1 parent e9e5880 commit 00b043e

File tree

3 files changed

+101
-106
lines changed

3 files changed

+101
-106
lines changed

job.go

Lines changed: 29 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,7 @@ import (
1212
func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker) {
1313
log.WithField("job", job).Info("Handling job")
1414

15-
err := q.getQueueForJob(ctx, job)
16-
if err != nil {
17-
log.WithError(err).Fatal("Failed to get status queue")
18-
return
19-
}
20-
21-
err = q.setjobReceived(ctx, job)
15+
err := q.setjobReceived(ctx, job)
2216
if err != nil {
2317
q.setjobFailed(ctx, job)
2418
return
@@ -35,27 +29,16 @@ func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker)
3529
defer vm.shutDown()
3630

3731
var reqJSON []byte
38-
switch job.Type {
39-
case "command":
40-
reqJSON, err = json.Marshal(agentExecReq{
41-
ID: job.ID,
42-
Command: job.Command,
43-
})
44-
if err != nil {
45-
log.WithError(err).Error("Failed to marshal JSON request")
46-
q.setjobFailed(ctx, job)
47-
return
48-
}
49-
case "code":
50-
reqJSON, err = json.Marshal(agentRunReq{
51-
ID: job.ID,
52-
Code: job.Code,
53-
})
54-
if err != nil {
55-
log.WithError(err).Error("Failed to marshal JSON request")
56-
q.setjobFailed(ctx, job)
57-
return
58-
}
32+
33+
reqJSON, err = json.Marshal(agentRunReq{
34+
ID: job.ID,
35+
Variant: job.Variant,
36+
Code: job.Code,
37+
})
38+
if err != nil {
39+
log.WithError(err).Error("Failed to marshal JSON request")
40+
q.setjobFailed(ctx, job)
41+
return
5942
}
6043

6144
err = q.setjobRunning(ctx, job)
@@ -65,44 +48,26 @@ func (job benchJob) run(ctx context.Context, WarmVMs <-chan runningFirecracker)
6548
}
6649

6750
var httpRes *http.Response
68-
var res agentExecRes
69-
70-
switch job.Type {
71-
case "command":
72-
httpRes, err = http.Post("http://"+vm.ip.String()+":8080/exec", "application/json", bytes.NewBuffer(reqJSON))
73-
if err != nil || httpRes.StatusCode != 200 {
74-
log.WithError(err).Error("Failed to request execution to agent")
75-
q.setjobFailed(ctx, job)
76-
return
77-
}
78-
json.NewDecoder(httpRes.Body).Decode(&res)
79-
log.WithField("result", res).Info("Job execution finished")
80-
81-
err = q.setjobResult(ctx, job, res)
82-
if err != nil {
83-
q.setjobFailed(ctx, job)
84-
}
51+
var agentRes agentExecRes
8552

86-
case "code":
87-
httpRes, err = http.Post("http://"+vm.ip.String()+":8080/run/c", "application/json", bytes.NewBuffer(reqJSON))
88-
if err != nil {
89-
log.WithError(err).Error("Failed to request execution to agent")
90-
q.setjobFailed(ctx, job)
91-
return
92-
}
93-
json.NewDecoder(httpRes.Body).Decode(&res)
94-
log.WithField("result", res).Info("Job execution finished")
95-
96-
if httpRes.StatusCode != 200 {
97-
log.WithField("res", res).Error("Failed to compile and run code")
98-
q.setjobFailed(ctx, job)
99-
return
100-
}
53+
// FIXME
54+
httpRes, err = http.Post("http://"+vm.ip.String()+":8080/run/python", "application/json", bytes.NewBuffer(reqJSON))
55+
if err != nil {
56+
log.WithError(err).Error("Failed to request execution to agent")
57+
q.setjobFailed(ctx, job)
58+
return
59+
}
60+
json.NewDecoder(httpRes.Body).Decode(&agentRes)
61+
log.WithField("result", agentRes).Info("Job execution finished")
62+
if httpRes.StatusCode != 200 {
63+
log.WithField("res", agentRes).Error("Failed to compile and run code")
64+
q.setjobFailed(ctx, job)
65+
return
66+
}
10167

102-
err = q.setjobResult(ctx, job, res)
103-
if err != nil {
104-
q.setjobFailed(ctx, job)
105-
}
68+
err = q.setjobResult(ctx, job, agentRes)
69+
if err != nil {
70+
q.setjobFailed(ctx, job)
10671
}
10772

10873
}

job_queue_rabbitmq.go

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@ type jobQueue struct {
1616
}
1717

1818
type jobStatus struct {
19-
ID string `json:"id"`
20-
Status string `json:"status"`
21-
Command string `json:"command"`
22-
StdErr string `json:"stderr"`
23-
StdOut string `json:"stdout"`
19+
ID string `json:"id"`
20+
Status string `json:"status"`
21+
StdErr string `json:"stderr"`
22+
StdOut string `json:"stdout"`
2423
}
2524

2625
func newJobQueue(endpoint string) jobQueue {
@@ -34,18 +33,40 @@ func newJobQueue(endpoint string) jobQueue {
3433
log.WithError(err).Fatal("Failed to open a channel")
3534
}
3635

36+
err = ch.ExchangeDeclare(
37+
"jobs_ex", // name
38+
"direct", // type
39+
true, // durable
40+
false, // auto-deleted
41+
false, // internal
42+
false, // no-wait
43+
nil, // arguments
44+
)
45+
if err != nil {
46+
log.WithError(err).Fatal("Failed to declare an exchange")
47+
}
48+
3749
jobsQ, err := ch.QueueDeclare(
38-
"jobs", // name
39-
true, // durable
40-
false, // delete when unused
41-
false, // exclusive
42-
false, // no-wait
43-
nil, // arguments
50+
"jobs_q", // name
51+
true, // durable
52+
false, // delete when unused
53+
false, // exclusive
54+
false, // no-wait
55+
nil, // arguments
4456
)
4557
if err != nil {
4658
log.WithError(err).Fatal("Failed to declare a queue")
4759
}
4860

61+
err = ch.QueueBind(
62+
jobsQ.Name, // queue name
63+
"jobs_rk", // routing key
64+
"jobs_ex", // exchange
65+
false,
66+
nil)
67+
if err != nil {
68+
log.WithError(err).Fatal("Failed to bind a queue")
69+
}
4970
jobs, err := ch.Consume(
5071
jobsQ.Name, // queue
5172
"", // consumer
@@ -67,36 +88,35 @@ func newJobQueue(endpoint string) jobQueue {
6788
}
6889
}
6990

70-
func (q jobQueue) getQueueForJob(ctx context.Context, job benchJob) error {
91+
func (q jobQueue) getQueueForJob(ctx context.Context) error {
7192
return q.ch.ExchangeDeclare(
72-
"job_status", // name
73-
"direct", // type
74-
false, // durable
75-
false, // auto-deleted
76-
false, // internal
77-
false, // no-wait
78-
nil, // arguments
93+
"jobs_status_ex", // name
94+
"direct", // type
95+
false, // durable
96+
false, // auto-deleted
97+
false, // internal
98+
false, // no-wait
99+
nil, // arguments
79100
)
80101
}
81102

82103
func (q jobQueue) setjobStatus(ctx context.Context, job benchJob, status string) error {
83104
log.WithField("status", status).Info("Set job status")
84105
jobStatus := &jobStatus{
85-
ID: job.ID,
86-
Status: status,
87-
Command: job.Command,
88-
StdErr: "",
89-
StdOut: "",
106+
ID: job.ID,
107+
Status: status,
108+
StdErr: "",
109+
StdOut: "",
90110
}
91111
b, err := json.Marshal(jobStatus)
92112
if err != nil {
93113
return err
94114
}
95115
err = q.ch.Publish(
96-
"job_status", // exchange
97-
job.ID, // routing key
98-
false, // mandatory
99-
false, // immediate
116+
"jobs_status_ex", // exchange
117+
"jobs_status_rk", // routing key
118+
false, // mandatory
119+
false, // immediate
100120
amqp.Publishing{
101121
ContentType: "text/plain",
102122
Body: b,
@@ -117,11 +137,10 @@ func (q jobQueue) setjobFailed(ctx context.Context, job benchJob) error {
117137
}
118138
func (q jobQueue) setjobResult(ctx context.Context, job benchJob, res agentExecRes) error {
119139
jobStatus := &jobStatus{
120-
ID: job.ID,
121-
Status: "done",
122-
Command: job.Command,
123-
StdErr: res.StdErr,
124-
StdOut: res.StdOut,
140+
ID: job.ID,
141+
Status: "done",
142+
StdErr: res.StdErr,
143+
StdOut: res.StdOut,
125144
}
126145
log.WithField("jobStatus", jobStatus).Info("Set job result")
127146

@@ -130,10 +149,10 @@ func (q jobQueue) setjobResult(ctx context.Context, job benchJob, res agentExecR
130149
return err
131150
}
132151
err = q.ch.Publish(
133-
"job_status", // exchange
134-
job.ID, // routing key
135-
false, // mandatory
136-
false, // immediate
152+
"jobs_status_ex", // exchange
153+
"jobs_status_rk", // routing key
154+
false, // mandatory
155+
false, // immediate
137156
amqp.Publishing{
138157
ContentType: "text/plain",
139158
Body: b,

main.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ import (
1313
"syscall"
1414

1515
firecracker "github.com/firecracker-microvm/firecracker-go-sdk"
16+
"github.com/sirupsen/logrus"
1617
log "github.com/sirupsen/logrus"
1718
)
1819

1920
type benchJob struct {
2021
ID string `json:"id"`
21-
Type string `json:"type"`
22-
Command string `json:"command"`
22+
Variant string `json:"variant"`
2323
Code string `json:"code"`
2424
}
2525

@@ -29,8 +29,9 @@ type agentExecReq struct {
2929
}
3030

3131
type agentRunReq struct {
32-
ID string `json:"id"`
33-
Code string `json:"code"`
32+
ID string `json:"id"`
33+
Variant string `json:"variant"`
34+
Code string `json:"code"`
3435
}
3536

3637
type agentExecRes struct {
@@ -60,10 +61,20 @@ func main() {
6061
installSignalHandlers()
6162
log.SetReportCaller(true)
6263

63-
q = newJobQueue("amqp://admin:admin@localhost:5672/")
64+
rabbitMQURL := os.Getenv("RABBITMQ_URL")
65+
if len(rabbitMQURL) == 0 {
66+
logrus.Fatal("Missing RABBITMQ_URL env variable")
67+
}
68+
q = newJobQueue(rabbitMQURL)
6469
defer q.ch.Close()
6570
defer q.conn.Close()
6671

72+
err := q.getQueueForJob(ctx)
73+
if err != nil {
74+
log.WithError(err).Fatal("Failed to get status queue")
75+
return
76+
}
77+
6778
log.Info("Waiting for RabbitMQ jobs...")
6879
for d := range q.jobs {
6980
log.Printf("Received a message: %s", d.Body)

0 commit comments

Comments
 (0)