Skip to content

Commit

Permalink
move job processes around so they can be called in a library (and dis…
Browse files Browse the repository at this point in the history
…patched from lambda)
  • Loading branch information
schuyler1d committed Aug 18, 2017
1 parent bb9174b commit 8b63fd7
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 291 deletions.
2 changes: 1 addition & 1 deletion deploy/lambda-scheduled-event.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"command": "processJobsAndMessages",
"command": "dispatchProcesses",
"description": "see docs/DEPLOYING_AWS_LAMBDA.md"
}
1 change: 0 additions & 1 deletion docs/DEPLOYING_AWS_LAMBDA.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ NOTES:

```
claudia add-scheduled-event --name spoke-job-runner --schedule 'rate(5 minutes)' --event ./deploy/lambda-scheduled-event.json
```

## How this works
Expand Down
42 changes: 22 additions & 20 deletions lambda.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,37 @@
const AWS = require('aws-sdk')
const awsServerlessExpress = require('aws-serverless-express')
const app = require('./build/server/server/index')
const jobs = require('./build/server/workers/jobs')
const jobs = require('./build/server/workers/job-processes')
const server = awsServerlessExpress.createServer(app.default)
exports.handler = (event, context) => {
if (event.command) {
if (!event.command) {
// default web server stuff
return awsServerlessExpress.proxy(server, event, context)
} else {
// handle a custom command sent as an event
const functionName = context.functionName
if (event.command in jobs) {
const job = jobs[event.command]
// behavior and arguments documented here:
// https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property
job({'event': event,
'dispatcher': function(dataToSend, callback) {
const lambda = AWS.Lambda()
return lambda.invoke({
FunctionName: functionName,
InvocationType: "Event", //asynchronous
Payload: JSON.stringify(dataToSend)
}, function(err, dataReceived) {
if (err) {
console.error('Failed to invoke Lambda job: ', err)
}
if (callback) {
callback(err, dataReceived)
}
})
}})
job(event,
function dispatcher(dataToSend, callback) {
const lambda = AWS.Lambda()
return lambda.invoke({
FunctionName: functionName,
InvocationType: "Event", //asynchronous
Payload: JSON.stringify(dataToSend)
}, function(err, dataReceived) {
if (err) {
console.error('Failed to invoke Lambda job: ', err)
}
if (callback) {
callback(err, dataReceived)
}
})
})
} else {
console.error('Unfound command sent as a Lambda event: ' + event.command)
}
} else {
return awsServerlessExpress.proxy(server, event, context)
}
}
9 changes: 7 additions & 2 deletions src/server/notifications.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ const setupNewAssignmentNotification = () => (
})
)

let notificationObserversSetup = false

export const setupUserNotificationObservers = () => {
setupIncomingReplyNotification()
setupNewAssignmentNotification()
if (!notificationObserversSetup) {
notificationObserversSetup = true
setupIncomingReplyNotification()
setupNewAssignmentNotification()
}
}
128 changes: 2 additions & 126 deletions src/workers/incoming-message-handler.js
Original file line number Diff line number Diff line change
@@ -1,127 +1,3 @@
import nexmo from '../server/api/lib/nexmo'
import twilio from '../server/api/lib/twilio'
import { saveNewIncomingMessage, getLastMessage } from '../server/api/lib/message-sending'
import { r } from '../server/models'
import { log } from '../lib'
import { handleIncomingMessages } from './job-processes'

const serviceDefault = 'twilio'
async function sleep(ms = 0) {
return new Promise(fn => setTimeout(fn, ms))
}

async function handleIncomingMessageParts() {
const serviceMap = { nexmo, twilio }
const messageParts = await r.table('pending_message_part')
const messagePartsByService = [
{'group': 'nexmo',
'reduction': messageParts.filter((m) => (m.service == 'nexmo'))
},
{'group': 'twilio',
'reduction': messageParts.filter((m) => (m.service == 'twilio'))
},
]
const serviceLength = messagePartsByService.length
for (let index = 0; index < serviceLength; index++) {
const serviceParts = messagePartsByService[index]
const allParts = serviceParts.reduction
const allPartsCount = allParts.length
if (allPartsCount == 0) {
continue
}
const service = serviceMap[serviceParts.group]
const convertMessageParts = service.convertMessagePartsToMessage
const messagesToSave = []
let messagePartsToDelete = []
const concatMessageParts = {}
for (let i = 0; i < allPartsCount; i++) {
const part = allParts[i]
const serviceMessageId = part.service_id
const savedCount = await r.table('message')
.getAll(serviceMessageId, { index: 'service_id' })
.count()
const lastMessage = await getLastMessage({
contactNumber: part.contact_number,
service: serviceDefault
})
const duplicateMessageToSaveExists = !!messagesToSave.find((message) => message.service_id === serviceMessageId)
if (!lastMessage) {
log.info('Received message part with no thread to attach to', part)
messagePartsToDelete.push(part)
} else if (savedCount > 0) {
log.info(`Found already saved message matching part service message ID ${part.service_id}`)
messagePartsToDelete.push(part)
} else if (duplicateMessageToSaveExists) {
log.info(`Found duplicate message to be saved matching part service message ID ${part.service_id}`)
messagePartsToDelete.push(part)
} else {
const parentId = part.parent_id
if (!parentId) {
messagesToSave.push(await convertMessageParts([part]))
messagePartsToDelete.push(part)
} else {
if (part.service !== 'nexmo') {
throw new Error('should not have a parent ID for twilio')
}
const groupKey = [parentId, part.contact_number, part.user_number]
const serviceMessage = JSON.parse(part.service_message)
if (!concatMessageParts.hasOwnProperty(groupKey)) {
const partCount = parseInt(serviceMessage['concat-total'], 10)
concatMessageParts[groupKey] = Array(partCount).fill(null)
}

const partIndex = parseInt(serviceMessage['concat-part'], 10) - 1
if (concatMessageParts[groupKey][partIndex] !== null) {
messagePartsToDelete.push(part)
} else {
concatMessageParts[groupKey][partIndex] = part
}
}
}
}

const keys = Object.keys(concatMessageParts)
const keyCount = keys.length

for (let i = 0; i < keyCount; i++) {
const groupKey = keys[i]
const messageParts = concatMessageParts[groupKey]

if (messageParts.filter((part) => part === null).length === 0) {
messagePartsToDelete = messagePartsToDelete.concat(messageParts)
const message = await convertMessageParts(messageParts)
messagesToSave.push(message)
}
}

const messageCount = messagesToSave.length
for (let i = 0; i < messageCount; i++) {
log.info('Saving message with service message ID', messagesToSave[i].service_id)
await saveNewIncomingMessage(messagesToSave[i])
}

const messageIdsToDelete = messagePartsToDelete.map((m) => m.id)
log.info('Deleting message parts', messageIdsToDelete)
await r.table('pending_message_part')
.getAll(...messageIdsToDelete)
.delete()
}
}
(async () => {
while (true) {
try {
const countPendingMessagePart = await r.knex('pending_message_part')
.count('id AS total').then( total => {
let totalCount = 0
totalCount = total[0].total
return totalCount
})

await sleep(500)
if(countPendingMessagePart > 0) {
await handleIncomingMessageParts()
}
} catch (ex) {
log.error(ex)
}
}
})()
handleIncomingMessages()
55 changes: 2 additions & 53 deletions src/workers/job-handler.js
Original file line number Diff line number Diff line change
@@ -1,54 +1,3 @@
import { r } from '../server/models'
import { sleep, getNextJob, updateJob, log } from './lib'
import { exportCampaign, uploadContacts, assignTexters, createInteractionSteps, sendMessages } from './jobs'
import { setupUserNotificationObservers } from '../server/notifications'

function jobMap() {
return {
export: exportCampaign,
upload_contacts: uploadContacts,
assign_texters: assignTexters,
create_interaction_steps: createInteractionSteps,
// usually dispatched in three separate processes, not here
send_messages: sendMessages
}
}

const JOBS_SAME_PROCESS = !!process.env.JOBS_SAME_PROCESS; //keep semi

export const processJobs = async () => {
// eslint-disable-next-line no-constant-condition
while (true) {
try {
if (!JOBS_SAME_PROCESS) {
// if this is running as a 'cron-like' task to finish all jobs
// and then close, then no need to wait around for the next
await sleep(1000)
}
const job = await getNextJob()
if (job) {
await (jobMap()[job.job_type])(job)
await r.table('job_request')
.get(job.id)
.delete()
} else if (JOBS_SAME_PROCESS) {
break // all finished, so just complete
}

var twoMinutesAgo = new Date(new Date() - 1000 * 60 * 2)
// delete jobs that are older than 2 minutes
// to clear out stuck jobs
await r.knex('job_request')
.where({ assigned: true })
.where('updated_at', '<', twoMinutesAgo)
.delete()
} catch (ex) {
log.error(ex)
}
}
}

if (require.main === module) {
processJobs()
}
import { processJobs } from './job-processes'

processJobs()
122 changes: 122 additions & 0 deletions src/workers/job-processes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { r } from '../server/models'
import { sleep, getNextJob, updateJob, log } from './lib'
import { exportCampaign, uploadContacts, assignTexters, createInteractionSteps, sendMessages, handleIncomingMessageParts } from './jobs'
import { setupUserNotificationObservers } from '../server/notifications'


/* Two process models are supported in this file.
The main in both cases is to process jobs and send/receive messages
on separate loop(s) from the web server.
* job processing (e.g. contact loading) shouldn't delay text message processing
The two process models:
* Run the 'scripts' in dev-tools/Procfile.dev
*/

const jobMap = {
"export": exportCampaign,
"upload_contacts": uploadContacts,
"assign_texters": assignTexters,
"create_interaction_steps": createInteractionSteps,
// usually dispatched in three separate processes, not here
"send_messages": sendMessages
}

export async function processJobs() {
setupUserNotificationObservers()
// eslint-disable-next-line no-constant-condition
while (true) {
try {
await sleep(1000)
const job = await getNextJob()
if (job) {
await (jobMap[job.job_type])(job)
await r.table('job_request')
.get(job.id)
.delete()
}

var twoMinutesAgo = new Date(new Date() - 1000 * 60 * 2)
// delete jobs that are older than 2 minutes
// to clear out stuck jobs
await r.knex('job_request')
.where({ assigned: true })
.where('updated_at', '<', twoMinutesAgo)
.delete()
} catch (ex) {
log.error(ex)
}
}
}

const messageSenderCreator = (subQuery) => {
return async () => {
setupUserNotificationObservers()
// eslint-disable-next-line no-constant-condition
while (true) {
try {
await sleep(1100)
await sendMessages(subQuery)
} catch (ex) {
log.error(ex)
}
}
}
}

export const messageSender01 = messageSenderCreator(function(mQuery) {
return mQuery.where(r.knex.raw("(contact_number LIKE '%0' OR contact_number LIKE '%1')"))
})

export const messageSender234 = messageSenderCreator(function(mQuery) {
return mQuery.where(r.knex.raw("(contact_number LIKE '%2' OR contact_number LIKE '%3' or contact_number LIKE '%4')"))
})

export const messageSender56 = messageSenderCreator(function(mQuery) {
return mQuery.where(r.knex.raw("(contact_number LIKE '%5' OR contact_number LIKE '%6')"))
})

export const messageSender789 = messageSenderCreator(function(mQuery) {
return mQuery.where(r.knex.raw("(contact_number LIKE '%7' OR contact_number LIKE '%8' or contact_number LIKE '%9')"))
})

export async function handleIncomingMessages() {
setupUserNotificationObservers()
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const countPendingMessagePart = await r.knex('pending_message_part')
.count('id AS total').then( total => {
let totalCount = 0
totalCount = total[0].total
return totalCount
})

await sleep(500)
if(countPendingMessagePart > 0) {
await handleIncomingMessageParts()
}
} catch (ex) {
log.error(ex)
}
}
}


const processMap = {
'processJobs': processJobs,
'messageSender01': messageSender01,
'messageSender234': messageSender234,
'messageSender56': messageSender56,
'messageSender789': messageSender789,
'handleIncomingMessages': handleIncomingMessages
}

export async function dispatchProcesses(event, dispatcher) {
const toDispatch = event.processes || processMap
for (let p in toDispatch) {
if (p in processMap) {
dispatcher(p)
}
}
}
Loading

0 comments on commit 8b63fd7

Please sign in to comment.