Monitoring ActionCable

A few weeks ago I was woken up with the following message "Hey, devices are randomly disconnecting from WebSockets can you come online and help?".

Usually, when a widespread problem like this occurs out of the blue, my first goal is to make the app mostly operational again and my second goal is to fix the cause of the problem. To do that I usually go over each component that could be causing this to check if it's functioning correctly and then drill down on any components that aren't.

Step one, identify which components could be at fault.

We use Rails' ActionCable for WebSockets. I've done a deep dive into ActionCable in a previous article if you'd like to read more about it.

ActionCable, except for the Ruby process itself, depends on Redis (in my case) and on the database(s) that the app uses.

Step two, prioritize.

Since I knew that the devices were randomly disconnecting I assumed that either app containers were restarting for whatever reason, or that something was causing ActionCable's worker pool to lag which then caused missed heartbeats.

This means that the Ruby process is the most likely culprit, then the database, and finally Redis.

Step three, check each components.

I checked Grafana to see what's going on with the app containers and saw that CPU was elevated but wasn't exceeding 80%, while memory didn't go above 70%, and the auto-scaler was adding more containers as needed. Everything was in order given that a reconnect storm was going on.

Then I checked Elastic APM to see what was going on with /cable. But the metrics were phenomenal - sub-millisecond response times and 100% successful responses. Fantastic numbers for a reconnect storm.

Next I checked the database. CPU and memory were at average levels, there were no slow queries. Everything was fine.

Lastly, I checked Redis. To my surprise, here too, everything was fine. The CPU never exceeded 10% and memory was at 5%.

What was going on?

Step four, stabilize the app.

Since Redis and the database were fine, in an Hail Mary attempt, I scaled-out the number of app containers to stop the disconnects. Surprisingly, this worked... This pointed a spotlight at the application code and ActionCable. They must have been the faulty component since the scale-out fixed the issue - but how?

Step five, find the root cause.

At that point I realized that I had no clue what was going on with our WebSockets after they were established. Elastic APM only captures metrics about the HTTP part of the WebSocket connection, but once it was established I had absolutely no insight into what's going on - how many clients there are, what the latency was like, how many messages were being sent...

The best way to figure that out was to introduce some kind of monitoring. But what should I monitor and how should I surface those metrics?

I'm not one to reinvent the wheel, so I checked if some tool for monitoring ActionCable already exists. I found out that Sentry's and AppSignal's APM tools monitor ActionCable's action execution times.

Action Execution Time

Action execution time measures how long it takes for a Channel to process a received message. This metric is important because it can show if you have a slow action which binds up the worker pool for too long and thus lowers throughput and increases latency. It's a nice way to quickly see if everything is alright with ActionCable.

I quickly exposed that metric to Elastic APM using the following code.
# config/initializers/elastic_apm.rb

ActiveSupport::Notifications.subscribe "perform_action.action_cable" do |event|
  transaction = ElasticAPM.start_transaction "#{event.payload[:channel_class]}##{event.payload[:action]}"
  transaction.start(event.time * 1000.0)
  
  ElasticAPM.end_transaction(event.payload[:exception].blank?)
end
That worked!

PubSub latency

I soon noticed that some extra metrics, like the PubSub latency, would be helpful. Knowing the PubSub latency would help me identify problems with the PubSub component of ActionCable. If the latency - the time it takes for a message to be sent to clients since it was broadcast from the app - goes up that means that the PubSub service, in my case Redis, is probably the cause of a problem.
Diagram that shows what PubSub latency measures
To measure that latency I have to know how much time has passed since a message was broadcast to when it was received in the ActionCable server. The easiest way to do that is to broadcast a message with the current time in it, then receive it in some channel and calculate the time difference between the time I received the message and the timestamp in the message.

To send the a message with the current timestamp I can do the following and run it from a cron job every minute
measurement_payload = {
  sent_at: Time.now.to_f
}

ActionCable.server.broadcast("metrics", measurement_payload)
To receive the message and calculate the latency I can do this
stream_from("metrics", proc do |json|
  payload = ActiveSupport::JSON.decode(json)
  sent_at = payload[:sent_at]
  latency = Time.now.to_f - sent_at
  
  Rails.logger.debug "PubSub Latency: #{latency}"
end)
But where can I put this code and how do I expose the latency?

After some experimentation I decided to create a stream from every open channel in ActionCable. This sounds pretty bad, and I'd like to find a better way to do this, but it has some upsides and the downsides can be minimized. The upside is that no measurements are performed if there are no clients. The downside is that each client has its own metrics subscription even though only one subscription per server is needed.

There are many ways to limit the subscription to be one per server and I opted for the simplest one - a mutex. A mutex, aka mutually exclusive lock, allows me to ensure that just one thread is running a piece of code at a time. In Ruby, you'd usually use Mutex#synchronize like so:
# Create the mutex and a counter
mutex = Mutex.new
count = 0

# Defines a proc that prints the value of the counter
# and increments it by 1
work = proc do
  10.times do
    mutex.synchronize do
      sleep 0.1
      puts "count: #{count}"
      count += 1
    end
  end
end

# Spawns 50 threads that all call the proc we just defined
# and then waits for all the threads to finish
50.times.map do
  Thread.new { work.call }
end.to_a.each(&:join)

# Prints the final count after all the threads are done
puts "Final count: #{count}"
If you now run this code from IRB and look at the output you'd see that the count always goes up by one and that the final count is 500
...
count: 487
count: 488
count: 489
count: 490
count: 491
count: 492
count: 493
count: 494
count: 495
count: 496
count: 497
count: 498
count: 499
Final count: 500
But if you remove the mutex, first you'd notice that the code runs much faster, second thing you'd notice is that the output is completely random, sometimes repeats, and change each time you run this
...
count: 487
count: 488
count: 489
count: 490
count: 491
count: 492
count: 493
count: 493
count: 495
count: 496
count: 495
count: 496
count: 494
Final count: 497
This is because, without the mutex, all the threads try to read and update the counter at the same time. Sometimes two threads read the value at the same time - like 493 in the output above - which results in a value being skipped - because both read 493 and updated it to 494.

With the mutex, one thread locks the mutex using the synchronize method, updates the counter ten times, and unlocks the mutex, then another thread lock it again, etc.

That's why with the mutex the code takes longer to complete, each thread runs after the other so the total time is 50 times 10 times 100ms which is 50 seconds. While without the mutex all threads run at the same time so the total time it takes to execute the code is 10 x 100ms or 1 second.

To guarantee that just 1 subscription will measure the latency I can use Mutex#try_lock and Mutex#unlock instead of Mutex#synchronize. Unlike synchronize which blocks and waits until the mutex unlock, try_lock doesn't block and instead returns true if it acquired the lock or false if it didn't. With try_lock I can tell all threads to try an measure the latency, the thread that locks the mutex then performs the measurement and all others return.
MUTEX = Mutex.new

def run_unless_already_running(&block)
  lock_acquired = MUTEX.try_lock
  return unless lock_acquired

  block.call
ensure
  MUTEX.unlock if lock_acquired
end

def collect(payload)
  run_unless_already_running do
    latency = Time.now.to_f - sent_at
  
    Rails.logger.debug "PubSub Latency: #{latency}"
  end
end

stream_from("metrics", proc do |json|
  payload = ActiveSupport::JSON.decode(json)
  collect(payload)
end)
To make things prettier I put all code related to metrics collection into a module
module ActionCable
  module MetricsCollector
    MUTEX = Mutex.new
    STREAM_NAME = "internal.action_cable.metrics"
    COLLECTION_TRESHOLD = 60.seconds

    cattr_accessor :last_collected_at

    class << self
      def measure
        ActionCable.server.broadcast(stream_name, measurement_payload)
      end

      def stream_name = STREAM_NAME

      def measurement_payload
        {
          sent_at: Time.now.to_f
        }
      end

      def collect(payload)
        run_unless_already_running do
          next if measurement_already_collected?

          self.last_collected_at = Time.now
          collect_measurement(payload)
        end
      end

      def measurement_already_collected? = last_collected_at&.after?(collection_trashold.ago)

      def collection_trashold = COLLECTION_TRESHOLD

      private

      def run_unless_already_running(&block)
        lock_acquired = MUTEX.try_lock
        return unless lock_acquired

        block.call
      ensure
        MUTEX.unlock if lock_acquired
      end

      def collect_measurement(payload)
        pub_sub_latency = [0, Time.now.to_f - payload.fetch("sent_at", -Float::INFINITY)].max
        Rails.logger.debug "PubSub latency: #{pub_sub_latency}"
      end
    end
  end
end
Then I changed my cron job to run
bin/rails runner "ActionCable::MetricsCollector.measure"
And added the following to app/channels/application_cable/channel.rb
module ApplicationCable
  class Channel < ActionCable::Channel::Base
    after_subscribe do
      stream_from(
        ::ActionCable::MetricsCollector.stream_name,
        proc do |json|
          payload = ActiveSupport::JSON.decode(json)
          ::ActionCable::MetricsCollector.collect(payload)
        rescue => e
          Rails.logger.error("Failed to collect metrics: #{e}")
        end
      )
    end
  end
end
How do I expose this metric to Elastic APM?

After some thought I decided I wouldn't. I could hack something together, but I'm shoehorning this metric in and later metrics I flat out don't know how to expose in Elastic AMP. So I decided not to bother with this and figure it out later.

Number of connections

Since I'm running an IoT platform I have a somewhat constant number of clients connected at all times. This means that by knowing the number of clients at any given moment I can easily see if there is a problem or if I need to scale out.

Luckily, with the module that I've added to measure the PubSub latency it's easy to expose the number of client. All I have to do is change the collect_measurement method like so
def collect_measurement(payload)
  pub_sub_latency = [0, Time.now.to_f - payload.fetch("sent_at", -Float::INFINITY)].max
  Rails.logger.debug "PubSub latency: #{pub_sub_latency}"

  connection_count = ActionCable.server.connections.length
  Rails.logger.debug "Connection count: #{connection_count}"
end
Without that module I'd probably setup a periodic task like so
module ApplicationCable
  class Channel < ActionCable::Channel::Base
    periodically every: 1.minute do
	  connection_count = ActionCable.server.connections.length
      Rails.logger.debug "Connection count: #{connection_count}"
    end
  end
end
The trick to getting the count is to call ActionCable.server.connections.length from within the server process. If you call it from the Rails console or from a rails runner the count will always return zero as the connections only exist in the server's memory.

Client-Server Latency

Since I'm running my own patch of ActionCable that adds PONG messages in addition to the standard heartbeat PING message, I can easily expose the time it takes for a message to travel from the server over the Internet to a client.
Diagram that shows what Client-Server latency measures
This metric isn't useful for regular web apps as it's non actionable - you can't fix your client's Internet. But it's useful for apps, like mine, where you have control over clients and want to have them connected 24/7.
# Stock Rails 8 doesn't have this notificaiton
ActiveSupport::Notifications.subscribe "client_latency.action_cable" do |_name, _start, _finish, _id, payload|
  Rails.logger.debug("Client latency: #{payload[:value]}")
end
Clients will start disconnecting if this latency exceeds six seconds. If the median latency exceeds a few seconds this indicates a widespread Internet issue (which is more common in the US than I though it would be).

Now that I have all these metrics I have to expose them somehow.

I decided to talk to a coworker from the DevOps team about how they collect and expose metrics from our machines and he told me that they use Prometheus and that he could configure it to collect metrics from ActionCable.

But what is Prometheus?

Prometheus is an open-source monitoring and alerting system - it's a service that collects metrics, allows you to query them, and to configure alerts when they exceed a threshold. It's a bit different from the SaaS monitoring solutions because it scrapes metrics from your application instead of having your application push metrics to it.
Diagram that explains the difference between Prometheus and other APM services
Usually, you'd expose a /metrics endpoint that renders all the metrics the application has collected in a format that Prometheus can parse.

I wasn't sure how to do that at first, but I remember reading an article on Basecamp's dev blog and it mentioned a gem called Yabeda.

Yabeda is a framework for monitoring Ruby applications. Using it you can measure, or report measurements, to various monitoring services like Datadog, NewRelic, Honeybadger, AWS CloudWatch, Prometheus, and many others.

In a nutshell, I can report my measurements to Yabeda and then it handles exposing those metrics to various different monitoring services - like Prometheus - for me.

According to the documentation, before I can use Yabeda I have to configure it which means that I have to define all the metrics that I'm going to expose. Yabeda supports a handful of types of metrics - counters, gauges, and histograms - so let me explain those first.

A counter is a number that can only ever go up by some amount. E.g. the number of ActiveJob jobs processed is a counter because the metric is a number that can only ever increase. But the number of connections to ActionCable isn't a counter since the number goes up when more clients connect and goes down when clients disconnect.

To represent numbers that can go both up and down as a metric we need a gauge. Gauges represent a metric that can be any number at any time.

Finally, a histogram represents a distribution of values. It tells you how many measurements you've got in a certain range called a bucket. Histograms are usually represented by a set of counters and gauges - one counter for each bucket, one counter for the number of measurements, and one gauge for the sum of all measurements. This sounds a bit complicated, but it allows you to calculate the average, median, and other distribution metrics quickly and without having to store every single measurement somewhere. Usually you'd use a histogram to represent time measurements like response times.

Defining these metrics is fairly straight forward
# config/initializers/yabeda.rb

Yabeda.configure do
  group :action_cable do
    histogram :client_server_latency do
      comment "The time it takes for a message to travel via the Internet to a client"
      unit :seconds
      buckets [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
    end

    histogram :pubsub_latency do
      comment "The time it takes for a message to travel through the PubSub service"
      unit :seconds
      buckets [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
    end

    histogram :action_execution_duration do
      comment "The time it takes to perform an invoked action"
      unit :seconds
      buckets [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
    end

    gauge :connection_count, comment: "Number of open WebSocket connections"
  end
end
To report a metric I have to build a chain of method calls consisting of the group names of the metric I want to report, then the metric's name, and finally call measure on that. The measure method accepts either one argument and a block or two arguments. The first argument is a Hash of tags to add to the measurement, I'll explain those later. The second is the measurement itself. Without the second argument, Yabeda will execute the block, measure it's execution time, and report that.
Yabeda.action_cable.client_server_latency.measure({}, 0.123)
# or
Yabeda.action_cable.client_server_latency.measure({}) do
  sleep 0.123 # do some work
end
I can now update all ActiveSupport Notification subscriptions to report to Yabeda, and I can move them all to Yabeda's initializer.
# config/initializers/yabeda.rb

Yabeda.configure do
  group :action_cable do
    histogram :client_server_latency do
      comment "The time it takes for a message to travel via the Internet to a client"
      unit :seconds
      buckets [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
    end

    histogram :pub_sub_latency do
      comment "The time it takes for a message to travel through the PubSub service"
      unit :seconds
      buckets [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
    end

    histogram :action_execution_duration do
      comment "The time it takes to perform an invoked action"
      unit :seconds
      buckets [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
    end

    gauge :connection_count, comment: "Number of open WebSocket connections"
  end
end

ActiveSupport::Notifications.subscribe "perform_action.action_cable" do |event|
  name = "#{event.payload[:channel_class]}##{event.payload[:action]}"
  duration = event.duration / 1000.0 # convert ms to seconds
  
  Yabeda
    .action_cable
    .action_execution_duration
    .measure({ action: action }, duration) # I've used a tag here
end

ActiveSupport::Notifications.subscribe "client_latency.action_cable" do |_name, _start, _finish, _id, payload|
  client_server_latency = payload[:value]
  Yabeda
    .action_cable
    .client_server_latency
    .measure({}, client_server_latency)
end
You might have noticed that I've used a tag when reporting the action execution time. You can think of tags as groups of measurements. Instead of collecting the action execution time of all actions in ActionCable, using tags I'll collect the action execution time of each individual action. That way I can easily pin point which action has the highest duration, but I can still get the average execution time by finding the average of all the tagged measurements.

Without tags I'd have to define dozens of histograms - one for each action. But with tags I can define one histogram and let Yabeda group my measurements for me.

I have to update the metrics collector too
module ActionCable
  module MetricsCollector
    MUTEX = Mutex.new
    STREAM_NAME = "internal.action_cable.metrics"
    COLLECTION_TRESHOLD = 60.seconds

    cattr_accessor :last_collected_at

    class << self
      def measure
        ActionCable.server.broadcast(stream_name, measurement_payload)
      end

      def stream_name = STREAM_NAME

      def measurement_payload
        {
          sent_at: Time.now.to_f
        }
      end

      def collect(payload)
        run_unless_already_running do
          next if measurement_already_collected?

          self.last_collected_at = Time.now
          collect_measurement(payload)
        end
      end

      def measurement_already_collected? = last_collected_at&.after?(collection_trashold.ago)

      def collection_trashold = COLLECTION_TRESHOLD

      private

      def run_unless_already_running(&block)
        lock_acquired = MUTEX.try_lock
        return unless lock_acquired

        block.call
      ensure
        MUTEX.unlock if lock_acquired
      end

      def collect_measurement(payload)
        pub_sub_latency = [0, Time.now.to_f - payload.fetch("sent_at", -Float::INFINITY)].max
        Yabeda.action_cable.pub_sub_latency.measure({}, pub_sub_latency)

		connection_count = ActionCable.server.connections.length
		Yabeda.action_cable.connection_count.set({}, connection_count)
      end
    end
  end
end
Now that my metrics are in Yabeda, I have to somehow expose them to Prometheus. For that I've added the yabeda-prometheus-mmap gem and mounted its route
# config/routes.rb

Rails.application.routes.draw do
  mount Yabeda::Prometheus::Exporter, at: "/metrics"
  # ...
end
There is also yabeda-prometheus, if you are running a single worker server these two are effectively the same. But if you are running Puma with multiple workers, or SolidQueue with multiple workers then the mmap version of the gem will be much faster.

Additionally, when running multiple workers, you probably also want to add "aggregation: :sum" to all gauges. Without that option, if a worker fails and gets restarted, you'll get very wonky metrics because the gauges will get counted twice with the old worker's gauge being stuck at whatever value it was before the worker crashed.

Now, with the route mounted I can connect to my ActionCable server and then open /metrics in a separate browser window to see the output
...
# HELP action_cable_action_execution_duration_seconds Multiprocess metric
# TYPE action_cable_action_execution_duration_seconds histogram
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="+Inf"} 2189
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="0.005"} 101
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="0.01"} 559
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="0.025"} 2044
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="0.05"} 2174
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="0.1"} 2181
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="0.25"} 2189
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="0.5"} 2189
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="1"} 2189
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="10"} 2189
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="2.5"} 2189
action_cable_action_execution_duration_seconds_bucket{environment="development",action="ChatChannel#post_message",le="5"} 2189
action_cable_action_execution_duration_seconds_count{environment="development",action="ChatChannel#post_message"} 2189
action_cable_action_execution_duration_seconds_sum{environment="development",action="ChatChannel#post_message"} 34.04095268249512
...
That's my action execution duration. It works!

My coworker had already configured Prometheus to scrape that endpoint every minute or so and the metrics appeared in Prometheus within a few minutes.

Analyzing the metrics

Now that I had all the metrics somewhere where I could query them I had to make them easy to analyze. I've decided to go with Grafana as we already had an instance to monitor our infrastructure.

Grafana allowed me to take this set of metrics and turn them into graphs that are easy to parse. First, I exposed the number of connections. The graph now clearly shows how many clients were connected at each minute.
The number of connections to ActionCable plotted over time
Next, I plotted the action execution latency. This is a bit more involved just because I wanted to plot the average, median, 95th percentile and 99th percentile.

The average is important for scaling as it's used in Little's law to determine the number of workers necessary to handle the load. The median is a better representation of what most clients experience as it represents the latency of 50% of all clients. The 95th and 99th percentile represent the worst cases - the latency experienced by 95% and 99% of all clients.
The action execution latency of all actions plotted over time
In addition to plotting the overall latency across all actions I've also plotted the latency per action as a heat-map. Here each rectangle represents 1 minute, red rectangles represent high latency and blue represents low latency. If I see a bright red action here I know that it's causing a problem.
A heatmap that breaks down the action execution latency by action over time
I've plotted the client-server latency and PubSub latency the same way as the overall action execution duration.
Client-Server latency plotted over time
-
PubSub latency plotted over time
With all these metrics in Grafana I could now focus on finding and fixing the root cause. As the incident had passed and I didn't have any metrics from it so I'd have to wait for another one.

Step six, fixing the root cause.

The app was now stable so I tried a few things that I guessed could be the cause while I waited to see if this would occur again.

Then, after two weeks, the incident happened again!

I quickly opened Grafana and saw that the PubSub latency was extremely high.
PubSub latency during the incident
(the 95th and 99th percentile are capped at 10s because that's the largest bucket I've configured)

This means that Redis is the culprit. For some reason It takes ages to deliver messages. As a quick fix, I opened the AWS console and scaled up our Redis instance. Within a few minutes the incident cleared up.

I spent the next week investigating ElastiCache Redis and confirmed that it was the root cause of both incidents. I'll go into more detail on this in a subsequent post, but the gist of it is that AWS is a piece of crap.

Looking back

I would never have found and fixed this without the monitoring that I've set up.
In the following month I've exposed more metrics like the broadcast and transmission latency, and the number of confirmed and rejected subscriptions.
ActiveSupport::Notifications.subscribe "broadcast.action_cable" do |_name, start, finish, _id, _payload|
  Yabeda.action_cable.broadcast_duration.measure({}, finish.to_f - start.to_f)
end

ActiveSupport::Notifications.subscribe "transmit.action_cable" do |_name, start, finish, _id, _payload|
  Yabeda.action_cable.transmission_duration.measure({}, finish.to_f - start.to_f)
end

ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |_name, start, finish, _id, _payload|
  Yabeda.action_cable.transmission_duration.measure({}, finish.to_f - start.to_f)
  Yabeda.action_cable.confirmed_subscription_count.increment({}, by: 1)
end

ActiveSupport::Notifications.subscribe "transmit_subscription_rejection.action_cable" do |_name, start, finish, _id, _payload|
  Yabeda.action_cable.transmission_duration.measure({}, finish.to_f - start.to_f)
  Yabeda.action_cable.rejected_subscription_count.increment({}, by: 1)
end
I don't really need those metrics now, but through this incident I've learned that it's better to have a metric before you need it so I exposed everything I could expose with ActiveSupport Notifications.

Additionally, I've added a few more gems - like yabeda-puma-plugin, yabeda-rails and yabeda-activejob - that expose various metrics to help me pinpoint problems in the future.
Subscribe to the newsletter to receive future posts via email