Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NodeJS Convert Aggregate to Promise #103

Open
matthew-valenti opened this issue Jul 14, 2020 · 1 comment
Open

NodeJS Convert Aggregate to Promise #103

matthew-valenti opened this issue Jul 14, 2020 · 1 comment

Comments

@matthew-valenti
Copy link

matthew-valenti commented Jul 14, 2020

I'd like to use the kinesis-aggregation in our existing NodeJS project that uses promises everywhere. Is there a way to convert the aggregate function to a promise. Here's a simple stub. It doesn't work -- the await never completes. I've tried various permutation of this but none have worked.

const aggregatePromise = (records, onReady) => {
  return new Promise((resolve, reject) => {
    aggregate(records, (aggregatedRecord, callback) => {
      onReady(aggregatedRecord, callback);
    }, () => resolve(), (err) => reject(err));
  });
};

const onReady= (aggregatedRecord, callback) => {
   // do stuff including kinesis.putRecord
};

await aggregatePromise(records, onReady);
@stevegoossens
Copy link
Contributor

stevegoossens commented Dec 8, 2020

I think what you need to do is to use the encodedRecordHandler (per-record callback) to build up a list of aggregate record, and then use the afterPutAggregationRecords (final callback) to Promise.resolve the resulting list. I made the errorCallback re-throw, which should return the Promise.reject.

I'm using this in TypeScript, so you'll have to adjust your code slightly (probably just remove the types) if you are coding in JS.

aggregate-promise.ts

import {aggregate as aggregateWithCallbacks} from 'aws-kinesis-agg';

export const aggregate = async (records: any[]): Promise<any[]> => {
  return new Promise((resolve, reject) => {
    const listOfAggregatedRecords: any[] = [];

    const encodedRecordHandler = (params: any, callback: Function) => {
      listOfAggregatedRecords.push(params);
      callback();
    }
  
    const afterPutAggregationRecords = () => {
      resolve(listOfAggregatedRecords);
    }
  
    const errorCallback = (error: Error, encoded: any) => {
      throw error;
    }

    try {
      aggregateWithCallbacks(
        records,
        encodedRecordHandler,
        afterPutAggregationRecords,
        errorCallback
      );
    } catch (error) {
      reject(error);
    }
  })
}

and here is a simple unit test which passes

aggregate-promise.spec.ts

import { aggregate } from './aggregate-promise';

describe('Aggregate promise', () => {

  describe('with valid records', () => {
    const data1 = {
      some: 'value1'
    }
    const data2 = {
      some: 'value2'
    }
    const inputRecords = [
      {
        approximateArrivalTimestamp: 1,
        data: JSON.stringify(data1),
        kinesisSchemaVersion: '1.0',
        partitionKey: 'part1',
        sequenceNumber: 'seq1',
      },
      {
        approximateArrivalTimestamp: 2,
        data: JSON.stringify(data2),
        kinesisSchemaVersion: '1.0',
        partitionKey: 'part1',
        sequenceNumber: 'seq2',
      },
    ]
  
    it('returns a collection containing one aggregated record', async () => {
      const aggregatedRecords = await aggregate(inputRecords);
      expect(aggregatedRecords).toHaveLength(1);
    })
  
    it('returns aggregated record with correct length data ', async () => {
      const aggregatedRecords = await aggregate(inputRecords);
      expect(aggregatedRecords[0]).toHaveProperty('data');
      expect(aggregatedRecords[0].data).toHaveLength(73);
    })
  
    it('returns aggregated record with correct data type ', async () => {
      const aggregatedRecords = await aggregate(inputRecords);
      expect(aggregatedRecords[0].data).toBeInstanceOf
    })
  
    it('returns aggregated record with correct partition key', async () => {
      const aggregatedRecords = await aggregate(inputRecords);
      expect(aggregatedRecords[0]).toHaveProperty('partitionKey');
      expect(aggregatedRecords[0].partitionKey).toEqual('part1');
    })
  })

  describe('with blank partitionKey', () => {
    const data1 = {
      some: 'value1'
    }
    const data2 = {
      some: 'value2'
    }
    const inputRecords = [
      {
        approximateArrivalTimestamp: 1,
        data: JSON.stringify(data1),
        kinesisSchemaVersion: '1.0',
        partitionKey: 'part1',
        sequenceNumber: 'seq1',
      },
      {
        approximateArrivalTimestamp: 2,
        data: JSON.stringify(data2),
        kinesisSchemaVersion: '1.0',
        partitionKey: '',
        sequenceNumber: 'seq2',
      },
    ]
  
    it('rejects with error', async () => {
      try {
        const aggregatedRecords = await aggregate(inputRecords);
      } catch (error) {
        expect(error.message).toEqual('record.partitionKey field is mandatory');
      }
    })
  })
})
 PASS  src/aggregation/aggregate-promise.spec.ts
  Aggregate promise
    with valid records
      ✓ returns a collection containing one aggregated record (9 ms)
      ✓ returns aggregated record with correct length data  (2 ms)
      ✓ returns aggregated record with correct data type  (1 ms)
      ✓ returns aggregated record with correct partition key (1 ms)
    with blank partitionKey
      ✓ rejects with error

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants