Using Slack to track the SQS dead-letter queue

AWS SQS plays a significant role in modern application architecture, especially in a serverless environment. When working with SQS, you can often see that messages have not been read; the reason could be a bug in your code, a temporary resource constraint, an API budget exceeded, or dependencies in messages that need to be processed. Most of the time, you would like to know what these messages are if they fail many times, then find out why and fix the problems. This is where the SQS dead-letter queue comes into play.


However, monitoring an undelivered (slang: “dead”) message can be challenging. One of the most common approaches is to configure CloudWatch to send alarms, but people often face two problems:

  1. There are no details about bounced messages. CloudWatch only shows that there are messages in the dead letter queue, without giving details, to find more information, DevOps often needs to use other tools, such as the AWS CLI.
  2. There is no way to reproduce the undelivered messages, that is, the system is not able to return the undelivered message to SQS, at least not so easy. You can use the AWS CLI to get them back, but again, this makes troubleshooting already annoying issues even more frustrating.

The above problems can be solved with Slack and Lambda as shown below.

The SQS dead-letter queue is an event trigger for a lambda function that sends notifications to Slack, then Slack passes the user’s actions back to the lambda function, and finally pushes messages back to SQS.

Part 1. Slack

First, we need to create an application in Slack, and then write the application so that it accepts user input and interacts with Lambda.

Go to api.slack.com, to create an app if you don’t already have one.

When the app is ready, we can create an inbound web hook that the lambda function will send notifications to. Save the webhook URL for later.

Creating interactivity, the request URL is where Slack sends user actions, the endpoint of the Lambda API gateway.

Part 2. Lambda

I am using a serverless framework to manage lambda functions. We will have two lambda functions:

  1. Monitoring function. The function’s event source is the SQS dead-letter queue, so when a message appears in the dead-letter queue, this function is triggered and then forwards the message to Slack.
  2. Command function. This function is responsible for listening to Slack actions, that is, clicking a button sends a message back to the original SQS.

The Serverless.yml file shows how these two functions are configured

service: slack-sqs-monitor
frameworkVersion: "2.9.0"
provider:
  name: aws
  versionFunctions: false
  runtime: nodejs12.x
  region: ap-southeast-2
  iamRoleStatements:
    - Effect: "Allow"
      Action:
        # You should only give least permissions to your functions.
        - "sqs:*"
      Resource:
        - arn:aws:sqs:ap-southeast-2:xxxxxxxx:sqs.fifo # The original SQS arn
        - arn:aws:sqs:ap-southeast-2:xxxxxxxx:deadletter.fifo # The dead letter queue arn

plugins:
  - serverless-webpack
  - serverless-domain-manager

custom:
  customDomain:
    rest:
      domainName: labs.mianio.com
      basePath: sqs-command
      createRoute53Record: true
      securityPolicy: tls_1_2
  webpack:
    webpackConfig: "webpack.config.js"
    packager: "yarn"

functions:
  monitor:
    handler: functions/monitor.handler
    desciption: The function has the dead letter queue as the event, and forward the event to Slack
    tags:
      name: Monitor
    environment:
#     This is the webhook URL from the previous step
      SLACK_ENDPOINT: https://hooks.slack.com/services/XXXXXXX/XXXXXX/XXXXXXXXX
    events:
      - sqs:
#       Dead letter queue ARN
          arn: arn:aws:sqs:ap-southeast-2:xxxxxxxx:deadletter.fifo

  command:
    handler: functions/command.handler
    tags:
      name: Command
      desciption: The function handles Slack action and place the message back to the queue
    environment:
    # Credentials should be retrieved from Parameter Store 
      SLACK_SIGNING_SECRET: ${ssm:/deadletter/slack/signing-secret~true}
      SLACK_OAUTH_TOKEN: ${ssm:/deadletter/slack/oauth-token~true}
    events:
      - http:
          path: slack
          method: post
          cors: true

  • The monitoring function has SLACK_ENDPOINT as an environment variable that will be used for publishing to Slack.
  • The command function is behind the API gateway, the endpoint is the request URL for Slack interactivity.

The function decomposes events from the dead-letter queue and creates a Slack payload to send. See api.slack.com/block-kitto learn more about Slack development blocks.

Monitoring function

import middy from "@middy/core";
import axios from "axios";
import doNotWaitForEmptyEventLoop from "@middy/do-not-wait-for-empty-event-loop";
export const monitor = async (event: any): Promise<any> => {
  const records = event.Records;
  await Promise.all(
    records.map((record: any) => {
      const messageGroupId = record?.attributes?.MessageGroupId;
      const messageDeduplicationId = record?.attributes?.MessageDeduplicationId;
      const approximateReceiveCount =
        record?.attributes?.ApproximateReceiveCount;

      return axios({
        method: "post",
        url: process.env.SLACK_ENDPOINT,
        data: {
          blocks: [
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text: `*Messsge ID*: ${record.messageId}`,
              },
            },
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text: `*Message Group Id*: ${messageGroupId}`,
              },
            },
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text: `*Message Deduplication Id*: ${messageDeduplicationId}`,
              },
            },
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text: `*Approximate Receive Count*: ${approximateReceiveCount}`,
              },
            },
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text: record.body,
              },
            },
            {
              type: "actions",
              elements: [
                {
                  type: "button",
                  style: "primary",
                  text: {
                    type: "plain_text",
                    text: "Send back",
                  },
                  action_id: "sendback",
                  value: record.body,
                },
              ],
            },
          ],
        },
        headers: {
          "Content-type": "application/json; charset=utf-8",
        },
      });
    })
  );
  return;
};

export const salesforceDeadLetterMonitor = middy(monitor).use(
  doNotWaitForEmptyEventLoop()
);

Every time a message enters the dead letter queue, it is redirected to the Slack channel, where you can not only understand why the message is not being sent, but also act.

Part 3. Sending back

An exciting detail of the project is the ability to send undeliverable messages back to SQS for rework. When you click the green Send back button, Slack launches a POST request to the previously defined action URL, which is the API gateway endpoint.

Command function that sends undelivered message back to SQS

import { APIGatewayEvent } from "aws-lambda";
import AWS from "aws-sdk";
import qs from "qs";
import axios from "axios";
import middy from "@middy/core";
import doNotWaitForEmptyEventLoop from "@middy/do-not-wait-for-empty-event-loop";
import httpHeaderNormalizer from "@middy/http-header-normalizer";
import httpEventNormalizer from "@middy/http-event-normalizer";
import httpErrorHandler from "@middy/http-error-handler";
import { slackVerifier } from "../../middlewares/slack/verify";

const sqs = new AWS.SQS({ region: "ap-southeast-2" });
const command = async (event: APIGatewayEvent) => {
  if (!event.body) return { statusCode: 200 };
  const requestBody: any = qs.parse(event.body);
  const payload: any = JSON.parse(requestBody.payload);
  let response;

  const action = payload.actions[0];
  if (action.action_id === "sendback") {
    try {
      const sqsPayload = payload.message.blocks.find(
        (block: any) => block.block_id === "payload"
      );
      if (sqsPayload?.text?.text && action?.value) {
        const payload = JSON.parse(sqsPayload.text.text);

        await putBack(payload.jobName, payload.jobData, action.value);

        response = {
          payload: {
            attachments: [
              {
                color: "good",
                text: "Job was sent back",
              },
            ],
            response_type: "in_channel",
          },
        };
      }
    } catch (error) {
      console.error(error);
    }
  }

  if (payload.response_url) {
    await axios({
      method: "post",
      url: payload.response_url,
      data: response.payload,
      headers: {
        "Content-type": "application/json; charset=utf-8",
        Authorization: `Bearer ${process.env.SLACK_OAUTH_TOKEN}`,
      },
    });
  } else if (response && !payload.response_url && response.payload) {
    return {
      body: JSON.stringify(response.payload),
      statusCode: 200,
    };
  } else {
    return {
      statusCode: 200,
    };
  }
};

const putBack = async (name: string, data: any, workerUrl: string) => {
  const params: any = {
    MessageBody: JSON.stringify({ jobName: name, jobData: data }),
    QueueUrl: workerUrl,
  };
  return new Promise((resolve: Function, reject: Function): any => {
    sqs.sendMessage(params, (err: any, data: any): any => {
      if (err) {
        reject(err);
      } else {
        resolve(data);
      }
    });
  });
};

export const handler = middy(command)
  .use(doNotWaitForEmptyEventLoop())
  .use(httpEventNormalizer())
  .use(httpHeaderNormalizer())
  .use(slackVerifier())
  .use(httpErrorHandler());

This function is pretty simple:

  • SlackVerifier function. It verifies that the POST request is from Slack.

verifier.ts

import crypto from 'crypto';
import qs from 'qs';

export const slackVerifier = () => {
  return {
    before: async (handler: any) => {
      const slackSignature =
        handler.event.headers && handler.event.headers['x-slack-signature'];
      const timestamp =
        handler.event.headers &&
        handler.event.headers['x-slack-request-timestamp'];
      const time = Math.floor(new Date().getTime() / 1000);
      if (Math.abs(time - timestamp) > 300) {
        //  The request timestamp is more than five minutes from local time.
        // It could be a replay attack, so let's ignore it.

        return {
          statusCode: 401,
          body: JSON.stringify('Too old'),
        };
      }
      const body = handler.event.body;
      const sigBasestring = `v0:${timestamp}:${body}`;

      const hash = crypto
        .createHmac('sha256', process.env.SLACK_SIGNING_SECRET)
        .update(sigBasestring, 'utf8')
        .digest('hex');

      const mySignature = `v0=${hash}`;
      if (
        !crypto.timingSafeEqual(
          Buffer.from(mySignature, 'utf8'),
          Buffer.from(slackSignature, 'utf8')
        )
      ) {
        return {
          statusCode: 401,
          body: JSON.stringify('Invalid Signature'),
        };
      }
      return;
    },

    onError: (handler: any) => {
      return handler.callback(null, handler.error);
    },
  };
};
view raw

The SLACKSIGNINGSECRET environment variable is a variable from the Slack configuration page that is injected from the Serverless.yml definitions of the serverless environment.

I highly recommend that you keep your credentials in a safe place; it is good practice to extract them during deployment and keep them out of source.

SLACK_SIGNING_SECRET: ${ssm:/deadletter/slack/signing-secret~true}

  • SendBack function. It receives the payload from the Slack POST request and sends it back to SQS.

The payload that Slack sends to the command function looks like this.

It contains the response_url, which should be used to send a response back to Slack to confirm the action.

Code sends [полезную нагрузку ответа] back to Slack with the Bearer token.

await axios({
    method: "post",
    url: payload.response_url,
    data: response.payload,
    headers: {
       "Content-type": "application/json; charset=utf-8",
       Authorization: `Bearer ${process.env.SLACK_OAUTH_TOKEN}`
    }
});

SLACK_OAUTH_TOKEN is injected from environment variables during deployment. You can get its value on the Slack configuration page:

It’s fun to create such a project, and more importantly, it makes DevOps’s life a little easier. Little by little, these little amenities create a comfortable working environment in general.

Have you thought about starting a new life and learning from the new year? Until the end of this year, you can still grab a course with a good discount. And if you use a promo code HABR – you can add another 10% to the discount on the banner.

image

Similar Posts

Leave a Reply