Abstract
The emergence of Big Data has had a profound impact on how data are analyzed. Open source distributed stream processing platforms have gained popularity for analyzing streaming Big Data as they provide low latency required for streaming Big Data applications using cluster resources. However, existing resource schedulers are still lacking the efficiency that Big Data analytical applications require. Recent works have already considered streaming Big Data characteristics to improve the efficiency of scheduling in the platforms. Nevertheless, they have not taken into account the specific attributes of analytical applications. This study, therefore, presents Bframework, an efficient resource scheduling framework used by streaming Big Data analysis applications based on cluster resources. Bframework proposes a query model using Directed Graphs (DGs) and introduces operator assignment and operator scheduling algorithms based on a novel partitioning algorithm. Bframework is highly adaptable to the fluctuation of streaming Big Data and the availability of cluster resources. Experiments with the benchmark and well-known real-world queries show that Bframework can significantly reduce the latency of streaming Big Data analysis queries up to about 65%.
Introduction
The emergence of data-intensive applications has rapidly increased the volume, variety and velocity of the generated data [14]. Efficient computing of the generated data during its lifecycle is beyond the capability of the current hardware and software technologies [11]. This represents a major challenge for many organizations and is known as the Big Data problem [13].
Streaming Big Data is related to the velocity dimension of Big Data and refers both to how fast data are generated and how fast they need to be analyzed [3]. Analysis of streaming Big Data is the last and most important stage of the streaming Big Data lifecycle in analytical applications [9]. The real-world examples of the applications can be analysis of networks traffic, healthcare data, financial transactions or web-clicks [11]. To develop such applications; high-level declarative query languages like SQL are usually preferred over coding them directly in algorithmic languages such as Java [12]. Therefore, most of the open-source distributed stream computing platforms (e.g., Storm,1
storm.apache.org.
https://twitter.github.io/heron/.
https://flink.apache.org.
Current open source Big Data stream computing platforms do not adequately address the problem of efficiently scheduling analytical queries. For example, Storm platform uses a round-robin strategy that is not aware of tasks or resources characteristics (i.e., task structure, data stream traffic of task) or the resource availability. Recent studies have proposed solutions to improve the efficiency of the platform schedulers by taking into account these aspects. However, most of them only consider one of these aspects or do not achieve satisfactory results.
Our work is motivated by the observation that the existing resource-aware, task-aware or traffic-aware scheduling strategies are inefficient and not suitable for the analytic queries. Current resource-aware works tackle the inefficiency of schedulers to reduce the latency of queries by considering the capability of the resources, but they generally ignore the resource load. Task-aware works reduce the latency of queries by taking into account the structure of queries, but they generally ignore the query runtime resource demands. In addition, traffic-aware works take into account the fluctuation of the data stream, but they do not generally consider all streaming Big Data characteristics.
We developed a scheduler with some lower tuple latency suitable for streaming Big Data analysis by considering the capacity and the load of the resources, the structure and the dynamic resource demand of queries, and the volume, velocity and variety of the input Big Data streams. Our approach is inspired by the idea that using the structure of streaming Big Data analytic query to partition the query operators and applying distinct scheduling strategy for each partition can reduce the average tuple latency of the query. More precisely, as query sample shown in Fig. 1, the weight of the outgoing edge of the operator
Key contributions of this study can be summarized as follows:
Formal definition of the Big Data stream and the Big Data stream analytic query. Systematic determination of the initial number of threads, processes and computing nodes needed for a streaming Big Data analytic query execution. The off-line initial assignment and scheduling of the streaming Big Data analytic query operators to the initial threads, processes and computing nodes using Bframework profiling and partitioning, operator assignment and Off-line Bscheduler algorithms. The online re-scheduling of the streaming Big Data analytic query operators to the new computing nodes using Bframework’s Online Bscheduler. Prototype implementation, simulation and performance evaluation of Bframework.
The remainder of this paper is organized as follows. Section 2 reviews the related works on Big Data and distributed stream processing schedulers. In Section 3, the definitions of Big Data stream and analytic query scheduling model in Big Data stream computing environment are presented. Section 3 presents the system architecture of Bframework. Section 4 focuses on the partitioning, assignment and scheduling algorithms of Bframework. Section 5 shows the experimental environment, parameter setup, and the experimental results of Bframework. Finally, conclusions and future works are given in Section 6.
This section presents two broad categories of related works: scheduling in distributed stream processing (DSP) open source platforms and the scheduling for stream Big Data queries or tasks.
DSP scheduling
In this section, Bframework as a DSP scheduler is compared with the recent state-of-the-art studies in terms of the proposed task, resource and scheduling models.
In [10], a stream processing system is designed and implemented based on Storm platform whose traffic-aware scheduler assigns and reassigns accepted tasks at runtime. Furthermore, the scheduler aims to minimize the inter-node and inter-process traffic loads and also monitors the overload of computing nodes. In comparison with Bframework, the scheduler is neither resource-aware nor topology-aware because its task model only considers the tasks as a set of given threads and models resources as a set of given processes. Furthermore, its scheduling model only takes into account the inter-node traffic optimization.
In [23], Storm platform is extended to operate in geographically distributed and highly variable environments. Authors present a distributed scheduler which is QoS-aware and operates at runtime. It takes into account the latency, resource utilization and resource availability as QoS attributes. In comparison with Bframework, its scheduling model aims to solve two placement problems, assignment of the task operators to virtual machine and assignment of the virtual machine to the physical machines where in comparison with our work, Bframework only considers physical machines. Furthermore its scheduler can be distributed among multiple worker nodes which the Bframework scheduler is not capable of. In addition, no special strategies are proposed by the authors while Bframework has proposed efficient strategies for thread, process and node assignment. And also, its task models considers tasks as a set of the operators while it is modelled as directed graph in Bframework.
In [21], a predictive scheduling framework is proposed which predicts the processing time of the application based on the structure of the topology. It also presents a scheduling algorithm which assigns threads to machines based on the prediction result. In comparison with Bframework, its task model considers tasks as a set of threads and processes; its resource model considers a set of machines, however their load and capacity is not considered. Similar to Bframework, its scheduling model models the scheduling as two mapping problems, assignment of the thread to process and process to computing node but there is no operator to thread assignment in their study. Compared with Bframework, it only aims to minimize the inter operator traffic load of a task.
In [25, 24], the placement problem for DSP is studied and a scheduler for Storm DSP is presented based on Integer Linear Programming. Similar to Bframework, they model tasks as a graph. Its resource model considers computing nodes with respect to QoS metrics like resource availability but the capacity of computing nodes is ignored. Its objectives model the two QoS metrics of resource availability and response time. It also takes into account the network delay. In comparison with Bframework, workload balancing and distribution is not considered.
In [20], authors aim to provision resources needed for data stream applications in presence of workload fluctuation. They use a control-theoretic method to find an optimal stream application configuration to address latency constraints. In comparison with Bframework, they mainly focus on the resource provisioning problem where the focus of Bframework is on both operator assignment and scheduling problems.
The Flink platform, includes a scheduling framework where process computing units are able to host a layer of accepted tasks and one or multiple layer of tasks can be scheduled in a computing node. As result, its scheduling framework is not QoS-aware. In [18], a scheduler for SPS is proposed using SDN controllers to place tasks to the underlying resources while taking into account the task QoS requirements. In comparison with Bframework, the resource model of s-flink is completely different from Bframework resource model. In [19], a new distributed system for managing stream processing application is developed to overcome the limitations of Storm. In this system, Aurora [6], a generic scheduler framework is used as resource scheduling. In comparison with Bframework, there are no specific strategies are applied.
In [5], a task optimization and task scheduling is proposed. The scheduling algorithm takes into account the topology and traffic of tasks and resources. Its task model takes into account task as Directed Acyclic Graph (DAG) and the scheduling algorithm assigns operator to thread and thread to computing nodes. Its objective is to minimize the inter node traffic. In comparison with Bframework, its task, resource and scheduling model has close assumption with Bframework, but it is not specifically designed for analytic queries and its solution has some limitations.
In summary, among the optimization strategies for improving the DSP scheduling, most studies consider the topology structure, the inter-node traffic or computing node load aspects of the scheduling’s improvement. However, our proposed scheduling algorithm could improve DSP scheduling in all of the mentioned aspects, which jointly considers the topology structure, inter-node traffic and workload of computing nodes.
Streaming Big Data scheduling
In this section, Bframework as a streaming Big Data scheduler is compared with the recent state-of-the-art studies in terms of the proposed task, resource and scheduling models.
In [11] a real-time scheduling framework is proposed which aims to meet the low response time and high energy efficiency of the query. Similar to Bframework, it models tasks as graph which encompasses operators and their inputs and outputs but the variety dimension of streaming Big Data is not considered. Similar to the Bframework, it models resources by considering resource capacity and the aim of its scheduling model is to minimize the response time and maximize the energy efficiency of the computing nodes. However, workload balancing and distribution is not considered.
In [8] a fault tolerance framework is presented which allocates tasks to fault tolerant computing nodes before the execution of tasks and also reassigns the tasks to the computing node with lower response time. In comparison with Bframework, its task model considers task as graph but the 3Vs model of streaming Big Data is not considered completely. Its resource model is similar to Bframework and model the capacity of the resources. Its scheduling model aims to minimize the response time and maximize the system reliability where in comparison with Bframework the workload distribution and balancing is not considered.
In [15], authors predict the data characteristics (5Vs) of streaming Big Data and expressed in CoBA values which is used to determine the required computing nodes of the tasks. In comparison with Bframework, It just considers the streaming Big Data and there is no model for task and resources.
In [7], a stable scheduling is presented which allocates tasks to the stable computing nodes before the execution of tasks and reassigns the task to the computing node with lower response time. Similar to Bframework, it models tasks as graph which encompasses operators and their input and outputs but the variety dimension of Big Data is not considered. Similar to Bframework, It models resources with capacity. Its scheduling model aim to minimize the response time and maximize the stability of the computing nodes. However, the workload balancing and distribution is not considered.
In [16], data characteristics of streaming Big Data based on the 4Vs model is estimated as a value named Characteristics of Data (CoD). Furthermore Self-Organizing Maps (SOM) are used to allocate cloud resources to streaming Big Data using its estimated CoD. In comparison with Bframework, this work only considers the streaming Big Data, the streaming Big Data queries are not considered.
In [17], authors derived effective optimization rules which are used to reconfigure the structure of an accepted task at runtime using real-time performance statistics. In comparison with Bframework, the result of this work can be used as an optimized input query to Bframework.
In summary, the current optimization strategies for improving the scheduling algorithm of streaming Big Data only consider some aspects of the Big Data characteristics. However, our proposed framework proposes scheduling algorithms which not only considers all of the mentioned aspects but it is also suited for the analytical queries of streaming Big Data.
Problem statement
In the context of stream Big Data computing, a set of streaming Big Data analytic queries is accepted by a stream Big Data computing platform and one or multiple streams of Big Data flow through the accepted queries. The platform’s scheduler handles the efficient assignment and scheduling of the query operators over the underlying cluster resources. The definitions below incorporate these factors into the scheduling decisions.
Big Data stream
In Bframework, a stream is a sequence of data sets
Streaming Big Data analysis query
In Bframework, a Big Data stream analytic query can be defined as a directed graph with a set of operator instances which analyze one or multiple input Big Data streams
The operator instance
The directed edge
An example of a

In Bframework, the scheduler accepts a set of independent Big Data stream analytic queries, places, and allocates all operators of the accepted queries to the underlying cluster resources to achieve the lowest queries latency and to meet the cluster resource capability constraints. The following model reflects these factors.
Let
The schedule matrix
In Bframework, the schedule model used by its scheduler is defined by Definition 6.
where
where
The goal of Bframework is to minimize the average tuple latency of the queries which are accepted by Bframework. To achieve this goal, the idea is to use more than one scheduling strategy for
To support the idea, the objectives of Bframework assignment and scheduling algorithms are established based on the characteristics of the identified partitions. The objective of Bframework operator assignment, Off-line Bscheduler and Online Bscheduler algorithms are to minimize the inter operator traffic loads of the first partition operators and distribute and balance the workloads of the second partition operators. These algorithms applies their own assignment and scheduling strategies to achieve the objective of each partition.
To implement the needed strategies of each partition, the architecture of Bframework is designed which encompasses five modules and operates within an open source stream Big Data computing platform at two phases. Details of the Bframework architecture, phases and algorithms are explained in the following sections.
Bframework architecture
The architecture of Bframework includes five modules and the workflow of modules operates at two phases. In the first phase, prior to the execution of its accepted queries, the profiler and partitioner, the operator assigner and the off-line scheduler modules operate. In the second phase, during the execution of queries, the on-line rescheduler and the monitor modules work periodically. The overall view of the architecture and its workflow are presented in Fig. 2.
Architecture of Bframework.
Prior to the execution of each query
During the execution of each query
This section presents the detail of the algorithms used in Bframework. First, it is describes the query profiling and partitioning algorithm, which is used to partition the query
Query profiling and partitioning algorithm
The query profiling and partitioning algorithm profiles and partitions the operators of the query
In Eq. (4)
The algorithm operates in three main steps:
Lines 1–4: Calculation of the outBfsize metric of the source and end operators located in the first layer of Lines 5–20: Calculation of the outBfsize value of all Lines 21–24: Annotation of the corresponding partition of all operators in the remaining layers. The algorithm sets the partition attribute of an operator with its corresponding operator in the first layer.
In order to profile the segments of
The operator assignment algorithm aims to efficiently assign the operator of each identified partition to a set of initial thread computing units and also assigns the identified thread to a set of initial process computing units prior to the execution of the operators. This algorithm uses three different assignment strategies to provide an efficient operator assignment for each partition.
The assignment strategy for the first partition is to minimize the inter-operator traffic load through minimizing the inter-thread and the inter-process traffic loads, because the inter-operator traffic load of the
The assignment strategy for the second partition is to distribute and balance the workload of the
Simple selectivity criteria
Simple selectivity criteria
The assignment strategy for the third partition is to simply use a thread and a process computing units and assigns all the
To implement the assignment strategy for the first partition, the algorithm initially assigns all operators of
To implement the assignment strategy for the second partition, the algorithm traverses the
To implement the assignment strategy for the third partition, all the operators of this partition are simply assigned to a thread (Lines 29–30) and process computing units (Lines 58–59).
The off-line Bscheduler aims to assign the identified processes to a set of available computing nodes prior to the execution of a query
For the first partition, the objectives of this algorithm is to reduce the inter-node traffic load of the
For the second partition, the algorithm aim to balance the workload of
For the third partition, the scheduling strategy is to assign all
Online Bscheduler
The online Bscheduler aims to reduce the
To assign the operators of the third partition, the algorithm selects one of the two sets of the strategies used for the first or second partitions. To select the appropriate set of strategies, the algorithm compares the average of the outBfsize of each thread in the third partition with the average outBfsize of both first partition threads and second one. The algorithm selects the strategies of partition whose distance with the third partition is smallest (Lines 1–12).
To react to the overloaded computing nodes, two strategies are used by the algorithm. If the overloaded node hosts
To react to the
Furthermore to react to
Bframework algorithms complexity analysis
The main cost of the profiling and partitioning algorithm comes from the sorting and traversing
For Off-line Bscheduler, the main step is to assign the identified process computing units to the sorted available computing nodes. Thus, the time complexity of sorting computing nodes and the traversing processes computing units is
Evaluation
This section describes the experiments that have been performed to validate and analyze the performance of Bframework. First, the query profiling and partitioning algorithm of Bframework is validated by comparing the identified partitions of the real-world and benchmark queries prior to their executions with the identified partitions after execution of the queries. Then, Bframework performance is assessed by analyzing the tuple latency of the benchmark and real-worlds queries in presence of different
Characteristics of BigBench queries [1]
Characteristics of BigBench queries [1]
The benchmark queries used in the experiments have been applied from the BigBench [14] and Linear Road Benchmark [1] benchmarks. BigBench includes the complex queries which cover the declarative and procedural and also the structured, semi-structured and unstructured aspects of the Big Data analytic queries. In other words, BigBench queries includes SQL-like and the user-defined operators and also, the structured, semi-structured and the unstructured data types. Table 2 describes the characteristics of the BigBench queries.
Linear Road Benchmark queries encompasses 17 simple stream queries whose operators are SQL-like and their data types are structured. Linear Road Benchmark has been used as benchmark for the evaluation of the distributed stream management systems. The WordCount and Yahoo page load processing are two real-world queries [19] which are usually used as test queries in the literature.
Environment
Table 3 describes the Storm cluster specifications used in the experiment to run the queries. The coordinating computing nodes #1 and #2 run zookeeper and nimbus and the worker computing nodes #3–15 execute the queries. The software stack used in the experiments is Ubuntu 14.04, Jdk 1.8, Apache Storm 0.9.5 [4] and MySQL 5.5.
Storm cluster specifications
Storm cluster specifications
All the benchmark and real-world queries are implemented using Trident API [22] of Apache Storm and Table 4 describes the parameters used in Bframework to execute the queries.
Parameters used in Bframework
Parameters used in Bframework
The range of values for
In order to validate Bframework, we validate the functionality of the query profiling and partitioning algorithm under the simple and complex benchmark queries and also under the real-world queries. The validation is conducted through comparing the identified partitions prior to execution of the queries with the identified partitions after their execution. In order to compare the identified partitions, the ratio between the average outBfsize of the first partition and the average outBfsize of the second one is calculated and shown in the R column of the result tables. Furthermore the number of identified partitions at the first phase is compared with the second one and is reported in the P column of the following tables.
Result of partitioning Linear Road Benchmark queries at first phase
Result of partitioning Linear Road Benchmark queries at first phase
R: outBfsize rate between first and second partitions; P: Number of identified partitions.
Result of partitioning Linear Road Benchmark queries at second phase
R: outBfsize rate between first and second partitions; P: Number of identified partitions.
Tables 5 and 6 show the result of partitioning Linear Road Benchmark queries before and after execution of the queries respectively. To execute these experiments, the configuration parameters
The Table 5 demonstrates that there is no third partition prior to execution of Linear Road Benchmark queries. This is because all
Result of partitioning BigBench queries at first phase
Result of partitioning BigBench queries at first phase
R: outBfsize rate between first and second partitions; P: Number of identified partitions.
Result of partitioning BigBench queries at second phase
R: outBfsize rate between first and second partitions; P: Number of identified partitions.
Tables 7 and 8 show the result of partitioning BigBench queries before and after execution of the queries respectively. To execute these experiments, the configuration parameters
As shown in Table 7, for queries like Q10, the algorithm is just identified one partition with rate
Real-world queries
The result of the partitioning of the WordCount and Yahoo page load processing queries at second phase is plotted in Fig. 3. The needed configuration parameters
Result of two real-world queries partitioning at second phase.
As shown in Fig. 3, the rate of the Yahoo page load processing query is higher than the WordCount query because the former structure is more complex than the latter structure. Furthermore, as, there is no schema available for these queries, the algorithm has detected
In order to evaluate Bframework performance, we have conducted two test cases. First, we compare the average tuple latency achieved by Storm default scheduler with Bframework results under the simple and complex benchmark queries. Then, the average tuple latency achieved by a state-of-the-art scheduler [18] is compared with Bframework results under multiple real-worlds queries. All these experiments have been performed in the presence of fixed and fluctuating
Average tuple latency of the query Q16 of Linear Road Benchmark.
Figures 4 and 5 show the average tuple latency of the two sample Linear Road Benchmark queries during their 15 minutes execution. Furthermore, Table 9 demonstrates that the average tuple latency reduction of all Linear Road Benchmark queries. These experiments are performed by the configuration parameter
Average tuple latency improvement under Linear Road Benchmark queries (Bframework compared with Storm default scheduler)
Average tuple latency improvement under Linear Road Benchmark queries (Bframework compared with Storm default scheduler)
Q: query, Fx: fixed rate, Fl: fluctuated rate.
Average tuple latency improvement under BigBench queries (Bframework compared with Storm default scheduler)
Q: query, Fx: fixed rate, Fl: fluctuated rate.
Average tuple latency of the query Q8 of Linear Road Benchmark.
As shown in Figs 4 and 5, in presence of
From Table 9, it is obvious to see that the Linear Road Benchmark queries have about low 14.3% average tuple latency improvement. The causes that might account this result are that the structure of these queries are generally linear, their number of operators are small and there is no inter-layer dependency. Thus, Bframework cannot apply its optimizations for these queries completely.
Figures 6–8 show the average tuple latency of the three sample BigBench queries during their 15 minutes execution. The average tuple latency reduction of all BigBench queries is presented in the Table 10. These experiments are performed by the configuration parameter
Average tuple latency of the query Q18 of BigBench.
Average tuple latency of the query Q1 of BigBench.
Average tuple latency of the query Q28 of BigBench.
As shown in Figs 6–8, in presence of
As shown in Table 10, the achieved improvement for the BigBench queries is vary from 3.7% to 65.7% which might be caused by three main reasons. First, the structure of these queries encompasses the different styles (i.e., simple linear style or more complex star and diamond patterns or even a mix of them). For simple linear queries like Q2 the most Bframework optimizations are not applicable while Bframework optimizations can be completely applied to complex queries like Q13. Second, the number of query operators is significantly different. For some queries like Q19, there is more than 30 operators per layer which lead to the better performance improvement achieved by Bframework in the second phase. Third, the variance tuple size
In the experiment, three BigBench queries including linear, star, diamond styles and two real-world queries including WordCount and Yahoo page load processing accepted simultaneously. The experiment is performed by the configuration parameter
Average tuple latency improvement under real world multiple queries (Bframework compared with state-of-the-art scheduler).
As shown in Fig. 9, Bframework outperforms the state-of-the-art scheduler in tuple latency reduction up to 31%. The improvement is related to the difference between Bframework strategies with state-of-the-art scheduler for scheduling multiple accepted queries. In other words, for two accepted queries
According to the conducted experiments and their analysis, we can conclude that Bframework is able to correctly partition the streaming Big Data analytic queries based on its input Big Data stream. Experimental results show that Bframework is able to correctly partition simple and complex before the execution of queries and can also be used in real world streaming Big Data analysis environment.
Furthermore, Bframework outperforms the Storm default scheduler in the average tuple latency reduction under the simple and complex streaming Big Data analytic queries up to 29.6% and 65.7% respectively. Experimental result have shown that Bframework is able to reduce the average tuple latency of simple queries mainly through the appropriate assignment of computing nodes. For complex queries, Bframework can also be used to efficiently reduce the average tuple latency through the efficient scheduling and rescheduling of needed threads, processes and computing nodes.
In addition, compared to the state-of-the-art scheduler, Bframework is able to reduce the average tuple latency of multiple accepted real world queries up to 31%. Bframework is able to efficiently schedule simultaneous accepted queries through isolated assignment of the query operators to the specific computing nodes.
Finally, Bframework performs best in complex fluctuating streaming Big Data analytic queries. Experimental results prove that most Bframework optimization techniques are completely utilized by queries with complex structures and fluctuating input Big Data streams.
Conclusion
The demand for streaming Big Data analysis applications is increasing. As the demand is growing, the efficient execution of the applications using suitable platforms become more important. However, current platform schedulers are not suitable for these applications in an effective manner. This study presents Bframework, a framework designed to efficiently schedule the accepted streaming Big Data analytic queries on a resource cluster. The five modules of Bframework implement the proposed operators profiling and partitioning, operator assignment, Off-line Bscheduler and Online Bscheduler algorithms and operate prior to and during the execution of accepted queries. Bframework is developed based on the idea of using more than one scheduling strategy for streaming Big Data analytic queries. Bframework partitions a query based on its Big Data stream characteristics and applies a set of efficient scheduling and rescheduling strategies for each partition prior to and during the execution of query.
Experimental results show that, compared to Storm’s default scheduling, Bframework is able to efficiently reduce tuple latency of streaming Big Data analytic queries up to about 65%. Furthermore, Bframework outperforms the most recent scheduling study under real-world queries in tuple latency reduction by up to 31%.
In the future work, it is planned to extend Bframework on public cloud resources. An efficient resource provisioning mechanism will be considered in our following work. Moreover, experiments that executes Health Big data queries will also be included.
