Skip to main content

Working with Event Sourcing, CQRS and Web Sockets on AWS

Introduction

The WebSocket API is an advanced technology that makes it possible to open a two-way interactive communication session between the user’s browser and a server. With this API, you can send messages to a server and receive event-driven responses without having to poll the server for a reply.

We normally use WebSockets when we want to get live updates of a particular object without having to constantly poll for new updates. In this scenario, WebSockets can be helpful as they reduce the required number of API calls and in turn the cost of our infrastructure, and if the number of request is very high, the API might apply throttling.

In this article, we will review how we could use WebSockets for EverntSourcing+CQRS. You can follow the same approach for other architectures where you need to notify the clients when changes happen in a certain dataset.

Architecture

In Fenergo we are using EventSourcing and CQRS primarily. We have commands which issue events that are then projected into a state database. Then, we have queries that allow us to efficiently query the read models that we store in the state database.

In some scenarios, we might want to be notified in the frontend every time there is a change in a particular object, so we can refresh the UI with the latest data. Examples of these are chats, dashboards, push notifications, etc.

When we are using Event Sourcing and CQRS, we can leverage the capabilities of the projections for notifying our clients through WebSockets when the data changes.

In this architecture, from the client perspective, it only requires creating a new WebSocket connection and reacting when there are new events coming through the socket.

If we are using DynamoDB as our state database, an alternative to this could be using Streams on DynamoDB. In this case, when the data changes, DynamoDB streams invoke a lambda, which will notify the corresponding sockets. The disadvantage of this approach is the fact that it won’t work with other data stores (ElasticSearch, Neptune, etc). For this reason, notifying after projecting the data is more generic.

Regarding the data we send though the WebSocket, ideally we would be sending the model we want to render in the frontend, so we can directly use the models we get through the WebSocket. However, this might not be suitable for some scenarios, as we may need to apply permissions or data aggregation for the DTOs we serve in our queries. If this is the case, we can just send a notification to our clients telling them that the data has changed, so they fetch latest from the Query API.

Security

When we think about sending information from a data protection and security perspective, the data we send though the socket must be encrypted. So to achieve this we only use secure WebSocket connections (wss) and have also implemented authentication. This ensures only authenticated users can receive updates from the WebSocket.

The authentication on WebSockets works in a similar to HTTP APIs, so when requesting a new connection, we send an access token. The token is is then validated by the authorizer in the API Gateway. The mechanism for passing the token in a WebSocket connection request is a little different. For a http request, we normally use the header authorization, but we cannot use custom headers in WebSockets when we are working with javascript so we need to look at alternatives, such as sending the token in the socket protocol or as a query parameter.

WebSockets on AWS

In AWS,  the API Gateway provides built-in support for Websockets, and the service will automatically manage the communication between clients and server. The approach we have taken is to create two lambdas:

- The first for creating new connections

- The Second for closing connections.

(Optionally, you could also create other lambdas for sending data.)

Once you get a request for creating a new connection, you need to store the connection in a data store. Also when a connection is closed, you need to remove it from the store. This store will be used to monitor which connections our application needs to notify when there is a new update. If we try to send data to a connection and the connection is GONE, we can delete the connection from the data store. For performance reasons we store the WebSocket connections in MemoryDB, but any data stores would work.

Another recommended strategy is to create your own subdomain (for example wss://websockets.my-domain.com) and create a new endpoint in API Gateway for WebSockets.

Here you have examples of WebSockets applications in javascript and csharp.

WebSockets JavaScript clients

In order to implement your javascript client, you can use the class WebSocket.

If you are using react hooks, our code would look similar to this example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const [socket, setSocket] = (useState < WebSocket) | (undefined > undefined);

useEffect(() => {
  if (!socket) {
    const webSocket = new WebSocket(
      "wss://websockets.my-domain.com/notifications",
      accessToken
    );

    webSocket.addEventListener("open", async () => {
      await refreshMyData();
    });

    webSocket.addEventListener("message", async (event) => {
      if (event.data === "DataUpdated") {
        await refreshMyData();
      }
    });

    setSocket(webSocket);
  }
  const fiveMinutesInMilliseconds = 300_000;
  const interval = setInterval(() => {
    if (socket && socket.readyState === socket.OPEN) {
      socket.send("PING");
    }
  }, fiveMinutesInMilliseconds);
  return () => {
    if (interval) {
      clearInterval(interval);
    }

    if (socket) {
      socket.close();
    }
  };
}, []);

In this example, we fetch the latest data with the method refreshMyData() every time there is an update. 

Depending on the volumes / expected volumes of traffic, you should be aware of AWS API Gateway quotas. AWS API Gateway would close any connections that have been idle for more than 10 minutes. If you don’t want this to happen, you should ping the WebSocket from time to time in order to keep it alive. In the example above, we just create an interval that pings the socket every 5 minutes which will keep the connection open. 

Finally, when you unload the component in the UI, you should clear the ping interval (if you are using any) and close the connection.

Testing Web Sockets

In order to mock the socket in unit tests, you can use the npm package jest-websocket-mock to verify it is working as expected.

You can also implement integration tests where you subscribe to a particular object with a web socket and issue commands to check if you are receiving updates.

For end-to-end tests, you don’t need to do anything specific. However, if you are using Cypress, I spotted an issue with Cypress when we are working with WebSockets, as it closes the before receiving the handshake response.

If you are facing this issue, you need to bypass the WebSockets endpoint in your cypress configuration, so cypress won’t proxy these requests. For that, we would add this custom configuration in the file cypress.config.ts.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { defineConfig } from "cypress";

export default defineConfig({
  setupNodeEvents(on, config) {
    on("before:browser:launch", (browser = {}, launchOptions) => {
      launchOptions.args = launchOptions.args.map((arg) => {
        if (arg.startsWith("--proxy-bypass-list")) {
          return "--proxy-bypass-list=<-loopback>,wss://websockets.my-domain.com";
        }
        return arg;
      });
      return launchOptions;
    });
  },
});

Conclusions

In this post, we have seen what WebSockets are and how we can use them to reduce the number of requests we make from our web clients to our APIs when we need to keep UI's updated.

As discussed, WebSockets can be great way to avoid polling APIs and reduce the hosting cost and limiting the use of a traffic quota.

In this post, we have also analyzed an architecture that leverages Event Sourcing and CQRS to send updates through WebSockets, so the web clients can get updates in real time.

Finally, we have explained how WebSockets can be implemented, including the backend with AWS API Gateway + Lambdas, and the frontend with the provided API for WebSockets.


Alberto Corrales is a Senior Technical Architect at Fenergo. Currently working on our SaaS platform he has been with us for over 4 years.