Designing for Performance Using Hadoop Hive

Version 4

    There are a number of techniques available for improving the performance of visualizations and dashboards built from data stored in your Hadoop cluster. While Hadoop is a batch-oriented system, the suggestions below can reduce latency through workload tuning, optimization hints for the Tableau Data Engine.

     

    Improve Performance When Connecting

    Custom SQL for Limiting the Data Set Size

    Custom SQL allows complex SQL expressions as the basis for a connection in Tableau. By using a LIMIT clause in the Custom SQL, you can reduce the data set size to speed up the task of exploring a new data set and building a view. Later, this LIMIT clause can be removed to support live queries on the entire data set.

     

    It is easy to get started with Custom SQL to limit the data set size. If your connection is a single or multiple table connection, you can switch it to a Custom SQL connection and have the connection dialog automatically populate the Custom SQL expression. In the last line in the Custom SQL, add LIMIT 10000 to work with only the first 10,000 records.

     

    Extracts

    The Tableau Data Engine is a powerful accelerator for working with large amounts of data, and supports ad hoc analysis with low latency. While it is not built for the same scale that Hadoop is, the Tableau Data Engine can handle wide data sets with many fields and hundreds of millions of rows.

    Creating an extract in Tableau provides opportunities to accelerate analysis of your data by condensing massive data to a much smaller data set, which can contain the most relevant characteristics. When creating an extract, take advantage of the following options located in the Extract Data dialog box:

     

    • Hide Unused Fields: Ignore fields that have been hidden in the Tableau Data window so that the extract is compact and concise.

    • Aggregate Visible Dimensions: Create the extract having pre-aggregated the data to a coarse-grained view. While Hadoop is great for storing each fine-grained data point of interest, a broader view of the data can yield much of the same insight with far less computational cost.

    • Roll Up Dates: Hadoop date/time data is a specific example of fine-grained data that may be better served if rolled up to coarser-grained timelines. For example, tracking events per hour instead of per millisecond.

    • Define Filters: Create a filter to keep only the data of interest. For example, if you are working with archival data but are only interested in recent records.

     

    Advanced Performance Techniques

    Below are some performance techniques that require a deeper understanding of Hive. Refer to the Improve Performance as an Administrator section of this article for more information about details an administrator should consider when creating tables in Hive.

     

    While Hive has some degree of support for traditional database technologies such as a query optimizer and indexes, Hive also offers some unique approaches to improving query performance.

     

    Partitioned fields as filters

    A table in Hive can define a partitioning field, which will separate the records on disk into partitions based on the value of the field. When a query contains a filter on that partition, Hive quickly isolates the subset of data blocks required to satisfy the query. Creating filters in Tableau on one or more partitioning fields can greatly reduce the query execution time.

     

    One known limitation of this Hive query optimization is that the partition filter must exactly match the data in the partition field. For example, if a string field contains dates, you cannot filter on YEAR([date_field])=2011. Instead, consider expressing the filter in terms of raw data values, e.g., [date_field] >= '2011-01-01'. More broadly, you cannot leverage partition filtering based on calculations or expressions derived from the field in question, you must filter the field directly with literal values.

     

    Clustered fields as grouping fields

    Fields that are clustered--sometimes referred to as bucketing--can dictate how the data in the table is separated on disk. One or more fields are defined as the clustering fields for a table, and their combined fingerprint ensures that all rows with the same content for the clustered fields are kept in close proximity within the data blocks.

     

    This fingerprint is known as a hash, and improves query performance in two ways. First, computing aggregates across the clustered fields can take advantage of an early stage in the Map/Reduce pipeline known as the Combiner, which can reduce network traffic by sending less data to each Reduce. This is most effective when you are using the clustered fields in your GROUP BY clause, which you will find are associated with the discrete (blue) fields in Tableau (typically dimensions).

     

    The other way that the hashing of clustered fields can improve performance is when working with joins. The join optimization known as a hash join allows Hadoop to quickly fuse together two data sets based on the precomputed hash value, provided that the join keys are also the clustered fields. Because the clustered fields ensure that the data blocks are organized based on the hash values, a hash join becomes far more efficient for disk I/O and network bandwidth because it can operate on large, co-located blocks of data.

     

    Initial SQL

    Initial SQL provides open-ended possibilities for setting configuration parameters and performing work immediately upon establishing a connection. This section will discuss in more detail how Initial SQL can be used for advanced performance tuning. It is by no means comprehensive; there are numerous performance tuning options that may vary in utility by the size of your cluster and the type and size of your data.

     

    • Increase parallelism. This first tuning example uses Initial SQL to force more parallelism for the jobs generated for Tableau analysis. By default, the parallelism is dictated by the size of the data set and the default block size of 64 MB. A data set with only 128 MB of data will only engage two map tasks at the start of any query over that data. For data sets that require computationally intensive analysis tasks, you can force a higher degree of parallelism by lowering the threshold of data set size required for a single unit of work. The following setting uses a split size of 1 MB, which could potentially increase the parallel throughput by 64 times:

     

    set mapred.max.split.size=1000000;

     

    • Optimize join performance. The next example extends the above discussion of using clustered fields (bucketed fields) to improve join performance. The optimization is turned off by default for many versions of Hive. Enable the optimization with the following settings.

     

    set hive.optimize.bucketmapjoin=true;

    set hive.optimize.bucketmapjoin.sortedmerge=true;

     

    Note: The second setting takes advantage of clustered fields which are also sorted.

     

    • Adjust configuration for uneven distribution. This last example shows how configuration settings are sometimes sensitive to the shape of your data. When working with data that has a highly uneven distribution such as web traffic by referrer, the nature of Map/Reduce can lead to tremendous data skew where a small number of compute nodes must handle the bulk of the computation. The following setting informs Hive that the data may be skewed and Hive should take a different approach formulating Map/Reduce jobs. This setting may reduce performance for data that is not heavily skewed.

     

    set hive.groupby.skewindata=true;

     

    Improve Performance as an Administrator

    Hive does not change the fundamental nature of Hadoop as a batch processing system. However, there are several techniques improving the efficiency of your cluster through Hive.

     

    Storage File Format

    Hive offers the flexibility of working with data files as-is. An external table in Hive may reference distributed file system data located outside of the Hive environment. The external table may indicate how the data is compressed and how it should be parsed. Each Hive query then results in on-the-fly decompression and parsing of the data. This is a trade off between flexibility and efficiency.

     

    Hive tables can also capture data sets in a storage format that is more efficient for frequent or complex analysis tasks. The default storage format is a Sequence File, but many others exist. For use with analytical tools such as Tableau, you may prefer the Record Columnar File format (RCFile), which is a hybrid row-columnar format that supports efficient analysis when only a subset of the data is needed.

     

    Several data formats natively support compression. Others are compatible with certain compression formats like BZ2. You may also configure Hive to compress intermediate data that shares from one compute note to the next over the network. Each type of compression is a tradeoff between CPU time and disk, or network I/O. There are many guidelines on this but no definitive rules for what compression scheme is appropriate because of how varied the clusters, data, and analytical use cases are.

     

    Partitioning

    You can organize a Hive table into separate files in the distributed file system, each with many data blocks, in order to control data proximity for efficient access. This is done by defining one or more fields as the partitioning fields. Then, each unique combination of field values will result in a separate file in HDFS. Often, a date field is used for partitioning to ensure that records with the same date are kept together. When a Hive query uses a WHERE clause to filter data in a partitioning field, the filter effectively describes which data files are relevant. By only loading those files from the file system, the query can execute substantially faster than a normal query that filters by a non-partitioning field.

     

    Bucketing

    Similar to partitioning, a bucketing field (i.e., clustering field) defines how the table data is organized into separate files in the distributed file system. The data values in one or more bucketing fields are used to compute a hash. The data is then separated into files according to which hashed bucket, out of a fixed total that is defined in the Hive table, the data belongs.

     

    In the Map/Reduce operation, the Map stage organizes data based on hashed values. Because the hashes are pre-computed and the data is organized by hash values, bucketing makes the Map stage substantially faster and reduces the scattering of data between Map/Reduce stages, which taxes network I/O. In terms of SQL, the end result is that GROUP BY and join operations are accelerated.