To coordinate the refinement and estimation sessions in Unrefined I wanted to have a way to give users instant feedback on some events like people estimating or moving to the next ticket. At the same time I wanted to keep the frontend code as simple as possible i.e. I tried to avoid writing a SPA as much as possible so I settled to the following setup:
This setup worked pretty well, it is easy to maintain and requires not much JavaScript code given the small size of this webapp.
In this post I'll describe the basic building blocks to replicate such setup using Aleph which provides a nice implementation of SSE (and Websockets, but not covered here) leveraging Manifold; the same result can be achieved using other web servers and core.async but I haven't tried it yet.
Before jumping into the implementation it is better to spend few words on the SSE spec. If you are already familiar with it you can jump to the next section, or you can stay here and help me to spot and fix possible mistakes ;) .
SSE provides a simple (but effective) way to stream events from a server to subscribed clients. Clients create a persistent connection to a server and wait for events being sent to them.
The client connects to an endpoint using the EventSource interface and the server is expected to reply with `text/event-stream` MIME type, creating effectively a persistent connection through which the server can send new events. See here for more details about the event format. As a quick introduction, events can be just data:
data: event payload
data: event payload with
data: multiple lines
or event types can be included in the event itself:
event: your-event-type
deta: your event payload
To recap:
text/event-stream
MIME typeFor more detailed information please refer to these links:
There is a ton of material out there about SSE, these links are just quick references.
Time to get some code! Lets start with a simple base setup for the server side.
We are going to create a web app with a single endpoint that will stream events back to a client to get things started, later we will add a router to handle two endpoints, one to send data and another to receive events and a tiny JS implementation of the EventSource interface to render events to a page.
We can start by putting down some deps, I am using tool.deps
but it should be easy
to port them to lain or boot if needed:
{:deps {org.clojure/clojure {:mvn/version "1.11.1"}
ring/ring-core {:mvn/version "1.10.0"}
aleph/aleph {:mvn/version "0.6.3"}}
Now we can start with an HTTP handler, not the fanciest code you'll ever see in your life but good enough to get started
(ns codes.fpsd.sse
(:require [aleph.http :as http]))
(defn handler [_req]
{:status 200
:headers {"content-type" "text/event-stream"}
:body "data: one event, k thanks bye!\n"})
(comment
(http/start-server handler {:port 8080
:join? false})
,)
After eveluating this code and the inner comment section we will have a running
web server that replies to every route with a text/event-source
body and will
close the connection afterwards; not too different compared to a "normal" HTTP
endpoint but we will get there. Here is the output of curl session:
$ curl localhost:8080 -vv
# Trying 127.0.0.1:8080…
# Connected to localhost (127.0.0.1) port 8080 (#0)
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: **/**
>
# Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Content-Type: text/event-stream
< Server: Aleph/0.6.3
< Date: Sun, 30 Jul 2023 19:13:32 GMT
< Connection: Keep-Alive
< content-length: 31
<
data: one event, k thanks bye!
# Connection #0 to host localhost left intact
Aleph builds its functionalities on top of Manifold which provides asynchronous
data structures like manifold.deferred/deferred
or manifold.stream/stream
(and more), these data structure can be used as the body of a response and
Aleph will return their contents as soon as they are available and will close
the connection as soon as these sources will be closed. Manifold has been released
almost at the same time of core.async
(docs) and can be also used to work with async channels, which
can be handy if we have some part of our system that relies on them. Citing its
own Readme:
Manifold provides basic building blocks for asynchronous programming, and can be used as a translation layer between libraries which use similar, but incompatible, abstractions.
Manifold provides two core abstractions: deferreds, which represent a single asynchronous value, and streams, which represent an ordered sequence of asynchronous values.
Manifold API has a lot to offer and I encourage everyone to get to its docs to get a better understanding of this library, to approach it quickly it could be enough to see how it is used in Aleph's examples here. Other, more up to date examples are available here.
For the purpose of this exercise we can think of Manifold streams as channels to which we can put data and from which Aleph read data to send back events to an EventSource client.
Lets re-write the handler in order to produce periodic events that later will be consumed by a client
(ns codes.fpsd.sse
(:require [aleph.http :as http]
[manifold.stream :as s]))
(defn format-event [body]
(str "data: " body "\n\n"))
(defn handler [_req]
{:status 200
:headers {"content-type" "text/event-stream"}
:body (let [counter (atom 0)]
(s/periodically
1000
#(format-event (str "Sending event #" (swap! counter inc)))))})
This new endpoint will send an event every second, until the client closes the
connection; (manifold.stream/periodically delay-ms fn)
returns a stream to
which a new value is sent by calling fn
after delay-ms
milliseconds.
The format-event
fn is a helper that will return a SSE event, with only the
data
part, given any text renderable body
; it could be improved to properly
handle data bodies which include new lines generating multiple data:
section,
but I guess you can do that easily if you want to ;) .
To recap the new endpoint will send back an event with a progressive number, starting from 1, to every client that connects to the web server.
This what we get if we try the endpoint with curl:
$ curl localhost:8080 -vv
# Trying 127.0.0.1:8080…
# Connected to localhost (127.0.0.1) port 8080 (#0)
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: **/**
>
# Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Content-Type: text/event-stream
< Server: Aleph/0.6.3
< Date: Mon, 31 Jul 2023 17:37:15 GMT
< Connection: Keep-Alive
< transfer-encoding: chunked
<
data: Sending event #1
data: Sending event #2
data: Sending event #3
data: Sending event #4
^C
Oooook, finally something interesting is happening! A client connects to
an endpoint that is streaming back events at regular intervals; if we stop
and think about it we have laid out the foundation to stream events to a
client, by leveraging manifold.stream/stream
abstraction and Aleph's
ability to use it to send back responses to the caller.
I think that implementing a browser client would be even more exciting, so lets do that! Actually before moving to a browser client I'd like to improve to server a bit by:
There are many routers available today, lately I am enjoying using reitit which I find very intuitive and comes with a comprehensive documentation; other good alternatives can be compojure, bidi (and many more) or even manual routing of requests, anyway as you can imagine we will setup out routes with reitit so we can start by adding it to our deps:
metosin/reitit {:mvn/version "0.7.0-alpha5"}
(yes it is an alpha version, I am confident that a stable 0.7.x will be out soon)
and setup the router to serve the static page holding the JS code and one endpoint to handle the SSE events:
(ns codes.fpsd.sse
(:require [aleph.http :as http]
[manifold.stream :as s]
[reitit.ring :as ring]))
(defn format-event
"Return a properly formatted event payload"
[body]
(str "data: " body "\n\n"))
(defn sse-events [_req]
{:status 200
:headers {"content-type" "text/event-stream"}
:body (let [counter (atom 0)]
(s/periodically
1000
#(format-event (str "Sending event #" (swap! counter inc)))))})
(defn create-app
"Return a ring handler that will route /events to the SSE handler
and that will servr static content form project's resource/public directory"
[]
(ring/ring-handler
(ring/router
[["/events" {:get {:handler sse-events
:name ::events}}]]
)
(ring/routes
(ring/create-resource-handler {:path "/"})
(ring/create-default-handler))))
;; Web server maangement code to make it easy to start and stop a server
;; after changesto router or handlers
(def server_ (atom nil))
(defn start-server! []
(reset! server_ (http/start-server (create-app)
{:port 8080
:join? false})))
(defn stop-server! []
(swap! server_ (fn [s]
(when s
(.close s)))))
(comment
(start-server!)
(stop-server!)
,)
(Please change the namespace to reflect your project setup)
To recap:
handler
to sse-events
but it is the same as before/events
and the static contentIf we evaluate the buffer and start the server we can verify that the /events
endpoint
is working as before:
❯ curl localhost:8080/events -vv
# Trying 127.0.0.1:8080…
# Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /events HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: **/**
>
# Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Content-Type: text/event-stream
< Server: Aleph/0.6.3
< Date: Wed, 02 Aug 2023 06:29:16 GMT
< Connection: Keep-Alive
< transfer-encoding: chunked
<
data: Sending event #1
data: Sending event #2
^C
To recap:
/events
endpointI have decided to avoid some details and provide a pre-cooked solution because reitit's docs are quite comprehensive, and to focus on SSE.
In the previous step we have added a way to serve static files, now we can add a
basic web page where to render events as we get them. First of all lets see if we
can correctly serve a simple HTML file; paste the following content in resources/public/index.html
<html>
<body>
Your events here!
<div id="events"></div>
</body>
</html>
Now start the server and point your browser to http://localhost:8080, you should something like this:
Quite boring still…we can try subscribing to the /events
endpoint and render
the upcoming events! By coincidence we already have a div
with the id events
that we can use for this purpose ;)
What we need to do is:
EventSource
object that connects to the events
endpointevents
divFor simplicity we will embedd the JS code inside the index.html
page:
<html>
<body>
Your events here!
<div id="events"></div>
</body>
<script type="text/javascript">
let source = new EventSource('/events')
source.onmessage = (e) => {
let container = document.getElementById('events')
container.prepend(`Received event with data: ${e.data}`)
}
</script>
</html>
If we load http:localhost:8080 again we should see something like the following screenshot, again nothing super exciting but I hope you can feel the potential of this approach!
Somehow I suspect that this is not enough yet, maybe we can spice it up with a super simplified chat room?
What do you need to do to implement the simplest chat room ever? Our MVP should at least provide:
Following the previous bullet points I'll start with the event bus.
Manifold provides a convenient implementation of an even bus
available at manifold.bus/event-bus
. This provides a way to subscribe
a consumer to a topic (or SSE endpoint) and send messages to a topic
which will be sent to all connected consumers. For simplicity
we will have one single topic the-chat-room
but it would easy to
extend this example to support multiple rooms.
Second thing to do is to add an endpoint that will get the chat message and will push it to the event bus, lets see some code:
(ns codes.fpsd.sse
(:require [aleph.http :as http]
[manifold.bus :as b]
[reitit.ring :as ring]
[ring.middleware.params :as params]))
;; the one and only event bus needed for this app
;; using defonce to be able to possibly evaluate the
;; full buffer without breaking existing connections
(defonce event-bus (b/event-bus))
(defn format-event
"Return a properly formatted event payload"
[body]
(str "data: " body "\n\n"))
(defn sse-events [_req]
{:status 200
:headers {"content-type" "text/event-stream"}
:body (b/subscribe event-bus "the-chat-room")})
(defn send-message! [request]
(let [message (format-event (-> request :params (get "message")))]
(b/publish! event-bus "the-chat-room" message)
{:status 204
:headers {:content-type "text/plain"}
:body ""}))
(defn create-app
"Return a ring handler that will route /events to the SSE handler
and that will servr static content form project's resource/public directory"
[]
(ring/ring-handler
(ring/router
[["/events" {:get {:handler sse-events
:name ::events}}]
["/send-message" {:post {:name ::send-message
:handler send-message!}}]]
{:data {:middleware [params/wrap-params]}})
(ring/routes
(ring/create-resource-handler {:path "/"})
(ring/create-default-handler))
))
Here is a breakdown of the changes:
manifold.bus
to have access to b/event-bus
sse-events
will subscribe users to the chat topic and will send back eventssend-message
will take the message
from the request body and send it to the event bussend-message
:params
of the request
mapQuite straight forward, isn't it?
Few pointers if you want to dig deeper in the APIs show here:
Now we can implement the frontend:
<html>
<body>
<p>
<form onsubmit="event.preventDefault(); return sendMessage()">
<input type="text" name="message" id="message" placeholder="Your message here"/>
<input type="submit" value="Send"/>
</form>
</p>
<p>Past messages</p>
<div id="events">first load</div>
</body>
<script type="text/javascript">
let sendMessage = () => {
let input = document.getElementById('message')
fetch('/send-message',
{method: 'POST',
headers: {"Content-Type": 'application/x-www-form-urlencoded'},
body: `message=${input.value}`}).then(() => {input.value = ''})
return false
}
let appendMessage = (text) => {
let p = document.createElement('p')
p.appendChild(document.createTextNode(text))
let container = document.getElementById('events')
container.prepend(p)
}
let source = new EventSource('/events')
source.onmessage = (e) => {
appendMessage(e.data)
}
</script>
</html>
Yeah the code looks early 2000 era of HTML+JS (if we exclude the use of fetch
),
but I think it shows the core concepts nicely:
events
div as soon as we will receive themfetch
API to POST messages to the send-message
endpoint (wow a SPA without React!)Again sorry for the JS code quality but it is not my bread and butter and this is the best I can do.
After restarting/re-evaluating our backend and loading the page we will be able to chat in realtime, no too bad for less than 100 lines of code!
I hope you had fun following this simple recipe on how to add a bit of real
time feel to a basic web app! SSE, while simple from the outside, can be handy
in a lot of situations and adding it to your web application can be quite easy.
I am sure that the same can be achieved with other web servers and
core.async
, if you'll try that approach please write about your experience
and share it with us!
The full source code is available in my playground here.