Refactor sending out of messages

This commit is contained in:
Daniel Petri Rocha 2022-12-18 19:43:14 +01:00
parent acd315435a
commit e11d2e119c
5 changed files with 125 additions and 92 deletions

View File

@ -1,13 +1,13 @@
const config = require('../../config');
const { level } = config.log;
const {level} = config.log;
const trace = level.toLowerCase() === 'trace';
const debug = trace || level.toLowerCase() === 'debug';
const date = () => new Date().toISOString();
const parse = (messages) => {
return messages.map(message => {
return messages.map((message) => {
if (typeof message === 'object') return JSON.stringify(message);
return message;

View File

@ -0,0 +1,88 @@
const config = require('../../config');
const EXPORT_STATUSES = Object.freeze({
COLLECTING: 'COLLECTING',
PROCESSING: 'PROCESSING',
});
class PresAnnStatusMsg {
constructor(exportJob, status = EXPORT_STATUSES.COLLECTING) {
this.message = {
envelope: {
name: config.log.msgName,
routing: {
sender: exportJob.module,
},
timestamp: (new Date()).getTime(),
},
core: {
header: {
name: config.log.msgName,
meetingId: exportJob.parentMeetingId,
userId: '',
},
body: {
presId: exportJob.presId,
pageNumber: 1,
totalPages: JSON.parse(exportJob.pages).length,
status,
error: false,
},
},
};
}
build = (pageNumber = 1) => {
this.message.core.body.pageNumber = pageNumber;
this.message.envelope.timestamp = (new Date()).getTime();
const event = JSON.stringify(this.message);
this.message.core.body.error = false;
return event;
};
setError = (error = true) => {
this.message.core.body.error = error;
};
setStatus = (status) => {
this.message.core.body.status = status;
};
static get EXPORT_STATUSES() {
return EXPORT_STATUSES;
}
};
class NewPresAnnFileAvailableMsg {
constructor(exportJob, link) {
this.message = {
envelope: {
name: config.notifier.msgName,
routing: {
sender: exportJob.module,
},
timestamp: (new Date()).getTime(),
},
core: {
header: {
name: config.notifier.msgName,
meetingId: exportJob.parentMeetingId,
userId: '',
},
body: {
fileURI: link,
presId: exportJob.presId,
},
},
};
}
build = () => {
return JSON.stringify(this.message);
};
};
module.exports = {
PresAnnStatusMsg,
NewPresAnnFileAvailableMsg,
};

View File

@ -8,6 +8,7 @@ const redis = require('redis');
const sanitize = require('sanitize-filename');
const stream = require('stream');
const WorkerStarter = require('../lib/utils/worker-starter');
const {PresAnnStatusMsg} = require('../lib/utils/message-builder');
const {workerData} = require('worker_threads');
const {promisify} = require('util');
@ -55,29 +56,7 @@ async function collectAnnotationsFromRedis() {
const pdfFile = `${presFile}.pdf`;
// Message to display conversion progress toast
const statusUpdate = {
envelope: {
name: config.log.msgName,
routing: {
sender: exportJob.module,
},
timestamp: (new Date()).getTime(),
},
core: {
header: {
name: config.log.msgName,
meetingId: exportJob.parentMeetingId,
userId: '',
},
body: {
presId: exportJob.presId,
pageNumber: 1,
totalPages: pages.length,
status: 'COLLECTING',
error: false,
},
},
};
const statusUpdate = new PresAnnStatusMsg(exportJob);
if (fs.existsSync(pdfFile)) {
for (const p of pages) {
@ -103,35 +82,32 @@ async function collectAnnotationsFromRedis() {
try {
cp.spawnSync(config.shared.pdftocairo, extract_png_from_pdf, {shell: false});
} catch (error) {
const error_reason = `PDFtoCairo failed extracting slide ${pageNumber}`;
logger.error(`${error_reason} in job ${jobId}: ${error.message}`);
statusUpdate.core.body.status = error_reason;
statusUpdate.core.body.error = true;
logger.error(`PDFtoCairo failed extracting slide ${pageNumber} in job ${jobId}: ${error.message}`);
statusUpdate.setError();
}
statusUpdate.core.body.pageNumber = pageNumber;
statusUpdate.envelope.timestamp = (new Date()).getTime();
await client.publish(config.redis.channels.publish, JSON.stringify(statusUpdate));
statusUpdate.core.body.error = false;
await client.publish(config.redis.channels.publish, statusUpdate.build(pageNumber));
}
// If PNG file already available
} else if (fs.existsSync(`${presFile}.png`)) {
fs.copyFileSync(`${presFile}.png`, path.join(dropbox, 'slide1.png'));
await client.publish(config.redis.channels.publish, JSON.stringify(statusUpdate));
// If JPEG file available
} else if (fs.existsSync(`${presFile}.jpeg`)) {
fs.copyFileSync(`${presFile}.jpeg`, path.join(dropbox, 'slide1.jpeg'));
await client.publish(config.redis.channels.publish, JSON.stringify(statusUpdate));
} else {
statusUpdate.core.body.error = true;
await client.publish(config.redis.channels.publish, JSON.stringify(statusUpdate));
client.disconnect();
return logger.error(`Presentation file missing for job ${exportJob.jobId}`);
if (fs.existsSync(`${presFile}.png`)) {
// PNG file available
fs.copyFileSync(`${presFile}.png`, path.join(dropbox, 'slide1.png'));
} else if (fs.existsSync(`${presFile}.jpeg`)) {
// JPEG file available
fs.copyFileSync(`${presFile}.jpeg`, path.join(dropbox, 'slide1.jpeg'));
await client.publish(config.redis.channels.publish, statusUpdate.build());
} else {
await client.publish(config.redis.channels.publish, statusUpdate.build());
client.disconnect();
return logger.error(`No PDF, PNG or JPEG file available for job ${jobId}`);
}
await client.publish(config.redis.channels.publish, statusUpdate.build());
}
client.disconnect();
const process = new WorkerStarter({jobId, statusUpdate});
const process = new WorkerStarter({jobId});
process.process();
}

View File

@ -5,6 +5,7 @@ const FormData = require('form-data');
const redis = require('redis');
const axios = require('axios').default;
const path = require('path');
const {NewPresAnnFileAvailableMsg} = require('../lib/utils/message-builder');
const {workerData} = require('worker_threads');
const [jobType, jobId, filename] = [workerData.jobType, workerData.jobId, workerData.filename];
@ -31,30 +32,10 @@ async function notifyMeetingActor() {
exportJob.parentMeetingId, exportJob.parentMeetingId,
exportJob.presId, 'pdf', jobId, filename);
const notification = {
envelope: {
name: config.notifier.msgName,
routing: {
sender: exportJob.module,
},
timestamp: (new Date()).getTime(),
},
core: {
header: {
name: config.notifier.msgName,
meetingId: exportJob.parentMeetingId,
userId: '',
},
body: {
fileURI: link,
presId: exportJob.presId,
},
},
};
const notification = new NewPresAnnFileAvailableMsg(exportJob, link);
logger.info(`Annotated PDF available at ${link}`);
await client.publish(config.redis.channels.publish,
JSON.stringify(notification));
await client.publish(config.redis.channels.publish, notification.build());
client.disconnect();
}

View File

@ -10,14 +10,16 @@ const sanitize = require('sanitize-filename');
const {getStrokePoints, getStrokeOutlinePoints} = require('perfect-freehand');
const probe = require('probe-image-size');
const redis = require('redis');
const {PresAnnStatusMsg} = require('../lib/utils/message-builder');
const [jobId, statusUpdate] = [workerData.jobId, workerData.statusUpdate];
const jobId = workerData.jobId;
const logger = new Logger('presAnn Process Worker');
logger.info('Processing PDF for job ' + jobId);
statusUpdate.core.body.status = 'PROCESSING';
const dropbox = path.join(config.shared.presAnnDropboxDir, jobId);
const job = fs.readFileSync(path.join(dropbox, 'job'));
const exportJob = JSON.parse(job);
const statusUpdate = new PresAnnStatusMsg(exportJob, PresAnnStatusMsg.EXPORT_STATUSES.PROCESSING);
// General utilities for rendering SVGs resembling Tldraw as much as possible
function align_to_pango(alignment) {
@ -157,10 +159,8 @@ function render_textbox(textColor, font, fontSize, textAlign, text, id, textBoxW
try {
cp.spawnSync(config.shared.imagemagick, commands, {shell: false});
} catch (error) {
const error_reason = 'ImageMagick failed to render textbox';
logger.error(`${error_reason} in job ${jobId}: ${error.message}`);
statusUpdate.core.body.status = error_reason;
statusUpdate.core.body.error = true;
logger.error(`ImageMagick failed to render textbox in job ${jobId}: ${error.message}`);
statusUpdate.setError();
}
}
@ -787,17 +787,13 @@ async function process_presentation_annotations() {
client.on('error', (err) => logger.info('Redis Client Error', err));
// 1. Get the job
const job = fs.readFileSync(path.join(dropbox, 'job'));
const exportJob = JSON.parse(job);
// 2. Get the annotations
// Get the annotations
const annotations = fs.readFileSync(path.join(dropbox, 'whiteboard'));
const whiteboard = JSON.parse(annotations);
const pages = JSON.parse(whiteboard.pages);
const ghostScriptInput = [];
// 3. Convert annotations to SVG
// Convert annotations to SVG
for (const currentSlide of pages) {
const bgImagePath = path.join(dropbox, `slide${currentSlide.page}`);
const svgBackgroundSlide = path.join(exportJob.presLocation,
@ -866,15 +862,11 @@ async function process_presentation_annotations() {
cp.spawnSync(config.shared.cairosvg, convertAnnotatedSlide, {shell: false});
} catch (error) {
logger.error(`Processing slide ${currentSlide.page} failed for job ${jobId}: ${error.message}`);
statusUpdate.core.body.error = true;
statusUpdate.setError();
}
statusUpdate.core.body.pageNumber = currentSlide.page;
statusUpdate.envelope.timestamp = (new Date()).getTime();
await client.publish(config.redis.channels.publish, JSON.stringify(statusUpdate));
await client.publish(config.redis.channels.publish, statusUpdate.build(currentSlide.page));
ghostScriptInput.push(PDFfile);
statusUpdate.core.body.error = false;
}
// Create PDF output directory if it doesn't exist
@ -896,11 +888,7 @@ async function process_presentation_annotations() {
try {
cp.spawnSync(config.shared.ghostscript, mergePDFs, {shell: false});
} catch (error) {
const error_reason = 'GhostScript failed to merge PDFs';
logger.error(`${error_reason} in job ${jobId}: ${error.message}`);
statusUpdate.core.body.status = error_reason;
statusUpdate.core.body.error = true;
await client.publish(config.redis.channels.publish, JSON.stringify(statusUpdate));
return logger.error(`GhostScript failed to merge PDFs in job ${jobId}: ${error.message}`);
}
// Launch Notifier Worker depending on job type