Abstract
Flink is regarded as a promising distributed data processing engine for unifying bounded data and unbounded data. Unbalanced workloads upon multiple workers/task managers/servers in the Flink bring congestion, which will lead to the quality of service (QoS) decreasing. The balanced load distribution could efficiently improve QoS. Besides, existing works are lagging behind the current Flink version. To distribute workloads upon workers evenly, a resource-oriented load balancing task scheduling (RoLBTS) mechanism for Flink is proposed. The capacities of CPU, memory, and bandwidth are taken into consideration. Based on the barrel principle, the memory, and the bandwidth are respectively selected to model the resource occupancy ratio of the physical node and that of the physical link. On the based of modeled resource occupancy ratio, the data processing of load-balancing resource usage in Flink is formulated as a quadratic programming problem. Based on the self-recursive calling, a RoLBTS algorithm for scheduling task-needed resources is presented. Trough the numerical simulation, the superiority of our work is evaluated in terms of resource score, the number of possible scheduling solutions, and resource usage ratio.
Introduction
Our world is flooded by data. Huge amounts of data are generated all the time by smartphones, websites, sensors, etc., benefitting from the development of information and communications technologies (ICT) like cloud computing, big data, internet of things (IoT), artificial intelligence (AI), and so on.
As time passes, this trend will become more pronounced, compared to the present. According to IDC’s report, the data generated globally will, in 2025, reach 163 ZB, a tenfold increase over the amount of data created in 2016 [1]. It is imminent that data will need to be instantly available whenever and wherever anyone needs it. As the raw material of AI model training, timely data processing leads to a trained model that keeps up with changes. More than 25% of the data created in the world will be real-time in nature by 2025.
The data from the core, edge, and endpoint, can be classified into bounded data and unbounded data. Bounded data has a defined start and a defined end, like the relatively stable data in databases. Meanwhile, unbounded data has a start without a clear end, such as the server signaling, real-time log, continuously changing, and adding data. For bounded data, all the data should be ingested before any data computation, without requiring data order, i.e., batch processing. For unbounded data, it must be processed continuously in a specific order, i.e., stream processing. Because unbounded data does not terminate and is ingested as it generates. The data computing framework for bounded data and unbounded data are completely unlike.
To avoid repetitive construction and computing mode confliction for unbounded data processing and bounded data processing, integrating them in a computing framework is considered a feasible way. Spark and Flink are two outstanding frameworks. In the Spark, unbounded data is treated as a special case of bounded data. Unbounded data for Spark is a series of ordered bounded data with a small enough interval [2]. In the Flink, bounded data is seen as the special case of unbounded data. Bounded data for Flink is the unbounded data with an end [3, 4]. A small enough interval is not an absence of interval after all. Flink is more close to unbounded data’s native no-interval characteristics. This brings lower latency for Flink, compared to Spark. Flink wins the competition with Spark.
Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams [5]. Flink has a distributed master-worker architecture with at least one master/job manager and more than one worker/task manager, as shown in Fig. 1. The job manager is responsible for task scheduling, resource management, coordinating message, and so on. The task manager provides the execution of a specific task in a stream processing, in the form of a java thread. The actor system is an underlying system for communicating among different machines. Unlike previous frameworks which read and write data from the hard disk, the operation of data in the Flink only occurs in the memory rather than the hard disk [6].

Flink architecture.
The resource in the Flink is limited. Load balancing in a distributed system can increase flexibility, decrease the possibility of a certain machine shutting down, and handle as many tasks as possible. How to schedule data processing tasks upon this limited resource evenly is a hotspot. Many works about resource management in the Flink have been done, which are discussed in the following Section II. But their works are mostly out of date because that Flink is a dynamic growing platform. For example, the integration of unbounded data and bounded data is realized via two different application programming interfaces (APIs), i.e., datastream API for unbounded data and dataset API for bounded data, before version 1.14.0. In version 1.14.0, the integration is realized via only the data stream API. Besides, the existing works usually adopt version 1.6.0 which is no longer supported by Apache Flink Foundation.
To solve the problem that the model lags behind the latest Flink and to distribute workloads upon workers evenly, a resource-oriented load balancing task scheduling (RoLBTS) mechanism for Flink is proposed. Our contributions can be listed as follows: To achieve global load fairness, the data processing oriented load balancing resource usage in the Flink is formulated as a quadratic programming problem. Multiple resources of both nodes and links, i.e., CPU, memory, and bandwidth, are considered, not a single kind of resource. A RoLBTS algorithm for scheduling task-needed resources is presented. Calculated possible scheduling solutions are filtered by the resource constraint of nodes and corresponding links in order. The performance of our work is analyzed and compared with the existing work in terms of resource score, the number of possible scheduling solutions, and resource usage ratio. The superiority of our work is displayed.
The rest of this paper is arranged as follows. In Section 2, the works about load balancing and Flink are analyzed. Section 3 gives a formulation of resource-oriented task scheduling for Flink and proposes a problem of evenly distributing tasks (or loads) upon nodes/servers. A RoLBTS algorithm for solving the proposed problem is represented in Section 4. Section 5 is devoted to presenting the characteristics of our work, via numerical simulation. Finally, Section 6 concludes our work and looks ahead to future research.
There have been several works about load balancing and Flink.
Load balancing can offer a fast, stable, and predicated performance. An algebraic evaluation of the load-balancing performance for a distributed storage system was done. In this system, each object was stored at d different nodes, and each node stored the same number of objects. Load balancing performance increased with the level of added storage redundancy d. Implementing storage redundancy with XOR results of r objects rather than object replicas obtained the same improvement in load balancing performance, while also reducing the storage overhead [7]. To get a high quality of service (QoS) provisioning, a video load balancing-based solution (ViLBaS) over multi-hop wireless networks was put forward. Through performance monitoring at the level of the nodes and load balancing by off-loading from heavily loaded nodes to less loaded neighboring nodes, ViLBaS efficiently improved QoS and quality of experience (QoE) [8]. The performance of a multi-criteria-based load balancing scheme among gateways in fog-assisted IoT was investigated. Via a queueing model of the IoT system, a multi-criteria decision-making (MCDM) based load balancing policy among the multiple gateways was brought forward [9]. To optimize the network performance, a dedicated lowest path buffer (LPB) load balancing algorithm considering the peculiarity of OPSquare was described. The OPSqure is an optical circuit switch-based network for the data center. Under different controller update periods, traffic destination distributions, and network sizes, the LPB algorithm provided lower packet loss in comparison with round-robin, and other compared groups [10].
An open-source benchmark test and full analysis of two popular stream processing frameworks, i.e., Flink and Spark, were presented. Four kinds of workloads (latency, sustainable throughput, single burst, and periodic burst) were investigated in the aspect of complexity and other performance [11]. A flow-network-based auto rescales strategy for Flink was represented. The flow network with calculated edge capacity was established through a self-learning algorithm. A maximum-flow algorithm was used to locate the flow bottleneck for a stateful data stream. Via the concept of bulk and bucket, the resource was rescaled and the data migration algorithm was applied to migrate correspondingly [12, 13]. To cope with the radical workload fluctuation in big data stream computing, a load prediction-based elastic resource scheduling strategy in Flink was proposed [14]. The workload was predicted by the autoregressive integrated moving average (ARIMA) model. While allocated resource was not appropriate for dynamic workloads, no matter limited or redundant, an online load migration algorithm was offered to re-schedule resources and to migrate processing load among nodes effectively.
The stream processing systems partition their workloads statically. This is one reason for imbalanced load distribution. A system named Marabunta was built in a skewed data distribution [15]. By introducing the vertical and horizontal elasticity of stateful operations, the parallelism was increased to accelerate the execution. This system was an improvement of the Flink v1.6.0. Flink had also been applied to execute the dynamic pyramid map tile generation algorithm for digital map visualization [16]. Combined with Redis, a main-memory database, a Flink-based architecture was proposed to handle the nearest neighbor queries in the 2-dimension space [17]. In this work, the resource occupancy ratio of the node and that of the link are selected as the screen indicator of candidate scheduling strategies. The newest Flink version is also considered.
In summary, the comparison of recent works and ours is shown in Table 1.
The comparison of recent works and ours
The comparison of recent works and ours
Preliminaries
In the Flink, a stream processing job should go through four phases, from the initial java code to executable service threads. These four phases are sequenced stream graph, job graph, execution graph, and physical graph, shown in Fig. 2. The stream graph is the first graph according to the java code via Flink stream API. The java code is often encapsulated as a The job graph is the improvement of the stream graph, after merging one-to-one operator chains. This improvement can reduce the coordination message among different threads. The job graph is submitted to the job manager and is a connection data structure. This structure is essentially a directed acyclic graph (DAG) in which each node (operator) has its parallelism parameter. The execution graph is generated by the job manager, based on the job graph. The execution graph is the parallel version of the job graph, through explicit operator connections. The execution graph is the core data structure of the job manager. The physical graph is the job distribution that is hosted by workers (physical servers), after the job manager’s job scheduling is based on the execution graph. The physical graph is a mapping relationship between task slots and parallelism.

Program execution.
In the Flink, one task slot can only host one parallel branch of the physical graph. One task slot is an independent thread of data processing. One branch is one of the parallelisms of the execution graph. In this situation, the unbalanced job task scheduling will lead to congestion which in turn can affect the QoS. For example, in Fig. 2, the job scheduling is unbalanced. Three task slots in Task Manager 1 are all occupied, while only one task slot in Task Manager 2 is busy. When a new stream processing task with parallelism parameter 2 arrives, scheduling it upon Task Manager 1 will result in congestion.
To obtain global load distribution fairness and guarantee QoS, scheduling a physical graph’s parallel branches upon physical servers as evenly as possible is feasible.
We assume that All the modules of Flink, including job managers and task managers, are deployed in the same cluster. Once a computing service is deployed, it will last with no end. The data rate of a computing service is constant. Because the data publish-subscribe system like Kafka, and Redis, is often placed before Flink. And it can shave the data stream peak and fill the valley. The coordinating message between two task slots in the same server takes up no extra bandwidth because the communication of a physical server’s different threads has no relationship with the physical link. In Fig. 2, the left three links between operator D and operator E belong to this situation. The coordinating massage between two physical servers’ task slots takes up the physical bandwidth because some bandwidth resources are needed for the coordinating message transmission between two physical servers. In Fig. 2, the last link between operator D and operator E belongs to this situation.
The physical network of workers is denoted as G S . G S =(N S , E S ). N S is the set of physical servers. A physical server with marker i is denoted as n i . n i ∈ N S . E S is the set of physical links. A physical link that connects two physical nodes, i.e., n i , and n j , is e ij . e ij ∈ E S . The available resource of physical network G S is denoted as G a . The required execution graph of a stream processing service is denoted as G r . The maximum parallelism of G r is denoted as M. Thus, M data stream branches of G r should be distributed upon G S as evenly as possible.
The resource occupancy ratio of the physical node n i is denoted as p i .
For a stream processing service, the resource interaction model is shown in Fig. 3. The stream among different physical servers will consume physical link bandwidth, while the stream upon the same physical server will consume no physical link bandwidth. The execution of operator computing is realized by the physical server’s CPU, accompanied by reading/writing temporary data from/into the physical server’s memory.

Resource interaction model.
In the real scenario, the data processing rate of a physical server’s CPU is greatly faster than the data reading/writing rate from/into its memory. This is the reason why the physical server’s memory is selected in Equation 1 and Equation 3.
The slot number of each task manager, i.e., taskmanager.numberofTaskSlots, is configured in the file flink-config.yaml. The recommended value of taskmanager.numberofTaskSlots is the number of CPU cores. The integer times of CPU cores are also acceptable because of the existence of CPU overclocking technology.
The stream which is embedded upon different physical servers would take up a part of the bandwidth. Meanwhile, the stream which is embedded upon the same server would not.
For a task slot of a physical server, its stream data processing rate is v
slot
.
The global load distribution in the Flink cluster can be indicated by the resource occupancy ratio of physical servers and links. Considering the global load distribution fairness, the variance of the resource occupancy ratio of physical servers and links is chosen as the target indicator. To guarantee QoS, getting minimum variance is formulated in Equation 4 with two constraint conditions, i.e., Equation 5 and Equation 6.
In Equation 5, ai↑k, as shown in Equation 7, is the scheduling situation of the stream distribution upon the physical server. When the kth data stream branch is scheduled upon the physical server n
i
, ai↑k = 1. Otherwise, ai↑k = 0. G
r
(k, k) is the data stream processing rate of the kth data stream branch of G
r
. G
a
(i, i) is the available memory of the physical server n
i
.
In Equation 6, aj↑l, as shown in Equation 8, is the scheduling situation of the stream distribution upon the physical link. When the lth data stream branch is scheduled upon physical server n j , aj↑l = 1. Otherwise, aj↑l = 0. G r (k, l) is the data stream processing rate of the coordination connection between the kth branch and the lthe branch. G a (i, j) is the available bandwidth of the physical link which connects the physical server n i and the physical server n j .
To solve the formulated problem, a RoLBTS algorithm is proposed. Its pseudo-code is shown in Algorithm 1.
Based on the inputs of the available graph, the execution graph, and the data stream rate, all the possible scheduling solutions are calculated (Line 1). The thinking of this calculation is recursively calling itself. The complexity of the possible scheduling solutions calculation is N M . The solutions are the full alignment of N physical servers with M parallelisms (or branches).
For each possible solution, by comparing the service required data processing rate (i.e., v r ) with the available memory of physical servers in this solution, the solution that can not afford sufficient memory is deleted (Line 3-7). In a possible solution, each physical node has its own p i . It can be obtained according to Equation (1). If the element of the ith row, ith column position in the G a is greater than any p i of this possible solution, this solution should be deleted. Similarly, the solution that can not afford sufficient bandwidth is deleted (Line 8-12) In a possible solution, each physical link has its own p ij . It can be obtained according to Equation (2). If the element of the ith row, jth column position in the G a is greater than any p ij of this possible solution, this solution also should be deleted.
The left solutions with the minimum variance of physical servers and links are the final solution. In the case of multiple solutions with the same minimum variance, the first solution is selected (Line 13).
Eventually, update G r , G a , empty Stack, and return G a and S res ;
Flink can be deployed locally in a standalone cluster. But in the production environment, Flink can run distributed on clusters of many machines. In a cluster, the physical node which is chosen as master is also responsible for collecting information from all other physical nodes. The collected information also includes the resource occupancy ratio mentioned in this manuscript. Getting an even workload distribution in our work brings overhead, because of the interaction information between the master and workers.
Numerical simulation
To evaluate the performance of our problem formulation and algorithm, a numerical simulation is conducted in this section. The common parameters in this simulation are listed in Table 2. The specific parameters used in the different scenarios are separately described in the later discussion.
Common parameters
Common parameters
There are 4 physical servers in this simulation, one of which is used as the master/job manager and the other three as the worker/task manager. The bus speed of the CPU and the read/write rate of the memory are got from the instruction manual of the DELL R730 server. GT/s is an abbreviation of Giga Transmission per second. The core number of the physical server is got by executing Linux command
The Flink Schedule Algorithm for Container Environment (FSACE) algorithm in [19] is selected as a compared group. In the FSACE algorithm, the CPU and memory of the physical server are combined to evaluate the ability of this physical server, neglecting the physical link bandwidth. The ability of this physical server is denoted as P.
The score of the RoLBTS algorithm and that of the FSACE algorithm are shown in Fig. 4. Considering that overclocking will reduce the CPU lifetime, it is not advisable to set the value of n
ocl
too high. The parameters n
ocl
in the RoLBTS algorithm are 1, 2, and 4. The parameters ρ in the FSACE algorithm are 0, 0.5, and 1. The record length is the size of each record in the data stream. The recommended record length is 1kMbit [20]. But the record length in the real industrial environment is always greater than the recommended value. The values of record length are

Score vs. Record length.
With the increase of record length, the score of the RoLBTS algorithm and that of the FSACE algorithm both grow up with a certain fixed growth rate. No care about the limit of the physical servers and the physical links, the score only relates to the input data rate. The greater the record length is, the larger score is. The comparison of the score of the RoLBTS algorithm and that of the FSACE algorithm is a simple comparison of the two indicators.
For a fixed record length, the Sorce of FSACE algorithm with ρ=0 < the Sorce of RoLBTS algorithm with n ocl =4 < the Sorce of RoLBTS algorithm with n ocl =2 < the Sorce of RoLBTS algorithm with n ocl =1 < the Sorce of FSACE algorithm with ρ=0.5 < the Sorce of FSACE algorithm with ρ=1. The score of RoLBTS algorithm is reversely proportional to n ocl , while the score of FSACE algorithm is proportional to ρ. These phenomena can be explained by the definition of the score, i.e., Equation 3 and Equation 9.
Compared to the scores of the FSACE algorithm, the scores of the RoLBTS algorithm are more concentrated. This is a reflection of the RoLBTS algorithm having a smaller workload variance. Through the application of our proposed RoLBTS algorithm, the resource occupancy ratio of all the physical nodes and links in the entire cluster has less fluctuation compare to the FSACE algorithm.
The number of calculated possible scheduling solutions is analyzed in Figs. 6. Figure 5 is the mesh surface diagram of the possible solution number. Figure 6 is the contour projection of the possible solution number in Fig. 5. The number of physical servers and maximum parallelism both increase from 0 to 10. Based on the thinking of self-recursive calling, the possible scheduling solution is calculated. The number of calculated possible scheduling solutions is related to the number of physical servers and the maximum value of all the parallelism.

3D diagram.

Contour map.
Figure 5 is the 3D diagram of the possible scheduling solutions number. The number of calculated possible scheduling solutions has a dramatic growth with the increase of physical server number or maximum parallelism. This is the result of self-recursive calling, i.e., N M . When the physical nodes number is 10 and maximum parallelism is 10, the possible scheduling solutions number is even as high as 10 billion. The appropriate value of the physical servers number in a cluster and that of maximum parallelism are both deliberated.
When the physical server’s number is less than 7, or the maximum parallelism is less than 8, the number of possible scheduling solutions is not large enough to be distinguished. The contour map of possible scheduling solutions number is shown in Fig. 6, which is derived from Fig. 5. Compared with the physical server’s number, the maximum parallelism can bring a sharper increase in the possible scheduling solution number.
The usage ratio of physical servers and that of physical links are shown in Figs. 8, respectively. To highlight the differences between the RoLBTS algorithm and the FSACE algorithm, neither too large nor too small a workload is appropriate. If the workload is too large, all the resources of the physical servers and those of the physical links are occupied. If the workload is too small, only a few resources of the physical servers and of the physical links are occupied. Both two situations can not demonstrate the differences between the RoLBTS algorithm and the FSACE algorithm.

Physical server.

Physical link.
The three physical servers are denoted as physical server 1, physical server 2, and physical server 3. The three physical links that connected servers to the job manager are correspondingly denoted as physical link 1, physical link 2, and physical link 3. n
ocl
in the RoLBTS algorithm is 4. ρ in the FSACE algorithm is 0.5. The required data stream processing services sequence are set as
In Fig. 7, the usage ratios of the three physical servers for the RoLBTS algorithm are 16.07%, 16.07%, and 17.86%, while the usage ratios of the three physical servers for the FSACE algorithm are 30.36%, 7.14%, and 12.50%. The average physical server usage ratio of the RoLBTS algorithm and that of the FSACE algorithm are the same and equal to 16.67%. But the variance value of the RoLBTS algorithm’s physical server usage ratio (1.03) is much less than that of the FSACE algorithm’s physical server usage ratio (12.16). This means that the RoLBTS algorithm has a more evenly workload distribution upon physical servers, compared to the FSACE algorithm. It is the result of solution screening. The solution with the low resource occupancy ratio is preserved.
In Fig. 8, the usage ratios of the three physical links for the RoLBTS algorithm are 30.36%, 26.79%, and 23.21%, while the usage ratios of the three physical links for the FSACE algorithm are 0. The average physical link usage ratio of the RoLBTS algorithm is 26.79%, while that of the FSACE algorithm is 0. Besides, the variance value of the RoLBTS algorithm’s physical link usage ratio (3.57) is a little greater than that of the FSACE algorithm’s physical link usage ratio (0). Because the FSACE algorithm does not consider the coordinating message upon the physical links in its model. As for the usage ratios of three physical links for the FSACE algorithm, the first scheduling solution with the minimum variance is selected. When the workload is not large enough, all the task slots are easy to be distributed upon the same physical server, i.e., physical node 1. This leads to that all the solutions are mapped upon the physical server 1 for FSACE algorithm. The physical link holds no workload. There is no interaction information upon all the physical links for FSACE algorithm. Therefore, the usage ratio of FSACE algorithm, the average of FSACE algorithm, and the variance of FSACE algorithm are 0.
From the numerical simulation, compared to the FSACE algorithm, the RoLBTS algorithm has more evenly global load fairness at the cost of an acceptable physical link usage ratio.
A self-recursive calling-based RoLBTS algorithm has been represented to solve the load imbalance problem in the application of Flink. On the base of the resource occupancy ratio of the physical node and the physical link, the load distribution in the Flink was formulated as a quadratic programming model. Through numerical simulation, the superiority of our work was validated.
In our future research, the dynamic scalability of Flink, the container-based industrial Flink application, etc., will be paid attention to by us.
Footnotes
Acknowledgment
This work was supported by Science and Technology General Projects of Beijing Education Commission (Research on Optical and Wireless converged Access Network Networking Technology in Smart Traffic, No. KM202111417010), China Computer Federation (CCF) Opening Project of Information System (Research on Massive Event Flow oriented Stream Computing Framework, No. CCFIS2019-01-01). And thanks to Ms. Li.
