From 973514036898ca8ccd19581054b86e35ecda89be Mon Sep 17 00:00:00 2001 From: Stanislas Lange Date: Wed, 12 May 2021 18:29:22 +0000 Subject: [PATCH] wip --- src/routes/try.route.ts | 86 +++++++++++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 24 deletions(-) diff --git a/src/routes/try.route.ts b/src/routes/try.route.ts index 2368316..cf8fdbb 100644 --- a/src/routes/try.route.ts +++ b/src/routes/try.route.ts @@ -24,16 +24,18 @@ module.exports = function (app: express.Application) { try { const conn = await amqp.connect(amqpEndpint, option); const chan = await conn.createChannel(); - const chan_status = await conn.createChannel(); const queue = process.env.RABBITMQ_QUEUE || 'jobs'; // Prepare status channel/queue/exchange/whatever + const chan_status = await conn.createChannel(); chan_status.assertExchange('job_status', 'direct', { durable: false, }); + const status_queue = await chan_status.assertQueue('job_status', { durable: true, }); + chan_status.bindQueue(status_queue.queue, 'job_status', job.id); // Send job to RabbitMQ @@ -49,32 +51,68 @@ module.exports = function (app: express.Application) { } ); - // Wait for job to be processed by getting status from queue - await chan_status.consume(status_queue.queue, (msg) => { - if (msg != null) { - console.log(msg.content.toString()); - chan_status.ack(msg); - const parsedRes = JSON.parse(msg.content.toString()); - if (parsedRes.status == 'done') { - res.status(200).json({ - message: 'Code successfully sent to message broker', - id: job.id, - stderr: parsedRes.stderr, - stdout: parsedRes.stdout, - }); - return; - } - if (parsedRes.status == 'failed') { - res.status(500).json({ - message: - 'Failed to send code your code for execution', - }); - return; + await chan.close(); + + let msg = await chan_status.get(status_queue.queue, {}); + if (msg != false) { + console.log(msg.content.toString()); + chan_status.ack(msg); + let parsedRes = JSON.parse(msg.content.toString()); + while (parsedRes.status != 'done') { + msg = await chan_status.get(status_queue.queue, {}); + if (msg != false) { + console.log(msg.content.toString()); + chan_status.ack(msg); + parsedRes = JSON.parse(msg.content.toString()); } } - }); - await chan.close(); + await chan_status.unbindQueue( + status_queue.queue, + 'job_status', + job.id + ); + + res.status(200).json({ + message: 'Code successfully sent to message broker', + id: job.id, + stderr: parsedRes.stderr, + stdout: parsedRes.stdout, + }); + } + // chan_status.prefetch(1) + // Wait for job to be processed by getting status from queue + // await chan_status.consume(status_queue.queue, async (msg) => { + // if (msg != null) { + // console.log(msg.content.toString()); + // chan_status.ack(msg); + // // await new Promise((r) => setTimeout(r, 1000)); + // // setTimeout(function () {}, 1000); + // const parsedRes = JSON.parse(msg.content.toString()); + // if (parsedRes.status == 'done') { + // res.status(200).json({ + // message: 'Code successfully sent to message broker', + // id: job.id, + // stderr: parsedRes.stderr, + // stdout: parsedRes.stdout, + // }); + // await chan_status.close(); + // return; + // } + // if (parsedRes.status == 'failed') { + // res.status(500).json({ + // message: + // 'Failed to send code your code for execution', + // }); + // await chan_status.close(); + + // return; + // } + // } + // await new Promise((r) => setTimeout(r, 5000)); + // // setTimeout(function () {}, 1000); + // }); + await chan_status.close(); await conn.close(); } catch (e) {