Abstract
To improve the processing efficiency on batch query for MapReduce, a multiple query optimization approach based on Hive+ is proposed to reduce the number of MapReduce tasks on multiple query, decrease the start time of MapReduce task and the overhead of fault tolerance, improve the query efficiency. TPC-H benchmark test set is selected as the use cases to experiment on Hive-0.12. The experiment shows that the processing efficiency of batch query is effectively improved.
Introduction
Nowadays terabytes or petabytes of data pour into our computer networks, the World Wide Web, and various data storage devices every day from society, business, science and engineering, and almost every other aspect of daily life. This explosive growth of available data volume is a result of the computerization of our society and the rapid development of information technology. The issue of mining the valuable information from the tremendous amounts of raw data has aroused great concerns of the whole industry and researchers. Big data query technology allows aggregated and summarized operations in different granularities according to the different needs to support multi-dimensional analysis. The widespread big data query systems including Hive, Impala, Shark and Pig and so on [1–4], where Hive has the most extensive user groups.
The basic principle of Hive is as follows. SQL statements received from CLI or web UI interface are optimized by the compiler for the first time, then MapReduce tasks and physical execution plans that are represented by the several directed acyclic graphs are automatically generated, afterwards, the tasks are executed and the final implementation results are returned to the end user [5, 6]. Hive provides several fault tolerance mechanisms to deal with exceptions in order to improve the size of the processed data, system stability and fault-tolerant performance. However, the start time and fault tolerance overhead of Hive query execution system are not satisfactory compared with other similar distributed processing frameworks such as Spark [7]. Furthermore, the read-write overhead and network transmission overhead have gradually become two important factors affecting Hive query performance with the growing amount of query data.
Much of the researches [8–13] in Hive performance optimization have examined from query data storage format, query plan overwriting, task decomposing strategy and physical execution algorithm. Based on the previous works, this paper aims at reducing the number of MapReduce tasks which are transformed from SQL statements, accordingly reducing I/O overhead and network transmission overhead to optimize query performance.
Query patterns are often repeated in batch query and the downward trend in the size of query results is emerged when input data are gradually increased. In order to reduce the cost of batch query and improve query efficiency, this paper explored the data sharing and task sharing between batch queries to find the common subquery. The number of the generated Mapreduce tasks, read-write overhead and network transmission overhead can be effectively reduced.
Multiple query optimization
Data warehouse query system –Hive
Hive is data warehouse infrastructure built on top of Hadoop for providing data summarization, query, and analysis of large datasets, which is considered the defacto standard for interactive SQL queries over petabytes of data in Hadoop since its incubation in 2008. Hadoop was built to organize and store massive amounts of data of all shapes and formats. Because of Hadoop’s “schema on read” architecture, a Hadoop cluster is a perfect reservoir of heterogeneous data-structured and unstructured-from a multitude of sources. Data analysts use Hive to query, summarize, explore and analyze data, then turn it into actionable business insight [14–16].
The execution flow of single query in Hive is shown in Fig. 1.

Execution flow chart of single query in Hive.
Hive consists mainly of user interface, driver, metadata storage, compiler, optimization module and execution module. The commands are received in the CliDriver module from user interface and passed over compiler for the first time, and then lexical analysis is further carried out and SQL statements are converted into abstract syntax tree. Afterward, query block and physical query plan are generated and optimized on the basis of grammatical analysis. In the end the query plan is carried out by the execution module.
For the sake of multiple query analysis and optimization, this paper extended the execution processing. The proposed framework is called Hive+.
Receiving, parsing and execution mechanisms of Hive+ are modified so as to be able to handle multiple queries at the same time. Execution semantics of the multiple queries must follow the corresponding semantics to an ordered single query sequence, and consequently the data flow dependencies between queries need to be analyzed and the dependencies should be maintained in the subsequent optimization and execution phase. Reducing the number of MapReduce tasks generated from query statements is an effective way to improve the efficiency of batch query. Therefore, the common subquery identification and optimization should be concerned in multiple query framework design.
The following works need to be done when building the proposed Hive+ framework. Adding a module to control the amount of read query statements in the receiving module CliDriver. Modifying the driver module to achieve the transmission of increased information between modules and call the extended compiler and execution module. Modifying the compiler to convert multiple query statements into AST (Abstract Syntax Tree), QB (Query Block) and logic OPTree (Operation Tree) sequentially. Adding a multiple query optimization based on a single query optimization module to identify and merge the common subquery and to divide and generate recognizable query tasks in order to analyze the data flow dependencies between queries and find out the common active data. Modifying the execution module to achieve batch query.
Hive+ framework is shown in Fig. 2. The improved version revised the query receiver module and added the inter-query flow analysis, common subquery identification and optimization module. The query statements are firstly received and saved according to the control module in batch query, and then they are dealt with lexical analysis, AST conversion and parsing one by one and converted ultimately into logic operation trees. Inter-query flow analysis, query similarity analysis and multiple query optimization are all executed in operation tree.

Execution flow chart of multiple query in Hive+.
The execution flow of multiple query in Hive+ is as follows. Multiple HQL statements are received from user interface at the same time according to the control module setting. Received queries are consecutively numbered. Information transmission with compiler is responsible for the multiple query driver module. Each query is processed sequentially lexical analysis, AST conversion and parsing, and then logic operation trees in the form of forest are generated ultimately and dependencies are analyzed. Common subquery identification and optimization. Common subtrees between queries are analyzed according to the dependencies obtained in step 2 and logic operation trees that have common subtree are merged. Multiple query optimization and execution. According to Hive optimization strategies and task division logic, the logic trees obtained in step 3 are optimized and converted into a number of MapReduce tasks, and finally the tasks are sent to the extended execution module.
Module division
In accordance with the principle of high cohesion and low coupling, Hive+ is divided into multiple query driver module, flow analysis module, common subquery identification and common subquery optimization module. Data flow between modules is shown in Fig. 3, where the light mark is the original function of Hive.

Data Flow between modules in Hive+.
Multiple query driver module is responsible for receiving the multiple query statements, which calls the driver, metadata storage, compiler, optimization and other modules. Logic query plan is optimized, restructured and executed and the results are turned.
Flow analysis module is responsible for analyzing multiple logic operation trees in the multiple query, which gives the dependencies between the queries and passes the analysis results to the common subquery identification module to obtain the same common subtrees. The common subquery optimization module eliminates the common representations and reduces the MapReduce tasks in order to achieve optimization.
The optimization results obtained in the above steps are generated a query plan according to the original optimization strategy and task division logic of Hive and the query plan is optimized further and executed. The results are returned by the multiple query driver module to the user at the other end of the UI according to the established mapping relationships when receiving the queries.
HQL statements separated by semicolons, which are received from UI, are read cyclically to achieve query statement reception in batch. The received query statements are processed sequentially. The above algorithm is as follows.
Algorithm 1: Multiple query reception
Input: multiple HQL statements received from UI
Output: Multiple OPTrees generated from HQL
1 for each oneCmd in line.split(“;”)
//oneCmd preprocess
2 if validMultiQuery is true
//multiple query switch is turned on
3 if command is DDL Query
//common query starts in select, from, insert
4 if multiQuerycounter> =multiQueryNum
5 ret = porcessMultiCmds();
6 clear multiCmds value;
7 set multiQuerycounter = 0;
8 multiCmds.add(new Pair(command, (HiveConf)conf);
9 multiQuerycounter++;
10 else if command is AggrDML
11 ret = procerssCmd(command);
12 endif
13 else if multiQuerycounter > 0
14 ret = porcessMultiCmds();
15 clear multiCmds value;
16 set multiQuerycounter = 0;
17 ret = procerssCmd(command);
18 endif
19 else multiple query switch is turned off
20 ret = porcessCmd(command);
21 clear Current Cli status
22 Exit;
23 endif
24 endfor
Inter-query flow analysis
Inter-query flow analysis worked in the compilation phase and analyzed table scan and insertion operation for each query, and then found out the dependencies between queries. The dependencies including flow dependency, query similarity and shared data. Inter-query flow analysis algorithm is shown as follows. Inter-query flow analysis results can not only be used for the common subquery identification and optimization but also be used to find out hot data.
Algorithm 2: Inter-query flow analysis
Input: logic operation trees
Output: the dependencies between queries existed in tables
1 assert(flowCtx has been initialized);
2 for i = 0 to multiQueryNum-1
3 currPctx = multiPctx[i];
4 SrcOpList = top operators of currPctx;
5 xfor each op in SrcOpList //start scanning table
6 assert(op is an instance of TS);
7 Get table_name,partition_names and column_names which are scanned d by op;
8 if flowCtx.TabExist(table_name)
9 currGid = flowCtx.GetGID(TableID);
10 else
11 currGid = flowCtx.newTabVer(GID,TableID);
12 Endif
13 currVer = flowCtx.getVerID(table_name);
14 flowCtx.addOptoTabRd(<i,op>,table_name, currVer);
15 flowCtx.TabVerDescRd(table_name,op, currGid, partition_names, column_names);
16 endfor //end scanning
17 SinkOpList = leaf operators currPctx;
18 for each op in SinkOpList //start insertion
19 assert(op is an instance of FS);
20 Get table_name,partition_names and column_names which are updated by op;
21 currGid = flowCtx.newGID(table_name);
22 flowCtx.newTabVer(table_name,op);
23 flowCtx.addOptoTabWr(<i,op>,table_ name, currVer);
24 flowCtx.TabVerDescWr(table_name,op, currGid, partition_names, column_names);
25 endfor //end insertion
26 endfor
Common subquery identification and optimaztion
The definition of common subquery
It assumed that there are operation trees OpTree1 and OpTree2, then ∀OP i ∈ OpTree1 (op) and ∀OP j ∈ OpTree1 (op), where 0 < i ≤ OpTree1 (op) . size, 0 < j ≤ OpTree2 (op) . size and OpTree denotes logic operation tree.
If SubOpTree (op) = CSQ (Op i , Op j ) and SubOpTree (op)∉ ∅, there is ∀Op z ∈ OpTree1 (op) and ∀OP z ∈ OpTree2 (op), where 0 < z ≤ SubOpTree (op) . size, SubOpTree denotes a subtree and CSQ (Op i , Op j ) denotes a tree which is consisted of common subquery.
If ∀OP i ∈ leafNode and i = 1 there is ∃OP j ∈ SubOpTree(op), where j = 1 and leafNode denotes leaf node.
Common subquery identification
This paper used an improved LCS(longest common substring), in which the logic operation trees with path are linearly represented and all common substring can be identified quickly to find out common subquery.
Algorithm 3: Common subquery identification
Input: two linear operation trees opTree1, opTree2
Output: the identification result stored in two-dimensional array
1 opTree1.add(genLinearOptree(rootOperator));
2 opTree2.add(genLinearOptree(rootOperator));
3 int rowLen = opTree1.size;
4 int colLen = opTree2.size;
5 opCmpResTab = new int[rowLen][colLen];
6 for i = 0 to opCmpResTab1.length
7 for j = 0 to opCmpResTab1[i].length
8 if opTree1.get(i) is an instance of Operator and opTree2.get(j) is an instance of Operator
9 if op1.Name equals to op2.Name and opCmp(op1,op2)
10 if i-1> =0 and j-1> =0
11 opCmpResTab[i][j] = opCmp ResTab[i-1][j-1]+1;
12 else
13 opCmpResTab[i][j] = 1;
14 endif
15 endif
16 else
17 opCmpResTab[i][j] = 0;
18 endif
19 if opTree1.get(i) is an instance of String and opTree2.get(j) is an instance of String
20 if op1.toSting equals to op2. toSting
21 if i-1> =0 and j-1> =0
22 opCmpResTab[i][j] = opCmpResTab[i-1][j-1]+1;
23 else
24 opCmpResTab[i][j] = 1;
25 endif
26 endif
27 else
28 opCmpResTab[i][j] = 0;
29 endif
30 endfor
31 endfor
Common subquery optimization
The Heuristic algorithm can be used to determine whether the corresponding set of query identifies and the number of connected operations is equal to the number of packeted operations according to the flow dependency analysis results. All that can be combined queries are found out and merged in order to reduce the number of MapReduce tasks and improve query performance.
Experiment
Experimental environment
Software environment
Platform software
Hive: Hive 0.12.0
Hadoop: hadoop-2.0.0-cdh4.3.0
Cluster environment
Linux OS: CentOS release 6.7
Size of data set: 10 GB
Number of computers in cluster: four
Computer configuration of a single node: CPU: AMD six-core Opteron 8425 processor, 8 G RAM
Experimental data
Database testing standard TPC-H published by the TPC (Transaction Processing Performance Council) is followed in the experiment. The eight tables are defined by simulating business environment and 10 gigabyte data are generated. The data set is shown in Table 1.
Data set of multiple query optimization
Data set of multiple query optimization
Two types of queries are used to simulate the actual application based on the TPC-H query set. Table 2 shows the test cases, where Test_similar is divided into two sets of queries Test_1 and Test_2. The parameter in select clause, the limit parameter in Test_1, the group by functions and count functions in Test_2 are modified in order to obtain the greatest possible common subtree between generated logic operation trees.
The test cases of multiple query optimization
Experimental results of the proposed multiple query optimization are shown in Table 3. The execution time before optimization or after optimization and the number of generated MapReduce tasks are compared in the experiment, in which before optimization and after optimization refer to the multiple query optimization switch is closed or open respectively.
Experimental results of multiple query optimization
Experimental results of multiple query optimization
The experimental results have shown that the proposed Hive+ framework can reduce effectively execution time and the number of MapReduce tasks in multiple query optimization. The efficiency of multiple query is improved significantly. In this paper, the optimizations are occurred during logic query. However, more optimizations can be implemented in a physical query plan generation phase, such as MapReduce task generation, task scheduling and dynamic loading and so on. In future work the above optimizations will be further researched.
