001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.TreeSet;
037import java.util.UUID;
038import java.util.function.Function;
039import java.util.stream.Collectors;
040
041import org.apache.commons.lang3.StringUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellComparator;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.HTableDescriptor;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.PrivateCellUtil;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RegionLocator;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.fs.HFileSystem;
063import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
064import org.apache.hadoop.hbase.io.compress.Compression;
065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
067import org.apache.hadoop.hbase.io.hfile.CacheConfig;
068import org.apache.hadoop.hbase.io.hfile.HFile;
069import org.apache.hadoop.hbase.io.hfile.HFileContext;
070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
071import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
072import org.apache.hadoop.hbase.regionserver.BloomType;
073import org.apache.hadoop.hbase.regionserver.HStore;
074import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
075import org.apache.hadoop.hbase.util.BloomFilterUtil;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.FSUtils;
079import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
080import org.apache.hadoop.io.NullWritable;
081import org.apache.hadoop.io.SequenceFile;
082import org.apache.hadoop.io.Text;
083import org.apache.hadoop.mapreduce.Job;
084import org.apache.hadoop.mapreduce.OutputCommitter;
085import org.apache.hadoop.mapreduce.OutputFormat;
086import org.apache.hadoop.mapreduce.RecordWriter;
087import org.apache.hadoop.mapreduce.TaskAttemptContext;
088import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
089import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
090import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
091import org.apache.yetus.audience.InterfaceAudience;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
096
097/**
098 * Writes HFiles. Passed Cells must arrive in order.
099 * Writes current time as the sequence id for the file. Sets the major compacted
100 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
101 * all HFiles being written.
102 * <p>
103 * Using this class as part of a MapReduce job is best done
104 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
105 */
106@InterfaceAudience.Public
107public class HFileOutputFormat2
108    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
109  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
110  static class TableInfo {
111    private TableDescriptor tableDesctiptor;
112    private RegionLocator regionLocator;
113
114    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
115      this.tableDesctiptor = tableDesctiptor;
116      this.regionLocator = regionLocator;
117    }
118
119    /**
120     * The modification for the returned HTD doesn't affect the inner TD.
121     * @return A clone of inner table descriptor
122     * @deprecated use {@link #getTableDescriptor}
123     */
124    @Deprecated
125    public HTableDescriptor getHTableDescriptor() {
126      return new HTableDescriptor(tableDesctiptor);
127    }
128
129    public TableDescriptor getTableDescriptor() {
130      return tableDesctiptor;
131    }
132
133    public RegionLocator getRegionLocator() {
134      return regionLocator;
135    }
136  }
137
138  protected static final byte[] tableSeparator = Bytes.toBytes(";");
139
140  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
141    return Bytes.add(tableName, tableSeparator, suffix);
142  }
143
144  // The following constants are private since these are used by
145  // HFileOutputFormat2 to internally transfer data between job setup and
146  // reducer run using conf.
147  // These should not be changed by the client.
148  static final String COMPRESSION_FAMILIES_CONF_KEY =
149      "hbase.hfileoutputformat.families.compression";
150  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
151      "hbase.hfileoutputformat.families.bloomtype";
152  static final String BLOOM_PARAM_FAMILIES_CONF_KEY =
153      "hbase.hfileoutputformat.families.bloomparam";
154  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
155      "hbase.mapreduce.hfileoutputformat.blocksize";
156  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
157      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
158
159  // This constant is public since the client can modify this when setting
160  // up their conf object and thus refer to this symbol.
161  // It is present for backwards compatibility reasons. Use it only to
162  // override the auto-detection of datablock encoding and compression.
163  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
164      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
165  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
166      "hbase.mapreduce.hfileoutputformat.compression";
167
168  /**
169   * Keep locality while generating HFiles for bulkload. See HBASE-12596
170   */
171  public static final String LOCALITY_SENSITIVE_CONF_KEY =
172      "hbase.bulkload.locality.sensitive.enabled";
173  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
174  static final String OUTPUT_TABLE_NAME_CONF_KEY =
175      "hbase.mapreduce.hfileoutputformat.table.name";
176  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
177          "hbase.mapreduce.use.multi.table.hfileoutputformat";
178
179  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
180  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
181
182  @Override
183  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
184      final TaskAttemptContext context) throws IOException, InterruptedException {
185    return createRecordWriter(context, this.getOutputCommitter(context));
186  }
187
188  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
189    return combineTableNameSuffix(tableName, family);
190  }
191
192  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
193      createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer)
194          throws IOException {
195
196    // Get the path of the temporary output file
197    final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
198    final Configuration conf = context.getConfiguration();
199    final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
200    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
201    if (writeTableNames==null || writeTableNames.isEmpty()) {
202      throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
203              + " cannot be empty");
204    }
205    final FileSystem fs = outputDir.getFileSystem(conf);
206    // These configs. are from hbase-*.xml
207    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
208        HConstants.DEFAULT_MAX_FILE_SIZE);
209    // Invented config.  Add to hbase-*.xml if other than default compression.
210    final String defaultCompressionStr = conf.get("hfile.compression",
211        Compression.Algorithm.NONE.getName());
212    final Algorithm defaultCompression = HFileWriterImpl
213        .compressionByName(defaultCompressionStr);
214    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
215    final Algorithm overriddenCompression;
216    if (compressionStr != null) {
217      overriddenCompression = Compression.getCompressionAlgorithmByName(compressionStr);
218    } else {
219      overriddenCompression = null;
220    }
221    final boolean compactionExclude = conf.getBoolean(
222        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
223
224    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
225            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
226
227    // create a map from column family to the compression algorithm
228    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
229    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
230    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
231    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
232
233    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
234    final Map<byte[], DataBlockEncoding> datablockEncodingMap
235        = createFamilyDataBlockEncodingMap(conf);
236    final DataBlockEncoding overriddenEncoding;
237    if (dataBlockEncodingStr != null) {
238      overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
239    } else {
240      overriddenEncoding = null;
241    }
242
243    return new RecordWriter<ImmutableBytesWritable, V>() {
244      // Map of families to writers and how much has been output on the writer.
245      private final Map<byte[], WriterLength> writers =
246              new TreeMap<>(Bytes.BYTES_COMPARATOR);
247      private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
248      private final long now = EnvironmentEdgeManager.currentTime();
249      private boolean rollRequested = false;
250
251      @Override
252      public void write(ImmutableBytesWritable row, V cell)
253          throws IOException {
254        Cell kv = cell;
255        // null input == user explicitly wants to flush
256        if (row == null && kv == null) {
257          rollWriters(null);
258          return;
259        }
260
261        byte[] rowKey = CellUtil.cloneRow(kv);
262        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
263        byte[] family = CellUtil.cloneFamily(kv);
264        byte[] tableNameBytes = null;
265        if (writeMultipleTables) {
266          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
267          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
268            throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
269                    "' not" + " expected");
270          }
271        } else {
272          tableNameBytes = Bytes.toBytes(writeTableNames);
273        }
274        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
275        WriterLength wl = this.writers.get(tableAndFamily);
276
277        // If this is a new column family, verify that the directory exists
278        if (wl == null) {
279          Path writerPath = null;
280          if (writeMultipleTables) {
281            writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
282                    .toString(family)));
283          }
284          else {
285            writerPath = new Path(outputDir, Bytes.toString(family));
286          }
287          fs.mkdirs(writerPath);
288          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
289        }
290
291        if (wl != null && wl.written + length >= maxsize) {
292          this.rollRequested = true;
293        }
294
295        // This can only happen once a row is finished though
296        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
297          rollWriters(wl);
298        }
299
300        // create a new WAL writer, if necessary
301        if (wl == null || wl.writer == null) {
302          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
303            HRegionLocation loc = null;
304
305            String tableName = Bytes.toString(tableNameBytes);
306            if (tableName != null) {
307              try (Connection connection = ConnectionFactory.createConnection(conf);
308                     RegionLocator locator =
309                       connection.getRegionLocator(TableName.valueOf(tableName))) {
310                loc = locator.getRegionLocation(rowKey);
311              } catch (Throwable e) {
312                LOG.warn("There's something wrong when locating rowkey: " +
313                  Bytes.toString(rowKey) + " for tablename: " + tableName, e);
314                loc = null;
315              } }
316
317            if (null == loc) {
318              if (LOG.isTraceEnabled()) {
319                LOG.trace("failed to get region location, so use default writer for rowkey: " +
320                  Bytes.toString(rowKey));
321              }
322              wl = getNewWriter(tableNameBytes, family, conf, null);
323            } else {
324              if (LOG.isDebugEnabled()) {
325                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
326              }
327              InetSocketAddress initialIsa =
328                  new InetSocketAddress(loc.getHostname(), loc.getPort());
329              if (initialIsa.isUnresolved()) {
330                if (LOG.isTraceEnabled()) {
331                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
332                      + loc.getPort() + ", so use default writer");
333                }
334                wl = getNewWriter(tableNameBytes, family, conf, null);
335              } else {
336                if (LOG.isDebugEnabled()) {
337                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
338                }
339                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
340                });
341              }
342            }
343          } else {
344            wl = getNewWriter(tableNameBytes, family, conf, null);
345          }
346        }
347
348        // we now have the proper WAL writer. full steam ahead
349        PrivateCellUtil.updateLatestStamp(cell, this.now);
350        wl.writer.append(kv);
351        wl.written += length;
352
353        // Copy the row so we know when a row transition.
354        this.previousRow = rowKey;
355      }
356
357      private void rollWriters(WriterLength writerLength) throws IOException {
358        if (writerLength != null) {
359          closeWriter(writerLength);
360        } else {
361          for (WriterLength wl : this.writers.values()) {
362            closeWriter(wl);
363          }
364        }
365        this.rollRequested = false;
366      }
367
368      private void closeWriter(WriterLength wl) throws IOException {
369        if (wl.writer != null) {
370          LOG.info(
371              "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
372          close(wl.writer);
373        }
374        wl.writer = null;
375        wl.written = 0;
376      }
377
378      /*
379       * Create a new StoreFile.Writer.
380       * @param family
381       * @return A WriterLength, containing a new StoreFile.Writer.
382       * @throws IOException
383       */
384      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
385          justification="Not important")
386      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
387              conf, InetSocketAddress[] favoredNodes) throws IOException {
388        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
389        Path familydir = new Path(outputDir, Bytes.toString(family));
390        if (writeMultipleTables) {
391          familydir = new Path(outputDir,
392                  new Path(Bytes.toString(tableName), Bytes.toString(family)));
393        }
394        WriterLength wl = new WriterLength();
395        Algorithm compression = overriddenCompression;
396        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
397        compression = compression == null ? defaultCompression : compression;
398        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
399        bloomType = bloomType == null ? BloomType.NONE : bloomType;
400        String bloomParam = bloomParamMap.get(tableAndFamily);
401        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
402          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
403        }
404        Integer blockSize = blockSizeMap.get(tableAndFamily);
405        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
406        DataBlockEncoding encoding = overriddenEncoding;
407        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
408        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
409        HFileContextBuilder contextBuilder = new HFileContextBuilder()
410                                    .withCompression(compression)
411                                    .withChecksumType(HStore.getChecksumType(conf))
412                                    .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
413                                    .withBlockSize(blockSize);
414
415        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
416          contextBuilder.withIncludesTags(true);
417        }
418
419        contextBuilder.withDataBlockEncoding(encoding);
420        HFileContext hFileContext = contextBuilder.build();
421        if (null == favoredNodes) {
422          wl.writer =
423              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
424                  .withOutputDir(familydir).withBloomType(bloomType)
425                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
426        } else {
427          wl.writer =
428              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
429                  .withOutputDir(familydir).withBloomType(bloomType)
430                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
431                  .withFavoredNodes(favoredNodes).build();
432        }
433
434        this.writers.put(tableAndFamily, wl);
435        return wl;
436      }
437
438      private void close(final StoreFileWriter w) throws IOException {
439        if (w != null) {
440          w.appendFileInfo(BULKLOAD_TIME_KEY,
441              Bytes.toBytes(System.currentTimeMillis()));
442          w.appendFileInfo(BULKLOAD_TASK_KEY,
443              Bytes.toBytes(context.getTaskAttemptID().toString()));
444          w.appendFileInfo(MAJOR_COMPACTION_KEY,
445              Bytes.toBytes(true));
446          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY,
447              Bytes.toBytes(compactionExclude));
448          w.appendTrackedTimestampsToMetadata();
449          w.close();
450        }
451      }
452
453      @Override
454      public void close(TaskAttemptContext c)
455      throws IOException, InterruptedException {
456        for (WriterLength wl: this.writers.values()) {
457          close(wl.writer);
458        }
459      }
460    };
461  }
462
463  /**
464   * Configure block storage policy for CF after the directory is created.
465   */
466  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
467      byte[] tableAndFamily, Path cfPath) {
468    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
469      return;
470    }
471
472    String policy =
473        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
474          conf.get(STORAGE_POLICY_PROPERTY));
475    FSUtils.setStoragePolicy(fs, cfPath, policy);
476  }
477
478  /*
479   * Data structure to hold a Writer and amount of data written on it.
480   */
481  static class WriterLength {
482    long written = 0;
483    StoreFileWriter writer = null;
484  }
485
486  /**
487   * Return the start keys of all of the regions in this table,
488   * as a list of ImmutableBytesWritable.
489   */
490  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
491                                                                 boolean writeMultipleTables)
492          throws IOException {
493
494    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
495    for(RegionLocator regionLocator : regionLocators)
496    {
497      TableName tableName = regionLocator.getName();
498      LOG.info("Looking up current regions for table " + tableName);
499      byte[][] byteKeys = regionLocator.getStartKeys();
500      for (byte[] byteKey : byteKeys) {
501        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
502        if (writeMultipleTables)
503        {
504          //MultiTableHFileOutputFormat use case
505          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
506        }
507        if (LOG.isDebugEnabled()) {
508          LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
509                  (fullKey) + "]");
510        }
511        ret.add(new ImmutableBytesWritable(fullKey));
512      }
513    }
514    return ret;
515  }
516
517  /**
518   * Write out a {@link SequenceFile} that can be read by
519   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
520   */
521  @SuppressWarnings("deprecation")
522  private static void writePartitions(Configuration conf, Path partitionsPath,
523      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
524    LOG.info("Writing partition information to " + partitionsPath);
525    if (startKeys.isEmpty()) {
526      throw new IllegalArgumentException("No regions passed");
527    }
528
529    // We're generating a list of split points, and we don't ever
530    // have keys < the first region (which has an empty start key)
531    // so we need to remove it. Otherwise we would end up with an
532    // empty reducer with index 0
533    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
534    ImmutableBytesWritable first = sorted.first();
535    if (writeMultipleTables) {
536      first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
537              ().get()));
538    }
539    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
540      throw new IllegalArgumentException(
541          "First region of table should have empty start key. Instead has: "
542          + Bytes.toStringBinary(first.get()));
543    }
544    sorted.remove(sorted.first());
545
546    // Write the actual file
547    FileSystem fs = partitionsPath.getFileSystem(conf);
548    SequenceFile.Writer writer = SequenceFile.createWriter(
549      fs, conf, partitionsPath, ImmutableBytesWritable.class,
550      NullWritable.class);
551
552    try {
553      for (ImmutableBytesWritable startKey : sorted) {
554        writer.append(startKey, NullWritable.get());
555      }
556    } finally {
557      writer.close();
558    }
559  }
560
561  /**
562   * Configure a MapReduce Job to perform an incremental load into the given
563   * table. This
564   * <ul>
565   *   <li>Inspects the table to configure a total order partitioner</li>
566   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
567   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
568   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
569   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
570   *     PutSortReducer)</li>
571   * </ul>
572   * The user should be sure to set the map output value class to either KeyValue or Put before
573   * running this function.
574   */
575  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
576      throws IOException {
577    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
578  }
579
580  /**
581   * Configure a MapReduce Job to perform an incremental load into the given
582   * table. This
583   * <ul>
584   *   <li>Inspects the table to configure a total order partitioner</li>
585   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
586   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
587   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
588   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
589   *     PutSortReducer)</li>
590   * </ul>
591   * The user should be sure to set the map output value class to either KeyValue or Put before
592   * running this function.
593   */
594  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
595      RegionLocator regionLocator) throws IOException {
596    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
597    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
598    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
599  }
600
601  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
602      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
603    Configuration conf = job.getConfiguration();
604    job.setOutputKeyClass(ImmutableBytesWritable.class);
605    job.setOutputValueClass(MapReduceExtendedCell.class);
606    job.setOutputFormatClass(cls);
607
608    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
609      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
610    }
611    boolean writeMultipleTables = false;
612    if (MultiTableHFileOutputFormat.class.equals(cls)) {
613      writeMultipleTables = true;
614      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
615    }
616    // Based on the configured map output class, set the correct reducer to properly
617    // sort the incoming values.
618    // TODO it would be nice to pick one or the other of these formats.
619    if (KeyValue.class.equals(job.getMapOutputValueClass())
620        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
621      job.setReducerClass(CellSortReducer.class);
622    } else if (Put.class.equals(job.getMapOutputValueClass())) {
623      job.setReducerClass(PutSortReducer.class);
624    } else if (Text.class.equals(job.getMapOutputValueClass())) {
625      job.setReducerClass(TextSortReducer.class);
626    } else {
627      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
628    }
629
630    conf.setStrings("io.serializations", conf.get("io.serializations"),
631        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
632        CellSerialization.class.getName());
633
634    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
635      LOG.info("bulkload locality sensitive enabled");
636    }
637
638    /* Now get the region start keys for every table required */
639    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
640    List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
641    List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
642
643    for( TableInfo tableInfo : multiTableInfo )
644    {
645      regionLocators.add(tableInfo.getRegionLocator());
646      allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
647      tableDescriptors.add(tableInfo.getTableDescriptor());
648    }
649    // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
650    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
651            .toString(tableSeparator)));
652    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
653    // Use table's region boundaries for TOP split points.
654    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
655        "to match current region count for all tables");
656    job.setNumReduceTasks(startKeys.size());
657
658    configurePartitioner(job, startKeys, writeMultipleTables);
659    // Set compression algorithms based on column families
660
661    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
662            tableDescriptors));
663    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
664            tableDescriptors));
665    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
666            tableDescriptors));
667    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails,
668        tableDescriptors));
669    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
670            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
671
672    TableMapReduceUtil.addDependencyJars(job);
673    TableMapReduceUtil.initCredentials(job);
674    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
675  }
676
677  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
678      IOException {
679    Configuration conf = job.getConfiguration();
680
681    job.setOutputKeyClass(ImmutableBytesWritable.class);
682    job.setOutputValueClass(MapReduceExtendedCell.class);
683    job.setOutputFormatClass(HFileOutputFormat2.class);
684
685    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
686    singleTableDescriptor.add(tableDescriptor);
687
688    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
689    // Set compression algorithms based on column families
690    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
691        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
692    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
693        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
694    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
695        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
696    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
697        serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
698    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
699        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
700
701    TableMapReduceUtil.addDependencyJars(job);
702    TableMapReduceUtil.initCredentials(job);
703    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
704  }
705
706  /**
707   * Runs inside the task to deserialize column family to compression algorithm
708   * map from the configuration.
709   *
710   * @param conf to read the serialized values from
711   * @return a map from column family to the configured compression algorithm
712   */
713  @VisibleForTesting
714  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
715      conf) {
716    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
717        COMPRESSION_FAMILIES_CONF_KEY);
718    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
719    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
720      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
721      compressionMap.put(e.getKey(), algorithm);
722    }
723    return compressionMap;
724  }
725
726  /**
727   * Runs inside the task to deserialize column family to bloom filter type
728   * map from the configuration.
729   *
730   * @param conf to read the serialized values from
731   * @return a map from column family to the the configured bloom filter type
732   */
733  @VisibleForTesting
734  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
735    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
736        BLOOM_TYPE_FAMILIES_CONF_KEY);
737    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
738    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
739      BloomType bloomType = BloomType.valueOf(e.getValue());
740      bloomTypeMap.put(e.getKey(), bloomType);
741    }
742    return bloomTypeMap;
743  }
744
745  /**
746   * Runs inside the task to deserialize column family to bloom filter param
747   * map from the configuration.
748   *
749   * @param conf to read the serialized values from
750   * @return a map from column family to the the configured bloom filter param
751   */
752  @VisibleForTesting
753  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
754    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
755  }
756
757
758  /**
759   * Runs inside the task to deserialize column family to block size
760   * map from the configuration.
761   *
762   * @param conf to read the serialized values from
763   * @return a map from column family to the configured block size
764   */
765  @VisibleForTesting
766  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
767    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
768        BLOCK_SIZE_FAMILIES_CONF_KEY);
769    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
770    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
771      Integer blockSize = Integer.parseInt(e.getValue());
772      blockSizeMap.put(e.getKey(), blockSize);
773    }
774    return blockSizeMap;
775  }
776
777  /**
778   * Runs inside the task to deserialize column family to data block encoding
779   * type map from the configuration.
780   *
781   * @param conf to read the serialized values from
782   * @return a map from column family to HFileDataBlockEncoder for the
783   *         configured data block type for the family
784   */
785  @VisibleForTesting
786  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
787      Configuration conf) {
788    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
789        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
790    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
791    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
792      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
793    }
794    return encoderMap;
795  }
796
797
798  /**
799   * Run inside the task to deserialize column family to given conf value map.
800   *
801   * @param conf to read the serialized values from
802   * @param confName conf key to read from the configuration
803   * @return a map of column family to the given configuration value
804   */
805  private static Map<byte[], String> createFamilyConfValueMap(
806      Configuration conf, String confName) {
807    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
808    String confVal = conf.get(confName, "");
809    for (String familyConf : confVal.split("&")) {
810      String[] familySplit = familyConf.split("=");
811      if (familySplit.length != 2) {
812        continue;
813      }
814      try {
815        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
816            URLDecoder.decode(familySplit[1], "UTF-8"));
817      } catch (UnsupportedEncodingException e) {
818        // will not happen with UTF-8 encoding
819        throw new AssertionError(e);
820      }
821    }
822    return confValMap;
823  }
824
825  /**
826   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
827   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
828   */
829  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
830          writeMultipleTables)
831      throws IOException {
832    Configuration conf = job.getConfiguration();
833    // create the partitions file
834    FileSystem fs = FileSystem.get(conf);
835    String hbaseTmpFsDir =
836        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
837          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
838    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
839    fs.makeQualified(partitionsPath);
840    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
841    fs.deleteOnExit(partitionsPath);
842
843    // configure job to use it
844    job.setPartitionerClass(TotalOrderPartitioner.class);
845    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
846  }
847
848  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
849  @VisibleForTesting
850  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables)
851      throws UnsupportedEncodingException {
852    StringBuilder attributeValue = new StringBuilder();
853    int i = 0;
854    for (TableDescriptor tableDescriptor : allTables) {
855      if (tableDescriptor == null) {
856        // could happen with mock table instance
857        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
858        return "";
859      }
860      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
861        if (i++ > 0) {
862          attributeValue.append('&');
863        }
864        attributeValue.append(URLEncoder.encode(
865            Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
866            "UTF-8"));
867        attributeValue.append('=');
868        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
869      }
870    }
871    // Get rid of the last ampersand
872    return attributeValue.toString();
873  }
874
875  /**
876   * Serialize column family to compression algorithm map to configuration.
877   * Invoked while configuring the MR job for incremental load.
878   *
879   * @param tableDescriptor to read the properties from
880   * @param conf to persist serialized values into
881   * @throws IOException
882   *           on failure to read column family descriptors
883   */
884  @VisibleForTesting
885  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
886          familyDescriptor.getCompressionType().getName();
887
888  /**
889   * Serialize column family to block size map to configuration. Invoked while
890   * configuring the MR job for incremental load.
891   *
892   * @param tableDescriptor
893   *          to read the properties from
894   * @param conf
895   *          to persist serialized values into
896   *
897   * @throws IOException
898   *           on failure to read column family descriptors
899   */
900  @VisibleForTesting
901  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
902          .valueOf(familyDescriptor.getBlocksize());
903
904  /**
905   * Serialize column family to bloom type map to configuration. Invoked while
906   * configuring the MR job for incremental load.
907   *
908   * @param tableDescriptor
909   *          to read the properties from
910   * @param conf
911   *          to persist serialized values into
912   *
913   * @throws IOException
914   *           on failure to read column family descriptors
915   */
916  @VisibleForTesting
917  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
918    String bloomType = familyDescriptor.getBloomFilterType().toString();
919    if (bloomType == null) {
920      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
921    }
922    return bloomType;
923  };
924
925  /**
926   * Serialize column family to bloom param map to configuration. Invoked while
927   * configuring the MR job for incremental load.
928   *
929   * @param tableDescriptor
930   *          to read the properties from
931   * @param conf
932   *          to persist serialized values into
933   *
934   * @throws IOException
935   *           on failure to read column family descriptors
936   */
937  @VisibleForTesting
938  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
939    BloomType bloomType = familyDescriptor.getBloomFilterType();
940    String bloomParam = "";
941    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
942      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
943    }
944    return bloomParam;
945  };
946
947  /**
948   * Serialize column family to data block encoding map to configuration.
949   * Invoked while configuring the MR job for incremental load.
950   *
951   * @param tableDescriptor
952   *          to read the properties from
953   * @param conf
954   *          to persist serialized values into
955   * @throws IOException
956   *           on failure to read column family descriptors
957   */
958  @VisibleForTesting
959  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
960    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
961    if (encoding == null) {
962      encoding = DataBlockEncoding.NONE;
963    }
964    return encoding.toString();
965  };
966
967}