Abstract
In Apache Spark cloud computing environment, computation capability of each node varies, together with the data size and uncertainties in application execution, these result in the differences in task execution time of a job. In order to enhance the accuracy in load execution time prediction, and reasonably guide the user to apply for Spark cluster resources, this paper researches on the execution flow of Spark job, collects the load time consumption index, and puts forward the time index fusion calculation scheme. And then, this paper researches on the multiple linear regression model and support vector machine model to explore the payload execution time and CPU Core, inputting data volume, memory size and other performance indicators. Based on the two models above, this paper proposes a Standard Regression Coefficient-based Weighted Support Vector Regression time prediction model (SRC-WSVR). Finally, through comparing the results from the prediction model proposed in this paper with conventional regression prediction model and Standard Support Vector Machine model, it proves that SRC-WSVR has a higher prediction accuracy, which can provide valid data reference for predicting Spark resource consumption.
Keywords
Introduction
Apache Spark [26,31] is an open-source cluster computing system. Spark is a fast, in-memory data processing engine with development APIs to allow data workers to execute streaming, machine learning or SQL. Spark introduces the concept of Resilient Distributed Dataset (RDD) which is particularly suitable for iterative applications such as Iterative Machine Learning and Graph Algorithms. At the same time, this type of cluster computing system with large CPU resources is also suitable for non-iterative applications. Executing Spark systems on the cloud has been a flexible choice of some enterprises and scientific research organisations. Users have to request and allocate the optimised amount of clusters resources to fulfil the application requirements, therefore, prediction models [2,27], which can help users to estimate appropriate number of clusters resources, have to be established for reducing the resources renting cost.
The features of Big Data are as follows: Volume (Amount of Data), Velocity (Speed of Data in and out), Variety (Range of Data Types and Sources) and Value (Usefulness of Data). The rise of BigData has spawned data science, which can be understood as predictive analysis and data mining, but at the same time, due to the subjective data and error analysis, it can easily lead to data dictatorship, which is terrible and should be forbidden.
This work builds the model based on Spark data analysis history information, for predicting execution time required for non-iterative application WordCount and iterative application PageRank. By considering the fact that execution time of specific operations on Apache Spark will be affected by various factors [1,17] such as data size and type of inputs, the design and implementation of algorithm [4], and computing power (e.g. number of nodes, number of execution CPU cores, available of memory size), the model is built based on Standard Regression Coefficient [13,15], so to contrast the weight of each impact factor, then obtaining prediction result with the aid of Weighted Support Vector Regression [7,8] Machine Model. This model makes use of input data size, and the number of CPU cores allocated, to predict the execution time of Spark processes.
Related works
Performance prediction of Spark is the basis for the optimisation of allocating cluster resources [12,18,30]. There have been works proposing performance prediction models for currently popular cloud computing platforms. Kavulya et al. [10] used behaviour analysis of Hadoop users and K-Means algorithm to measure the similarities of jobs, then to predict Hadoop process’s execution time. However, this approach required a large amount of historical data and the result accuracy is unstable. Ganapathi et al. [6,20] used machine learning algorithm to analyses the performance of the loaded historical operation information prediction system. They extracted job’s historical operation information and proposed a resources scheduling model, to ensure jobs could finish in a specific time. Article [23] worked on the issues of execution time prediction on Network-Intensive Iterative Algorithm on MapReduce. However, it mainly focussed on the iterative algorithm, which requires representable tuning data in order to achieve high prediction accuracy. Article [32] attempted to base on distributed computing jobs [24] on heterogeneous machines and to predict job completion time based on boundary-based performance modelling. Its aim is to evaluate the upper and lower limits of task completion time to predict job performance. Article [3,19,21] proposed a Self-Adjusting Framework for predicting job’s working performance under different settings. Despite having a satisfactory prediction accuracy, its differences with Spark platform made it not suitable for predicting the performance of Spark platform. This work’s findings may not be adequate to apply on Spark as Spark has a different implementation mechanism than Hadoop, but these works results are valuable references on Spark performance prediction. Wang et al. [28] proposed a Spark performance evaluation scheme based on theoretical analysis. It sampled operational results of small data sets and to predict the performance of big data sets using theoretical derivation approach, and this gave a satisfactory accuracy. Article [5,14] evaluated the performance of a cloud workstation from the perspective of the mathematical analysis model, using impact benchmarks such as CPU, internal memory and network bandwidth to conduct an integrated evaluation of the system impact caused by different parameters in an application, with the use of a Comprehensive Fuzzy Evaluation Model. The rigorous mathematical theories in this work have a strong reference, but the work lacks the descriptions on the impact relationship between parameters and system performance.
In conclusion, this work builds on results from previous works, to integrate time consumption information benchmarks [11,22] by tracing the task execution flow in Spark, then introduce the calculation methods for job execution time boundaries, to put forward a Standard Regression Coefficient-based Weighted Support Vector Regression (SRC-WSVR) time prediction model. It first performs screening calculations of standard regression coefficient for related benchmarks, and those coefficients are used as impact weight and added to training set data parameters. A non-iterative application, WordCount and an iterative application, PageRank, are used for job execution time prediction, to verify if the results lie within the boundaries set.
Calculate metrics
Execution flow
In Spark platform, all application executed in serial and by stages. Data sets input are divided into clusters and scattered on nodes to fulfil the parallel processing model [16,29]. The diagram in Fig. 1 illustrates the serial process execution.

Serial Process Execution Diagram.
In the diagram, each box represents a stage, and execution of the job is divided into 3 stages. The first stage is to read the input file “textFile” from the HDFS [25], it then triggers “flatMap” operand to extract each word from each row, and then trigger sthe “Map” operand to form the key-value pair of the form (word, 1). The second stage is to make use of the “reduceByKey” operand to summarise each word’s value, then triggers the “sortBy” operand to sort the statistics results of WordCount. The last stage is mainly saving the result by the “saveAsTextFile” operand.
Hence, from the analysis of Spark operation process, it is found that sub-stages are executed one after the other like a streaming line. Detailed time consumption information for each sub-stages can be obtained from historical execution information, as illustrated in Table 1.
Consuming Time Information
Based on the time benchmark information above, this work proposes the integration of time benchmark information in each sub-stages, forming the boundary calculation method of job execution time from the perspective of job execution process.
Execution time
By breaking down the Spark job execution is divided into multiple stages, with each stage containing multiple tasks, this can be represented by formula (1) and (2):
Since different stages in a job are executed in sequence order, the job’s execution time can be represented as the summation of the execution time of stages, start time and clean time, represented by formula (3):
Considering jobs are processed in a parallel manner between nodes, each stage will have a CPU core executing one task at a time. Hence, in a Spark cluster with H nodes, the number of tasks P that can be processed in a parallel manner can be calculated by formula (4):
From formula (3), each stage will have specific task time. Based on that time, it can be divided into the maximum execution time of all serial tasks, startup time and clean up time, as represented by formula (5):
DeserialisationTime is the time required for deserializing input data, SerialisationTime is the time required to serialise the results, RunTime is the actual time used for the data operation. These time information can be obtained through Spark UI job dashboard.
In short, the Spark job execution time evaluation formula can be represented by formula (7):
Evaluation of execution time boundaries
The dynamics of virtualisation of resources inevitably have a certain impact on the result of prediction. Therefore, an evaluation of execution time boundaries is proposed in this paper, which divides the effective upper and lower bounds of the prediction time, and verifies the prediction results of the SRC-WSVR model.
Based on the job execution time evaluation formula, it is easily seen that the execution time of task in each stage influences the job execution time the most. Let a stage
In an ideal situation, n tasks in a stage are allocated evenly onto the K cores, with each task executing for the same amount of time. The execution time of such stage can be represented by formula (8):
However, in the actual jobs loading operation, each node is using different types of CPU, internal memory, and hard drives that may have differences performance. Task distributed on each CPU core execution time may different. Cluster scheduler will prioritise assigning tasks to idled CPU cores, and in the worst scenario it could be the longest execution time task will be the last execution task. By the greedy method, the time elapsed before executing the last task is
The upper bound and lower bound are to estimate the longest and shortest execution time of a job in a stage, respectively. Hence, the lower bound of the Spark job total execution time is:
In summary, from the produced time-consuming information in the task operation runtime listed in Table 1, the execution time of a Spark job, upper bound and lower bound of job’s total execution time can be calculated using formulae (7), (11) and (12), respectively. From formula (4), we can observe that the execution time of a job is affected by several factors, such as the number of CPU used in the execution, input data size, and the allocated internal memory. Therefore, in part 4, a regression model is used to compute a Standard Regression Coefficient as the impact weight using job execution time-related indices, the weighted samples are then passed onto the Weighted Support Vector Regression Machine Prediction Model to analyse the predicted operation time in processing big data cluster jobs. In part 5, the accuracy of the predicted results will be compared with Multi-factor Line Regression (MLR), and Support Vector Regression machine (SVR) models.
Weighted support vector regression machine predictive model based on standard regression coefficient
In Standard Support Vector Regression Machine, given a training sample
On the other hand, as stated in formula (15), the Standard Regression Support Vector Machine method will have the same C and ε values for all samples, meaning that the punishment applied for deviation in accuracy is the same for all sample data. But in actual applications, the dependent and independent variables of the forecasting function have a weighted relationship, a different error requirement parameter
Calculation of the weight of sample parameters
The linear relationship sample constructed from dependent variable Y and independent variables
Let
Weighted support vector regression machine based on standard regression coefficient
Putting formula (22) into formula (15) and add weight according to the importance weight of the sample parameters, the optimisation problem is:
In summary, this calculation process enables prediction to job execution time by the Standard Regression Coefficient-based Weighted Support Vector Regression Machine. The use of estimated Mean Absolute Error (MAE), estimated Mean Square Error (MSW) and goodness of fit determination coefficient
Evaluation benchmark and corresponding formula
Evaluation benchmark and corresponding formula
Physical configurations table for nodes
Descriptions
To evaluate the prediction accuracy of the model proposed in this work, cluster execution experiments were used in simulations, with one of the nodes being the main node, and the two other being the task nodes. Physical configurations of each node are illustrated in Table 3.
In the evaluation process, this work used the Hadoop Distributed File System (HDSF) with default allocated memory size is 64 MB for the input source and output of data. The Apache Spark run in Standalone mode, for the performance data were collected from the UI dashboard of the Apache Spark platform and performance benchmark acquisition port provided by the Spark Metrics, the results are then exported and stored on a local hard drive in CSV format. Through analysing job performance monitoring information, StageTime could be calculated using formula (3), while TaskTime can be calculated using formula (5). At the same time, values such as the startup time, cleanup time and stage initialization time, stage cleanup time and task serialisation time could be obtained from monitoring documents, and to calculate the actual JobTime using formula (7).
One of the applications used in this work for verifying the prediction model is WordCount, a non-iterative text processing tool [9] used for word frequency statistics analysis. This is a type of loading which identifies keywords of web pages for easy categorization and processing. Another application used is PageRank, an iterative graph algorithm which gives a mark to every web pages and to calculate the importance of each of them iteratively by understanding the relationship of the links (URLs).
The detailed experiment procedures are: 1. Choice of data type and the amount of data: As tuning data, data source input to WordCount is from Wikipedia entries and output store to HDFS. PageRank used Google Web graphics, this data set has a maximum of 875713 nodes, 5105039 edges, forming a non-structured graph data set. 2. Calculation of sample parameter Weight: Standard Weight coefficient: To the size of data set is gradually growing, jobs could run on multiple CPU cores with different size of memory. The standard weight coefficient of the application’s execution time effect by the number of cores and data size is calculated using regression algorithm. 3. Prediction by Weighted Support Vector Regression Machine: Adding weight obtained in step 2 to sample-related parameters for predicting job time, and to prove if it within the job operation time range. The accuracy of the prediction result is compared with traditional prediction model to verify the model’s accuracy. Configurations of data used in experiments are listed in Table 4.
Experiment Data Information
Experiment Data Information
By performing regression analysis on performance data using stepwise approach, the summary table and regression variation analysis table can be obtained.
Regression Variance Analysis Table
Regression Variance Analysis Table
From Table 5, it is observed that the response level of the linear equation to actual data (adjusted
From Table 6, it is observed that for the non-iterative application like WordCount, the benchmark “cores” account for the heaviest weight, meaning that the more CPU core is used, the shorter the job execution time and higher efficiency.
Related benchmark weight for WordCount
Related benchmark weight for PageRank

WordCount prediction result.

PageRank prediction result.
From Table 7, it is observed that for the iterative application like PageRank, benchmark “memory” accounts for the heaviest weight, meaning that the more the amount of internal memory allocated, the shorter the job execution time and higher efficiency.
Putting prediction samples confirmed by weight parameter into the model, WordCount used prediction samples with sizes 2 G, 6 G and 10 G; PageRank used prediction samples with sizes 3 G, 8 G and 12G. The number of cores used was 12 and the internal memory was 30G. Results are shown in Figs 2 and 3.
Related benchmark weight for WordCount
The experiment showed that the predicted execution time for both types of the job is within the prediction range. Results from repetitive experiment showed that the predicted job execution time has an error within 10% when compared with the mean value.
By comparing the results from the model proposed in this work with the evaluation benchmarks of conventional regression prediction model and standard support vector regression machine model, as from the R2 values listed in Table 8, the model proposed in this word has a higher fitting degree.
In order to use the rental service efficiently in the cloud environment, users need to apply for optimal cluster resources to reduce rental costs. Therefore, the Standard Regression Coefficient-Weighted Support Vector Regression Machine prediction model proposed in this work enables the prediction of execution time in processing large data sets of a specific type of job, through analysing the tuning data sets. Compared with the traditional prediction model, it also has a higher accuracy which could provide an estimation of resources overheads requirement to users in the production environment.
Footnotes
Acknowledgements
The subject is sponsored by the National Natural Science Foundation of P.R. China (No. 61572260, No. 61572261, No. 61672296, No. 61602261), the Natural Science Foundation of Jiangsu Province (No. BK20140886, No. BK20140888, No. BK20160089), Scientific & Technological Support Project of Jiangsu Province (No. BE2015702, No. BE2016777, BE2016185), China Postdoctoral Science Foundation (No. 2014M551636, No. 2014M561696), Jiangsu Planned Projects for Postdoctoral Research Funds (No. 1302090B, No. 1401005B), Jiangsu High Technology Research Key Laboratory for Wireless Sensor Networks Foundation (No. WSNLBZY201508), Research Innovation Program for College Graduates of Jiangsu Province (SJZZ16-0148), Postgraduate Research & Practice Innovation Program of Jiangsu Province (SJZZ16-0148).
