SQS DSQ Hdr
Jump to navigation
Jump to search
Example Sqs|Dsq Handler
step for create lambdaFunction and Sqs|Dsq handler
Lambda
- add lambda, copy LambdaFunction.js and change name, ex: ProcessProduct_Main.js
'use strict';
const izaraSharedLib = require('@izara_project/izara-shared');
const dynamodbSharedLib = izaraSharedLib.dynamodbSharedLib;
const hash = require('object-hash');
//* require more module that you use
/**
* description of function.
* @param {string} productId
*
* @returns {string} dataName
*/
module.exports.processProduct = async (
_izContext,
productId
) => {
try {
_izContext.logger.debug('ProcessProduct: ', {
productId
});
// ... do something
//* get item of productId from table
let productRecord = await dynamodbSharedLib.getItem(
_izContext,
{
//your keyValue
}
);
_izContext.logger.debug("productRecord: ", productRecord);
//* check record
//* if no record => throw error
//* have record => get type and name from record => dataName = type + "_" + name
if (!productRecord) {
//throw error;
}
let dataName = type + "_" + name;
_izContext.logger.debug("dataName: ", dataName);
// send message to SNS OutProductComplete
let productRequest = {
Message: JSON.stringify({
dataName: dataName,
}),
TopicArn: snsSharedLib.snsTopicArn('OutProductComplete')
};
_izContext.logger.debug("RequestParams before send to sqs OutProductComplete ", productRequest);
await sns.publishAsync(_izContext, productRequest);
return "product complete"
} catch (err) {
_izContext.logger.error('error ProcessProduct: ', err)
throw (err)
}
}
//Local testing
async function test() {
const CorrelationIds = require('@izara_project/izara-core-library-correlation-ids');
const Logger = require('@izara_project/izara-core-library-logger');
const IntegrationTestDetail = require('@izara_project/izara-core-library-integration-tests/src/IntegrationTests')
let _izContext = {
correlationIds: CorrelationIds,
logger: Logger,
integrationTestDetail: IntegrationTestDetail
}
let productId = "hashOfTypeAndNameEqualsProductId";
return module.exports.updateCatalogSettings(
_izContext,
productId
)
}
test();
** node ProcessProduct_Main.js => in terminal
- add lambda handler, copy LambdaFunctionHdrSqs.js and change name, ex: ProcessProduct_HdrSqs.js
'use strict';
const izara = require("@izara_project/izara-middleware");
const middleware = izara.middlewareHandler;
const recordHandlerSharedLib = require("@izara_project/izara-shared").recordHandlerSharedLib
const Logger = require('@izara_project/izara-core-library-logger');
const processProduct = require('./ProcessProduct')
// validate event properties in body.Message of sqs event
middleware.setValidatorSchema(recordHandlerSharedLib.baseValidatorSchema());
// set schema for record.body.Message
const perRecordsValidatorSchema = {
type: "object",
required: [
'productId'
],
properties: {
productId: {
type: 'string',
}
}
};
// // set schema for record.body.MessageAttributes
// const messageAttributeValidatorSchema = {
// type: "object",
// required: ['msgAtrrParam1', 'msgAtrrParam2'],
// properties: {
// msgAtrrParam1: {
// type: "string"
// },
// msgAtrrParam2: {
// type: "object"
// }
// }
// };
module.exports.main = middleware.wrap(async (event, context, callback) => {
try {
let recordPromises = [];
// loop each record and send to mainFunction
await Promise.all(event.Records.map(async record => { // promise.all for map() function
let passOnProperties = []
record._izContext.logger.debug('record ReceiveMsgOutHdrSqs', record);
//validate message (and MessageAttributes)
await recordHandlerSharedLib.validateRecord(
record, // one record will send to mainFunction
"ProcessProduct", // queue name that need to retry or send to dlq
perRecordsValidatorSchema, // schema for record.Message
// messageAttributeValidatorSchema // ----- for msgAttr default is null -> do not send this parameter if not want to validate msgAtt
);
// add argument (to invoke lambda) to passOnProperties[]
passOnProperties.push(record.body.Message.productId)
record._izContext.logger.debug('passOnProperties in handler', passOnProperties);
// call recordHandlerSharedLib.recordHandler with 3 parameters and return promise(resolve)
let recordPromise = recordHandlerSharedLib.recordHandler(
record, // one record will send to mainFunction
processProduct.processProduct, // mainFunction that need to invoke.
"ProcessProduct", // queue name that need to retry or send to dlq
passOnProperties, // all parameters that mainFunction needed.
);
record._izContext.logger.debug('after recordPromise in handler');
recordPromises.push(recordPromise); // push promise to recordPromises
}))
Logger.debug('before Promise.all(recordPromises) in handler');
try {
// --- main await all promises
await Promise.all(recordPromises); // await all promises
return event.Records // return all for local testing
} catch {
Logger.debug('Promise.all(recordPromises) in handler threw error (at least one record did no resolve');
}
Logger.debug('after Promise.all(recordPromises) in handler');
} catch (err) {
Logger.error('Unhandled Error, ProcessProduct_HdrSqs: ', err);
throw (err);
}
});
- function.yml
For Sqs handler ex: ProcessProduct_HdrSqs
ProcessProductSqs:
handler: src/ProcessProduct_HdrSqs.main
name: ${self:custom.iz_resourcePrefix}ProcessProductHdrSqs
events:
- sqs:
arn: arn:aws:sqs:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}ProcessProduct
batchSize: 10
# filterPatterns: # **** need to update serveless framework upper v.2.69.1
# - body: {"MessageAttributes": {"callingFlow": {"Value": ["${self:custom.iz_serviceName}ProcessProduct"]} } } # TestFilter is function name of callingflow
# - body: {"MessageAttributes": {"callingFlow": {"Value": [{"exists":false}]} } }
iamRoleStatements:
- Effect: Allow
Action:
- sqs:SendMessage
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- SNS:Publish
- dynamodb:PutItem
- dynamodb:GetItem
- dynamodb:Query
- dynamodb:DeleteItem
- dynamodb:UpdateItem
Resource:
- arn:aws:dynamodb:${self:custom.iz_region}:${self:custom.iz_accountId}:table/${self:custom.iz_resourcePrefix}Config
- arn:aws:dynamodb:${self:custom.iz_region}:${self:custom.iz_accountId}:table/${self:custom.iz_resourcePrefix}ProductRecord
- "arn:aws:sns:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}InProcessProduct" #sns msgIn or Out Topicname
- "arn:aws:sqs:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}ProcessProduct" # for reties
- "arn:aws:sqs:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}ProcessProductDLQ"
- "arn:aws:sns:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}OutProductComplete" #sns msgIn or Out Topicname
//* add more.....
- core-function.yml
add role in resource of InitialSetup and InitialSetupTestEnv
resource:
- ${self:custom.iz_prefixIamRole}ProcessProductSqs${self:custom.iz_suffixIamRole} # eg. PutRolePolicy
//* add more....
in resource
- sns-out.yml
Resources:
# ------[Create topic out]
OutProductComplete:
Type: AWS::SNS::Topic
Properties:
DisplayName: "SNS Message out"
TopicName: ${self:custom.iz_resourcePrefix}OutProductComplete
- sns-in-sqs.yml
ex: ProcessProduct_HdrSqs
Resources:
# ProcessProduct_HdrSqs: sns -> sqs -> lambda
# -------[Create Topic In]
InProcessProduct:
Type: AWS::SNS::Topic
Properties:
DisplayName: "SNS Message in"
TopicName: ${self:custom.iz_resourcePrefix}InProcessProduct
# -------[Topic In]
SubscriptionInProcessProduct:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref InProcessProduct
Endpoint: "arn:aws:sqs:${self:custom.iz_region}:${self:custom.iz_accountId}:${self:custom.iz_resourcePrefix}ProcessProduct"
Protocol: "sqs"
#----[Queue]
ProcessProduct:
Type: "AWS::SQS::Queue"
Properties:
QueueName: ${self:custom.iz_resourcePrefix}ProcessProduct
RedrivePolicy:
deadLetterTargetArn:
# !GetAtt
Fn::GetAtt:
- ProcessProductDLQ
- Arn
maxReceiveCount: 3
VisibilityTimeout: 120
#----[QueueDLQ]
ProcessProductDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.iz_resourcePrefix}ProcessProductDLQ
#----[DataPolicy]
ProcessProductPolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Sid: "allow-sns-messages"
Effect: Allow
Principal: "*"
Resource:
# !GetAtt
Fn::GetAtt:
- ProcessProduct
- Arn
Action: "SQS:SendMessage"
Queues:
- Ref: ProcessProduct
- serverless.yml
add sns-out.yml and sns-in-sqs.yml in resources