Clojure bites - SSE with Aleph and Reitit

by FPSD — 2023-08-03


Overview

To coordinate the refinement and estimation sessions in Unrefined I wanted to have a way to give users instant feedback on some events like people estimating or moving to the next ticket. At the same time I wanted to keep the frontend code as simple as possible i.e. I tried to avoid writing a SPA as much as possible so I settled to the following setup:

This setup worked pretty well, it is easy to maintain and requires not much JavaScript code given the small size of this webapp.

In this post I'll describe the basic building blocks to replicate such setup using Aleph which provides a nice implementation of SSE (and Websockets, but not covered here) leveraging Manifold; the same result can be achieved using other web servers and core.async but I haven't tried it yet.

SSE overview

Before jumping into the implementation it is better to spend few words on the SSE spec. If you are already familiar with it you can jump to the next section, or you can stay here and help me to spot and fix possible mistakes ;) .

SSE provides a simple (but effective) way to stream events from a server to subscribed clients. Clients create a persistent connection to a server and wait for events being sent to them.

The client connects to an endpoint using the EventSource interface and the server is expected to reply with `text/event-stream` MIME type, creating effectively a persistent connection through which the server can send new events. See here for more details about the event format. As a quick introduction, events can be just data:

data: event payload

data: event payload with
data: multiple lines

or event types can be included in the event itself:

event: your-event-type
deta: your event payload

To recap:

For more detailed information please refer to these links:

There is a ton of material out there about SSE, these links are just quick references.

Project setup

Time to get some code! Lets start with a simple base setup for the server side.

We are going to create a web app with a single endpoint that will stream events back to a client to get things started, later we will add a router to handle two endpoints, one to send data and another to receive events and a tiny JS implementation of the EventSource interface to render events to a page.

We can start by putting down some deps, I am using tool.deps but it should be easy to port them to lain or boot if needed:

    {:deps {org.clojure/clojure {:mvn/version "1.11.1"}
            ring/ring-core {:mvn/version "1.10.0"}
            aleph/aleph {:mvn/version "0.6.3"}}

Now we can start with an HTTP handler, not the fanciest code you'll ever see in your life but good enough to get started

    (ns codes.fpsd.sse
      (:require [aleph.http :as http]))
    
    (defn handler [_req]
      {:status 200
       :headers {"content-type" "text/event-stream"}
       :body "data: one event, k thanks bye!\n"})
    
    (comment
      (http/start-server handler {:port 8080
                                  :join? false})
      ,)

After eveluating this code and the inner comment section we will have a running web server that replies to every route with a text/event-source body and will close the connection afterwards; not too different compared to a "normal" HTTP endpoint but we will get there. Here is the output of curl session:

$ curl localhost:8080 -vv


# Trying 127.0.0.1:8080…


# Connected to localhost (127.0.0.1) port 8080 (#0)

> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: **/**
> 


# Mark bundle as not supporting multiuse

< HTTP/1.1 200 OK
< Content-Type: text/event-stream
< Server: Aleph/0.6.3
< Date: Sun, 30 Jul 2023 19:13:32 GMT
< Connection: Keep-Alive
< content-length: 31
< 
data: one event, k thanks bye!


# Connection #0 to host localhost left intact

Streaming data

Aleph builds its functionalities on top of Manifold which provides asynchronous data structures like manifold.deferred/deferred or manifold.stream/stream (and more), these data structure can be used as the body of a response and Aleph will return their contents as soon as they are available and will close the connection as soon as these sources will be closed. Manifold has been released almost at the same time of core.async (docs) and can be also used to work with async channels, which can be handy if we have some part of our system that relies on them. Citing its own Readme:

Manifold provides basic building blocks for asynchronous programming, and can be used as a translation layer between libraries which use similar, but incompatible, abstractions.

Manifold provides two core abstractions: deferreds, which represent a single asynchronous value, and streams, which represent an ordered sequence of asynchronous values.

Manifold API has a lot to offer and I encourage everyone to get to its docs to get a better understanding of this library, to approach it quickly it could be enough to see how it is used in Aleph's examples here. Other, more up to date examples are available here.

For the purpose of this exercise we can think of Manifold streams as channels to which we can put data and from which Aleph read data to send back events to an EventSource client.

Lets re-write the handler in order to produce periodic events that later will be consumed by a client

    (ns codes.fpsd.sse
      (:require [aleph.http :as http]
                [manifold.stream :as s]))
    
    (defn format-event [body]
      (str "data: " body "\n\n"))
    
    (defn handler [_req]
      {:status 200
       :headers {"content-type" "text/event-stream"}
       :body (let [counter (atom 0)]
               (s/periodically
                1000
                #(format-event (str "Sending event #" (swap! counter inc)))))})

This new endpoint will send an event every second, until the client closes the connection; (manifold.stream/periodically delay-ms fn) returns a stream to which a new value is sent by calling fn after delay-ms milliseconds. The format-event fn is a helper that will return a SSE event, with only the data part, given any text renderable body; it could be improved to properly handle data bodies which include new lines generating multiple data: section, but I guess you can do that easily if you want to ;) .

To recap the new endpoint will send back an event with a progressive number, starting from 1, to every client that connects to the web server.

This what we get if we try the endpoint with curl:

$ curl localhost:8080 -vv


# Trying 127.0.0.1:8080&#x2026;


# Connected to localhost (127.0.0.1) port 8080 (#0)

> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: **/**
> 


# Mark bundle as not supporting multiuse

< HTTP/1.1 200 OK
< Content-Type: text/event-stream
< Server: Aleph/0.6.3
< Date: Mon, 31 Jul 2023 17:37:15 GMT
< Connection: Keep-Alive
< transfer-encoding: chunked
< 
data: Sending event #1

data: Sending event #2

data: Sending event #3

data: Sending event #4

^C

Oooook, finally something interesting is happening! A client connects to an endpoint that is streaming back events at regular intervals; if we stop and think about it we have laid out the foundation to stream events to a client, by leveraging manifold.stream/stream abstraction and Aleph's ability to use it to send back responses to the caller.

I think that implementing a browser client would be even more exciting, so lets do that! Actually before moving to a browser client I'd like to improve to server a bit by:

Adding the router

There are many routers available today, lately I am enjoying using reitit which I find very intuitive and comes with a comprehensive documentation; other good alternatives can be compojure, bidi (and many more) or even manual routing of requests, anyway as you can imagine we will setup out routes with reitit so we can start by adding it to our deps:

    metosin/reitit {:mvn/version "0.7.0-alpha5"}

(yes it is an alpha version, I am confident that a stable 0.7.x will be out soon)

and setup the router to serve the static page holding the JS code and one endpoint to handle the SSE events:

    (ns codes.fpsd.sse
      (:require [aleph.http :as http]
                [manifold.stream :as s]
                [reitit.ring :as ring]))
    
    (defn format-event
      "Return a properly formatted event payload"
      [body]
      (str "data: " body "\n\n"))
    
    (defn sse-events [_req]
      {:status 200
       :headers {"content-type" "text/event-stream"}
       :body (let [counter (atom 0)]
               (s/periodically
                1000
                #(format-event (str "Sending event #" (swap! counter inc)))))})
    
    (defn create-app
      "Return a ring handler that will route /events to the SSE handler
       and that will servr  static content form project's resource/public directory"
      []
      (ring/ring-handler
       (ring/router
        [["/events" {:get {:handler sse-events
                           :name ::events}}]]
    
        )
       (ring/routes
        (ring/create-resource-handler {:path "/"})
        (ring/create-default-handler))))
    
    
    ;; Web server maangement code to make it easy to start and stop a server
    ;; after changesto router or handlers
    (def server_ (atom nil))
    
    (defn start-server! []
      (reset! server_ (http/start-server (create-app)
                                         {:port 8080
                                          :join? false})))
    
    (defn stop-server! []
      (swap! server_ (fn [s]
                       (when s
                         (.close s)))))
    (comment
    
      (start-server!)
    
      (stop-server!)
      ,)

(Please change the namespace to reflect your project setup)

To recap:

If we evaluate the buffer and start the server we can verify that the /events endpoint is working as before:

 curl localhost:8080/events -vv


# Trying 127.0.0.1:8080&#x2026;


# Connected to localhost (127.0.0.1) port 8080 (#0)

> GET /events HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: **/**
> 


# Mark bundle as not supporting multiuse

< HTTP/1.1 200 OK
< Content-Type: text/event-stream
< Server: Aleph/0.6.3
< Date: Wed, 02 Aug 2023 06:29:16 GMT
< Connection: Keep-Alive
< transfer-encoding: chunked
< 
data: Sending event #1

data: Sending event #2

^C

To recap:

I have decided to avoid some details and provide a pre-cooked solution because reitit's docs are quite comprehensive, and to focus on SSE.

Frontend

In the previous step we have added a way to serve static files, now we can add a basic web page where to render events as we get them. First of all lets see if we can correctly serve a simple HTML file; paste the following content in resources/public/index.html

    <html>
      <body>
        Your events here!
        <div id="events"></div>
      </body>
    </html>

Now start the server and point your browser to http://localhost:8080, you should something like this:

initial-page

Quite boring still…we can try subscribing to the /events endpoint and render the upcoming events! By coincidence we already have a div with the id events that we can use for this purpose ;)

What we need to do is:

For simplicity we will embedd the JS code inside the index.html page:

    <html>
      <body>
        Your events here!
        <div id="events"></div>
      </body>
      <script type="text/javascript">
        let source = new EventSource('/events')
    
        source.onmessage = (e) => {
          let container = document.getElementById('events')
          container.prepend(`Received event with data: ${e.data}`)
        }
      </script>
    </html>

If we load http:localhost:8080 again we should see something like the following screenshot, again nothing super exciting but I hope you can feel the potential of this approach!

getting-events

Somehow I suspect that this is not enough yet, maybe we can spice it up with a super simplified chat room?

Generating and consuming events

What do you need to do to implement the simplest chat room ever? Our MVP should at least provide:

Following the previous bullet points I'll start with the event bus. Manifold provides a convenient implementation of an even bus available at manifold.bus/event-bus. This provides a way to subscribe a consumer to a topic (or SSE endpoint) and send messages to a topic which will be sent to all connected consumers. For simplicity we will have one single topic the-chat-room but it would easy to extend this example to support multiple rooms.

Second thing to do is to add an endpoint that will get the chat message and will push it to the event bus, lets see some code:

    (ns codes.fpsd.sse
      (:require [aleph.http :as http]
                [manifold.bus :as b]
                [reitit.ring :as ring]
                [ring.middleware.params :as params]))
    
    ;; the one and only event bus needed for this app
    ;; using defonce to be able to possibly evaluate the
    ;; full buffer without breaking existing connections
    (defonce event-bus (b/event-bus))
    
    (defn format-event
      "Return a properly formatted event payload"
      [body]
      (str "data: " body "\n\n"))
    
    (defn sse-events [_req]
      {:status 200
       :headers {"content-type" "text/event-stream"}
       :body (b/subscribe event-bus "the-chat-room")})
    
    (defn send-message! [request]
      (let [message (format-event (-> request :params (get "message")))]
        (b/publish! event-bus "the-chat-room" message)
        {:status 204
         :headers {:content-type "text/plain"}
         :body ""}))
    
    (defn create-app
      "Return a ring handler that will route /events to the SSE handler
       and that will servr  static content form project's resource/public directory"
      []
      (ring/ring-handler
       (ring/router
        [["/events" {:get {:handler sse-events
                           :name ::events}}]
         ["/send-message" {:post {:name ::send-message
                                  :handler send-message!}}]]
    
        {:data {:middleware [params/wrap-params]}})
    
       (ring/routes
        (ring/create-resource-handler {:path "/"})
        (ring/create-default-handler))
       ))

Here is a breakdown of the changes:

Quite straight forward, isn't it?

Few pointers if you want to dig deeper in the APIs show here:

Now we can implement the frontend:

    <html>
      <body>
        <p>
          <form onsubmit="event.preventDefault(); return sendMessage()">
            <input type="text" name="message" id="message" placeholder="Your message here"/>
            <input type="submit" value="Send"/>
          </form>
        </p>
        <p>Past messages</p>
        <div id="events">first load</div>
      </body>
      <script type="text/javascript">
        let sendMessage = () => {
          let input = document.getElementById('message')
          fetch('/send-message',
            {method: 'POST',
             headers: {"Content-Type": 'application/x-www-form-urlencoded'},
             body: `message=${input.value}`}).then(() => {input.value = ''})
          return false
        }
    
        let appendMessage = (text) => {
          let p = document.createElement('p')
          p.appendChild(document.createTextNode(text))
    
          let container = document.getElementById('events')
          container.prepend(p)
        }
    
        let source = new EventSource('/events')
    
        source.onmessage = (e) => {
          appendMessage(e.data)
        }
      </script>
    </html>

Yeah the code looks early 2000 era of HTML+JS (if we exclude the use of fetch), but I think it shows the core concepts nicely:

Again sorry for the JS code quality but it is not my bread and butter and this is the best I can do.

After restarting/re-evaluating our backend and loading the page we will be able to chat in realtime, no too bad for less than 100 lines of code!

realtime-chat

Conclusions

I hope you had fun following this simple recipe on how to add a bit of real time feel to a basic web app! SSE, while simple from the outside, can be handy in a lot of situations and adding it to your web application can be quite easy. I am sure that the same can be achieved with other web servers and core.async, if you'll try that approach please write about your experience and share it with us!

The full source code is available in my playground here.

Discuss