Consolidating long-running, lightweight tasks for improved resource utilization

By: Yingbo Wang, Kevin Yang

Airflow is a platform to programmatically author, schedule, and monitor data pipelines. A typical Airflow cluster supports thousands of workflows, called DAGs (directed acyclic graphs), and there could be tens of thousands of concurrently running tasks at peak hours. Back in 2018, Airbnb’s Airflow cluster had several thousand DAGs and more than 30 thousand tasks running at the same time. This amount of workload would often result in Airflow’s database being overloaded. It also made the cluster quite expensive since it required a lot of resources to support those concurrent tasks.

In order to make the system more stable, and to reduce the cost of the cluster, we looked to optimize the Airflow system. We soon found that the long-running lightweight (LRLW) tasks waste a lot of resources, so we proposed a Smart Sensor to consolidate them and address the waste.

When we investigated the Airflow performance issues, we found that a few kinds of tasks shared the same LRLW patterns. They are the sensor tasks, the subDAGs, and the SparkSubmitOperator.

Sensors, or sensor tasks, are a special kind of operator that will keep running until a certain criterion is met. The criterion can be a file landing in HDFS or S3, a partition appearing in Hive, whether some other external task succeeded, or even if it is a specific time of the day.

Figure 1. The lifespan of a sensor task

When a sensor task is running, it calls its “poke” function to check the criterion periodically, usually every 3 minutes, and marks the sensor tasks with ‘success’ if their “poke” functions return true or ‘fail’ if sensor timeout. The execution of a “poke” is very fast, mostly less than 100ms, so most of the time sensors are idle, waiting for the next “poke” time to come. The lifespan of a sensor task is from the checking time to the time when the condition is met, which can range from a few minutes to several days.

SubDAGs are another example of long-running lightweight tasks. They are used to encapsulate a set of tasks in a DAG and make a complicated DAG’s structure cleaner and more readable. The DAG run is created for a subDAG in the pre_execute function and then subDAG task “poke” the DAG run status in the execute function.

The SparkSubmitOperator is also an example of a long-running lightweight task. The Spark client in Airflow submits the job and polls until completion. All these tasks, after some initialization work, fall into a lightweight and, at times, a long-running status.

From the previous examples, we can see that these tasks fall into the same “long-running, lightweight” pattern, characterized by the following:

  • The resource utilization is very low. Worker processes for these tasks remain idle 99% of the time.
  • These tasks often account for a very large portion of the concurrent running tasks in a large scale cluster. At Airbnb, more than 70% of running tasks are sensors. At peak hour, they take more than 20Kworker slots.
  • There are a lot of duplicate sensor tasks. More than 40% of sensor jobs are duplicates because many downstream DAGs usually wait for the same partitions from just a few important upstream DAGs.

We proposed the Smart Sensor to consolidate these LRLW tasks. Though originally created to consolidate long-running sensor tasks, it was later expanded to consolidate all LRLW tasks. We kept the name Smart Sensor for this service.

The main idea of the Smart Sensor service is to use centralized processes to execute long-running tasks in batches, instead of using one process for each task.

Figure 2. Sensors before and after enabling smart sensor

With the Smart Sensor service, a sensor task is executed in two steps:

  1. First, each task parses the DAG, gets the task object, runs the pre_execute function, and then registers itself to the Smart Sensor service. In the registration, it persists information required to poll external resources to the Airflow metaDB. After registration succeeds, the task exits and frees up the worker slots.
  2. Then, a few centralized processes (the Smart Sensor tasks from a built-in DAG) keep checking the database for the latest records of all registered tasks and execute the “poke” function for these tasks in batches. Normally, one Smart Sensor task is able to handle several hundred sensor tasks easily. The Smart Sensor can also combine duplicate sensor tasks into a single instance to save even more resources.

The Smart Sensor deduplicates tasks and balances workloads by defining the sensor task shards. The number of concurrently running sensors could be large and there will be multiple Smart Sensor tasks to execute all these jobs in a short period. How to assign sensor tasks to Smart Sensors was one of our key challenges when designing this system. We sought to balance the workload of all Smart Sensor tasks. At the same time, the `duplicated` sensor tasks have to be assigned to the same Smart Sensor so that we can avoid multiple pokes for the same target.

Figure 3. Deduplicating tasks by shardcode

In the Smart Sensor service, the `poke_context` is the signature of a sensor job. It is a dictionary of arguments needed to execute the sensor’s poke function. Two sensors with the same operator class and same `poke_context` are running the same `poke` function and are considered duplicated tasks. By using the hashcode of `poke_context` to do the sharding and make each Smart Sensor task take care of tasks whose hashcode is in a specific range, it should be able to assign `duplicated` sensors to the same smart sensor. Since hashcodes are long, we optimized by using the mod of the hashcode, which can be indexed in the database. We refer to this key as the `shardcode`.

Figure 3 shows how the sharding works in the Smart Sensor service. Sensor1 and sensor2 have the same `poke_context` and so they have the same `hashcode` and `shardcode`. At runtime, they will be picked up by the same Smart Sensor — e.g., `SmartSensor1`. All duplicated sensors will be poked only once in one poking loop.

Smart Sensor is a general service for all sensor classes. The centralized Smart Sensor task is a general framework. It is designed to support various classes. As long as the class has a poke function and the argument for this poke function can be serialized, the Smart Sensor tasks can support them.

Logs are handled similarly to unconsolidated processes. Although task execution is consolidated into fewer processes, the Smart Sensor service supports the same ability to read or download logs from the Airflow UI. Users can read logs from the original sensor task’s URL.

Smart Sensor can be easily applied to an Airflow cluster. Enabling and disabling the Smart Sensor service is simple, we only need to do a system level configuration change on the `smart_sensor` session in airflow.cfg. The change is transparent to the individual users and there is no need to change existing DAGs. Also, rotating centralized smart sensor tasks will not cause any user’s sensor task to fail.

Upon deploying the first version of Smart Sensor, Airbnb was able to reduce the number of peak-hour, concurrently running tasks by more than 60%. We also reduced the running sensor tasks by 80%. The process slots needed for sensors were reduced from 20,000 to 80. The database load is also greatly reduced due to much fewer running tasks.

Figure 4. Number of running tasks after Smart Sensor deployed

In Smart Sensor, the deduplicate mechanism reduced about 40% of requests to the Hive metastore and hence reduced both the absolute sensor traffic and the load on the underlying data warehouse.

Smart Sensor is a service which consolidates small, lightweight task traffic into bigger centralized tasks. It can reduce Airflow’s infrastructure cost and improve cluster stability. This is especially true for large clusters with a considerable amount of sensor tasks. For Airbnb’s gigantic Airflow clusters, Smart Sensor reduced a significant amount of cost and greatly improved the overall cluster stability.

The smart sensor service was released as one of the majority new features in Apache Airflow 2.0, since which it has been used to improve the resource utilization for more airflow users. Because the smart sensor service introduced the idea of splitting task lifespan into multiple processes and unlocked the `async` mode for task execution, the open source community has started to invest in more generic use cases for `async` solutions, among which the deferrable (“Async”) operator is an operator aiming to extend the async mode to more tasks.

All product names, logos, and brands are property of their respective owners. All company, product and service names used in this website are for identification purposes only. Use of these names, logos, and brands does not imply endorsement.



Source link