SQS DSQ Hdr

From Izara Wiki
Jump to navigation Jump to search

Example Sqs|Dsq Handler

step for create lambdaFunction and Sqs|Dsq handler

Lambda

  • add lambda, copy LambdaFunction.js and change name, ex: ProcessProduct_Main.js
'use strict';

const izaraSharedLib = require('@izara_project/izara-shared');
const dynamodbSharedLib = izaraSharedLib.dynamodbSharedLib;

const hash = require('object-hash');

//* require more module that you use


/**
 * description of function.
 * @param {string} productId
 *
 * @returns {string} dataName
 */
module.exports.processProduct = async (
  _izContext,
  productId
) => {

try {
    _izContext.logger.debug('ProcessProduct: ', {
      productId
    });

    // ... do something

    //* get item of productId from table
    let productRecord = await dynamodbSharedLib.getItem(
      _izContext,
      {
        //your keyValue
      }
    );
    _izContext.logger.debug("productRecord: ", productRecord);


    //* check record
      //* if no record => throw error
      //* have record => get type and name from record => dataName = type + "_" + name
    if (!productRecord) {
      //throw error;
    }

    let dataName = type + "_" + name;
     _izContext.logger.debug("dataName: ", dataName);

    // send message to SNS OutProductComplete
     let productRequest = {
       Message: JSON.stringify({
         dataName: dataName,
       }),
       TopicArn: snsSharedLib.snsTopicArn('OutProductComplete')
     };

     _izContext.logger.debug("RequestParams before send to sqs OutProductComplete ", productRequest);
     await sns.publishAsync(_izContext, productRequest);

    return "product complete"

  } catch (err) {
    _izContext.logger.error('error ProcessProduct: ', err)
    throw (err)
  }
}

//Local testing
async function test() {

  const CorrelationIds = require('@izara_project/izara-core-library-correlation-ids');
  const Logger = require('@izara_project/izara-core-library-logger');
  const IntegrationTestDetail = require('@izara_project/izara-core-library-integration-tests/src/IntegrationTests')
  let _izContext = {
    correlationIds: CorrelationIds,
    logger: Logger,
    integrationTestDetail: IntegrationTestDetail
  }

  let productId = "hashOfTypeAndNameEqualsProductId";


  return module.exports.updateCatalogSettings(
    _izContext,
    productId
  )
}
test();

** node ProcessProduct_Main.js => in terminal
  • add lambda handler, copy LambdaFunctionHdrSqs.js and change name, ex: ProcessProduct_HdrSqs.js
'use strict';

const izara = require("@izara_project/izara-middleware");
const middleware = izara.middlewareHandler;
const recordHandlerSharedLib = require("@izara_project/izara-shared").recordHandlerSharedLib
const Logger = require('@izara_project/izara-core-library-logger');

const processProduct = require('./ProcessProduct')

// validate event properties in body.Message of sqs event
middleware.setValidatorSchema(recordHandlerSharedLib.baseValidatorSchema());
// set schema for record.body.Message
const perRecordsValidatorSchema = {
  type: "object",
  required: [
    'productId'
  ],
  properties: {
    productId: {
      type: 'string',
    }
  }
};

// // set schema for record.body.MessageAttributes
// const messageAttributeValidatorSchema = {
//   type: "object",
//   required: ['msgAtrrParam1', 'msgAtrrParam2'],
//   properties: {
//     msgAtrrParam1: {
//       type: "string"
//     },
//     msgAtrrParam2: {
//       type: "object"
//     }
//   }
// };

module.exports.main = middleware.wrap(async (event, context, callback) => {

  try {

    let recordPromises = [];

    // loop each record and send to mainFunction
    await Promise.all(event.Records.map(async record => { // promise.all for map() function

      let passOnProperties = []
      record._izContext.logger.debug('record ReceiveMsgOutHdrSqs', record);

      //validate message (and MessageAttributes)
      await recordHandlerSharedLib.validateRecord(
        record,                             // one record will send to mainFunction
        "ProcessProduct",                   // queue name that need to retry or send to dlq
        perRecordsValidatorSchema,          // schema for record.Message
        // messageAttributeValidatorSchema   // ----- for msgAttr default is null -> do not send this parameter if not want to validate msgAtt
      );

      // add argument (to invoke lambda) to passOnProperties[]
      passOnProperties.push(record.body.Message.productId)
      record._izContext.logger.debug('passOnProperties in handler', passOnProperties);

      // call recordHandlerSharedLib.recordHandler with 3 parameters and return promise(resolve)
      let recordPromise = recordHandlerSharedLib.recordHandler(
        record,                         // one record will send to mainFunction
        processProduct.processProduct,  // mainFunction that need to invoke.
        "ProcessProduct",               // queue name that need to retry or send to dlq
        passOnProperties,               // all parameters that mainFunction needed.
      );
      record._izContext.logger.debug('after recordPromise in handler');
      recordPromises.push(recordPromise); // push promise to recordPromises
    }))

    Logger.debug('before Promise.all(recordPromises) in handler');
    try {
      // --- main await all promises
      await Promise.all(recordPromises); // await all promises

      return event.Records // return all for local testing

    } catch {
      Logger.debug('Promise.all(recordPromises) in handler threw error (at least one record did no resolve');
    }
    Logger.debug('after Promise.all(recordPromises) in handler');

  } catch (err) {
    Logger.error('Unhandled Error, ProcessProduct_HdrSqs: ', err);
    throw (err);
  }
});
  • function.yml

For Sqs handler ex: ProcessProduct_HdrSqs

ProcessProductSqs:
  handler: src/ProcessProduct_HdrSqs.main
  name: ${self:custom.iz_resourcePrefix}ProcessProductHdrSqs
  events:
    - sqs:
        arn: arn:aws:sqs:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}ProcessProduct
        batchSize: 10
        # filterPatterns: # **** need to update serveless framework upper v.2.69.1
        #   - body: {"MessageAttributes": {"callingFlow": {"Value": ["${self:custom.iz_serviceName}ProcessProduct"]} } } # TestFilter is function name of callingflow
        #   - body: {"MessageAttributes": {"callingFlow": {"Value": [{"exists":false}]} } }
  iamRoleStatements:
    - Effect: Allow
      Action:
        - sqs:SendMessage
        - sqs:ReceiveMessage
        - sqs:DeleteMessage
        - sqs:GetQueueAttributes
        - SNS:Publish
        - dynamodb:PutItem
        - dynamodb:GetItem
        - dynamodb:Query
        - dynamodb:DeleteItem
        - dynamodb:UpdateItem
      Resource:
        - arn:aws:dynamodb:${self:custom.iz_region}:${self:custom.iz_accountId}:table/${self:custom.iz_resourcePrefix}Config
        - arn:aws:dynamodb:${self:custom.iz_region}:${self:custom.iz_accountId}:table/${self:custom.iz_resourcePrefix}ProductRecord
        - "arn:aws:sns:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}InProcessProduct" #sns msgIn or Out Topicname
        - "arn:aws:sqs:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}ProcessProduct" # for reties
        - "arn:aws:sqs:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}ProcessProductDLQ"
        - "arn:aws:sns:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}OutProductComplete" #sns msgIn or Out Topicname
        //* add more.....
  • core-function.yml

add role in resource of InitialSetup and InitialSetupTestEnv

resource:
    - ${self:custom.iz_prefixIamRole}ProcessProductSqs${self:custom.iz_suffixIamRole} # eg. PutRolePolicy
    //* add more....

in resource

  • sns-out.yml
Resources:
# ------[Create topic out]
  OutProductComplete:
    Type: AWS::SNS::Topic
    Properties:
      DisplayName: "SNS Message out"
      TopicName: ${self:custom.iz_resourcePrefix}OutProductComplete
  • sns-in-sqs.yml

ex: ProcessProduct_HdrSqs

Resources:
# ProcessProduct_HdrSqs: sns -> sqs -> lambda
# -------[Create Topic In]
  InProcessProduct:
    Type: AWS::SNS::Topic
    Properties:
      DisplayName: "SNS Message in"
      TopicName: ${self:custom.iz_resourcePrefix}InProcessProduct
  # -------[Topic In]
  SubscriptionInProcessProduct:
    Type: AWS::SNS::Subscription
    Properties:
      TopicArn: !Ref InProcessProduct
      Endpoint: "arn:aws:sqs:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}ProcessProduct"
      Protocol: "sqs"
#----[Queue]
  ProcessProduct:
    Type: "AWS::SQS::Queue"
    Properties:
      QueueName: ${self:custom.iz_resourcePrefix}ProcessProduct
      RedrivePolicy:
        deadLetterTargetArn:
        # !GetAtt
          Fn::GetAtt:
            - ProcessProductDLQ
            - Arn
        maxReceiveCount: 3
      VisibilityTimeout: 120
  #----[QueueDLQ]
  ProcessProductDLQ:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: ${self:custom.iz_resourcePrefix}ProcessProductDLQ
  #----[DataPolicy]
  ProcessProductPolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Sid: "allow-sns-messages"
            Effect: Allow
            Principal: "*"
            Resource:
            # !GetAtt
              Fn::GetAtt:
                - ProcessProduct
                - Arn
            Action: "SQS:SendMessage"
      Queues:
        - Ref: ProcessProduct
  • serverless.yml

add sns-out.yml and sns-in-sqs.yml in resources