Server-Sent Event using Kubernetes sidecar to enable HTTP2

Mohammed Hewedy
6 min readJul 29, 2020

--

Photo by Henry Be on Unsplash

I wanted to test running Server-Sent Event behind an http2 server exposed to the browser.

I wrote a simple HTTP service in Golang using HTTP 1.1 and then played with k3s Kubernetes cluster to deploy the service along with an Nginx as a reverse proxy to the Golang HTTP 1.1 service.

The Nginx will expose HTTP 2.0 and forward requests to the Golang service on HTTP 1.1.

We will explore the Golang service, then explore the Kubernetes YAML objects that enable us to achieve our goal, in the end, we will discuss the statefulness of the SSE and how to achieve the desired statelessness.

The Golang Service

First, the service is doing a very simple task, which accepts input on /submit endpoint and then broadcast it to all subscribed clients.

The Golang service consists of two files, main.go and index.thml

First here’s the broker object, it keeps track of all the connected clients and broadcast an event to them whenever an event got published.

type broker struct {
clients map[*http.Request]http.ResponseWriter
dataChan chan string
mux sync.Mutex
}

func newBroker() *broker {
return &broker{
clients: make(map[*http.Request]http.ResponseWriter),
dataChan: make(chan string, 100),
}
}

func (b *broker) publish(data string) {
b.dataChan <- data
}

func (b *broker) broadcast() {
fmt.Println("broadcasting to", len(b.clients), "clients connected")
data := <-b.dataChan
for _, w := range b.clients {
fmt.Fprintf(w, "data: %s\n\n", data)
f, _ := w.(http.Flusher)
f.Flush()
}
}

func (b *broker) addClient(w http.ResponseWriter, r *http.Request) {
b.mux.Lock()
defer b.mux.Unlock()
b.clients[r] = w
fmt.Printf("client %p connected\n", r)
}

func (b *broker) removeClient(w http.ResponseWriter, r *http.Request) {
b.mux.Lock()
defer b.mux.Unlock()
delete(b.clients, r)
fmt.Printf("client %p disconnected\n", r)
}

Here are the HTTP handlers:

b := newBroker()

http.HandleFunc("/submit", func(w http.ResponseWriter, r *http.Request) {
data, _ := ioutil.ReadAll(r.Body)
b.publish(string(data))
})

http.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")

b.addClient(w, r)

go func() {
<-r.Context().Done()
b.removeClient(w, r)
}()

for {
b.broadcast()
}
})

Then the full main.go:

package main

import (
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
)

type broker struct {
clients map[*http.Request]http.ResponseWriter
dataChan chan string
mux sync.Mutex
}

func newBroker() *broker {
return &broker{
clients: make(map[*http.Request]http.ResponseWriter),
dataChan: make(chan string, 100),
}
}

func (b *broker) publish(data string) {
b.dataChan <- data
}

func (b *broker) broadcast() {
fmt.Println("broadcasting to", len(b.clients), "clients connected")
data := <-b.dataChan
for _, w := range b.clients {
fmt.Fprintf(w, "data: %s\n\n", data)
f, _ := w.(http.Flusher)
f.Flush()
}
}

func (b *broker) addClient(w http.ResponseWriter, r *http.Request) {
b.mux.Lock()
defer b.mux.Unlock()
b.clients[r] = w
fmt.Printf("client %p connected\n", r)
}

func (b *broker) removeClient(w http.ResponseWriter, r *http.Request) {
b.mux.Lock()
defer b.mux.Unlock()
delete(b.clients, r)
fmt.Printf("client %p disconnected\n", r)
}

func main() {

b := newBroker()

http.HandleFunc("/submit", func(w http.ResponseWriter, r *http.Request) {
data, _ := ioutil.ReadAll(r.Body)
b.publish(string(data))
})

http.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")

b.addClient(w, r)

go func() {
<-r.Context().Done()
b.removeClient(w, r)
}()

for {
b.broadcast()
}
})

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "index.html")
})

log.Fatal(http.ListenAndServe(":8080", nil))
}

(inspired by https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c)

And here’s the index.html :

It shows a button that when clicked an event published to the backend, the upon event publish, the brocker object broadcast the event to all connected clients.

<script src="https://code.jquery.com/jquery-3.5.0.js"></script>
<form>
<div>
<input type="submit" value="Submit a New Job" style="width: 200px; height: 40;">
</div>
</form>
<div id="result">
</div>
<script>
(function () {
var input = 0;
$("form").submit(function (event) {
event.preventDefault();
input++;
$.post("/submit", input + "");
show("Process <b>" + input + "</b> <span style='color: red;'>Under progress</span>");
});

var eventSource = new EventSource("/status");
show("Connection established");

eventSource.onmessage = function (evt) {
console.log('onmessage: ', evt);
show("Process <b>" + evt.data + "</b> <span style='color: green;'>Completed</span>");
};
eventSource.onerror = function (err) {
show("EventSource failed:"+ err);
};

eventSource.onopen = function (e) {
show("Connection opened")
};

window.onbeforeunload = function () {
eventSource.close();
};

function show(text) {
$("#result").html($("#result").html() + "<br />" + text)
}
}());
</script>

Very simple and straightforward implementation. and Here’s the docker file:

FROM alpine:latest
WORKDIR /root/
COPY
index.html .
COPY gosse .
CMD ["./gosse"]

Note, I am copying both files into the Docker image.

Then I build and publish the docker image to docker hub:

GOOS=linux go build -o gosse
docker build -t gosse:latest .
docker tag gosse:latest mhewedy/gosse:latest
docker push mhewedy/gosse:latest

Kubernetes objects definitions

Once we have a docker image, we can use it in Kubernetes to have a deployment with 2 pods each consist of our image beside an Nginx reverse proxy as a sidecar container.

We use Nginx here to provide us with HTTP2 support.

First, we needed to generate TLS keypairs:

openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout nginx-selfsigned.key -out nginx-selfsigned.crt

Now, we will create configmap and secret to hold Nginx configurations.

Also, we will have the Deployment and NodePort service so I can access it from my browser on my mac machine:

# Kubernetes ConfigMap, Secret, Deployment and Service for the golang sse service
# that is proxied by nginx to provide http2 support.
---
apiVersion: v1
kind: ConfigMap
metadata:
name: nginxconf
data:
nginx-selfsigned.crt: |
.... public key here ....
default.conf: |
server {
listen 443 ssl http2;
ssl_certificate /etc/nginx/conf.d/nginx-selfsigned.crt;
ssl_certificate_key /etc/nginx/conf.d/private/nginx-selfsigned.key;

server_name localhost;

location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
}
}
---
apiVersion: v1
kind: Secret
metadata:
name: nginxkey
data:
nginx-selfsigned.key: ..... private key here .....
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: gosse
name: gosse
spec:
replicas: 2
selector:
matchLabels:
app: gosse
template:
metadata:
labels:
app: gosse
spec:
containers:
- image: mhewedy/gosse
name: gosse
ports:
- containerPort: 8080
- image: nginx
name: nginx
ports:
- containerPort: 443
volumeMounts:
- name: nginxconf-vol
mountPath: /etc/nginx/conf.d/
- name: nginxkey-vol
mountPath: /etc/nginx/conf.d/private/
volumes:
- name: nginxconf-vol
configMap:
name: nginxconf
- name: nginxkey-vol
secret:
secretName: nginxkey
---
apiVersion: v1
kind: Service
metadata:
labels:
app: gosse
name: gosse
spec:
selector:
app: gosse
type: NodePort
ports:
- name: 443-port
port: 1443
protocol: TCP
targetPort: 443
nodePort: 30007

Then I create the whole file at once:

kubectl apply -f k8s.yaml

Now, I can open many tabs to (more than 6) to the Golang service

Here it shows the HTTP protocol is in use:

Stateful Protocol

It is important to note that, SSE is a stateful protocol, in terms of each one of the backend servers that has an open connection with a set of clients that push data to.

Current service implementation in Golang doesn’t allow each node to share the client’s information with one another.

To achieve this, we already have — in the Golang service implementation — a Node-specific broker object (broker)that broadcasts all subscribed clients on that node, we need only to trigger this broker object on all nodes whenever we have a new state. Maybe we might have a Kafka consumer in each node that subscribes to a particular queue/topic and trigger such a broker object. or even we might depend on the pub-sub of Redis to achieve so.

--

--

Mohammed Hewedy
Mohammed Hewedy

No responses yet