001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UnsupportedEncodingException; 027import java.net.InetSocketAddress; 028import java.net.URLDecoder; 029import java.net.URLEncoder; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.TreeSet; 037import java.util.UUID; 038import java.util.function.Function; 039import java.util.stream.Collectors; 040 041import org.apache.commons.lang3.StringUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.CellComparator; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.HRegionLocation; 050import org.apache.hadoop.hbase.HTableDescriptor; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.PrivateCellUtil; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Connection; 057import org.apache.hadoop.hbase.client.ConnectionFactory; 058import org.apache.hadoop.hbase.client.Put; 059import org.apache.hadoop.hbase.client.RegionLocator; 060import org.apache.hadoop.hbase.client.Table; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.fs.HFileSystem; 063import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 064import org.apache.hadoop.hbase.io.compress.Compression; 065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 067import org.apache.hadoop.hbase.io.hfile.CacheConfig; 068import org.apache.hadoop.hbase.io.hfile.HFile; 069import org.apache.hadoop.hbase.io.hfile.HFileContext; 070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 071import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 072import org.apache.hadoop.hbase.regionserver.BloomType; 073import org.apache.hadoop.hbase.regionserver.HStore; 074import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 075import org.apache.hadoop.hbase.util.BloomFilterUtil; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 078import org.apache.hadoop.hbase.util.FSUtils; 079import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 080import org.apache.hadoop.io.NullWritable; 081import org.apache.hadoop.io.SequenceFile; 082import org.apache.hadoop.io.Text; 083import org.apache.hadoop.mapreduce.Job; 084import org.apache.hadoop.mapreduce.OutputCommitter; 085import org.apache.hadoop.mapreduce.OutputFormat; 086import org.apache.hadoop.mapreduce.RecordWriter; 087import org.apache.hadoop.mapreduce.TaskAttemptContext; 088import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 089import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 090import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 091import org.apache.yetus.audience.InterfaceAudience; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 096 097/** 098 * Writes HFiles. Passed Cells must arrive in order. 099 * Writes current time as the sequence id for the file. Sets the major compacted 100 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll 101 * all HFiles being written. 102 * <p> 103 * Using this class as part of a MapReduce job is best done 104 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 105 */ 106@InterfaceAudience.Public 107public class HFileOutputFormat2 108 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 109 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 110 static class TableInfo { 111 private TableDescriptor tableDesctiptor; 112 private RegionLocator regionLocator; 113 114 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 115 this.tableDesctiptor = tableDesctiptor; 116 this.regionLocator = regionLocator; 117 } 118 119 /** 120 * The modification for the returned HTD doesn't affect the inner TD. 121 * @return A clone of inner table descriptor 122 * @deprecated use {@link #getTableDescriptor} 123 */ 124 @Deprecated 125 public HTableDescriptor getHTableDescriptor() { 126 return new HTableDescriptor(tableDesctiptor); 127 } 128 129 public TableDescriptor getTableDescriptor() { 130 return tableDesctiptor; 131 } 132 133 public RegionLocator getRegionLocator() { 134 return regionLocator; 135 } 136 } 137 138 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 139 140 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 141 return Bytes.add(tableName, tableSeparator, suffix); 142 } 143 144 // The following constants are private since these are used by 145 // HFileOutputFormat2 to internally transfer data between job setup and 146 // reducer run using conf. 147 // These should not be changed by the client. 148 static final String COMPRESSION_FAMILIES_CONF_KEY = 149 "hbase.hfileoutputformat.families.compression"; 150 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = 151 "hbase.hfileoutputformat.families.bloomtype"; 152 static final String BLOOM_PARAM_FAMILIES_CONF_KEY = 153 "hbase.hfileoutputformat.families.bloomparam"; 154 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = 155 "hbase.mapreduce.hfileoutputformat.blocksize"; 156 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 157 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 158 159 // This constant is public since the client can modify this when setting 160 // up their conf object and thus refer to this symbol. 161 // It is present for backwards compatibility reasons. Use it only to 162 // override the auto-detection of datablock encoding and compression. 163 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 164 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 165 public static final String COMPRESSION_OVERRIDE_CONF_KEY = 166 "hbase.mapreduce.hfileoutputformat.compression"; 167 168 /** 169 * Keep locality while generating HFiles for bulkload. See HBASE-12596 170 */ 171 public static final String LOCALITY_SENSITIVE_CONF_KEY = 172 "hbase.bulkload.locality.sensitive.enabled"; 173 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 174 static final String OUTPUT_TABLE_NAME_CONF_KEY = 175 "hbase.mapreduce.hfileoutputformat.table.name"; 176 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 177 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 178 179 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 180 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 181 182 @Override 183 public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( 184 final TaskAttemptContext context) throws IOException, InterruptedException { 185 return createRecordWriter(context, this.getOutputCommitter(context)); 186 } 187 188 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 189 return combineTableNameSuffix(tableName, family); 190 } 191 192 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> 193 createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer) 194 throws IOException { 195 196 // Get the path of the temporary output file 197 final Path outputDir = ((FileOutputCommitter)committer).getWorkPath(); 198 final Configuration conf = context.getConfiguration(); 199 final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ; 200 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 201 if (writeTableNames==null || writeTableNames.isEmpty()) { 202 throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY 203 + " cannot be empty"); 204 } 205 final FileSystem fs = outputDir.getFileSystem(conf); 206 // These configs. are from hbase-*.xml 207 final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, 208 HConstants.DEFAULT_MAX_FILE_SIZE); 209 // Invented config. Add to hbase-*.xml if other than default compression. 210 final String defaultCompressionStr = conf.get("hfile.compression", 211 Compression.Algorithm.NONE.getName()); 212 final Algorithm defaultCompression = HFileWriterImpl 213 .compressionByName(defaultCompressionStr); 214 String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); 215 final Algorithm overriddenCompression; 216 if (compressionStr != null) { 217 overriddenCompression = Compression.getCompressionAlgorithmByName(compressionStr); 218 } else { 219 overriddenCompression = null; 220 } 221 final boolean compactionExclude = conf.getBoolean( 222 "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 223 224 final Set<String> allTableNames = Arrays.stream(writeTableNames.split( 225 Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 226 227 // create a map from column family to the compression algorithm 228 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 229 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 230 final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf); 231 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 232 233 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 234 final Map<byte[], DataBlockEncoding> datablockEncodingMap 235 = createFamilyDataBlockEncodingMap(conf); 236 final DataBlockEncoding overriddenEncoding; 237 if (dataBlockEncodingStr != null) { 238 overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); 239 } else { 240 overriddenEncoding = null; 241 } 242 243 return new RecordWriter<ImmutableBytesWritable, V>() { 244 // Map of families to writers and how much has been output on the writer. 245 private final Map<byte[], WriterLength> writers = 246 new TreeMap<>(Bytes.BYTES_COMPARATOR); 247 private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; 248 private final long now = EnvironmentEdgeManager.currentTime(); 249 private boolean rollRequested = false; 250 251 @Override 252 public void write(ImmutableBytesWritable row, V cell) 253 throws IOException { 254 Cell kv = cell; 255 // null input == user explicitly wants to flush 256 if (row == null && kv == null) { 257 rollWriters(null); 258 return; 259 } 260 261 byte[] rowKey = CellUtil.cloneRow(kv); 262 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 263 byte[] family = CellUtil.cloneFamily(kv); 264 byte[] tableNameBytes = null; 265 if (writeMultipleTables) { 266 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 267 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 268 throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) + 269 "' not" + " expected"); 270 } 271 } else { 272 tableNameBytes = Bytes.toBytes(writeTableNames); 273 } 274 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 275 WriterLength wl = this.writers.get(tableAndFamily); 276 277 // If this is a new column family, verify that the directory exists 278 if (wl == null) { 279 Path writerPath = null; 280 if (writeMultipleTables) { 281 writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes 282 .toString(family))); 283 } 284 else { 285 writerPath = new Path(outputDir, Bytes.toString(family)); 286 } 287 fs.mkdirs(writerPath); 288 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 289 } 290 291 if (wl != null && wl.written + length >= maxsize) { 292 this.rollRequested = true; 293 } 294 295 // This can only happen once a row is finished though 296 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { 297 rollWriters(wl); 298 } 299 300 // create a new WAL writer, if necessary 301 if (wl == null || wl.writer == null) { 302 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 303 HRegionLocation loc = null; 304 305 String tableName = Bytes.toString(tableNameBytes); 306 if (tableName != null) { 307 try (Connection connection = ConnectionFactory.createConnection(conf); 308 RegionLocator locator = 309 connection.getRegionLocator(TableName.valueOf(tableName))) { 310 loc = locator.getRegionLocation(rowKey); 311 } catch (Throwable e) { 312 LOG.warn("There's something wrong when locating rowkey: " + 313 Bytes.toString(rowKey) + " for tablename: " + tableName, e); 314 loc = null; 315 } } 316 317 if (null == loc) { 318 if (LOG.isTraceEnabled()) { 319 LOG.trace("failed to get region location, so use default writer for rowkey: " + 320 Bytes.toString(rowKey)); 321 } 322 wl = getNewWriter(tableNameBytes, family, conf, null); 323 } else { 324 if (LOG.isDebugEnabled()) { 325 LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); 326 } 327 InetSocketAddress initialIsa = 328 new InetSocketAddress(loc.getHostname(), loc.getPort()); 329 if (initialIsa.isUnresolved()) { 330 if (LOG.isTraceEnabled()) { 331 LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" 332 + loc.getPort() + ", so use default writer"); 333 } 334 wl = getNewWriter(tableNameBytes, family, conf, null); 335 } else { 336 if (LOG.isDebugEnabled()) { 337 LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); 338 } 339 wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa 340 }); 341 } 342 } 343 } else { 344 wl = getNewWriter(tableNameBytes, family, conf, null); 345 } 346 } 347 348 // we now have the proper WAL writer. full steam ahead 349 PrivateCellUtil.updateLatestStamp(cell, this.now); 350 wl.writer.append(kv); 351 wl.written += length; 352 353 // Copy the row so we know when a row transition. 354 this.previousRow = rowKey; 355 } 356 357 private void rollWriters(WriterLength writerLength) throws IOException { 358 if (writerLength != null) { 359 closeWriter(writerLength); 360 } else { 361 for (WriterLength wl : this.writers.values()) { 362 closeWriter(wl); 363 } 364 } 365 this.rollRequested = false; 366 } 367 368 private void closeWriter(WriterLength wl) throws IOException { 369 if (wl.writer != null) { 370 LOG.info( 371 "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written)); 372 close(wl.writer); 373 } 374 wl.writer = null; 375 wl.written = 0; 376 } 377 378 /* 379 * Create a new StoreFile.Writer. 380 * @param family 381 * @return A WriterLength, containing a new StoreFile.Writer. 382 * @throws IOException 383 */ 384 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", 385 justification="Not important") 386 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration 387 conf, InetSocketAddress[] favoredNodes) throws IOException { 388 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 389 Path familydir = new Path(outputDir, Bytes.toString(family)); 390 if (writeMultipleTables) { 391 familydir = new Path(outputDir, 392 new Path(Bytes.toString(tableName), Bytes.toString(family))); 393 } 394 WriterLength wl = new WriterLength(); 395 Algorithm compression = overriddenCompression; 396 compression = compression == null ? compressionMap.get(tableAndFamily) : compression; 397 compression = compression == null ? defaultCompression : compression; 398 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 399 bloomType = bloomType == null ? BloomType.NONE : bloomType; 400 String bloomParam = bloomParamMap.get(tableAndFamily); 401 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 402 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); 403 } 404 Integer blockSize = blockSizeMap.get(tableAndFamily); 405 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 406 DataBlockEncoding encoding = overriddenEncoding; 407 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 408 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 409 HFileContextBuilder contextBuilder = new HFileContextBuilder() 410 .withCompression(compression) 411 .withChecksumType(HStore.getChecksumType(conf)) 412 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) 413 .withBlockSize(blockSize); 414 415 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 416 contextBuilder.withIncludesTags(true); 417 } 418 419 contextBuilder.withDataBlockEncoding(encoding); 420 HFileContext hFileContext = contextBuilder.build(); 421 if (null == favoredNodes) { 422 wl.writer = 423 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs) 424 .withOutputDir(familydir).withBloomType(bloomType) 425 .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build(); 426 } else { 427 wl.writer = 428 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 429 .withOutputDir(familydir).withBloomType(bloomType) 430 .withComparator(CellComparator.getInstance()).withFileContext(hFileContext) 431 .withFavoredNodes(favoredNodes).build(); 432 } 433 434 this.writers.put(tableAndFamily, wl); 435 return wl; 436 } 437 438 private void close(final StoreFileWriter w) throws IOException { 439 if (w != null) { 440 w.appendFileInfo(BULKLOAD_TIME_KEY, 441 Bytes.toBytes(System.currentTimeMillis())); 442 w.appendFileInfo(BULKLOAD_TASK_KEY, 443 Bytes.toBytes(context.getTaskAttemptID().toString())); 444 w.appendFileInfo(MAJOR_COMPACTION_KEY, 445 Bytes.toBytes(true)); 446 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, 447 Bytes.toBytes(compactionExclude)); 448 w.appendTrackedTimestampsToMetadata(); 449 w.close(); 450 } 451 } 452 453 @Override 454 public void close(TaskAttemptContext c) 455 throws IOException, InterruptedException { 456 for (WriterLength wl: this.writers.values()) { 457 close(wl.writer); 458 } 459 } 460 }; 461 } 462 463 /** 464 * Configure block storage policy for CF after the directory is created. 465 */ 466 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 467 byte[] tableAndFamily, Path cfPath) { 468 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 469 return; 470 } 471 472 String policy = 473 conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 474 conf.get(STORAGE_POLICY_PROPERTY)); 475 FSUtils.setStoragePolicy(fs, cfPath, policy); 476 } 477 478 /* 479 * Data structure to hold a Writer and amount of data written on it. 480 */ 481 static class WriterLength { 482 long written = 0; 483 StoreFileWriter writer = null; 484 } 485 486 /** 487 * Return the start keys of all of the regions in this table, 488 * as a list of ImmutableBytesWritable. 489 */ 490 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 491 boolean writeMultipleTables) 492 throws IOException { 493 494 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 495 for(RegionLocator regionLocator : regionLocators) 496 { 497 TableName tableName = regionLocator.getName(); 498 LOG.info("Looking up current regions for table " + tableName); 499 byte[][] byteKeys = regionLocator.getStartKeys(); 500 for (byte[] byteKey : byteKeys) { 501 byte[] fullKey = byteKey; //HFileOutputFormat2 use case 502 if (writeMultipleTables) 503 { 504 //MultiTableHFileOutputFormat use case 505 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 506 } 507 if (LOG.isDebugEnabled()) { 508 LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary 509 (fullKey) + "]"); 510 } 511 ret.add(new ImmutableBytesWritable(fullKey)); 512 } 513 } 514 return ret; 515 } 516 517 /** 518 * Write out a {@link SequenceFile} that can be read by 519 * {@link TotalOrderPartitioner} that contains the split points in startKeys. 520 */ 521 @SuppressWarnings("deprecation") 522 private static void writePartitions(Configuration conf, Path partitionsPath, 523 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 524 LOG.info("Writing partition information to " + partitionsPath); 525 if (startKeys.isEmpty()) { 526 throw new IllegalArgumentException("No regions passed"); 527 } 528 529 // We're generating a list of split points, and we don't ever 530 // have keys < the first region (which has an empty start key) 531 // so we need to remove it. Otherwise we would end up with an 532 // empty reducer with index 0 533 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 534 ImmutableBytesWritable first = sorted.first(); 535 if (writeMultipleTables) { 536 first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first 537 ().get())); 538 } 539 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 540 throw new IllegalArgumentException( 541 "First region of table should have empty start key. Instead has: " 542 + Bytes.toStringBinary(first.get())); 543 } 544 sorted.remove(sorted.first()); 545 546 // Write the actual file 547 FileSystem fs = partitionsPath.getFileSystem(conf); 548 SequenceFile.Writer writer = SequenceFile.createWriter( 549 fs, conf, partitionsPath, ImmutableBytesWritable.class, 550 NullWritable.class); 551 552 try { 553 for (ImmutableBytesWritable startKey : sorted) { 554 writer.append(startKey, NullWritable.get()); 555 } 556 } finally { 557 writer.close(); 558 } 559 } 560 561 /** 562 * Configure a MapReduce Job to perform an incremental load into the given 563 * table. This 564 * <ul> 565 * <li>Inspects the table to configure a total order partitioner</li> 566 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 567 * <li>Sets the number of reduce tasks to match the current number of regions</li> 568 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 569 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 570 * PutSortReducer)</li> 571 * </ul> 572 * The user should be sure to set the map output value class to either KeyValue or Put before 573 * running this function. 574 */ 575 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 576 throws IOException { 577 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 578 } 579 580 /** 581 * Configure a MapReduce Job to perform an incremental load into the given 582 * table. This 583 * <ul> 584 * <li>Inspects the table to configure a total order partitioner</li> 585 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 586 * <li>Sets the number of reduce tasks to match the current number of regions</li> 587 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 588 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 589 * PutSortReducer)</li> 590 * </ul> 591 * The user should be sure to set the map output value class to either KeyValue or Put before 592 * running this function. 593 */ 594 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 595 RegionLocator regionLocator) throws IOException { 596 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 597 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 598 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 599 } 600 601 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 602 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 603 Configuration conf = job.getConfiguration(); 604 job.setOutputKeyClass(ImmutableBytesWritable.class); 605 job.setOutputValueClass(MapReduceExtendedCell.class); 606 job.setOutputFormatClass(cls); 607 608 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 609 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 610 } 611 boolean writeMultipleTables = false; 612 if (MultiTableHFileOutputFormat.class.equals(cls)) { 613 writeMultipleTables = true; 614 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 615 } 616 // Based on the configured map output class, set the correct reducer to properly 617 // sort the incoming values. 618 // TODO it would be nice to pick one or the other of these formats. 619 if (KeyValue.class.equals(job.getMapOutputValueClass()) 620 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) { 621 job.setReducerClass(CellSortReducer.class); 622 } else if (Put.class.equals(job.getMapOutputValueClass())) { 623 job.setReducerClass(PutSortReducer.class); 624 } else if (Text.class.equals(job.getMapOutputValueClass())) { 625 job.setReducerClass(TextSortReducer.class); 626 } else { 627 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 628 } 629 630 conf.setStrings("io.serializations", conf.get("io.serializations"), 631 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 632 CellSerialization.class.getName()); 633 634 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 635 LOG.info("bulkload locality sensitive enabled"); 636 } 637 638 /* Now get the region start keys for every table required */ 639 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 640 List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size()); 641 List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size()); 642 643 for( TableInfo tableInfo : multiTableInfo ) 644 { 645 regionLocators.add(tableInfo.getRegionLocator()); 646 allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString()); 647 tableDescriptors.add(tableInfo.getTableDescriptor()); 648 } 649 // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table 650 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes 651 .toString(tableSeparator))); 652 List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); 653 // Use table's region boundaries for TOP split points. 654 LOG.info("Configuring " + startKeys.size() + " reduce partitions " + 655 "to match current region count for all tables"); 656 job.setNumReduceTasks(startKeys.size()); 657 658 configurePartitioner(job, startKeys, writeMultipleTables); 659 // Set compression algorithms based on column families 660 661 conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, 662 tableDescriptors)); 663 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails, 664 tableDescriptors)); 665 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, 666 tableDescriptors)); 667 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails, 668 tableDescriptors)); 669 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 670 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 671 672 TableMapReduceUtil.addDependencyJars(job); 673 TableMapReduceUtil.initCredentials(job); 674 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 675 } 676 677 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws 678 IOException { 679 Configuration conf = job.getConfiguration(); 680 681 job.setOutputKeyClass(ImmutableBytesWritable.class); 682 job.setOutputValueClass(MapReduceExtendedCell.class); 683 job.setOutputFormatClass(HFileOutputFormat2.class); 684 685 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 686 singleTableDescriptor.add(tableDescriptor); 687 688 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 689 // Set compression algorithms based on column families 690 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 691 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 692 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 693 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 694 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 695 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 696 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 697 serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); 698 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 699 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 700 701 TableMapReduceUtil.addDependencyJars(job); 702 TableMapReduceUtil.initCredentials(job); 703 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 704 } 705 706 /** 707 * Runs inside the task to deserialize column family to compression algorithm 708 * map from the configuration. 709 * 710 * @param conf to read the serialized values from 711 * @return a map from column family to the configured compression algorithm 712 */ 713 @VisibleForTesting 714 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration 715 conf) { 716 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 717 COMPRESSION_FAMILIES_CONF_KEY); 718 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 719 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 720 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 721 compressionMap.put(e.getKey(), algorithm); 722 } 723 return compressionMap; 724 } 725 726 /** 727 * Runs inside the task to deserialize column family to bloom filter type 728 * map from the configuration. 729 * 730 * @param conf to read the serialized values from 731 * @return a map from column family to the the configured bloom filter type 732 */ 733 @VisibleForTesting 734 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 735 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 736 BLOOM_TYPE_FAMILIES_CONF_KEY); 737 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 738 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 739 BloomType bloomType = BloomType.valueOf(e.getValue()); 740 bloomTypeMap.put(e.getKey(), bloomType); 741 } 742 return bloomTypeMap; 743 } 744 745 /** 746 * Runs inside the task to deserialize column family to bloom filter param 747 * map from the configuration. 748 * 749 * @param conf to read the serialized values from 750 * @return a map from column family to the the configured bloom filter param 751 */ 752 @VisibleForTesting 753 static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) { 754 return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); 755 } 756 757 758 /** 759 * Runs inside the task to deserialize column family to block size 760 * map from the configuration. 761 * 762 * @param conf to read the serialized values from 763 * @return a map from column family to the configured block size 764 */ 765 @VisibleForTesting 766 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 767 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 768 BLOCK_SIZE_FAMILIES_CONF_KEY); 769 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 770 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 771 Integer blockSize = Integer.parseInt(e.getValue()); 772 blockSizeMap.put(e.getKey(), blockSize); 773 } 774 return blockSizeMap; 775 } 776 777 /** 778 * Runs inside the task to deserialize column family to data block encoding 779 * type map from the configuration. 780 * 781 * @param conf to read the serialized values from 782 * @return a map from column family to HFileDataBlockEncoder for the 783 * configured data block type for the family 784 */ 785 @VisibleForTesting 786 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap( 787 Configuration conf) { 788 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, 789 DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 790 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 791 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 792 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 793 } 794 return encoderMap; 795 } 796 797 798 /** 799 * Run inside the task to deserialize column family to given conf value map. 800 * 801 * @param conf to read the serialized values from 802 * @param confName conf key to read from the configuration 803 * @return a map of column family to the given configuration value 804 */ 805 private static Map<byte[], String> createFamilyConfValueMap( 806 Configuration conf, String confName) { 807 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 808 String confVal = conf.get(confName, ""); 809 for (String familyConf : confVal.split("&")) { 810 String[] familySplit = familyConf.split("="); 811 if (familySplit.length != 2) { 812 continue; 813 } 814 try { 815 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 816 URLDecoder.decode(familySplit[1], "UTF-8")); 817 } catch (UnsupportedEncodingException e) { 818 // will not happen with UTF-8 encoding 819 throw new AssertionError(e); 820 } 821 } 822 return confValMap; 823 } 824 825 /** 826 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 827 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 828 */ 829 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean 830 writeMultipleTables) 831 throws IOException { 832 Configuration conf = job.getConfiguration(); 833 // create the partitions file 834 FileSystem fs = FileSystem.get(conf); 835 String hbaseTmpFsDir = 836 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 837 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 838 Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); 839 fs.makeQualified(partitionsPath); 840 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 841 fs.deleteOnExit(partitionsPath); 842 843 // configure job to use it 844 job.setPartitionerClass(TotalOrderPartitioner.class); 845 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 846 } 847 848 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 849 @VisibleForTesting 850 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables) 851 throws UnsupportedEncodingException { 852 StringBuilder attributeValue = new StringBuilder(); 853 int i = 0; 854 for (TableDescriptor tableDescriptor : allTables) { 855 if (tableDescriptor == null) { 856 // could happen with mock table instance 857 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 858 return ""; 859 } 860 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 861 if (i++ > 0) { 862 attributeValue.append('&'); 863 } 864 attributeValue.append(URLEncoder.encode( 865 Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())), 866 "UTF-8")); 867 attributeValue.append('='); 868 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 869 } 870 } 871 // Get rid of the last ampersand 872 return attributeValue.toString(); 873 } 874 875 /** 876 * Serialize column family to compression algorithm map to configuration. 877 * Invoked while configuring the MR job for incremental load. 878 * 879 * @param tableDescriptor to read the properties from 880 * @param conf to persist serialized values into 881 * @throws IOException 882 * on failure to read column family descriptors 883 */ 884 @VisibleForTesting 885 static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor -> 886 familyDescriptor.getCompressionType().getName(); 887 888 /** 889 * Serialize column family to block size map to configuration. Invoked while 890 * configuring the MR job for incremental load. 891 * 892 * @param tableDescriptor 893 * to read the properties from 894 * @param conf 895 * to persist serialized values into 896 * 897 * @throws IOException 898 * on failure to read column family descriptors 899 */ 900 @VisibleForTesting 901 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String 902 .valueOf(familyDescriptor.getBlocksize()); 903 904 /** 905 * Serialize column family to bloom type map to configuration. Invoked while 906 * configuring the MR job for incremental load. 907 * 908 * @param tableDescriptor 909 * to read the properties from 910 * @param conf 911 * to persist serialized values into 912 * 913 * @throws IOException 914 * on failure to read column family descriptors 915 */ 916 @VisibleForTesting 917 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 918 String bloomType = familyDescriptor.getBloomFilterType().toString(); 919 if (bloomType == null) { 920 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 921 } 922 return bloomType; 923 }; 924 925 /** 926 * Serialize column family to bloom param map to configuration. Invoked while 927 * configuring the MR job for incremental load. 928 * 929 * @param tableDescriptor 930 * to read the properties from 931 * @param conf 932 * to persist serialized values into 933 * 934 * @throws IOException 935 * on failure to read column family descriptors 936 */ 937 @VisibleForTesting 938 static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> { 939 BloomType bloomType = familyDescriptor.getBloomFilterType(); 940 String bloomParam = ""; 941 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 942 bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); 943 } 944 return bloomParam; 945 }; 946 947 /** 948 * Serialize column family to data block encoding map to configuration. 949 * Invoked while configuring the MR job for incremental load. 950 * 951 * @param tableDescriptor 952 * to read the properties from 953 * @param conf 954 * to persist serialized values into 955 * @throws IOException 956 * on failure to read column family descriptors 957 */ 958 @VisibleForTesting 959 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 960 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 961 if (encoding == null) { 962 encoding = DataBlockEncoding.NONE; 963 } 964 return encoding.toString(); 965 }; 966 967}