d

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore.

15 St Margarets, NY 10033
(+381) 11 123 4567
ouroffice@aware.com

 

KMF

How To Use WebSockets With AWS Serverless

In this guide, we are going to see how we can use WebSockets using an AWS serverless framework with NodeJs. At the end of this guide, we will have an application where we can create a chat room and other users can join our room to chat with each other in a custom room. I made the procedure very simple to follow, and at the end of this post, you will also get a link to the Github repository for the code.

Project Setup

The first thing is to set up the project folder and install the required project dependencies by creating a new folder and running the below commands in the root of the project folder.

npm init
npm i aws-sdk --save

Create a folder named src at the root of the project and inside the src folder, create three more folders with index.js files in each folder:

  1. connectionHandler: This folder will contain the file with code to handle the connect and disconnect events of WebSockets.
  2. manageRoom: This folder will contain the file with code to create/join the chat room.
  3. sendMessage: This folder will contain the file with code to emit the message to all connected users in a particular room if any user in the room sends a message.

By now, our project structure should look like this:

Sample project structure.

Now we have the basic project setup done and we are ready to move to the next step, which is creating the serverless.yml file. 

What Is a serverless.yml File?

In a very simple language, a serverless.yml file is used to code out the template according to the resources we want to create in our AWS account. We can define different types of resources in the serverless.yml file, and we can also set the different permissions for different resources.

In this project, the main use of serverless.yml will be to create the Lambda functions and to set up the DynamoDB table with different permissions.

Defining Configuration and Permissions Block in the serverless.yml File

service: serverless-chat

provider:
  name: aws
  runtime: nodejs12.x
  websocketsApiName: custom-websockets-api-name
  websocketsApiRouteSelectionExpression: $request.body.action
  environment:
    DYNAMO_TABLE_NAME: connections
  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:Query
        - dynamodb:Scan
        - dynamodb:GetItem
        - dynamodb:PutItem
        - dynamodb:DeleteItem
        - dynamodb:UpdateItem
        - lambda:InvokeFunction
      Resource: "arn:aws:dynamodb:${opt:region, self:provider.region}:*:
      table/${self:provider.environment.DYNAMO_TABLE_NAME}"

This is the first part of our serverless.yml file. Let’s break it down into parts.

  • service: This is the name of the CloudFormation template which will be created in the AWS account.
  • provider: We define configuration, environment variables, different permissions, and roles in this block. Here in this code, we are defining things like the version of NodeJs we want to use in our AWS environment.
  • websocketsApiRouteSelectionExpression: This is the custom route selection expression, meaning if we want to emit custom events from our WebSocket client, we will be passing the event name in the action property of the payload.
  • Action: This block has all the permission we want to give our Lambda function to perform different operations on the DynamoDB table.

Defining Functions Block In the serverless.yml File

functions:
  connectionHandler:
    handler: src/connectionHandler/index.connectionHandler
    events:
      - websocket:
          route: $connect
      - websocket:
          route: $disconnect

  sendMessage:
    handler: src/sendMessage/index.sendMessage
    events:
      - websocket:
          route: sendmessage

  manageRoom:
    handler: src/manageRoom/index.manageRoom
    events:
      - websocket:
          route: manageroom

This is where we will define all our Lambda functions to be created. Let’s break it down a little bit for a better understanding:

  • connectionHandler: This is the Lambda function that will be called when any user connects to or disconnects from our WebSocket server. There are three predefined events or routes defined by API Gateway:$connect, $disconnect and $default.
  • $connect/$disconnect: When the user connects to our WebSocket server, $connect is the default event that gets called, and when the user disconnects, the $disconnect event gets called.
  • sendMessage: This function will be called if the user sends sendmessage as the value of the action property in the request payload. It handles sending messages to all connected users in a particular room.
  • manageRoom: This function is used for creating/joining a room according to the room id.

Defining Resources Block in the serverless.yml File

resources:
  Resources:
    UsersDynamoDbTable:
      Type: AWS::DynamoDB::Table
      DeletionPolicy: Retain
      Properties:
        AttributeDefinitions:
          - AttributeName: connectionId
            AttributeType: S
        KeySchema:
          - AttributeName: connectionId
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TableName: ${self:provider.environment.DYNAMO_TABLE_NAME}

This is our resources block in the serverless.yml file. We define all the resources we want to automatically create in the AWS account in this file. Here we are creating a new DynamoDB table with a Hash key, or in another language, Primary key, if you come from an SQL background.

Connecting and Disconnecting Users

Let’s start working on the Lambda function to connect or disconnect WebSocket clients. We are using the connectionHandler function to handle this functionality. It will look something like this:

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.connectionHandler = async event => {
    const connectionId = event.requestContext.connectionId;
    const eventType = event.requestContext.eventType
    if (eventType === 'DISCONNECT') {
        try {
            await ddb.delete({ TableName: process.env.DYNAMO_TABLE_NAME, Key: { connectionId } }).promise();
            return { statusCode: 200, body: 'Disconnected' };
        }
        catch (e) {
            return { statusCode: 500, body: 'Could not clear the connection.' };
        }
    }
    else if (eventType === "CONNECT") {
        const putParams = {
            TableName: process.env.DYNAMO_TABLE_NAME,
            Item: {
                connectionId
            }
        };

        try {
            await ddb.put(putParams).promise();
        } catch (err) {
            return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
        }

        return { statusCode: 200, body: 'Connected.' };
    }
};

We will go through each part of the function in detail, so let’s start with the first part: handling the connected users.

Connecting Users

else if (eventType === "CONNECT") {
        const putParams = {
            TableName: process.env.DYNAMO_TABLE_NAME,
            Item: {
                connectionId
            }
        };

        try {
            await ddb.put(putParams).promise();
        } catch (err) {
            return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
        }

        return { statusCode: 200, body: 'Connected.' };
    }

What we are doing here is checking if the user was connected using the API Gateway WebSocket URL. If the user was connected, we are getting the connectionId from the event.requestContextobject and creating a new entry in the DynamoDB table with the connectionId value. This is just a simple insert operation on the DynamoDB table with connectionId.

What Is .Promise()?

If you are wondering why we are using .promise() here, it is used because we want to write clean code to the best of our ability, so we want to use async/await instead of callbacks. To use async/await, the function call must return a Javascript promise, which is why we are using the .promise() call. Most of the functions in AWS-SDK have an option to use .promise() , which allows the function to return the result in a promise instead of a callback.

Disconnecting Users

if (eventType === 'DISCONNECT') {
        try {
            await ddb.delete({ TableName: process.env.DYNAMO_TABLE_NAME, Key: { connectionId } }).promise();
            return { statusCode: 200, body: 'Disconnected' };
        }
        catch (e) {
            return { statusCode: 500, body: 'Could not clear the connection.' };
        }
    }

Here we are checking to see if the user was disconnected from the WebSocket server. If the user was disconnected, then connectionId is used to remove that user entry from the DynamoDB table.

Creating and Joining Chat Rooms

The next step is to set up a Lambda function to allow users to create or join a room. The code of the function will look something like this:

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.manageRoom = async event => {
    const body = JSON.parse(event.body)
    if (!body.roomid) return { statusCode: 200, body: 'Room id is required.' };

    const params = {
        TableName: process.env.DYNAMO_TABLE_NAME,
        Key: {
            connectionId: event.requestContext.connectionId,
        },
        ExpressionAttributeValues: {
            ":roomid": body.roomid,
        },
        UpdateExpression: "SET roomid = :roomid",
        ReturnValues: "ALL_NEW"
    };

    const data = await ddb.update(params).promise();
    if (data.Attributes) {
        return { statusCode: 200, body: 'Room joined.' };
    } else {
        return { statusCode: 400, body: 'Some error has occured.' };
    }
};

Let’s break the code into different parts for a better understanding of the code.

Getting and Checking the Room ID

const body = JSON.parse(event.body)
if (!body.roomid) return { statusCode: 200, body: 'Room id is required.' };

Here we are getting the request body and parsing it as JSON data, and we are also checking whether roomid is present in the body object, because roomid is required if the user is trying to create/join a chat room.

Creating/Joining the Chat Room

const params = {
        TableName: process.env.DYNAMO_TABLE_NAME,
        Key: {
            connectionId: event.requestContext.connectionId,
        },
        ExpressionAttributeValues: {
            ":roomid": body.roomid,
        },
        UpdateExpression: "SET roomid = :roomid",
        ReturnValues: "ALL_NEW"
    };

    const data = await ddb.update(params).promise();
    if (data.Attributes) {
        return { statusCode: 200, body: 'Room joined.' };
    } else {
        return { statusCode: 400, body: 'Some error has occured.' };
    }

Here we are updating an entry in the DynamoDB table according to the connectionId and setting the column roomid with the value that is passed by the user in the request body. For example, if connectionId is #f!41fg and the roomid passed by the user is test-chat-room, then this code will update the roomid column with the value test-chat-room in the row where connectionId is #f!41fg.

Sending a Message to All Connected Users in the Chat Room

The final part of our project is to create a Lambda function to send a message to all connected users in a chat room if any user in the room sends a message. The code for this function will look like this:

const params = {
        TableName: process.env.DYNAMO_TABLE_NAME,
        Key: {
            connectionId: event.requestContext.connectionId,
        },
        ExpressionAttributeValues: {
            ":roomid": body.roomid,
        },
        UpdateExpression: "SET roomid = :roomid",
        ReturnValues: "ALL_NEW"
    };

    const data = await ddb.update(params).promise();
    if (data.Attributes) {
        return { statusCode: 200, body: 'Room joined.' };
    } else {
        return { statusCode: 400, body: 'Some error has occured.' };
    }

Let’s break down this function into different parts for better understanding.

Getting All Connection IDs According to the Room ID

let connectionData;
    try {
        const params = {
            TableName: process.env.DYNAMO_TABLE_NAME,
            FilterExpression: '#roomid = :roomid',
            ExpressionAttributeNames: {
                '#roomid': 'roomid',
            },
            ExpressionAttributeValues: {
                ':roomid': body.roomid
            },
        }

        connectionData = await ddb.scan(params).promise();
    } catch (e) {
        return { statusCode: 500, body: 'Could not send the message.' };
    }

When any user sends any message in a chat room, they must send the roomid. We will use that roomid to find all the users’ connectionids associated with that roomid. In the above code, we are using the roomid to find the records from the DynamoDB table and store all that data in a variable called connectionData.

Sending the Message to All Connected Users

{
try {
await apiGatewayMng.postToConnection({ ConnectionId: connectionId, Data: body.message }).promise();
} catch (e) {
if (e.statusCode === 410) {
await ddb.delete({ TableName: DYNAMO_TABLE_NAME, Key: { connectionId } }).promise();
} else {
throw e;
}
}
});

try {
await Promise.all(postCalls);
} catch (e) {
return { statusCode: 500, body: ‘Could not send the message.’ };
}

return { statusCode: 200, body: ‘Message sent.’ };
};” data-lang=”text/javascript”>

const apiGatewayMng = new AWS.ApiGatewayManagementApi({
        apiVersion: '2018-11-29',
        endpoint: event.requestContext.domainName + "https://dzone.com/" + event.requestContext.stage
});

    const postCalls = connectionData.Items.map(async ({ connectionId }) => {
        try {
            await apiGatewayMng.postToConnection({ ConnectionId: connectionId, Data: body.message }).promise();
        } catch (e) {
            if (e.statusCode === 410) {
                await ddb.delete({ TableName: DYNAMO_TABLE_NAME, Key: { connectionId } }).promise();
            } else {
                throw e;
            }
        }
    });

    try {
        await Promise.all(postCalls);
    } catch (e) {
        return { statusCode: 500, body: 'Could not send the message.' };
    }

    return { statusCode: 200, body: 'Message sent.' };
};

Here is the code that handles the functionality of sending a message to other users who are connected in a chat room if any member in the chat room sends any message. Let’s go through this code in detail:

Use of ApiGatewayManagementApi

const apiGatewayMng = new AWS.ApiGatewayManagementApi({
        apiVersion: '2018-11-29',
        endpoint: event.requestContext.domainName + "https://dzone.com/" + event.requestContext.stage
 });

ApiGatewayManagementApi is used to send the data to an endpoint. What we are doing here is creating an instance of this class to use the methods that ApiGatewayManagementApi provides. We are also getting the endpoint on which we will send the event data of our Lambda function.

Send the Message in a Chat Room

const postCalls = connectionData.Items.map(async ({ connectionId }) => {
        try {
            await apiGatewayMng.postToConnection({ ConnectionId: connectionId, Data: body.message }).promise();
        } catch (e) {
            if (e.statusCode === 410) {
                await ddb.delete({ TableName: DYNAMO_TABLE_NAME, Key: { connectionId } }).promise();
            } else {
                throw e;
            }
        }
 });

If you are not familiar with javascript, this code might seem confusing. What we are doing in this code is mapping through all the data that connectionData has. If you remember, connectionData is the collection of connectionIds for users who are in a chat room.

  • postToConnection is the method we are going to use to send a message to all the connected users in a chat room using the connectionId of the user.
  • Data property is the data that we want to send to the connected sockets.
  • postCalls will have the collection of pending Javascript Promises that are posting a message to each user in a particular chat room using users’ connectionId.

Using postCalls To Resolve All the Promises

try {
       await Promise.all(postCalls);
    } catch (e) {
        return { statusCode: 500, body: 'Could not send the message.' };
 }

We are passing postCalls, which is a collection of pending promises into a function called Promise.all(). This function requires an iterable array of promises, and it returns a single promise resolved with an array of data after resolving each promise in an array. In simpler words, Promise.all() will send the message to all the users in a chat room.

Testing the Code

We need to run sls deploy to deploy our code to the AWS, and then we will get a URL that will look something like this:

wss://{YOUR-API-ID}.execute-api.{YOUR-REGION}.amazonaws.com/dev

To test the application:

  • Install an NPM named wscat by running this command: npm install wscat -g
  • Run this command inside the terminal: wscat -c {your API Gateway URL} (without {}).

We are now connected to our Websocket server.

Now let’s create a new room named test room by sending this data: {“action”:”manageroom”,”roomid”:”test room”}.

After sending this data, we can go to our DynamoDB table and check if a new entry has been created with a connectionId with roomid.

Let’s repeat this same process for another user by opening a new terminal window and running the same process. After repeating this process from another terminal window, check the DynamoDB table. If it has another entry with the same test room value as roomid, your manage room code is working perfectly.

It’s time to send our first message by sending this data: {“action”:”sendmessage”,”roomid”:”test room”,”message”:”Hi there!”}.

Congratulations! You have successfully posted your first message. Now when any of the connected users send any message, it will be shown to all the users in that chat room.

Credit: Source link

Previous Next
Close
Test Caption
Test Description goes like this