diff --git a/lambda.js b/lambda.js index 9882de501..77cc22f58 100644 --- a/lambda.js +++ b/lambda.js @@ -107,27 +107,7 @@ exports.handler = async (event, context) => { const job = jobs[event.command]; // behavior and arguments documented here: // https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property - const result = await job(event, function dispatcher( - dataToSend, - callback - ) { - const lambda = new AWS.Lambda(); - return lambda.invoke( - { - FunctionName: functionName, - InvocationType: "Event", //asynchronous - Payload: JSON.stringify(dataToSend) - }, - function(err, dataReceived) { - if (err) { - console.error("Failed to invoke Lambda job: ", err); - } - if (callback) { - callback(err, dataReceived); - } - } - ); - }); + const result = await job(event, context); return result; } else { console.error("Unfound command sent as a Lambda event: " + event.command); diff --git a/src/workers/job-processes.js b/src/workers/job-processes.js index 6592bc8a4..5f67d3167 100644 --- a/src/workers/job-processes.js +++ b/src/workers/job-processes.js @@ -60,6 +60,7 @@ export const invokeJobFunction = async job => { }; export async function processJobs() { + // DEPRECATED -- switch to job dispatchers. See src/extensions/job-runners/README.md setupUserNotificationObservers(); console.log("Running processJobs"); // eslint-disable-next-line no-constant-condition @@ -77,14 +78,14 @@ export async function processJobs() { const twoMinutesAgo = new Date(new Date() - 1000 * 60 * 2); // clear out stuck jobs - await clearOldJobs(twoMinutesAgo); + await clearOldJobs({ delay: twoMinutesAgo }); } catch (ex) { log.error(ex); } } } -export async function checkMessageQueue() { +export async function checkMessageQueue(event, contextVars) { if (!process.env.TWILIO_SQS_QUEUE_URL) { return; } @@ -92,8 +93,19 @@ export async function checkMessageQueue() { console.log("checking if messages are in message queue"); while (true) { try { - await sleep(10000); - processSqsMessages(); + if (process.env.DEBUG) { + await sleep(10000); + } + await processSqsMessages(); + if ( + contextVars && + typeof contextVars.remainingMilliseconds === "function" + ) { + if (contextVars.remainingMilliseconds() < 5000) { + // rather than get caught half-way through a message batch, let's bail + return; + } + } } catch (ex) { log.error(ex); } @@ -232,7 +244,7 @@ export async function handleIncomingMessages() { } } -export async function runDatabaseMigrations(event, dispatcher, eventCallback) { +export async function runDatabaseMigrations(event, context, eventCallback) { console.log("inside runDatabaseMigrations1"); console.log("inside runDatabaseMigrations2", event); await r.k.migrate.latest(); @@ -243,11 +255,7 @@ export async function runDatabaseMigrations(event, dispatcher, eventCallback) { return "completed migrations runDatabaseMigrations"; } -export async function databaseMigrationChange( - event, - dispatcher, - eventCallback -) { +export async function databaseMigrationChange(event, context, eventCallback) { console.log("inside databaseMigrationChange", event); if (event.up) { await r.k.migrate.up(); @@ -282,26 +290,22 @@ const syncProcessMap = { clearOldJobs }; -export async function dispatchProcesses(event, dispatcher, eventCallback) { +export async function dispatchProcesses(event, context, eventCallback) { const toDispatch = event.processes || (JOBS_SAME_PROCESS ? syncProcessMap : processMap); - for (let p in toDispatch) { - if (p in processMap) { - // / not using dispatcher, but another interesting model would be - // / to dispatch processes to other lambda invocations - // dispatcher({'command': p}) - console.log("process", p); - toDispatch[p]() - .then() - .catch(err => { + await Promise.all( + Object.keys(toDispatch) + .filter(p => p in processMap) + .map(p => + toDispatch[p](event, context).catch(err => { console.error("Process Error", p, err); - }); - } - } + }) + ) + ); return "completed"; } -export async function ping(event, dispatcher) { +export async function ping(event, context) { return "pong"; } diff --git a/src/workers/jobs.js b/src/workers/jobs.js index eba884916..1e5d0ec1b 100644 --- a/src/workers/jobs.js +++ b/src/workers/jobs.js @@ -9,7 +9,7 @@ import { } from "../server/models"; import telemetry from "../server/telemetry"; import { log, gunzip, zipToTimeZone, convertOffsetsToStrings } from "../lib"; -import { updateJob } from "./lib"; +import { sleep, updateJob } from "./lib"; import serviceMap from "../server/api/lib/services"; import twilio from "../server/api/lib/twilio"; import { @@ -173,33 +173,36 @@ export async function processSqsMessages() { const p = new Promise((resolve, reject) => { sqs.receiveMessage(params, async (err, data) => { if (err) { - console.log(err, err.stack); + console.log("processSqsMessages Error", err, err.stack); reject(err); - } else if (data.Messages) { - console.log(data); - for (let i = 0; i < data.Messages.length; i++) { - const message = data.Messages[i]; - const body = message.Body; - console.log("processing sqs queue:", body); - const twilioMessage = JSON.parse(body); - - await serviceMap.twilio.handleIncomingMessage(twilioMessage); - - sqs.deleteMessage( - { - QueueUrl: process.env.TWILIO_SQS_QUEUE_URL, - ReceiptHandle: message.ReceiptHandle - }, - (delMessageErr, delMessageData) => { - if (delMessageErr) { - console.log(delMessageErr, delMessageErr.stack); // an error occurred - } else { - console.log(delMessageData); // successful response - } + } else { + if (!data.Messages || !data.Messages.length) { + // Since we are likely in a while(true) loop let's avoid racing + await sleep(10000); + resolve(); + } else { + console.log("processSqsMessages", data.Messages.length); + for (let i = 0; i < data.Messages.length; i++) { + const message = data.Messages[i]; + const body = message.Body; + if (process.env.DEBUG) { + console.log("processSqsMessages message body", body); } - ); + const twilioMessage = JSON.parse(body); + await serviceMap.twilio.handleIncomingMessage(twilioMessage); + const delMessageData = await sqs + .deleteMessage({ + QueueUrl: process.env.TWILIO_SQS_QUEUE_URL, + ReceiptHandle: message.ReceiptHandle + }) + .promise() + .catch(reject); + if (process.env.DEBUG) { + console.log("processSqsMessages deleteresult", delMessageData); + } + } + resolve(); } - resolve(); } }); }); @@ -1152,10 +1155,10 @@ export async function fixOrgless() { } // if } // function -export async function clearOldJobs(delay) { +export async function clearOldJobs(event) { // to clear out old stuck jobs const twoHoursAgo = new Date(new Date() - 1000 * 60 * 60 * 2); - delay = delay || twoHoursAgo; + const delay = (event && event.delay) || twoHoursAgo; return await r .knex("job_request") .where({ assigned: true })