diff --git a/deploy/lambda-scheduled-event.json b/deploy/lambda-scheduled-event.json index 6783fdb0d..80172f5c8 100644 --- a/deploy/lambda-scheduled-event.json +++ b/deploy/lambda-scheduled-event.json @@ -1,4 +1,4 @@ { - "command": "processJobsAndMessages", + "command": "dispatchProcesses", "description": "see docs/DEPLOYING_AWS_LAMBDA.md" } diff --git a/docs/DEPLOYING_AWS_LAMBDA.md b/docs/DEPLOYING_AWS_LAMBDA.md index 5d5414122..07b7198de 100644 --- a/docs/DEPLOYING_AWS_LAMBDA.md +++ b/docs/DEPLOYING_AWS_LAMBDA.md @@ -54,7 +54,6 @@ NOTES: ``` claudia add-scheduled-event --name spoke-job-runner --schedule 'rate(5 minutes)' --event ./deploy/lambda-scheduled-event.json - ``` ## How this works diff --git a/lambda.js b/lambda.js index 89846f82b..01620ffe3 100644 --- a/lambda.js +++ b/lambda.js @@ -2,35 +2,37 @@ const AWS = require('aws-sdk') const awsServerlessExpress = require('aws-serverless-express') const app = require('./build/server/server/index') -const jobs = require('./build/server/workers/jobs') +const jobs = require('./build/server/workers/job-processes') const server = awsServerlessExpress.createServer(app.default) exports.handler = (event, context) => { - if (event.command) { + if (!event.command) { + // default web server stuff + return awsServerlessExpress.proxy(server, event, context) + } else { + // handle a custom command sent as an event const functionName = context.functionName if (event.command in jobs) { const job = jobs[event.command] // behavior and arguments documented here: // https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property - job({'event': event, - 'dispatcher': function(dataToSend, callback) { - const lambda = AWS.Lambda() - return lambda.invoke({ - FunctionName: functionName, - InvocationType: "Event", //asynchronous - Payload: JSON.stringify(dataToSend) - }, function(err, dataReceived) { - if (err) { - console.error('Failed to invoke Lambda job: ', err) - } - if (callback) { - callback(err, dataReceived) - } - }) - }}) + job(event, + function dispatcher(dataToSend, callback) { + const lambda = AWS.Lambda() + return lambda.invoke({ + FunctionName: functionName, + InvocationType: "Event", //asynchronous + Payload: JSON.stringify(dataToSend) + }, function(err, dataReceived) { + if (err) { + console.error('Failed to invoke Lambda job: ', err) + } + if (callback) { + callback(err, dataReceived) + } + }) + }) } else { console.error('Unfound command sent as a Lambda event: ' + event.command) } - } else { - return awsServerlessExpress.proxy(server, event, context) } } diff --git a/src/server/notifications.js b/src/server/notifications.js index b21786e45..69c85f892 100644 --- a/src/server/notifications.js +++ b/src/server/notifications.js @@ -119,7 +119,12 @@ const setupNewAssignmentNotification = () => ( }) ) +let notificationObserversSetup = false + export const setupUserNotificationObservers = () => { - setupIncomingReplyNotification() - setupNewAssignmentNotification() + if (!notificationObserversSetup) { + notificationObserversSetup = true + setupIncomingReplyNotification() + setupNewAssignmentNotification() + } } diff --git a/src/workers/incoming-message-handler.js b/src/workers/incoming-message-handler.js index bf3fd9f8f..bc388d83f 100644 --- a/src/workers/incoming-message-handler.js +++ b/src/workers/incoming-message-handler.js @@ -1,127 +1,3 @@ -import nexmo from '../server/api/lib/nexmo' -import twilio from '../server/api/lib/twilio' -import { saveNewIncomingMessage, getLastMessage } from '../server/api/lib/message-sending' -import { r } from '../server/models' -import { log } from '../lib' +import { handleIncomingMessages } from './job-processes' -const serviceDefault = 'twilio' -async function sleep(ms = 0) { - return new Promise(fn => setTimeout(fn, ms)) -} - -async function handleIncomingMessageParts() { - const serviceMap = { nexmo, twilio } - const messageParts = await r.table('pending_message_part') - const messagePartsByService = [ - {'group': 'nexmo', - 'reduction': messageParts.filter((m) => (m.service == 'nexmo')) - }, - {'group': 'twilio', - 'reduction': messageParts.filter((m) => (m.service == 'twilio')) - }, - ] - const serviceLength = messagePartsByService.length - for (let index = 0; index < serviceLength; index++) { - const serviceParts = messagePartsByService[index] - const allParts = serviceParts.reduction - const allPartsCount = allParts.length - if (allPartsCount == 0) { - continue - } - const service = serviceMap[serviceParts.group] - const convertMessageParts = service.convertMessagePartsToMessage - const messagesToSave = [] - let messagePartsToDelete = [] - const concatMessageParts = {} - for (let i = 0; i < allPartsCount; i++) { - const part = allParts[i] - const serviceMessageId = part.service_id - const savedCount = await r.table('message') - .getAll(serviceMessageId, { index: 'service_id' }) - .count() - const lastMessage = await getLastMessage({ - contactNumber: part.contact_number, - service: serviceDefault - }) - const duplicateMessageToSaveExists = !!messagesToSave.find((message) => message.service_id === serviceMessageId) - if (!lastMessage) { - log.info('Received message part with no thread to attach to', part) - messagePartsToDelete.push(part) - } else if (savedCount > 0) { - log.info(`Found already saved message matching part service message ID ${part.service_id}`) - messagePartsToDelete.push(part) - } else if (duplicateMessageToSaveExists) { - log.info(`Found duplicate message to be saved matching part service message ID ${part.service_id}`) - messagePartsToDelete.push(part) - } else { - const parentId = part.parent_id - if (!parentId) { - messagesToSave.push(await convertMessageParts([part])) - messagePartsToDelete.push(part) - } else { - if (part.service !== 'nexmo') { - throw new Error('should not have a parent ID for twilio') - } - const groupKey = [parentId, part.contact_number, part.user_number] - const serviceMessage = JSON.parse(part.service_message) - if (!concatMessageParts.hasOwnProperty(groupKey)) { - const partCount = parseInt(serviceMessage['concat-total'], 10) - concatMessageParts[groupKey] = Array(partCount).fill(null) - } - - const partIndex = parseInt(serviceMessage['concat-part'], 10) - 1 - if (concatMessageParts[groupKey][partIndex] !== null) { - messagePartsToDelete.push(part) - } else { - concatMessageParts[groupKey][partIndex] = part - } - } - } - } - - const keys = Object.keys(concatMessageParts) - const keyCount = keys.length - - for (let i = 0; i < keyCount; i++) { - const groupKey = keys[i] - const messageParts = concatMessageParts[groupKey] - - if (messageParts.filter((part) => part === null).length === 0) { - messagePartsToDelete = messagePartsToDelete.concat(messageParts) - const message = await convertMessageParts(messageParts) - messagesToSave.push(message) - } - } - - const messageCount = messagesToSave.length - for (let i = 0; i < messageCount; i++) { - log.info('Saving message with service message ID', messagesToSave[i].service_id) - await saveNewIncomingMessage(messagesToSave[i]) - } - - const messageIdsToDelete = messagePartsToDelete.map((m) => m.id) - log.info('Deleting message parts', messageIdsToDelete) - await r.table('pending_message_part') - .getAll(...messageIdsToDelete) - .delete() - } -} -(async () => { - while (true) { - try { - const countPendingMessagePart = await r.knex('pending_message_part') - .count('id AS total').then( total => { - let totalCount = 0 - totalCount = total[0].total - return totalCount - }) - - await sleep(500) - if(countPendingMessagePart > 0) { - await handleIncomingMessageParts() - } - } catch (ex) { - log.error(ex) - } - } -})() +handleIncomingMessages() diff --git a/src/workers/job-handler.js b/src/workers/job-handler.js index 54ef96150..811905c6d 100644 --- a/src/workers/job-handler.js +++ b/src/workers/job-handler.js @@ -1,54 +1,3 @@ -import { r } from '../server/models' -import { sleep, getNextJob, updateJob, log } from './lib' -import { exportCampaign, uploadContacts, assignTexters, createInteractionSteps, sendMessages } from './jobs' -import { setupUserNotificationObservers } from '../server/notifications' - -function jobMap() { - return { - export: exportCampaign, - upload_contacts: uploadContacts, - assign_texters: assignTexters, - create_interaction_steps: createInteractionSteps, - // usually dispatched in three separate processes, not here - send_messages: sendMessages - } -} - -const JOBS_SAME_PROCESS = !!process.env.JOBS_SAME_PROCESS; //keep semi - -export const processJobs = async () => { - // eslint-disable-next-line no-constant-condition - while (true) { - try { - if (!JOBS_SAME_PROCESS) { - // if this is running as a 'cron-like' task to finish all jobs - // and then close, then no need to wait around for the next - await sleep(1000) - } - const job = await getNextJob() - if (job) { - await (jobMap()[job.job_type])(job) - await r.table('job_request') - .get(job.id) - .delete() - } else if (JOBS_SAME_PROCESS) { - break // all finished, so just complete - } - - var twoMinutesAgo = new Date(new Date() - 1000 * 60 * 2) - // delete jobs that are older than 2 minutes - // to clear out stuck jobs - await r.knex('job_request') - .where({ assigned: true }) - .where('updated_at', '<', twoMinutesAgo) - .delete() - } catch (ex) { - log.error(ex) - } - } -} - -if (require.main === module) { - processJobs() -} +import { processJobs } from './job-processes' +processJobs() diff --git a/src/workers/job-processes.js b/src/workers/job-processes.js new file mode 100644 index 000000000..af0660f1e --- /dev/null +++ b/src/workers/job-processes.js @@ -0,0 +1,122 @@ +import { r } from '../server/models' +import { sleep, getNextJob, updateJob, log } from './lib' +import { exportCampaign, uploadContacts, assignTexters, createInteractionSteps, sendMessages, handleIncomingMessageParts } from './jobs' +import { setupUserNotificationObservers } from '../server/notifications' + + +/* Two process models are supported in this file. + The main in both cases is to process jobs and send/receive messages + on separate loop(s) from the web server. + * job processing (e.g. contact loading) shouldn't delay text message processing + + The two process models: + * Run the 'scripts' in dev-tools/Procfile.dev + */ + +const jobMap = { + "export": exportCampaign, + "upload_contacts": uploadContacts, + "assign_texters": assignTexters, + "create_interaction_steps": createInteractionSteps, + // usually dispatched in three separate processes, not here + "send_messages": sendMessages +} + +export async function processJobs() { + setupUserNotificationObservers() + // eslint-disable-next-line no-constant-condition + while (true) { + try { + await sleep(1000) + const job = await getNextJob() + if (job) { + await (jobMap[job.job_type])(job) + await r.table('job_request') + .get(job.id) + .delete() + } + + var twoMinutesAgo = new Date(new Date() - 1000 * 60 * 2) + // delete jobs that are older than 2 minutes + // to clear out stuck jobs + await r.knex('job_request') + .where({ assigned: true }) + .where('updated_at', '<', twoMinutesAgo) + .delete() + } catch (ex) { + log.error(ex) + } + } +} + +const messageSenderCreator = (subQuery) => { + return async () => { + setupUserNotificationObservers() + // eslint-disable-next-line no-constant-condition + while (true) { + try { + await sleep(1100) + await sendMessages(subQuery) + } catch (ex) { + log.error(ex) + } + } + } +} + +export const messageSender01 = messageSenderCreator(function(mQuery) { + return mQuery.where(r.knex.raw("(contact_number LIKE '%0' OR contact_number LIKE '%1')")) +}) + +export const messageSender234 = messageSenderCreator(function(mQuery) { + return mQuery.where(r.knex.raw("(contact_number LIKE '%2' OR contact_number LIKE '%3' or contact_number LIKE '%4')")) +}) + +export const messageSender56 = messageSenderCreator(function(mQuery) { + return mQuery.where(r.knex.raw("(contact_number LIKE '%5' OR contact_number LIKE '%6')")) +}) + +export const messageSender789 = messageSenderCreator(function(mQuery) { + return mQuery.where(r.knex.raw("(contact_number LIKE '%7' OR contact_number LIKE '%8' or contact_number LIKE '%9')")) +}) + +export async function handleIncomingMessages() { + setupUserNotificationObservers() + // eslint-disable-next-line no-constant-condition + while (true) { + try { + const countPendingMessagePart = await r.knex('pending_message_part') + .count('id AS total').then( total => { + let totalCount = 0 + totalCount = total[0].total + return totalCount + }) + + await sleep(500) + if(countPendingMessagePart > 0) { + await handleIncomingMessageParts() + } + } catch (ex) { + log.error(ex) + } + } +} + + +const processMap = { + 'processJobs': processJobs, + 'messageSender01': messageSender01, + 'messageSender234': messageSender234, + 'messageSender56': messageSender56, + 'messageSender789': messageSender789, + 'handleIncomingMessages': handleIncomingMessages +} + +export async function dispatchProcesses(event, dispatcher) { + const toDispatch = event.processes || processMap + for (let p in toDispatch) { + if (p in processMap) { + dispatcher(p) + } + } +} diff --git a/src/workers/jobs.js b/src/workers/jobs.js index 076acfc78..bf6cab3be 100644 --- a/src/workers/jobs.js +++ b/src/workers/jobs.js @@ -429,8 +429,99 @@ export async function sendMessages(queryFunc, defaultStatus) { } } -export async function processJobsAndMessages(eventData) { - // TODO - // run through jobs like job-handler with getNextJob - // separate process? cycle through messages and dispatch +export async function handleIncomingMessageParts() { + const messageParts = await r.table('pending_message_part') + const messagePartsByService = [ + {'group': 'nexmo', + 'reduction': messageParts.filter((m) => (m.service == 'nexmo')) + }, + {'group': 'twilio', + 'reduction': messageParts.filter((m) => (m.service == 'twilio')) + }, + ] + const serviceLength = messagePartsByService.length + for (let index = 0; index < serviceLength; index++) { + const serviceParts = messagePartsByService[index] + const allParts = serviceParts.reduction + const allPartsCount = allParts.length + if (allPartsCount == 0) { + continue + } + const service = serviceMap[serviceParts.group] + const convertMessageParts = service.convertMessagePartsToMessage + const messagesToSave = [] + let messagePartsToDelete = [] + const concatMessageParts = {} + for (let i = 0; i < allPartsCount; i++) { + const part = allParts[i] + const serviceMessageId = part.service_id + const savedCount = await r.table('message') + .getAll(serviceMessageId, { index: 'service_id' }) + .count() + const lastMessage = await getLastMessage({ + contactNumber: part.contact_number, + service: process.env.DEFAULT_SERVICE || 'twilio' + }) + const duplicateMessageToSaveExists = !!messagesToSave.find((message) => message.service_id === serviceMessageId) + if (!lastMessage) { + log.info('Received message part with no thread to attach to', part) + messagePartsToDelete.push(part) + } else if (savedCount > 0) { + log.info(`Found already saved message matching part service message ID ${part.service_id}`) + messagePartsToDelete.push(part) + } else if (duplicateMessageToSaveExists) { + log.info(`Found duplicate message to be saved matching part service message ID ${part.service_id}`) + messagePartsToDelete.push(part) + } else { + const parentId = part.parent_id + if (!parentId) { + messagesToSave.push(await convertMessageParts([part])) + messagePartsToDelete.push(part) + } else { + if (part.service !== 'nexmo') { + throw new Error('should not have a parent ID for twilio') + } + const groupKey = [parentId, part.contact_number, part.user_number] + const serviceMessage = JSON.parse(part.service_message) + if (!concatMessageParts.hasOwnProperty(groupKey)) { + const partCount = parseInt(serviceMessage['concat-total'], 10) + concatMessageParts[groupKey] = Array(partCount).fill(null) + } + + const partIndex = parseInt(serviceMessage['concat-part'], 10) - 1 + if (concatMessageParts[groupKey][partIndex] !== null) { + messagePartsToDelete.push(part) + } else { + concatMessageParts[groupKey][partIndex] = part + } + } + } + } + + const keys = Object.keys(concatMessageParts) + const keyCount = keys.length + + for (let i = 0; i < keyCount; i++) { + const groupKey = keys[i] + const messageParts = concatMessageParts[groupKey] + + if (messageParts.filter((part) => part === null).length === 0) { + messagePartsToDelete = messagePartsToDelete.concat(messageParts) + const message = await convertMessageParts(messageParts) + messagesToSave.push(message) + } + } + + const messageCount = messagesToSave.length + for (let i = 0; i < messageCount; i++) { + log.info('Saving message with service message ID', messagesToSave[i].service_id) + await saveNewIncomingMessage(messagesToSave[i]) + } + + const messageIdsToDelete = messagePartsToDelete.map((m) => m.id) + log.info('Deleting message parts', messageIdsToDelete) + await r.table('pending_message_part') + .getAll(...messageIdsToDelete) + .delete() + } } diff --git a/src/workers/message-sender-01.js b/src/workers/message-sender-01.js index e67243df9..20dd6b7a7 100644 --- a/src/workers/message-sender-01.js +++ b/src/workers/message-sender-01.js @@ -1,22 +1,3 @@ -import { r } from '../server/models' -import { log } from '../lib' -import { sleep } from './lib' -import { sendMessages } from './jobs' +import { messageSender01 } from './job-processes' -async function sendMyMessages() { - return sendMessages(function(mQuery) { - return mQuery.where(r.knex.raw("(contact_number LIKE '%0' OR contact_number LIKE '%1')")) - }) -} - -(async () => { - // eslint-disable-next-line no-constant-condition - while (true) { - try { - await sleep(1100) - await sendMyMessages() - } catch (ex) { - log.error(ex) - } - } -})() +messageSender01() diff --git a/src/workers/message-sender-234.js b/src/workers/message-sender-234.js index 715f4c6c8..fca6c5708 100644 --- a/src/workers/message-sender-234.js +++ b/src/workers/message-sender-234.js @@ -1,22 +1,3 @@ -import { r } from '../server/models' -import { log } from '../lib' -import { sleep } from './lib' -import { sendMessages } from './jobs' +import { messageSender234 } from './job-processes' -async function sendMyMessages() { - return sendMessages(function(mQuery) { - return mQuery.where(r.knex.raw("(contact_number LIKE '%2' OR contact_number LIKE '%3' or contact_number LIKE '%4')")) - }) -} - -(async () => { - // eslint-disable-next-line no-constant-condition - while (true) { - try { - await sleep(1100) - await sendMyMessages() - } catch (ex) { - log.error(ex) - } - } -})() +messageSender234() diff --git a/src/workers/message-sender-56.js b/src/workers/message-sender-56.js index aeea65877..60805193c 100644 --- a/src/workers/message-sender-56.js +++ b/src/workers/message-sender-56.js @@ -1,22 +1,3 @@ -import { r } from '../server/models' -import { log } from '../lib' -import { sleep } from './lib' -import { sendMessages } from './jobs' +import { messageSender56 } from './job-processes' -async function sendMyMessages() { - return sendMessages(function(mQuery) { - return mQuery.where(r.knex.raw("(contact_number LIKE '%5' OR contact_number LIKE '%6')")) - }) -} - -(async () => { - // eslint-disable-next-line no-constant-condition - while (true) { - try { - await sleep(1100) - await sendMyMessages() - } catch (ex) { - log.error(ex) - } - } -})() +messageSender56() diff --git a/src/workers/message-sender-789.js b/src/workers/message-sender-789.js index 836c88522..4b6803ea0 100644 --- a/src/workers/message-sender-789.js +++ b/src/workers/message-sender-789.js @@ -1,22 +1,3 @@ -import { r } from '../server/models' -import { log } from '../lib' -import { sleep } from './lib' -import { sendMessages } from './jobs' +import { messageSender789 } from './job-processes' -async function sendMyMessages() { - return sendMessages(function(mQuery) { - return mQuery.where(r.knex.raw("(contact_number LIKE '%7' OR contact_number LIKE '%8' or contact_number LIKE '%9')")) - }) -} - -(async () => { - // eslint-disable-next-line no-constant-condition - while (true) { - try { - await sleep(1100) - await sendMyMessages() - } catch (ex) { - log.error(ex) - } - } -})() +messageSender789()