PyCharm Tip: Show only tables

If you work with PyCharm or DataGrip with a database and you want to access the table data it can be pretty annoying to get to them since you have to select the Connection -> Database -> Schema -> Tables - quite a few clicks.

Here’s how you can reduce the number of clicks:

  • Click on the Cog (settings icon) in top-right corner in the Database Window in Pycharm
  • Uncheck: Group Data sources, Group Schema and Show Intermediary Nodes
  • Click on the Filter button inside the Database window and uncheck all the Objects you don’t want to see such as Role
    or Access Method.

You can also watch the video below on how to do it.

Good luck!

Introducing kuku: kubernetes template tool

At Gorgias we’re using k8s on gke to run all our production services. We run our REST API apps, RabbitMQ, Celery background workers, PostgreSQL and other smaller services on k8s. We also have staging & development k8s clusters where we experiment with different infrastructure setups and deployments.

If you have multiple k8s clusters to manage, chances are you also need a templating tool to customize your k8s manifests. By far the most popular one is helm these days. There is also ksonnet and more recently pulumi. All of these tools are powerful and solve real problems, but they are not quite right for us.

I can’t speak much about ksonnet and pulumi because I only briefly had a look at their APIs and how-to guides so take it with a grain of salt. However, as a user, I can speak about helm which is what we’ve been using at Gorgias for all our services.

Why not helm?

Well, there are a few things I find problematic with helm:

  • Poor templating language: requires constant referral to the docs, whitespace issues, yaml formatting is hard.
  • Server-side dependency: if you upgrade the server -> every user needs to update their client - waste of valuable time.
  • Lack of local validation: helm lint does not actually ensure the validity (Ex: required keys for a k8s object) of the manifest.
  • Chart names, releases, and other helm specific features do not fit with our current workflow.

At Gorgias, we’ve been using Helm to manage all our k8s resources, and it’s been great until we had to deal with more complex charts with lots of control flow and ranges. If you ever dealt with ranges in Helm and template you might know that it’s not easy to manage considering different contexts. For example template "name" . vs template "name" $ comes to mind.

So why not Ksonnet then?

Ksonnet improves the situation a bit with a stricter approach using the jsonnet language. When I say strict, I mean it doesn’t blindly render a text file into YAML as helm does, but uses a real programming language to render the yaml in the end.

My main issue with it is the language: jsonnet. It mostly has to do with the fact that it is yet another template language that I have to learn and deal with its different gotchas. A separate issue is that it introduces a whole set of new concepts such as Part, Prototype, Parameter, etc… I found that a bit too much when all I want is to render a bunch of YAML files with some variables.

Pulumi?

Pulumi approaches the most to what I would consider the ideal tool for us. It uses a programmatic approach where it connects directly to your cluster and creates the resources declared with code (TypeScript, Python, etc..). You write TS code, and you provision your infra with progress-bar. There is a lot to like about this approach. There are, however, a few things that I don’t like about Pulumi either: the primary language seems to be TypeScript at the moment - which I don’t want to use when it comes to infrastructure code. Python templates were in development when I wrote this post but I didn’t try them.

Pulumi also does infrastructure provisioning (multi-provider) - a la Terraform. I think this is overkill for what we need at Gorgias. We don’t have to use those features of course, but it seems like it tries to solve 2 different and complex problems at the same time. To put it plainly: it’s too much of a swiss army knife for us.

kuku: a simple templating tool

Finally, after searching for the right tool, I decided that I would write my own. kuku is very similar to Helm, but uses python files as templates instead of YAML. It also doesn’t have a server-side component.

Here are some of its goals:

Write python code to generate k8s manifests.

Python is a popular language with a vast ecosystem of dev-ops packages. Most importantly it’s easier to debug than some templating languages used today to generate k8s manifests.

No k8s server-side dependencies (i.e. tiller).

k8s already has a database for its current state (using etcd). We can connect directly to it (if needed) from the client to do our operations instead of relying on an extra server-side dependency.

Local validation of manifests.

Where possible do the validation locally using the official k8s python client.

Use standard tools.

Where possible use kubectl to apply changes to the k8s cluster instead of implementing a specific protocol. Again, this allows for easier maintenance and debugging for the end user.

More on Helm comparison

Compared to Helm there is no concept of charts, releases or dependencies. I found that we have rarely used any of those concepts and they just added extra complexity to our charts without much benefit.

Instead there are just 2 concepts that are similar to helm: values and templates.

Values come from the CLI or value files (same as Helm). Templates are just python files that have a template function.

Using kuku

Suppose you want to create a k8s service using a template where you define the service name, internalPort and externalPort.

To install: pip3 install kuku

Given the following service.py template:

from kubernetes import client

def template(context):
    return client.V1Service(
        api_version="v1",
        kind="Service",
        metadata=client.V1ObjectMeta(name=context["name"]),
        spec=client.V1ServiceSpec(
            type="NodePort",
            ports=[
                {"port": context["externalPort"], "targetPort": context["internalPort"]}
            ],
            selector={"app": context["name"]},
        ),
    )

You can now generate a yaml output from the above template using kuku by running:

$ ls .
service.py 
$ kuku render -s name=kuku-web,internalPort=80,externalPort=80 .

the above produces:

# Source: service.py
apiVersion: v1
kind: Service
metadata:
  name: kuku-web
spec:
  ports:
  - port: 80
    targetPort: 80
  selector:
    app: kuku-web
  type: NodePort

You can also combine the above with kubectl apply -f - to create your service on k8s:

kuku render -s name=kuku-web,internalPort=80,externalPort=80 . | kubectl apply -f -

Same as above, but let’s make it shorter:

kuku apply -s name=kuku-web,internalPort=80,externalPort=80 .

Finally to delete it:

kuku delete -s name=kuku-web,internalPort=80,externalPort=80 .
# same as above
kuku render -s name=kuku-web,internalPort=80,externalPort=80 . | kubectl delete -f -

kuku templates

Let’s return to templates a bit because a few things are happening there. Templates are python files that are defining a function called template that accepts a dict argument context and returns a k8s object or a list of k8s objects. Simplest example:

def template(context):
    return V1Namespace(name=context['namespace'])  # example k8s object

You can create multiple template files each defining their own template function. kuku uses the k8s objects (aka models) from official kubernetes python client package. You can find them all here.

When writing kuku templates I highly recommend that you use an editor that is aware of the k8s python package above so you can get nice auto-completion of properties - it makes life some much easier as a result.

kuku command line interface

Similar to helm, kuku accepts defining it’s context variables from the CLI:

kuku render -s namespace=kuku .

-s namespace=kuku will be passed to the context argument in your template function. Run kuku -h to find out more.

A more realistic example

Defining services and a namespace is nice, but let’s see how it behaves with a more complex Postgres StatefulSet.
Consider the following directory:

.
├── templates
│   ├── configmap.py
│   ├── service.py
│   └── statefulset.py
├── values-production.yaml
└── values-staging.yaml

We have some value files, a configmap, service (like before) and statefulset template. This postgres statefulset template is something similar to what we have currently in our production at Gorgias.

Let’s have a look at values-production.yaml:

name: pg # global name of our statefulset/service/configmap/etc..

image: postgres:latest

# optional
nodeSelector:
  cloud.google.com/gke-nodepool: pg

replicas: 1

resources:
  requests:
    memory: 10Gi
  limits:
    memory: 12Gi

pvc:
- name: data
  class: ssd
  size: 500Gi
  mountPath: /var/lib/postgresql/data/

configmap:
- name: postgresql.conf
  value: |
    max_connections = 500

Above we’re defining values that are used to declare that we want to run one instance of postgres:latest docker image on a specific k8s node pool while requesting some memory and a persistent volume. We’re also using a config map to define our postgresql.conf so it’s easier to keep track of its changes.

Keep in mind the above values and now let’s have a look at our statefuset.py template:

from kubernetes import client


def template(context):
    # volumes attached to our pod
    pod_spec_volumes = []

    # where those volumes are mounted in our container
    pod_spec_volume_mounts = []

    # persistent volume claims templates
    stateful_set_spec_volume_claim_templates = []

    # only set the claims if we have a PVC value
    for pvc in context.get("pvc"):
        stateful_set_spec_volume_claim_templates.append(
            client.V1PersistentVolumeClaim(
                metadata=client.V1ObjectMeta(
                    name=pvc["name"],
                    annotations={
                        "volume.beta.kubernetes.io/storage-class": pvc["class"]
                    },
                ),
                spec=client.V1PersistentVolumeClaimSpec(
                    access_modes=["ReadWriteOnce"],
                    resources=client.V1ResourceRequirements(
                        requests={"storage": pvc["size"]}
                    ),
                ),
            )
        )
        pod_spec_volume_mounts.append(
            client.V1VolumeMount(name=pvc["name"], mount_path=pvc["mountPath"])
        )

    # same for configmap
    if "configmap" in context:
        volume_name = "{}-config".format(context["name"])
        pod_spec_volumes.append(
            client.V1Volume(name=volume_name, config_map=context["name"])
        )
        pod_spec_volume_mounts.append(
            client.V1VolumeMount(name=volume_name, mount_path="/etc/postgresql/")
        )

    # command to check if postgres is live (used for probes below)
    pg_isready_exec = client.V1ExecAction(command=["gosu postgres pg_isready"])

    return client.V1StatefulSet(
        api_version="apps/v1beta1",
        kind="StatefulSet",
        metadata=client.V1ObjectMeta(name=context["name"]),
        spec=client.V1StatefulSetSpec(
            service_name=context["name"],
            replicas=context["replicas"],
            selector={"app": context["name"]},
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(labels={"name": context["name"]}),
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name="postgres",
                            image=context["image"],
                            lifecycle=client.V1Lifecycle(
                                pre_stop=client.V1Handler(
                                    _exec=client.V1ExecAction(
                                        command=[
                                            'gosu postgres pg_ctl -D "$PGDATA" -m fast -w stop'
                                        ]
                                    )
                                )
                            ),
                            liveness_probe=client.V1Probe(
                                _exec=pg_isready_exec,
                                initial_delay_seconds=120,
                                timeout_seconds=5,
                                failure_threshold=6,
                            ),
                            readiness_probe=client.V1Probe(
                                _exec=pg_isready_exec,
                                initial_delay_seconds=10,
                                timeout_seconds=5,
                                period_seconds=30,
                                failure_threshold=999,
                            ),
                            ports=[client.V1ContainerPort(container_port=5432)],
                            volume_mounts=pod_spec_volume_mounts,
                            resources=client.V1ResourceRequirements(
                                **context["resources"]
                            )
                            if "resources" in context
                            else None,
                        )
                    ],
                    volumes=pod_spec_volumes,
                    node_selector=context.get("nodeSelector"),
                ),
            ),
            volume_claim_templates=stateful_set_spec_volume_claim_templates,
        ),
    )

If you squint a bit you might see that the last return is similar to a yaml file, but it uses python objects instead with all of it’s IFs and for-loop, standard library, etc..

What I find better than a regular helm YAML template is that you can validate some of the input arguments of those python objects (Ex: client.V1Container) even before the template is sent to your k8s server - not to mention autocomplete.

Finally, this is how it all comes together:

kuku render -f values-production.yaml templates/ | kubectl apply -f -

The above renders all your templates and generates the yaml manifests that are then applied using kubectl apply.

You can find the source here: https://github.com/xarg/kuku/
And a couple of examples: https://github.com/xarg/kuku/tree/master/examples

Conclusion

We’ve started using kuku in Sept. 2018 at Gorgias, and we’ve since migrated all our Helm charts to kuku templates. It allowed us to customize our k8s deployment code to our needs much easier than before with minimal deployment surprises.

Hope you find kuku useful as we did. Happy hacking!

Managing DNS zones with GCP deployment manager

I looked for an example of how to setup a managed DNS zone on GCP (using their deployment manager) with a Global IP address to no avail.
After some inspiration from this issue: https://github.com/GoogleCloudPlatform/deploymentmanager-samples/issues/62
I managed to create the following config:

config.yaml

imports:
- path: templates/ip.py
- path: templates/dns.py

resources:
- name: global-ip
  type: templates/ip.py
  properties:
    name: global-ip 
    description: Global IP address used below in the DNS 

- name: example-com
  type: templates/dns.py
  properties:
    description: Example domain
    dnsName: example.com.
    resourceRecordSets:
    - name: "*.gorgias.io."
      type: TXT
      ttl: 3600
      rrdatas:
      - '"v=spf1 include:spf.gorgias.io -all"'
    - name: "*.gorgias.io."
      type: MX
      ttl: 3600
      rrdatas:
      - "10 mx1.gorgias.io."
      - "10 mx2.gorgias.io."
    - name: "*.gorgias.io."
      type: A
      ttl: 3600
      rrdatas:
      - "$(ref.global-ip.address)"

templates/ip.py

def GenerateConfig(context):
    return {'resources': [{
        'type': 'compute.v1.globalAddress',
        'name': context.env['name'],
        'properties': {
            'description': context.properties['description'],
        }
    }]}

templates/dns.py

def GenerateConfig(context):
    resources = [{
        'type': 'dns.v1.managedZone',
        'name': context.env['name'],
        'properties': {
            'description': context.properties['description'],
            'dnsName': context.properties['dnsName'],
        }
    }]

    for i, record in enumerate(context.properties['resourceRecordSets']):
        # Used to create the records
        resources.append({
            'name': 'dns-{}-create'.format(i),
            'action': 'gcp-types/dns-v1:dns.changes.create',
            'metadata': {
                'runtimePolicy': [
                    'CREATE',
                ],
            },
            'properties': {
                'managedZone': '$(ref.{}.name)'.format(context.env['name']),
                'additions': [
                    record,
                ],
            },
        })
        # Used on deployment teardown to delete the records 
        resources.append({
            'name': 'dns-{}-delete'.format(i),
            'action': 'gcp-types/dns-v1:dns.changes.create',
            'metadata': {
                'runtimePolicy': [
                    'DELETE',
                ],
            },
            'properties': {
                'managedZone': '$(ref.{}.name)'.format(context.env['name']),
                'deletions': [
                    record,
                ],
            },
        })

    return {'resources': resources}

Pod sandbox changed, it will be killed and re-created.

It all started at 6PM Friday night when we were in the middle of our new office warming party - as any admin can tell you it’s best time to have a major outage. Our postgres main was down and all the alarms went crazy. We run everything on k8s so we started looking at the kubectl get event logs and found this cryptic event: Pod sandbox changed, it will be killed and re-created..
Googled that and it seemed that it means that the docker server inside the node was down, restarted and then because the “sandbox changed” it restarted all the pods inside it including our beloved pg main. We confirmed that docker crashed by looking at the journalctl inside that k8s node (btw, we run on GKE using Container Optimized OS).

Why did the docker crash? Well after some more grepping we’ve found this nice little script here: health-monitor.sh (it got changed recently). What that does is that it kills docker after waiting 60s for it to respond. That echo is what we saw in the journalctl logs echo "Docker daemon failed!" and that is why we knew it was that script that killed docker.

Why did the docker not respond? As it turns out our postgres was doing it’s daily base backup at 6PM and because the backup itself was not throttled it consumed lots of CPU and I/O - presumably making docker unresponsive.

To fix it it we throttled our backup script to use less IO and it seemed to fix the problem. Here’s how we did it: https://github.com/wal-e/wal-e#controlling-the-i-o-of-a-base-backup

Hope it helps others out there. Happy hacking!

Highly available Celery with RabbitMQ and Kubernetes

Kubernetes, RabbitMQ and Celery provides a very natural way to create a reliable python worker cluster. This post is based on my experience running Celery in production at Gorgias over the past 3 years. The scope of this post is mostly dev-ops setup and a few small gotchas that could prove useful for people trying to accomplish the same type of deployment. At the end we’ll try to shut down a machine to see if our cluster is indeed reliable as I claim.

Before diving too deep I recommend refreshing your knowledge on the tools we are going to use. In particular I expect:

  • You have some familiarity with Kubernetes and in particular: pods, services, deployments, stateful sets and persistent volumes.
  • You used or know a bit about RabbitMQ. Some basics about clustering would be nice, but not required. You can read their docs here: https://www.rabbitmq.com/clustering.html
  • You used or know about Celery: how it schedules it’s tasks and executes them.

Pieces falling into place: Why Kubernetes, RabbitMQ and Celery?

  • Kubernetes is a very reliable container orchestration system (runs your docker images). It is used by a vast number of companies in production environments. It’s proven tech and provides a good base for making failure tollerant and scalable applications such as an async worker cluster which is what we’re trying to do.
  • RabbitMQ is a popular open source broker that has a history of being resilient to failure, can be configured to be highly-available and can protect your environment from data-loss in case of a hardware failure.
  • Celery is probably the most popular python async worker at this moment. It’s feature rich, stable and actively maintained. Celery (or any other worker) by it’s nature is distributed and relies on the message broker (RabbitMQ in our case) for state synchronisation. It’s also what we use at Gorgias to run asynchronous tasks.

Given their properties I hope that when we put all of the above components together you will have a robust worker cluster that is easy to scale and will be hard to brake.

Kubernetes (k8s) and Helm setup

First we’ll need to run k8s either via minikube on your local machine or using some k8s provider such as Google Kubernetes Engine (GKE). For this tutorial I’m going to use GKE, but feel free to use any kubernetes environment.

Assuming you have your gcloud setup let’s create a new 3 node Kubernetes cluster. It will take a few minutes so feel free to grab some refreshment:

gcloud container clusters create ha-celery --num-nodes=3 -z us-east1-c

Make sure you delete your cluster (so you don’t get charged) after you’re done like so:

gcloud container clusters delete ha-celery -z us-east1-c

Test that you have access to it:

kubectl cluster-info

Great. Now let’s install Helm. Helm is a package manager that will allow us to
create a template for our RabbitMQ and Celery deployment. We can use it to run a development, staging or production environment without having to maintain separate configs for each one. It’s also easier to use than running kubectl commands when dealing with multiple k8s primitives at a time.

To install helm in your kubernetes cluster (make sure the Helm CLI is installed on your machine first):

helm init

At this point you should be all set with Kubernetes and Helm. We are now ready to deploy our RabbitMQ cluster which Celery will use later on.

RabbitMQ (RMQ) docker image

In order to run our RabbitMQ (RMQ) cluster on k8s first we’ll have to build the Docker images for it. Here’s a sample Dockerfile:

FROM rabbitmq:3.6.12

RUN rabbitmq-plugins enable --offline rabbitmq_management

ENV RABBITMQ_ERLANG_COOKIE changeThis

# Add files.
COPY ./cmd.sh /
COPY ./rabbitmq.config /etc/rabbitmq/rabbitmq.config

# Define default command.
CMD ["/cmd.sh"]

# default ports + management plugin ports
EXPOSE 4369 5671 5672 25672 15671 15672

Above we have a custom RabbitMQ Dockerfile that inherits the official RabbitMQ image and adds a few extra features.
It installs the rabbitmq_management plugin which I highly recommend if you want to understand what’s going on in production. I copies our rabbitmq.config and cmd.sh which will see later. And finally exposes the default RMQ ports.

Let’s have a look at rabbitmq.config:

[
    { rabbit, [
            { loopback_users, [ ] },
            { tcp_listeners, [ 5672 ] },
            { ssl_listeners, [ ] },
            { hipe_compile, false },
            { cluster_partition_handling, pause_minority}
    ] },
    { rabbitmq_management, [ { listener, [
            { port, 15672 },
            { ssl, false }
    ] } ] }
].

The first part about loopback_users and listeners is pretty straight forward. hipe_compile setting is false, but can be true if you use the high-perf Erlang.

Now what I believe to be an extremely important setting is cluster_partition_handling which has the default value set to ignore. It’s extremely important for it to be pause_minority instead in order to avoid split-world and data loss situations. Here’s why:

Imagine you have a RMQ cluster of 3 nodes setup: rmq0, rmq1 and rmq2.
All is running smoothly and then suddenly you have a network partition that separates rmq0 from the other 2 nodes. Note that all the nodes are still running, it’s just that they can’t communicate between themselves.
Now if you have cluster_partition_handling set to the default ignore all the clients connected to rmq0 will still be able to read from it and most importantly write to it! Here’s a more concrete example:

Suppose we have 2 Celery workers (w0 and w1) that get tasks from the celery queue (the default one for celery).
w0 is connected to rmq0 and consumes the celery queue and w1 that is connected to rmq1 and does the same.

In the case of a network partition with the RMQ defaults w0 will consume celery as if nothing happened, the problem is that w1 also does the same on rmq1. In this situation the same task can be consumed 2 times! When the network partition is fixed which of the rmq nodes in your cluster holds the truth?
This is called a split-world or split-brain situation and it literally can cause your brain to split trying to untangle the mess. This happens more often than you might think even on very reliable hardware and network. In this situation you’re better off re-creating the queue from scratch with some nasty data loss in the process.

Basically what cluster_partition_handling set to ignore is saying: In the case of a network partition the RMQ nodes will just ignore this failure and continue running as if nothing happened accepting regular consumer operations. Setting it to pause_minority will cause the nodes in the minority (in our case rmq-0) to pause - becoming read-only basically. Once the cluster is back it should sync with the other nodes and get back on track. This IMHO is what the default should be because it avoids the split-world situations which is what I believe most really want.

Ok and finally the cmd.sh:

#!/usr/bin/env bash

ulimit -n 65536

chown -R rabbitmq:rabbitmq /var/lib/rabbitmq

# This is needed for statefulset service name resolution - needed for short name resolution
if [ -n "$RABBITMQ_SERVICE_DOMAIN" ]; then
    echo "search $RABBITMQ_SERVICE_DOMAIN" >> /etc/resolv.conf
fi

is_clustered="/var/lib/rabbitmq/is_clustered"

host=`hostname`

join_cluster () {
    if [ -e $is_clustered ]; then
        echo "Already clustered with $CLUSTER_WITH"
    else
        # Don't cluster with self or if already clustered
        if ! [[ $CLUSTER_WITH =~ $host ]]; then
            rabbitmq-server -detached
            rabbitmqctl stop_app
            rabbitmqctl join_cluster rabbit@$CLUSTER_WITH
            rabbitmqctl start_app

            # mark that this node is clustered
            mkdir $is_clustered
            # stopping because it we started it later in attached mode
            rabbitmqctl stop
            sleep 5
        fi
    fi
}

create_vhost() {
    rabbitmq-server -detached
    until rabbitmqctl node_health_check; do echo "Waiting to start..." && sleep 1; done;

    USER_EXISTS=`rabbitmqctl list_users | { grep $RABBITMQ_USERNAME || true; }`

    # create user only if it doesn't exist
    if [ -z "$USER_EXISTS" ]; then
        rabbitmqctl add_user $RABBITMQ_USERNAME $RABBITMQ_PASSWORD
        rabbitmqctl add_vhost $RABBITMQ_VHOST
        rabbitmqctl set_permissions -p $RABBITMQ_VHOST $RABBITMQ_USERNAME ".*" ".*" ".*"
        rabbitmqctl set_policy -p $RABBITMQ_VHOST ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
    fi
    # stopping because it we started it later in attached mode
    rabbitmqctl stop
    sleep 5
}

if [ -n "$CLUSTERED" ] && [ -n "$CLUSTER_WITH" ]; then
    join_cluster
fi

if [ -n "$RABBITMQ_USERNAME" -a -n "$RABBITMQ_PASSWORD" -a -n "$RABBITMQ_VHOST" ]; then
    create_vhost
fi

rabbitmq-server $RABBITMQ_SERVER_PARAMS

The above script will attempt to join a cluster if the $CLUSTERED and $CLUSTER_WITH vars are set and then attempt to create a RMQ virtual host and a user. This part below sets the default HA policy for the vhost which in our case here is all meaning that all queues will be replicated across all nodes:

rabbitmqctl set_policy -p $RABBITMQ_VHOST ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

Also note the $RABBITMQ_SERVICE_DOMAIN variable. That one is needed in order to have our nodes to connect to each other. It’s value is the service domain of the k8s service we’re going to use.

Now that we understand our docker image better we can build it and upload it to our gcr.io docker registry.

git clone git@github.com:xarg/rabbitmq-statefulset.git
cd rabbitmq-statefulset
docker build -t gcr.io/your-project/rabbitmq .
gcloud docker -- push gcr.io/your-project/rabbitmq

We have our image uploaded to the container registry and now we’re ready to deploy it!

RabbitMQ Helm chart

To deploy our RabbitMQ cluster we’re going to use the kubernetes StatefulSet to deploy our RMQ cluster. The reason for this choice is that StatefulSets is in it’s description:

A StatefulSet [..] Manages the deployment and scaling of a set of Pods, and provides guarantees about the ordering and uniqueness of these Pods.

This is important when doing upgrades of the cluster for example, but also because the hostnames of the rabbitmq nodes need to be kept the same between restarts. See here: https://www.rabbitmq.com/clustering.html#issues-hostname

Let’s have a look at our helm chart files:

.
└── rabbitmq
    ├── Chart.yaml -- metadata
    ├── templates
    │   ├── _helpers.tpl -- helper functions
    │   ├── service.yaml -- template for the k8s service
    │   ├── ssd.yaml -- ssd storage (if IO is important)
    │   ├── standard.yaml -- standard storage (default).
    │   └── statefulset.yaml -- template for the statefulset
    └── values.yaml -- default values for our templates

A helm chart is basically just a template that uses the values from the values.yaml to render the template.
We’ll have a look at 2 files

service.yaml:

apiVersion: v1
kind: Service
metadata:
  name: rmq
spec:
  ports:
    ...
  clusterIP: None
  selector:
    app: rmq

This service defines a way for our application and the cluster to connect to their nodes. Note that the clusterIP: None - that’s because we don’t want k8s to load-balance our RMQ cluster, we’ll do that on the Celery application level.

statefulset.yaml:

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: rmq
spec:
  serviceName: rmq
  replicas: {{ .Values.replicaCount }}

  volumeClaimTemplates:
  - metadata:
      name: rmq
      annotations:
        volume.beta.kubernetes.io/storage-class: "standard"
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 10Gi

  template:
    metadata:
      labels:
        app: rmq
    spec:
      # prevents rmq pods running on the same nodes                             
      affinity:                                                                 
        podAntiAffinity:                                                        
          preferredDuringSchedulingIgnoredDuringExecution:                      
          - weight: 100                                                         
            podAffinityTerm:                                                    
              labelSelector:                                                    
                matchExpressions:                                               
                - key: app                                                      
                  operator: In                                                  
                  values:                                                       
                  - rmq                                                         
              topologyKey: kubernetes.io/hostname   
      containers:
      - name: rmq
        image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
        volumeMounts:
          - mountPath: /var/lib/rabbitmq
            name: rmq
        env:
          - name: CLUSTERED
            value: "true"
          - name: CLUSTER_WITH
            value: "rmq-0"
          - name: RABBITMQ_SERVICE_DOMAIN
            value: "rmq.default.svc.cluster.local"
{{ if eq .Values.environment "production" }}
        resources:
{{ toYaml .Values.resources | indent 10 }}
{{ end }}
{{ if .Values.nodeSelector }}
      nodeSelector:
{{ toYaml .Values.nodeSelector | indent 8 }}
{{ end }}

The StatefulSet will first claim a 10Gi persistent volume with the standard storage type for each of our nodes defined by replicaCount. Then it will start each rabbitmq pod and check if it’s alive and running.

Deploy the RabbitMQ cluster in Kubernetes

While still in the rabbitmq-statefulset directory we can deploy our chart to k8s using the helm command:

helm upgrade -i rmq --set environment=production --set image.repository=gcr.io/your-project/rabbitmq charts/rabbitmq/

If everything went well you should eventually see:

kubectl get pod
NAME      READY     STATUS    RESTARTS   AGE
rmq-0     1/1       Running   0          4m
rmq-1     1/1       Running   0          3m
rmq-2     1/1       Running   0          2m

You now have a working RMQ cluster with 3 running nodes. I recommend poking around a bit. Check their logs with kubectl logs rmq-0. Check the created service: kubectl get svc, etc..

Now that our cluster is created and running we can finally start working on the workers.

Celery application

Let’s create a simple celery application with workers that just wait 10 seconds. We’ll also create a script that schedules a lot of celery tasks.

tasks.py:

import time
from celery import Celery

BROKER_URL = [
    'amqp://my_user:my_pass@rmq-0.rmq.default.svc.cluster.local:5672/my_vhost',
    'amqp://my_user:my_pass@rmq-1.rmq.default.svc.cluster.local:5672/my_vhost',
    'amqp://my_user:my_pass@rmq-2.rmq.default.svc.cluster.local:5672/my_vhost',
]
app = Celery('tasks', broker=BROKER_URL)


@app.task
def count():
    for i in range(10):
        print(i)
        time.sleep(1)

As you can see all this task does is waits 10s and prints it’s counts.

scheduler.py:

from tasks import count

# just schedules 10k count tasks
for i in range(10000):
    count.delay()

The scheduler will be used to schedule lots of count tasks so we can observe their execution while killing workers, rabbitmq nodes, etc..

Now that we have our handler and scheduler we can put then in a simple Docker image so we can run them in k8s:

FROM python:3

RUN pip install celery 

COPY src/* /

CMD ["/usr/local/bin/celery", "-A", "tasks", "worker", "--loglevel=info"]

You can find the source for the handler, scheduler, Dockerfile and the Helm chart here: https://github.com/xarg/celery-counter

Now let’s build the image and push to the registry just like we did with the RMQ image:

git clone git@github.com:xarg/celery-counter.git
cd celery-counter
docker build -t gcr.io/your-project/celery-counter .                            
gcloud docker -- push gcr.io/your-project/celery-counter                        

Great, we’re getting closer to running our counter worker tasks on k8s.

Celery Helm chart

Now that we have our image uploaded we can deploy it once again using the celery-counter helm chart. While still in the celery-counter directory:

helm upgrade -i counter --set image.repository=gcr.io/your-project/celery-counter charts/counter

I not going to dig into this helm chart: It’s a simple k8s Deployment with 3 replicas that runs the celery worker command.
Make sure that we workers are running using: kubectl get pod and then look at their logs using kubectl logs <name of the pod>

Schedule our tasks

Hopefully you have your workers running by now, but they are not doing anything yet. We need to use our scheduler to make then execute the count tasks:

kubectl create -f scheduler.yaml

Once the scheduler is started you should start seeing some activity in your worker logs:

[INFO/ForkPoolWorker-1] Task tasks.count[cf168fdb-c199-481b-81cc-7ac30c93786e] succeeded in 10.015281924999726s: None
[WARNING/ForkPoolWorker-1] 0
[WARNING/ForkPoolWorker-1] 1
[WARNING/ForkPoolWorker-1] 2
[WARNING/ForkPoolWorker-1] 3
[WARNING/ForkPoolWorker-1] 4
[WARNING/ForkPoolWorker-1] 5
[WARNING/ForkPoolWorker-1] 6
[WARNING/ForkPoolWorker-1] 7
[WARNING/ForkPoolWorker-1] 8
[WARNING/ForkPoolWorker-1] 9

Nice work! You have your worker cluster running and executing tasks!

Redundancy

Now that we have our Celery counter application running we can go ahead and remove a k8s node to see what happens.

First let’s see on which nodes our pods are running:

kubectl get pods -o wide
NAME                               READY     STATUS     NODE
counter-counter-3473528255-24z8m   1/1       Running    gke-ha-celery-default-pool-5939d2ec-khq0
counter-counter-3473528255-6n3q8   1/1       Running    gke-ha-celery-default-pool-5939d2ec-dc85
counter-counter-3473528255-g823p   1/1       Running    gke-ha-celery-default-pool-5939d2ec-dc85
rmq-0                              1/1       Running    gke-ha-celery-default-pool-5939d2ec-dc85
rmq-1                              1/1       Running    gke-ha-celery-default-pool-5939d2ec-khq0
rmq-2                              1/1       Running    gke-ha-celery-default-pool-5939d2ec-43nq

Observe on which node rmq-0 is running. In the above example it’s gke-ha-celery-default-pool-5939d2ec-dc85 - this is the node we’re going to remove from the pool to cause a little havoc.

By default celery connects to the first RMQ node in the list (see BROKER_URL in the tasks.py) and then if the connection fails to the first broker it goes to the next one, etc..

Before killing a k8s node to see what happens let’s observe our cluster using these commands (run each one in it’s own terminal):

# same as above - but choose a pod that is NOT running on a node that you plan to kill
# the reason for this is to observe how the works fail over to a different RMQ node.
kubectl logs -f counter-counter-3473528255-24z8m

# to see how rmq-0 dies
kubectl logs -f rmq-0

# to see how rmq-1 takes over the traffic from the workers
kubectl logs -f rmq-1

# to see how pods stop and then start again
watch "kubectl get pods -o wide"

# to see how the node gets unscheduled
watch "kubectl get nodes"

Next we’ll mark the k8s node that runs rmq-0 as unschedulable and then we’ll drain (kill) all pods on it, you should choose the the node that runs the rmq-0, you can see it running this command kubectl get pods -o wide:

# marks the node unschedulable (the one that runs rmq-0)
kubectl cordon gke-ha-celery-default-pool-5939d2ec-dc85

# kills all pods that run on it
kubectl drain --force --ignore-daemonsets gke-ha-celery-default-pool-5939d2ec-dc85

Now keep your eyes on your monitoring commands. You should see how rmq-0 is dying and 1 or 2 celery counters since they might be running on the same node as rmq-0.
If everything went according to plan you should see a log in your worker like:

consumer: Connection to broker lost. Trying to re-establish the connection...   
...
Cannot connect to amqp://my_user:**@rmq-0.rmq.default.svc.cluster.local:5672/my_vhost: [Errno -2] Name or service not known.
...
Connected to amqp://my_user:**@rmq-1.rmq.default.svc.cluster.local:5672/my_vhost

And then just as before continue to execute the tasks after the period of failure.

I also recommend looking at the rmq-1 logs to see how the clients start connecting to it and then how it accepts again the rmq-0 into the cluster once it gets up again.

Conclusion

What we did so far:

  • Create a new k8s cluster
  • Build and pushed the RabbitMQ and Celery images to the Google Container Registry
  • Deployed the helm charts for RabbitMQ and celery cluster.
  • Removed a k8s node and observed the behavior of our workers and RMQ nodes.

Of course this is just one way you cluster can fail. There are many other things that can happen. I recommend looking at: https://github.com/bloomberg/powerfulseal or https://github.com/asobti/kube-monkey to get an idea about other possible types of failure.

If I had to give one piece of advice based on this article:

Expect your workers to die at any moment and always code with that in mind.

IMHO this is the hardest part of all.

At Gorgias we’re sending tons of emails/chats and facebook messages and also making HTTP requests to user defined HTTP endpoints before sending the aforementioned messages which can fail with a timeout or an error. These are just a few questions that arise:

  • Should the emails be sent if other parts of the process have passed or not? If not how should we notify the customer?
  • What happens if the HTTP service we’re trying to reach times out? How many times should we retry before giving up? What happens then?
  • What if the worker is killed in the middle of the transaction with the mail server? Was the email sent or not? Should we retry? Should we notify the customer?
  • If the mail server is down do we have a retry mechanism and when should the retry switch to a different server?

The above are but a few of the questions that come to mind when thinking of our application, in reality there are many more and the answer is not always simple. The code required for failure handling makes the application much more verbose, harder to debug and maintain. We’re try having less and simpler features precisely because some many things can go wrong.

Even though the application level failure handling is very hard it’s still a million times better when you know that you can rely on Kubernetes and RabbitMQ to stay up and running your application code even if your VMs or physical machines go down. It’s a lot easier to build resilient and scalable applications that it was before kubernetes in my option and I hope that this post illustrates just that.

Here are the repos that have been used in this post: