Compressing Text Tables In Hive

At Forward we have been using Hive for a while and started out with the default table type (uncompressed text) and wanted to see if we could save some space and not lose too much performance.

The wiki page HiveCompressedStorage lists the possibilities.

Basically you have 3 decisions: TextFile or SequenceFile tables TextFile

  • Can be compressed in place.

  • Can gzip/bzip before you LOAD DATA into your table

  • Only gzip/bzip are supported

  • Gzip is not splitable

SequenceFile

  • Need to create a SequenceFile table and do a SELECT/INSERT into it

  • Can use any supported compression codec

  • All compression codecs are splitable. All the cool kids use LZO or Snappy

  • Does not work- At least for me (help appreciated!)

Which compression algorithm

  • gzip - Quite slow, good compression, not splitable, supported in TextFile table

  • bzip - Slowest, best compression, splitable, supported in TextFile table

  • LZO - Not in standard distro (licensing issues), fast, splitable

  • Snappy - New from google, Not in standard distro (but licence compatable), Very fast

*Block or Record compression (for SequenceFile tables) * The docs say

The value for io.seqfile.compression.type determines how the compression is performed. If you set it to RECORD you will get as many output files as the number of map/reduce jobs. If you set it to BLOCK, you will get as many output files as there were input files. There is a tradeoff involved here – large number of output files => more parellel map jobs => lower compression ratio.

But I got the same number of files regardless of what I selected and the total size suggested they were not even compressed so I dont know what is going on.

For simplicity I chose gziped TextFile tables because

  • It worked (always criteria zero)

  • Most of our files were not huge anyway and the technique described below keeps some of the parallelism

  • Can be done on the table in place

  • Each partition can be compressed separately

  • The space is saved incrementally and realised immediately

  • Testing showed for our load it was not much of a performance hit

  • We are feeling more pain on space than query performance at the moment, our hourly runs complete in ~20mins)

require 'rubygems'
require 'date'
require 'rbhive'

countrys = %w[at au br de dk es fr in it jp mx nl no pl pt ru se uk us za]
dates = (Date.parse('2011-01-01')..Date.parse('2011-04-30'))

RBHive.connect('hiveserver') do |con|
  dates.each do |date|
    countrys.each do |country|
      query = "insert overwrite table keywords partition (dated='#{date}', country = '#{country}')
              select account,campaign,ad_group,keyword_id,keyword,match_type,status,
              first_page_bid,quality_score,distribution,max_cpc,destination_url,ad_group_status,
              campaign_status,currency_code,impressions,clicks,ctr,cpc,
              cost,avg_position,account_id,campaign_id,adgroup_id
              from keywords where dated='#{date}' and country='#{country}'"
      begin
        con.set('mapred.output.compression.codec','org.apache.hadoop.io.compress.GzipCodec')
        con.set('hive.exec.compress.output','true')
        con.set('mapred.output.compress','true')
        con.set('mapred.compress.map.output','true')
        con.set('hive.merge.mapredfiles','true')
        con.set('hive.merge.mapfiles','true')
        con.execute(query)
      rescue => e
        puts "#########################"
        puts e.message
        puts "#########################"
      end
    end
  end
end

This will loop through the partitions (date/country) and do an INSERT OVERWRITE from/to that partition using our rbhive gem. This is good because Hive reads the old data via map/reduce jobs, writes the output to /tmp, deletes the old folder and then imports the new compressed version. You need to select the columns out as the target partition has 2 less fields (date and country are missing) As we had 2 levels of partitioning and lots of big files this ran within a day on a 2Tb table, saving us around 5Tb (replication factor is 3).

You can actually download and compress the data directly to HDFS as Hive does not know what data is inside the folders on HDFS, just their layout but I thought better to do it via hive and let Hadoop parallelise it. I would have carried on doing it this way but with other tables it was too slow (too many partitions, difficult to parallelise hive server). I stopped using rbhive, dropped to using hive -e to execute the querys and used the lovely autopartitioning in later hive versions. Notice you can SELECT * now and it automatically does what it needs to to insert results into the correct partitions.

require 'rubygems'
require 'date'

countrys = %w[at au br de dk es fr in int it jp kr mx nl no pl pt ru se uk us za]

dates = (Date.parse('2010-12-02')..Date.parse('2011-05-01'))

dates.each do |date|
  query = ""
  query += "SET hive.exec.compress.output=true;"
  query += "SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;"
  query += "set mapred.job.priority=VERY_LOW;"
  query += "set hive.exec.dynamic.partition=true;"
  query += "set mapred.output.compress=true;"
  query += "set mapred.compress.map.output=true;"
  query += "set hive.merge.mapredfiles=true;"
  query += "set hive.merge.mapfiles=true;"
  query += "insert overwrite table hourly_clicks
            partition (dated='#{date}', country, hour)
            select * from hourly_clicks where dated='#{date}'"
  query = "hive -e \"#{query}\""
  puts "running #{query}"
  `#{query}`
end

The key difference is partition (dated=’#{date}’, country, hour) , we have not specified a country or hour partition so hive will do it automatically. This ran loads faster than looping over the partitions, letting hive schedule lots more mapreduce jobs at once. If you set hive.exec.dynamic.partition.mode=nonstrict as well you can not specify any partition information (I did this as a test but kept the WHERE clause, I was scared to do it all at once!)

The reason I am not (very) worried about losing parallelism is that some of our partition contained big .csv’s and the output of INSERT OVERWRITE was multiple .gz files (looked to me like as many as there were mappers, for example a 700M text file became ~10 .gz files) so they will still be read in parallel by mappers as the original CSV was.

Open to suggestions about better ways to achieve this, this does not preclude doing something better later.