...

Intelligent Autoscaling of Kafka Consumers with Workload Prediction

blog image

Introduction

In a Kafka-based application, messages for specific topics are generated from some producers, and sent to the Kafka brokers. The brokers perform required replications and distribute messages to the consumers of the respective topics. After receiving messages from the brokers, a consumer will perform some tasks and let the brokers know the messages have been committed (or consumed). The zookeepers maintain the offset of the last message sent by a producer for a topic, and the offset of the last committed message notified by a consumer for a topic. When there is a burst of messages received by the brokers, the messages will be stored in the queues longer if a consumer cannot process the messages fast enough, affecting overall application performance. In order to handle the dynamic nature of message production rate, HPA or Horizontal Pod Autotscalling of the Kafka Consumers is used to scale the number of Kafka Consumers so that the production and consumption rates of a topic are matched while using a reasonable number of consumer replicas (minimize resource costs). At the same time, HPA also needs to maintain a low latency of processing messages, which is a KPI or Key Performance Index of Kafka Consumers. In particular, we are calibrating the number of replicas with the following trade-offs:

  • Long latency if not enough Consumers
  • Too many consumers result in waste of resources
This article shows that the Native Kubernetes HPA algorithms (K8sHPA mechanism), based on either resource usages or KPI, result in modest savings and much larger lags (latency). Federator.ai from ProphetStor uses Machine Learning technologies to predict and analyze the Kafka workload, and to recommend the number of consumer replicas, considering the benefit and cost with the adjustment. Federaor.ai achieves much better performance (reduced latency) with much fewer resources (reduced number of consumers in Kafka), all without changing the K8sHPA mechanism or a line of code of Kafka. This makes implementing autoscaling of Kafka consumers simple and straightforward. 

Scaling with Kubernetes Native HPA and KEDA

A Kubernetes HPA controller is a controller that can determine the number of pods of a deployment, a replica set, or a stateful set. The HPA controller measures the relevant metrics to determine the number of pods required to meet the criteria as defined the HPA’s configuration, which is implemented as API resource with information like the desiredMetricValue. A native HPA controller supports the following types of metrics to determine how to scale:

  • Per POD resource metrics – the resource metrics, such as CPU usage, are represented as utilization or raw metrics depends on the choice of desiredMetricValue. The controller fetches metrics from relevant pods and compute the mean value as the currentMetricValue. Then, the desired number of pods, i.e., desiredReplicas, can be computed with the formula below.
  • Per POD custom metrics – The custom metrics, such as network receive bytes, are treated as raw metrics. The controller performs the same function as with per pod resource metrics to determine the desiredReplicas.
  • Per Container resource metrics – the resource metrics, such as CPU usage, of a specific container (instead of the entire pod) of all relevant pods are used to determine the currentMetricValue.
  • External metric – an application-specific metric, such as latency or other KPI, is treated as a raw metric. Optionally, in the autoscaling/v2beta2 API version, the controller can also divide the external metrics by the number of pods as currentMetricValue before applying the algorithm below.
  • Multiple metrics – multiple metrics can be supported in autoscaling/v2beta2 API version. The controller computes the desiredReplicas for each metric as above, and use the largest desiredReplicas value to scale.
Another class of autocalers is called KEDA or Kubernetes-based Event Driven Autoscaler. KEDA utilizes Kubernetes’ HPA component to support event-driven policy. One important distinction to the native HPA is KEDA’s ability to scale down to zero replicas when there is no event. This ability to dynamically activate/deactivate of deployment is supported by KEDA’s Agent. The number of replicas are updated based on triggers, for example, CPU utilization has exceeds 50%, or lagOffset of a Kafka consumer group of a topic has reached 5. There are many application-specific and generic resource metrics that have been defined here [1].
 
To compare the performance of some of the HPA schemes, we set up an environment with the following configuration:
  • Kubernetes Cluster: 1 Master, 3 Workers
  • Brokers: 3
  • ZooKeepers: 3
  • Producer: 1 Topic: 1 Partitions: 40
  • Consumer Scaling Range: 1 to 40
  • Message Production Rate: various rates from 20K msg/min ~ 120K msg/min, average 85K msg/min
We first utilize the native HPA scaling based on average CPU utilization of Kafka consumer replicas. We examine the applications with three different CPU utilization characteristics, which are denoted as Consumer 1, 2, and 3 representing different consumer groups under similar producer workloads.
  • In Consumer 1 case, we set the desiredMetricValue to 60%. The consumer pods are not CPU intensive and can handle a fixed number of messages (for example 100 messages per second) while using a small fractional of the allocated CPU (less than 60%). Since the mean CPU utilization never reach the pre-determined threshold (desiredMetricValue of 60%) while processing a small portion of the messages, we see the number of pods decreases while the consumer lags increases.
  • In the Consumer 2 case, we set the desiredMetricValue to 70%. The consumer pods are CPU intensive and can handle a fixed number of messages (for example 100 messages per second) while using a larger portion of allocated CPU (over 70% and reach 90% or higher). Since the mean CPU utilization above desiredMetricValue, we see the number of pods adjust as expected according to the mean CPU utilization.
  • In the Consumer 3 case, we set the desiredMetricValue to 80%. The consumer pods are CPU intensive and can handle a fixed number of messages (for example 100 messages per second) while using a larger portion of allocated CPU (over 70% but below 80%). Since the mean CPU utilization never reaches the pre-determined threshold (desiredMetricValue of 80%) while processing a small portion of the messages, we see the number of pods decreases while the consumer lags increases.
From the above cases, we observed different Kafka consumers exhibit different scaling results with Kubernetes native HPA. In order to achieve reasonable results as in the case of Consumer 2, and to avoid the bad performance as observed in the cases of Consumer 1 and 3, we need to set proper desiredMetricValue in accordance of the characteristics of the Kafka consumers. The difficulty arrives is how to set the desiredMetricValue, in particular, how to do it in a production environment, where Kafka consumers may behave differently from the testing/staging environment due to the size and types of workloads and available resources. It will require a lot of experiments to find what the right desiredMetricValue is for a particular Kafka consumer.
 
In a different experiment, we utilizes Kafka lag offset as external metrics to adjust the number of consumer replicas. In the native HPA scheme with external metrics, we used three diffe