Highly available Celery with RabbitMQ and Kubernetes

Kubernetes, RabbitMQ and Celery provides a very natural way to create a reliable python worker cluster. This post is based on my experience running Celery in production at Gorgias over the past 3 years. The scope of this post is mostly dev-ops setup and a few small gotchas that could prove useful for people trying to accomplish the same type of deployment. At the end we’ll try to shut down a machine to see if our cluster is indeed reliable as I claim.

Before diving too deep I recommend refreshing your knowledge on the tools we are going to use. In particular I expect:

  • You have some familiarity with Kubernetes and in particular: pods, services, deployments, stateful sets and persistent volumes.
  • You used or know a bit about RabbitMQ. Some basics about clustering would be nice, but not required. You can read their docs here: https://www.rabbitmq.com/clustering.html
  • You used or know about Celery: how it schedules it’s tasks and executes them.

Pieces falling into place: Why Kubernetes, RabbitMQ and Celery?

  • Kubernetes is a very reliable container orchestration system (runs your docker images). It is used by a vast number of companies in production environments. It’s proven tech and provides a good base for making failure tollerant and scalable applications such as an async worker cluster which is what we’re trying to do.
  • RabbitMQ is a popular open source broker that has a history of being resilient to failure, can be configured to be highly-available and can protect your environment from data-loss in case of a hardware failure.
  • Celery is probably the most popular python async worker at this moment. It’s feature rich, stable and actively maintained. Celery (or any other worker) by it’s nature is distributed and relies on the message broker (RabbitMQ in our case) for state synchronisation. It’s also what we use at Gorgias to run asynchronous tasks.

Given their properties I hope that when we put all of the above components together you will have a robust worker cluster that is easy to scale and will be hard to brake.

Kubernetes (k8s) and Helm setup

First we’ll need to run k8s either via minikube on your local machine or using some k8s provider such as Google Kubernetes Engine (GKE). For this tutorial I’m going to use GKE, but feel free to use any kubernetes environment.

Assuming you have your gcloud setup let’s create a new 3 node Kubernetes cluster. It will take a few minutes so feel free to grab some refreshment:

gcloud container clusters create ha-celery --num-nodes=3 -z us-east1-c

Make sure you delete your cluster (so you don’t get charged) after you’re done like so:

gcloud container clusters delete ha-celery -z us-east1-c

Test that you have access to it:

kubectl cluster-info

Great. Now let’s install Helm. Helm is a package manager that will allow us to create a template for our RabbitMQ and Celery deployment. We can use it to run a development, staging or production environment without having to maintain separate configs for each one. It’s also easier to use than running kubectl commands when dealing with multiple k8s primitives at a time.

To install helm in your kubernetes cluster (make sure the Helm CLI is installed on your machine first):

helm init

At this point you should be all set with Kubernetes and Helm. We are now ready to deploy our RabbitMQ cluster which Celery will use later on.

RabbitMQ (RMQ) docker image

In order to run our RabbitMQ (RMQ) cluster on k8s first we’ll have to build the Docker images for it. Here’s a sample Dockerfile:

FROM rabbitmq:3.6.12

RUN rabbitmq-plugins enable --offline rabbitmq_management

ENV RABBITMQ_ERLANG_COOKIE changeThis

# Add files.
COPY ./cmd.sh /
COPY ./rabbitmq.config /etc/rabbitmq/rabbitmq.config

# Define default command.
CMD ["/cmd.sh"]

# default ports + management plugin ports
EXPOSE 4369 5671 5672 25672 15671 15672

Above we have a custom RabbitMQ Dockerfile that inherits the official RabbitMQ image and adds a few extra features. It installs the rabbitmq_management plugin which I highly recommend if you want to understand what’s going on in production. I copies our rabbitmq.config and cmd.sh which will see later. And finally exposes the default RMQ ports.

Let’s have a look at rabbitmq.config:

[
    { rabbit, [
            { loopback_users, [ ] },
            { tcp_listeners, [ 5672 ] },
            { ssl_listeners, [ ] },
            { hipe_compile, false },
            { cluster_partition_handling, pause_minority}
    ] },
    { rabbitmq_management, [ { listener, [
            { port, 15672 },
            { ssl, false }
    ] } ] }
].

The first part about loopback_users and listeners is pretty straight forward. hipe_compile setting is false, but can be true if you use the high-perf Erlang.

Now what I believe to be an extremely important setting is cluster_partition_handling which has the default value set to ignore. It’s extremely important for it to be pause_minority instead in order to avoid split-world and data loss situations. Here’s why:

Imagine you have a RMQ cluster of 3 nodes setup: rmq0, rmq1 and rmq2. All is running smoothly and then suddenly you have a network partition that separates rmq0 from the other 2 nodes. Note that all the nodes are still running, it’s just that they can’t communicate between themselves. Now if you have cluster_partition_handling set to the default ignore all the clients connected to rmq0 will still be able to read from it and most importantly write to it! Here’s a more concrete example:

Suppose we have 2 Celery workers (w0 and w1) that get tasks from the celery queue (the default one for celery). w0 is connected to rmq0 and consumes the celery queue and w1 that is connected to rmq1 and does the same.

In the case of a network partition with the RMQ defaults w0 will consume celery as if nothing happened, the problem is that w1 also does the same on rmq1. In this situation the same task can be consumed 2 times! When the network partition is fixed which of the rmq nodes in your cluster holds the truth? This is called a split-world or split-brain situation and it literally can cause your brain to split trying to untangle the mess. This happens more often than you might think even on very reliable hardware and network. In this situation you’re better off re-creating the queue from scratch with some nasty data loss in the process.

Basically what cluster_partition_handling set to ignore is saying: In the case of a network partition the RMQ nodes will just ignore this failure and continue running as if nothing happened accepting regular consumer operations. Setting it to pause_minority will cause the nodes in the minority (in our case rmq-0) to pause - becoming read-only basically. Once the cluster is back it should sync with the other nodes and get back on track. This IMHO is what the default should be because it avoids the split-world situations which is what I believe most really want.

Ok and finally the cmd.sh:

#!/usr/bin/env bash

ulimit -n 65536

chown -R rabbitmq:rabbitmq /var/lib/rabbitmq

# This is needed for statefulset service name resolution - needed for short name resolution
if [ -n "$RABBITMQ_SERVICE_DOMAIN" ]; then
    echo "search $RABBITMQ_SERVICE_DOMAIN" >> /etc/resolv.conf
fi

is_clustered="/var/lib/rabbitmq/is_clustered"

host=`hostname`

join_cluster () {
    if [ -e $is_clustered ]; then
        echo "Already clustered with $CLUSTER_WITH"
    else
        # Don't cluster with self or if already clustered
        if ! [[ $CLUSTER_WITH =~ $host ]]; then
            rabbitmq-server -detached
            rabbitmqctl stop_app
            rabbitmqctl join_cluster rabbit@$CLUSTER_WITH
            rabbitmqctl start_app

            # mark that this node is clustered
            mkdir $is_clustered
            # stopping because it we started it later in attached mode
            rabbitmqctl stop
            sleep 5
        fi
    fi
}

create_vhost() {
    rabbitmq-server -detached
    until rabbitmqctl node_health_check; do echo "Waiting to start..." && sleep 1; done;

    USER_EXISTS=`rabbitmqctl list_users | { grep $RABBITMQ_USERNAME || true; }`

    # create user only if it doesn't exist
    if [ -z "$USER_EXISTS" ]; then
        rabbitmqctl add_user $RABBITMQ_USERNAME $RABBITMQ_PASSWORD
        rabbitmqctl add_vhost $RABBITMQ_VHOST
        rabbitmqctl set_permissions -p $RABBITMQ_VHOST $RABBITMQ_USERNAME ".*" ".*" ".*"
        rabbitmqctl set_policy -p $RABBITMQ_VHOST ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
    fi
    # stopping because it we started it later in attached mode
    rabbitmqctl stop
    sleep 5
}

if [ -n "$CLUSTERED" ] && [ -n "$CLUSTER_WITH" ]; then
    join_cluster
fi

if [ -n "$RABBITMQ_USERNAME" -a -n "$RABBITMQ_PASSWORD" -a -n "$RABBITMQ_VHOST" ]; then
    create_vhost
fi

rabbitmq-server $RABBITMQ_SERVER_PARAMS

The above script will attempt to join a cluster if the $CLUSTERED and $CLUSTER_WITH vars are set and then attempt to create a RMQ virtual host and a user. This part below sets the default HA policy for the vhost which in our case here is all meaning that all queues will be replicated across all nodes:

rabbitmqctl set_policy -p $RABBITMQ_VHOST ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

Also note the $RABBITMQ_SERVICE_DOMAIN variable. That one is needed in order to have our nodes to connect to each other. It’s value is the service domain of the k8s service we’re going to use.

Now that we understand our docker image better we can build it and upload it to our gcr.io docker registry.

git clone git@github.com:xarg/rabbitmq-statefulset.git
cd rabbitmq-statefulset
docker build -t gcr.io/your-project/rabbitmq .
gcloud docker -- push gcr.io/your-project/rabbitmq

We have our image uploaded to the container registry and now we’re ready to deploy it!

RabbitMQ Helm chart

To deploy our RabbitMQ cluster we’re going to use the kubernetes StatefulSet to deploy our RMQ cluster. The reason for this choice is that StatefulSets is in it’s description:

A StatefulSet [..] Manages the deployment and scaling of a set of Pods, and provides guarantees about the ordering and uniqueness of these Pods.

This is important when doing upgrades of the cluster for example, but also because the hostnames of the rabbitmq nodes need to be kept the same between restarts. See here: https://www.rabbitmq.com/clustering.html#issues-hostname

Let’s have a look at our helm chart files:

.
└── rabbitmq
    ├── Chart.yaml -- metadata
    ├── templates
    │   ├── _helpers.tpl -- helper functions
    │   ├── service.yaml -- template for the k8s service
    │   ├── ssd.yaml -- ssd storage (if IO is important)
    │   ├── standard.yaml -- standard storage (default).
    │   └── statefulset.yaml -- template for the statefulset
    └── values.yaml -- default values for our templates

A helm chart is basically just a template that uses the values from the values.yaml to render the template. We’ll have a look at 2 files

service.yaml:

apiVersion: v1
kind: Service
metadata:
  name: rmq
spec:
  ports:
    ...
  clusterIP: None
  selector:
    app: rmq

This service defines a way for our application and the cluster to connect to their nodes. Note that the clusterIP: None - that’s because we don’t want k8s to load-balance our RMQ cluster, we’ll do that on the Celery application level.

statefulset.yaml:

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: rmq
spec:
  serviceName: rmq
  replicas: {{ .Values.replicaCount }}

  volumeClaimTemplates:
  - metadata:
      name: rmq
      annotations:
        volume.beta.kubernetes.io/storage-class: "standard"
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 10Gi

  template:
    metadata:
      labels:
        app: rmq
    spec:
      # prevents rmq pods running on the same nodes                             
      affinity:                                                                 
        podAntiAffinity:                                                        
          preferredDuringSchedulingIgnoredDuringExecution:                      
          - weight: 100                                                         
            podAffinityTerm:                                                    
              labelSelector:                                                    
                matchExpressions:                                               
                - key: app                                                      
                  operator: In                                                  
                  values:                                                       
                  - rmq                                                         
              topologyKey: kubernetes.io/hostname   
      containers:
      - name: rmq
        image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
        volumeMounts:
          - mountPath: /var/lib/rabbitmq
            name: rmq
        env:
          - name: CLUSTERED
            value: "true"
          - name: CLUSTER_WITH
            value: "rmq-0"
          - name: RABBITMQ_SERVICE_DOMAIN
            value: "rmq.default.svc.cluster.local"
{{ if eq .Values.environment "production" }}
        resources:
{{ toYaml .Values.resources | indent 10 }}
{{ end }}
{{ if .Values.nodeSelector }}
      nodeSelector:
{{ toYaml .Values.nodeSelector | indent 8 }}
{{ end }}

The StatefulSet will first claim a 10Gi persistent volume with the standard storage type for each of our nodes defined by replicaCount. Then it will start each rabbitmq pod and check if it’s alive and running.

Deploy the RabbitMQ cluster in Kubernetes

While still in the rabbitmq-statefulset directory we can deploy our chart to k8s using the helm command:

helm upgrade -i rmq --set environment=production --set image.repository=gcr.io/your-project/rabbitmq charts/rabbitmq/

If everything went well you should eventually see:

kubectl get pod
NAME      READY     STATUS    RESTARTS   AGE
rmq-0     1/1       Running   0          4m
rmq-1     1/1       Running   0          3m
rmq-2     1/1       Running   0          2m

You now have a working RMQ cluster with 3 running nodes. I recommend poking around a bit. Check their logs with kubectl logs rmq-0. Check the created service: kubectl get svc, etc..

Now that our cluster is created and running we can finally start working on the workers.

Celery application

Let’s create a simple celery application with workers that just wait 10 seconds. We’ll also create a script that schedules a lot of celery tasks.

tasks.py:

import time
from celery import Celery

BROKER_URL = [
    'amqp://my_user:my_pass@rmq-0.rmq.default.svc.cluster.local:5672/my_vhost',
    'amqp://my_user:my_pass@rmq-1.rmq.default.svc.cluster.local:5672/my_vhost',
    'amqp://my_user:my_pass@rmq-2.rmq.default.svc.cluster.local:5672/my_vhost',
]
app = Celery('tasks', broker=BROKER_URL)


@app.task
def count():
    for i in range(10):
        print(i)
        time.sleep(1)

As you can see all this task does is waits 10s and prints it’s counts.

scheduler.py:

from tasks import count

# just schedules 10k count tasks
for i in range(10000):
    count.delay()

The scheduler will be used to schedule lots of count tasks so we can observe their execution while killing workers, rabbitmq nodes, etc..

Now that we have our handler and scheduler we can put then in a simple Docker image so we can run them in k8s:

FROM python:3

RUN pip install celery 

COPY src/* /

CMD ["/usr/local/bin/celery", "-A", "tasks", "worker", "--loglevel=info"]

You can find the source for the handler, scheduler, Dockerfile and the Helm chart here: https://github.com/xarg/celery-counter

Now let’s build the image and push to the registry just like we did with the RMQ image:

git clone git@github.com:xarg/celery-counter.git
cd celery-counter
docker build -t gcr.io/your-project/celery-counter .                            
gcloud docker -- push gcr.io/your-project/celery-counter                        

Great, we’re getting closer to running our counter worker tasks on k8s.

Celery Helm chart

Now that we have our image uploaded we can deploy it once again using the celery-counter helm chart. While still in the celery-counter directory:

helm upgrade -i counter --set image.repository=gcr.io/your-project/celery-counter charts/counter

I not going to dig into this helm chart: It’s a simple k8s Deployment with 3 replicas that runs the celery worker command. Make sure that we workers are running using: kubectl get pod and then look at their logs using kubectl logs <name of the pod>

Schedule our tasks

Hopefully you have your workers running by now, but they are not doing anything yet. We need to use our scheduler to make then execute the count tasks:

kubectl create -f scheduler.yaml

Once the scheduler is started you should start seeing some activity in your worker logs:

[INFO/ForkPoolWorker-1] Task tasks.count[cf168fdb-c199-481b-81cc-7ac30c93786e] succeeded in 10.015281924999726s: None
[WARNING/ForkPoolWorker-1] 0
[WARNING/ForkPoolWorker-1] 1
[WARNING/ForkPoolWorker-1] 2
[WARNING/ForkPoolWorker-1] 3
[WARNING/ForkPoolWorker-1] 4
[WARNING/ForkPoolWorker-1] 5
[WARNING/ForkPoolWorker-1] 6
[WARNING/ForkPoolWorker-1] 7
[WARNING/ForkPoolWorker-1] 8
[WARNING/ForkPoolWorker-1] 9

Nice work! You have your worker cluster running and executing tasks!

Redundancy

Now that we have our Celery counter application running we can go ahead and remove a k8s node to see what happens.

First let’s see on which nodes our pods are running:

kubectl get pods -o wide
NAME                               READY     STATUS     NODE
counter-counter-3473528255-24z8m   1/1       Running    gke-ha-celery-default-pool-5939d2ec-khq0
counter-counter-3473528255-6n3q8   1/1       Running    gke-ha-celery-default-pool-5939d2ec-dc85
counter-counter-3473528255-g823p   1/1       Running    gke-ha-celery-default-pool-5939d2ec-dc85
rmq-0                              1/1       Running    gke-ha-celery-default-pool-5939d2ec-dc85
rmq-1                              1/1       Running    gke-ha-celery-default-pool-5939d2ec-khq0
rmq-2                              1/1       Running    gke-ha-celery-default-pool-5939d2ec-43nq

Observe on which node rmq-0 is running. In the above example it’s gke-ha-celery-default-pool-5939d2ec-dc85 - this is the node we’re going to remove from the pool to cause a little havoc.

By default celery connects to the first RMQ node in the list (see BROKER_URL in the tasks.py) and then if the connection fails to the first broker it goes to the next one, etc..

Before killing a k8s node to see what happens let’s observe our cluster using these commands (run each one in it’s own terminal):

# same as above - but choose a pod that is NOT running on a node that you plan to kill
# the reason for this is to observe how the works fail over to a different RMQ node.
kubectl logs -f counter-counter-3473528255-24z8m

# to see how rmq-0 dies
kubectl logs -f rmq-0

# to see how rmq-1 takes over the traffic from the workers
kubectl logs -f rmq-1

# to see how pods stop and then start again
watch "kubectl get pods -o wide"

# to see how the node gets unscheduled
watch "kubectl get nodes"

Next we’ll mark the k8s node that runs rmq-0 as unschedulable and then we’ll drain (kill) all pods on it, you should choose the the node that runs the rmq-0, you can see it running this command kubectl get pods -o wide:

# marks the node unschedulable (the one that runs rmq-0)
kubectl cordon gke-ha-celery-default-pool-5939d2ec-dc85

# kills all pods that run on it
kubectl drain --force --ignore-daemonsets gke-ha-celery-default-pool-5939d2ec-dc85

Now keep your eyes on your monitoring commands. You should see how rmq-0 is dying and 1 or 2 celery counters since they might be running on the same node as rmq-0. If everything went according to plan you should see a log in your worker like:

consumer: Connection to broker lost. Trying to re-establish the connection...   
...
Cannot connect to amqp://my_user:**@rmq-0.rmq.default.svc.cluster.local:5672/my_vhost: [Errno -2] Name or service not known.
...
Connected to amqp://my_user:**@rmq-1.rmq.default.svc.cluster.local:5672/my_vhost

And then just as before continue to execute the tasks after the period of failure.

I also recommend looking at the rmq-1 logs to see how the clients start connecting to it and then how it accepts again the rmq-0 into the cluster once it gets up again.

Conclusion

What we did so far:

  • Create a new k8s cluster
  • Build and pushed the RabbitMQ and Celery images to the Google Container Registry
  • Deployed the helm charts for RabbitMQ and celery cluster.
  • Removed a k8s node and observed the behavior of our workers and RMQ nodes.

Of course this is just one way you cluster can fail. There are many other things that can happen. I recommend looking at: https://github.com/bloomberg/powerfulseal or https://github.com/asobti/kube-monkey to get an idea about other possible types of failure.

If I had to give one piece of advice based on this article:

Expect your workers to die at any moment and always code with that in mind.

IMHO this is the hardest part of all.

At Gorgias we’re sending tons of emails/chats and facebook messages and also making HTTP requests to user defined HTTP endpoints before sending the aforementioned messages which can fail with a timeout or an error. These are just a few questions that arise:

  • Should the emails be sent if other parts of the process have passed or not? If not how should we notify the customer?
  • What happens if the HTTP service we’re trying to reach times out? How many times should we retry before giving up? What happens then?
  • What if the worker is killed in the middle of the transaction with the mail server? Was the email sent or not? Should we retry? Should we notify the customer?
  • If the mail server is down do we have a retry mechanism and when should the retry switch to a different server?

The above are but a few of the questions that come to mind when thinking of our application, in reality there are many more and the answer is not always simple. The code required for failure handling makes the application much more verbose, harder to debug and maintain. We’re try having less and simpler features precisely because some many things can go wrong.

Even though the application level failure handling is very hard it’s still a million times better when you know that you can rely on Kubernetes and RabbitMQ to stay up and running your application code even if your VMs or physical machines go down. It’s a lot easier to build resilient and scalable applications that it was before kubernetes in my option and I hope that this post illustrates just that.

Here are the repos that have been used in this post:

Get updates on my new posts:
or get RSS feed.