001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.snapshot;
020
021import java.io.BufferedInputStream;
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.function.BiConsumer;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileChecksum;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.fs.permission.FsPermission;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.io.FileLink;
051import org.apache.hadoop.hbase.io.HFileLink;
052import org.apache.hadoop.hbase.io.WALLink;
053import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
054import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
055import org.apache.hadoop.hbase.mob.MobUtils;
056import org.apache.hadoop.hbase.util.AbstractHBaseTool;
057import org.apache.hadoop.hbase.util.FSUtils;
058import org.apache.hadoop.hbase.util.HFileArchiveUtil;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.io.BytesWritable;
061import org.apache.hadoop.io.IOUtils;
062import org.apache.hadoop.io.NullWritable;
063import org.apache.hadoop.io.Writable;
064import org.apache.hadoop.mapreduce.InputFormat;
065import org.apache.hadoop.mapreduce.InputSplit;
066import org.apache.hadoop.mapreduce.Job;
067import org.apache.hadoop.mapreduce.JobContext;
068import org.apache.hadoop.mapreduce.Mapper;
069import org.apache.hadoop.mapreduce.RecordReader;
070import org.apache.hadoop.mapreduce.TaskAttemptContext;
071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
072import org.apache.hadoop.mapreduce.security.TokenCache;
073import org.apache.hadoop.util.StringUtils;
074import org.apache.hadoop.util.Tool;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
079import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
083
084/**
085 * Export the specified snapshot to a given FileSystem.
086 *
087 * The .snapshot/name folder is copied to the destination cluster
088 * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
089 * When everything is done, the second cluster can restore the snapshot.
090 */
091@InterfaceAudience.Public
092public class ExportSnapshot extends AbstractHBaseTool implements Tool {
093  public static final String NAME = "exportsnapshot";
094  /** Configuration prefix for overrides for the source filesystem */
095  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
096  /** Configuration prefix for overrides for the destination filesystem */
097  public static final String CONF_DEST_PREFIX = NAME + ".to.";
098
099  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
100
101  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
102  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
103  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
104  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
105  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
106  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
107  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
108  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
109  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
110  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
111  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
112  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
113  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
114  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
115  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
116  private static final String CONF_COPY_MANIFEST_THREADS =
117      "snapshot.export.copy.references.threads";
118  private static final int DEFAULT_COPY_MANIFEST_THREADS =
119      Runtime.getRuntime().availableProcessors();
120
121  static class Testing {
122    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
123    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
124    int failuresCountToInject = 0;
125    int injectedFailureCount = 0;
126  }
127
128  // Command line options and defaults.
129  static final class Options {
130    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
131    static final Option TARGET_NAME = new Option(null, "target", true,
132        "Target name for the snapshot.");
133    static final Option COPY_TO = new Option(null, "copy-to", true, "Remote "
134        + "destination hdfs://");
135    static final Option COPY_FROM = new Option(null, "copy-from", true,
136        "Input folder hdfs:// (default hbase.rootdir)");
137    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
138        "Do not verify checksum, use name+length only.");
139    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
140        "Do not verify the integrity of the exported snapshot.");
141    static final Option OVERWRITE = new Option(null, "overwrite", false,
142        "Rewrite the snapshot manifest if already exists.");
143    static final Option CHUSER = new Option(null, "chuser", true,
144        "Change the owner of the files to the specified one.");
145    static final Option CHGROUP = new Option(null, "chgroup", true,
146        "Change the group of the files to the specified one.");
147    static final Option CHMOD = new Option(null, "chmod", true,
148        "Change the permission of the files to the specified one.");
149    static final Option MAPPERS = new Option(null, "mappers", true,
150        "Number of mappers to use during the copy (mapreduce.job.maps).");
151    static final Option BANDWIDTH = new Option(null, "bandwidth", true,
152        "Limit bandwidth to this value in MB/second.");
153  }
154
155  // Export Map-Reduce Counters, to keep track of the progress
156  public enum Counter {
157    MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
158    BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
159  }
160
161  private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
162                                                   NullWritable, NullWritable> {
163    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
164    final static int REPORT_SIZE = 1 * 1024 * 1024;
165    final static int BUFFER_SIZE = 64 * 1024;
166
167    private boolean verifyChecksum;
168    private String filesGroup;
169    private String filesUser;
170    private short filesMode;
171    private int bufferSize;
172
173    private FileSystem outputFs;
174    private Path outputArchive;
175    private Path outputRoot;
176
177    private FileSystem inputFs;
178    private Path inputArchive;
179    private Path inputRoot;
180
181    private static Testing testing = new Testing();
182
183    @Override
184    public void setup(Context context) throws IOException {
185      Configuration conf = context.getConfiguration();
186
187      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
188      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
189
190      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
191
192      filesGroup = conf.get(CONF_FILES_GROUP);
193      filesUser = conf.get(CONF_FILES_USER);
194      filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
195      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
196      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
197
198      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
199      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
200
201      try {
202        srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
203        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
204      } catch (IOException e) {
205        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
206      }
207
208      try {
209        destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
210        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
211      } catch (IOException e) {
212        throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
213      }
214
215      // Use the default block size of the outputFs if bigger
216      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
217      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
218      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
219
220      for (Counter c : Counter.values()) {
221        context.getCounter(c).increment(0);
222      }
223      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
224        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
225        // Get number of times we have already injected failure based on attempt number of this
226        // task.
227        testing.injectedFailureCount = context.getTaskAttemptID().getId();
228      }
229    }
230
231    @Override
232    protected void cleanup(Context context) {
233      IOUtils.closeStream(inputFs);
234      IOUtils.closeStream(outputFs);
235    }
236
237    @Override
238    public void map(BytesWritable key, NullWritable value, Context context)
239        throws InterruptedException, IOException {
240      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
241      Path outputPath = getOutputPath(inputInfo);
242
243      copyFile(context, inputInfo, outputPath);
244    }
245
246    /**
247     * Returns the location where the inputPath will be copied.
248     */
249    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
250      Path path = null;
251      switch (inputInfo.getType()) {
252        case HFILE:
253          Path inputPath = new Path(inputInfo.getHfile());
254          String family = inputPath.getParent().getName();
255          TableName table =HFileLink.getReferencedTableName(inputPath.getName());
256          String region = HFileLink.getReferencedRegionName(inputPath.getName());
257          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
258          path = new Path(FSUtils.getTableDir(new Path("./"), table),
259              new Path(region, new Path(family, hfile)));
260          break;
261        case WAL:
262          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
263          break;
264        default:
265          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
266      }
267      return new Path(outputArchive, path);
268    }
269
270    /**
271     * Used by TestExportSnapshot to test for retries when failures happen.
272     * Failure is injected in {@link #copyFile(Context, SnapshotFileInfo, Path)}.
273     */
274    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
275        throws IOException {
276      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
277      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
278      testing.injectedFailureCount++;
279      context.getCounter(Counter.COPY_FAILED).increment(1);
280      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
281      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
282          testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
283    }
284
285    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
286        final Path outputPath) throws IOException {
287      // Get the file information
288      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
289
290      // Verify if the output file exists and is the same that we want to copy
291      if (outputFs.exists(outputPath)) {
292        FileStatus outputStat = outputFs.getFileStatus(outputPath);
293        if (outputStat != null && sameFile(inputStat, outputStat)) {
294          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
295          context.getCounter(Counter.FILES_SKIPPED).increment(1);
296          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
297          return;
298        }
299      }
300
301      InputStream in = openSourceFile(context, inputInfo);
302      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
303      if (Integer.MAX_VALUE != bandwidthMB) {
304        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
305      }
306
307      try {
308        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
309
310        // Ensure that the output folder is there and copy the file
311        createOutputPath(outputPath.getParent());
312        FSDataOutputStream out = outputFs.create(outputPath, true);
313        try {
314          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
315        } finally {
316          out.close();
317        }
318
319        // Try to Preserve attributes
320        if (!preserveAttributes(outputPath, inputStat)) {
321          LOG.warn("You may have to run manually chown on: " + outputPath);
322        }
323      } finally {
324        in.close();
325        injectTestFailure(context, inputInfo);
326      }
327    }
328
329    /**
330     * Create the output folder and optionally set ownership.
331     */
332    private void createOutputPath(final Path path) throws IOException {
333      if (filesUser == null && filesGroup == null) {
334        outputFs.mkdirs(path);
335      } else {
336        Path parent = path.getParent();
337        if (!outputFs.exists(parent) && !parent.isRoot()) {
338          createOutputPath(parent);
339        }
340        outputFs.mkdirs(path);
341        if (filesUser != null || filesGroup != null) {
342          // override the owner when non-null user/group is specified
343          outputFs.setOwner(path, filesUser, filesGroup);
344        }
345        if (filesMode > 0) {
346          outputFs.setPermission(path, new FsPermission(filesMode));
347        }
348      }
349    }
350
351    /**
352     * Try to Preserve the files attribute selected by the user copying them from the source file
353     * This is only required when you are exporting as a different user than "hbase" or on a system
354     * that doesn't have the "hbase" user.
355     *
356     * This is not considered a blocking failure since the user can force a chmod with the user
357     * that knows is available on the system.
358     */
359    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
360      FileStatus stat;
361      try {
362        stat = outputFs.getFileStatus(path);
363      } catch (IOException e) {
364        LOG.warn("Unable to get the status for file=" + path);
365        return false;
366      }
367
368      try {
369        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
370          outputFs.setPermission(path, new FsPermission(filesMode));
371        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
372          outputFs.setPermission(path, refStat.getPermission());
373        }
374      } catch (IOException e) {
375        LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
376        return false;
377      }
378
379      boolean hasRefStat = (refStat != null);
380      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
381      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
382      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
383        try {
384          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
385            outputFs.setOwner(path, user, group);
386          }
387        } catch (IOException e) {
388          LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
389          LOG.warn("The user/group may not exist on the destination cluster: user=" +
390                   user + " group=" + group);
391          return false;
392        }
393      }
394
395      return true;
396    }
397
398    private boolean stringIsNotEmpty(final String str) {
399      return str != null && str.length() > 0;
400    }
401
402    private void copyData(final Context context,
403        final Path inputPath, final InputStream in,
404        final Path outputPath, final FSDataOutputStream out,
405        final long inputFileSize)
406        throws IOException {
407      final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
408                                   " (%.1f%%)";
409
410      try {
411        byte[] buffer = new byte[bufferSize];
412        long totalBytesWritten = 0;
413        int reportBytes = 0;
414        int bytesRead;
415
416        long stime = System.currentTimeMillis();
417        while ((bytesRead = in.read(buffer)) > 0) {
418          out.write(buffer, 0, bytesRead);
419          totalBytesWritten += bytesRead;
420          reportBytes += bytesRead;
421
422          if (reportBytes >= REPORT_SIZE) {
423            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
424            context.setStatus(String.format(statusMessage,
425                              StringUtils.humanReadableInt(totalBytesWritten),
426                              (totalBytesWritten/(float)inputFileSize) * 100.0f) +
427                              " from " + inputPath + " to " + outputPath);
428            reportBytes = 0;
429          }
430        }
431        long etime = System.currentTimeMillis();
432
433        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
434        context.setStatus(String.format(statusMessage,
435                          StringUtils.humanReadableInt(totalBytesWritten),
436                          (totalBytesWritten/(float)inputFileSize) * 100.0f) +
437                          " from " + inputPath + " to " + outputPath);
438
439        // Verify that the written size match
440        if (totalBytesWritten != inputFileSize) {
441          String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
442                       " expected=" + inputFileSize + " for file=" + inputPath;
443          throw new IOException(msg);
444        }
445
446        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
447        LOG.info("size=" + totalBytesWritten +
448            " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
449            " time=" + StringUtils.formatTimeDiff(etime, stime) +
450            String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
451        context.getCounter(Counter.FILES_COPIED).increment(1);
452      } catch (IOException e) {
453        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
454        context.getCounter(Counter.COPY_FAILED).increment(1);
455        throw e;
456      }
457    }
458
459    /**
460     * Try to open the "source" file.
461     * Throws an IOException if the communication with the inputFs fail or
462     * if the file is not found.
463     */
464    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
465            throws IOException {
466      try {
467        Configuration conf = context.getConfiguration();
468        FileLink link = null;
469        switch (fileInfo.getType()) {
470          case HFILE:
471            Path inputPath = new Path(fileInfo.getHfile());
472            link = getFileLink(inputPath, conf);
473            break;
474          case WAL:
475            String serverName = fileInfo.getWalServer();
476            String logName = fileInfo.getWalName();
477            link = new WALLink(inputRoot, serverName, logName);
478            break;
479          default:
480            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
481        }
482        return link.open(inputFs);
483      } catch (IOException e) {
484        context.getCounter(Counter.MISSING_FILES).increment(1);
485        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
486        throw e;
487      }
488    }
489
490    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
491        throws IOException {
492      try {
493        Configuration conf = context.getConfiguration();
494        FileLink link = null;
495        switch (fileInfo.getType()) {
496          case HFILE:
497            Path inputPath = new Path(fileInfo.getHfile());
498            link = getFileLink(inputPath, conf);
499            break;
500          case WAL:
501            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
502            break;
503          default:
504            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
505        }
506        return link.getFileStatus(inputFs);
507      } catch (FileNotFoundException e) {
508        context.getCounter(Counter.MISSING_FILES).increment(1);
509        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
510        throw e;
511      } catch (IOException e) {
512        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
513        throw e;
514      }
515    }
516
517    private FileLink getFileLink(Path path, Configuration conf) throws IOException{
518      String regionName = HFileLink.getReferencedRegionName(path.getName());
519      TableName tableName = HFileLink.getReferencedTableName(path.getName());
520      if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
521        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
522                HFileArchiveUtil.getArchivePath(conf), path);
523      }
524      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
525    }
526
527    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
528      try {
529        return fs.getFileChecksum(path);
530      } catch (IOException e) {
531        LOG.warn("Unable to get checksum for file=" + path, e);
532        return null;
533      }
534    }
535
536    /**
537     * Check if the two files are equal by looking at the file length,
538     * and at the checksum (if user has specified the verifyChecksum flag).
539     */
540    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
541      // Not matching length
542      if (inputStat.getLen() != outputStat.getLen()) return false;
543
544      // Mark files as equals, since user asked for no checksum verification
545      if (!verifyChecksum) return true;
546
547      // If checksums are not available, files are not the same.
548      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
549      if (inChecksum == null) return false;
550
551      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
552      if (outChecksum == null) return false;
553
554      return inChecksum.equals(outChecksum);
555    }
556  }
557
558  // ==========================================================================
559  //  Input Format
560  // ==========================================================================
561
562  /**
563   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
564   * @return list of files referenced by the snapshot (pair of path and size)
565   */
566  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
567      final FileSystem fs, final Path snapshotDir) throws IOException {
568    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
569
570    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
571    final TableName table = TableName.valueOf(snapshotDesc.getTable());
572
573    // Get snapshot files
574    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
575    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
576      new SnapshotReferenceUtil.SnapshotVisitor() {
577        @Override
578        public void storeFile(final RegionInfo regionInfo, final String family,
579            final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
580          // for storeFile.hasReference() case, copied as part of the manifest
581          if (!storeFile.hasReference()) {
582            String region = regionInfo.getEncodedName();
583            String hfile = storeFile.getName();
584            Path path = HFileLink.createPath(table, region, family, hfile);
585
586            SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
587              .setType(SnapshotFileInfo.Type.HFILE)
588              .setHfile(path.toString())
589              .build();
590
591            long size;
592            if (storeFile.hasFileSize()) {
593              size = storeFile.getFileSize();
594            } else {
595              size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
596            }
597            files.add(new Pair<>(fileInfo, size));
598          }
599        }
600    });
601
602    return files;
603  }
604
605  /**
606   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
607   * The groups created will have similar amounts of bytes.
608   * <p>
609   * The algorithm used is pretty straightforward; the file list is sorted by size,
610   * and then each group fetch the bigger file available, iterating through groups
611   * alternating the direction.
612   */
613  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
614      final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
615    // Sort files by size, from small to big
616    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
617      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
618        long r = a.getSecond() - b.getSecond();
619        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
620      }
621    });
622
623    // create balanced groups
624    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
625    long[] sizeGroups = new long[ngroups];
626    int hi = files.size() - 1;
627    int lo = 0;
628
629    List<Pair<SnapshotFileInfo, Long>> group;
630    int dir = 1;
631    int g = 0;
632
633    while (hi >= lo) {
634      if (g == fileGroups.size()) {
635        group = new LinkedList<>();
636        fileGroups.add(group);
637      } else {
638        group = fileGroups.get(g);
639      }
640
641      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
642
643      // add the hi one
644      sizeGroups[g] += fileInfo.getSecond();
645      group.add(fileInfo);
646
647      // change direction when at the end or the beginning
648      g += dir;
649      if (g == ngroups) {
650        dir = -1;
651        g = ngroups - 1;
652      } else if (g < 0) {
653        dir = 1;
654        g = 0;
655      }
656    }
657
658    if (LOG.isDebugEnabled()) {
659      for (int i = 0; i < sizeGroups.length; ++i) {
660        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
661      }
662    }
663
664    return fileGroups;
665  }
666
667  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
668    @Override
669    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
670        TaskAttemptContext tac) throws IOException, InterruptedException {
671      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
672    }
673
674    @Override
675    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
676      Configuration conf = context.getConfiguration();
677      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
678      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
679
680      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
681      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
682      if (mappers == 0 && snapshotFiles.size() > 0) {
683        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
684        mappers = Math.min(mappers, snapshotFiles.size());
685        conf.setInt(CONF_NUM_SPLITS, mappers);
686        conf.setInt(MR_NUM_MAPS, mappers);
687      }
688
689      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
690      List<InputSplit> splits = new ArrayList(groups.size());
691      for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
692        splits.add(new ExportSnapshotInputSplit(files));
693      }
694      return splits;
695    }
696
697    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
698      private List<Pair<BytesWritable, Long>> files;
699      private long length;
700
701      public ExportSnapshotInputSplit() {
702        this.files = null;
703      }
704
705      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
706        this.files = new ArrayList(snapshotFiles.size());
707        for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
708          this.files.add(new Pair<>(
709            new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
710          this.length += fileInfo.getSecond();
711        }
712      }
713
714      private List<Pair<BytesWritable, Long>> getSplitKeys() {
715        return files;
716      }
717
718      @Override
719      public long getLength() throws IOException, InterruptedException {
720        return length;
721      }
722
723      @Override
724      public String[] getLocations() throws IOException, InterruptedException {
725        return new String[] {};
726      }
727
728      @Override
729      public void readFields(DataInput in) throws IOException {
730        int count = in.readInt();
731        files = new ArrayList<>(count);
732        length = 0;
733        for (int i = 0; i < count; ++i) {
734          BytesWritable fileInfo = new BytesWritable();
735          fileInfo.readFields(in);
736          long size = in.readLong();
737          files.add(new Pair<>(fileInfo, size));
738          length += size;
739        }
740      }
741
742      @Override
743      public void write(DataOutput out) throws IOException {
744        out.writeInt(files.size());
745        for (final Pair<BytesWritable, Long> fileInfo: files) {
746          fileInfo.getFirst().write(out);
747          out.writeLong(fileInfo.getSecond());
748        }
749      }
750    }
751
752    private static class ExportSnapshotRecordReader
753        extends RecordReader<BytesWritable, NullWritable> {
754      private final List<Pair<BytesWritable, Long>> files;
755      private long totalSize = 0;
756      private long procSize = 0;
757      private int index = -1;
758
759      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
760        this.files = files;
761        for (Pair<BytesWritable, Long> fileInfo: files) {
762          totalSize += fileInfo.getSecond();
763        }
764      }
765
766      @Override
767      public void close() { }
768
769      @Override
770      public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
771
772      @Override
773      public NullWritable getCurrentValue() { return NullWritable.get(); }
774
775      @Override
776      public float getProgress() { return (float)procSize / totalSize; }
777
778      @Override
779      public void initialize(InputSplit split, TaskAttemptContext tac) { }
780
781      @Override
782      public boolean nextKeyValue() {
783        if (index >= 0) {
784          procSize += files.get(index).getSecond();
785        }
786        return(++index < files.size());
787      }
788    }
789  }
790
791  // ==========================================================================
792  //  Tool
793  // ==========================================================================
794
795  /**
796   * Run Map-Reduce Job to perform the files copy.
797   */
798  private void runCopyJob(final Path inputRoot, final Path outputRoot,
799      final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
800      final String filesUser, final String filesGroup, final int filesMode,
801      final int mappers, final int bandwidthMB)
802          throws IOException, InterruptedException, ClassNotFoundException {
803    Configuration conf = getConf();
804    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
805    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
806    if (mappers > 0) {
807      conf.setInt(CONF_NUM_SPLITS, mappers);
808      conf.setInt(MR_NUM_MAPS, mappers);
809    }
810    conf.setInt(CONF_FILES_MODE, filesMode);
811    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
812    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
813    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
814    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
815    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
816    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
817
818    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
819    Job job = new Job(conf);
820    job.setJobName(jobname);
821    job.setJarByClass(ExportSnapshot.class);
822    TableMapReduceUtil.addDependencyJars(job);
823    job.setMapperClass(ExportMapper.class);
824    job.setInputFormatClass(ExportSnapshotInputFormat.class);
825    job.setOutputFormatClass(NullOutputFormat.class);
826    job.setMapSpeculativeExecution(false);
827    job.setNumReduceTasks(0);
828
829    // Acquire the delegation Tokens
830    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
831    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
832      new Path[] { inputRoot }, srcConf);
833    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
834    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
835        new Path[] { outputRoot }, destConf);
836
837    // Run the MR Job
838    if (!job.waitForCompletion(true)) {
839      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
840    }
841  }
842
843  private void verifySnapshot(final Configuration baseConf,
844      final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
845    // Update the conf with the current root dir, since may be a different cluster
846    Configuration conf = new Configuration(baseConf);
847    FSUtils.setRootDir(conf, rootDir);
848    FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
849    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
850    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
851  }
852
853  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
854      BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
855    ExecutorService pool = Executors
856        .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
857    List<Future<Void>> futures = new ArrayList<>();
858    for (Path dstPath : traversedPath) {
859      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
860      futures.add(future);
861    }
862    try {
863      for (Future<Void> future : futures) {
864        future.get();
865      }
866    } catch (InterruptedException | ExecutionException e) {
867      throw new IOException(e);
868    } finally {
869      pool.shutdownNow();
870    }
871  }
872
873  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
874      Configuration conf, List<Path> traversedPath) throws IOException {
875    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
876      try {
877        fs.setOwner(path, filesUser, filesGroup);
878      } catch (IOException e) {
879        throw new RuntimeException(
880            "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
881      }
882    }, conf);
883  }
884
885  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
886      final List<Path> traversedPath, final Configuration conf) throws IOException {
887    if (filesMode <= 0) {
888      return;
889    }
890    FsPermission perm = new FsPermission(filesMode);
891    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
892      try {
893        fs.setPermission(path, perm);
894      } catch (IOException e) {
895        throw new RuntimeException(
896            "set permission for file " + path + " to " + filesMode + " failed", e);
897      }
898    }, conf);
899  }
900
901  private boolean verifyTarget = true;
902  private boolean verifyChecksum = true;
903  private String snapshotName = null;
904  private String targetName = null;
905  private boolean overwrite = false;
906  private String filesGroup = null;
907  private String filesUser = null;
908  private Path outputRoot = null;
909  private Path inputRoot = null;
910  private int bandwidthMB = Integer.MAX_VALUE;
911  private int filesMode = 0;
912  private int mappers = 0;
913
914  @Override
915  protected void processOptions(CommandLine cmd) {
916    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
917    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
918    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
919      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
920    }
921    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
922      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
923    }
924    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
925    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
926    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
927    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode);
928    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
929    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
930    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
931    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
932    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
933  }
934
935  /**
936   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
937   * @return 0 on success, and != 0 upon failure.
938   */
939  @Override
940  public int doWork() throws IOException {
941    Configuration conf = getConf();
942
943    // Check user options
944    if (snapshotName == null) {
945      System.err.println("Snapshot name not provided.");
946      LOG.error("Use -h or --help for usage instructions.");
947      return 0;
948    }
949
950    if (outputRoot == null) {
951      System.err.println("Destination file-system (--" + Options.COPY_TO.getLongOpt()
952              + ") not provided.");
953      LOG.error("Use -h or --help for usage instructions.");
954      return 0;
955    }
956
957    if (targetName == null) {
958      targetName = snapshotName;
959    }
960    if (inputRoot == null) {
961      inputRoot = FSUtils.getRootDir(conf);
962    } else {
963      FSUtils.setRootDir(conf, inputRoot);
964    }
965
966    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
967    srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
968    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
969    LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
970    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
971    destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
972    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
973    LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
974
975    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) ||
976        conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
977
978    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
979    Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot,
980        destConf);
981    Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
982    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
983
984    // Find the necessary directory which need to change owner and group
985    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
986    if (outputFs.exists(needSetOwnerDir)) {
987      if (skipTmp) {
988        needSetOwnerDir = outputSnapshotDir;
989      } else {
990        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
991        if (outputFs.exists(needSetOwnerDir)) {
992          needSetOwnerDir = snapshotTmpDir;
993        }
994      }
995    }
996
997    // Check if the snapshot already exists
998    if (outputFs.exists(outputSnapshotDir)) {
999      if (overwrite) {
1000        if (!outputFs.delete(outputSnapshotDir, true)) {
1001          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1002          return 1;
1003        }
1004      } else {
1005        System.err.println("The snapshot '" + targetName +
1006          "' already exists in the destination: " + outputSnapshotDir);
1007        return 1;
1008      }
1009    }
1010
1011    if (!skipTmp) {
1012      // Check if the snapshot already in-progress
1013      if (outputFs.exists(snapshotTmpDir)) {
1014        if (overwrite) {
1015          if (!outputFs.delete(snapshotTmpDir, true)) {
1016            System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
1017            return 1;
1018          }
1019        } else {
1020          System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
1021          System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
1022          System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
1023          return 1;
1024        }
1025      }
1026    }
1027
1028    // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
1029    // The snapshot references must be copied before the hfiles otherwise the cleaner
1030    // will remove them because they are unreferenced.
1031    List<Path> travesedPaths = new ArrayList<>();
1032    boolean copySucceeded = false;
1033    try {
1034      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1035      travesedPaths =
1036          FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1037              conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1038      copySucceeded = true;
1039    } catch (IOException e) {
1040      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
1041        snapshotDir + " to=" + initialOutputSnapshotDir, e);
1042    } finally {
1043      if (copySucceeded) {
1044        if (filesUser != null || filesGroup != null) {
1045          LOG.warn((filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to "
1046              + filesUser)
1047              + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
1048                  + filesGroup));
1049          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1050        }
1051        if (filesMode > 0) {
1052          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1053          setPermissionParallel(outputFs, (short)filesMode, travesedPaths, conf);
1054        }
1055      }
1056    }
1057
1058    // Write a new .snapshotinfo if the target name is different from the source name
1059    if (!targetName.equals(snapshotName)) {
1060      SnapshotDescription snapshotDesc =
1061        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
1062          .toBuilder()
1063          .setName(targetName)
1064          .build();
1065      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, initialOutputSnapshotDir, outputFs);
1066      if (filesUser != null || filesGroup != null) {
1067        outputFs.setOwner(new Path(initialOutputSnapshotDir,
1068          SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, filesGroup);
1069      }
1070      if (filesMode > 0) {
1071        outputFs.setPermission(new Path(initialOutputSnapshotDir,
1072          SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), new FsPermission((short)filesMode));
1073      }
1074    }
1075
1076    // Step 2 - Start MR Job to copy files
1077    // The snapshot references must be copied before the files otherwise the files gets removed
1078    // by the HFileArchiver, since they have no references.
1079    try {
1080      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1081                 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1082
1083      LOG.info("Finalize the Snapshot Export");
1084      if (!skipTmp) {
1085        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1086        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1087          throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1088            snapshotTmpDir + " to=" + outputSnapshotDir);
1089        }
1090      }
1091
1092      // Step 4 - Verify snapshot integrity
1093      if (verifyTarget) {
1094        LOG.info("Verify snapshot integrity");
1095        verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1096      }
1097
1098      LOG.info("Export Completed: " + targetName);
1099      return 0;
1100    } catch (Exception e) {
1101      LOG.error("Snapshot export failed", e);
1102      if (!skipTmp) {
1103        outputFs.delete(snapshotTmpDir, true);
1104      }
1105      outputFs.delete(outputSnapshotDir, true);
1106      return 1;
1107    } finally {
1108      IOUtils.closeStream(inputFs);
1109      IOUtils.closeStream(outputFs);
1110    }
1111  }
1112
1113  @Override
1114  protected void printUsage() {
1115    super.printUsage();
1116    System.out.println("\n"
1117        + "Examples:\n"
1118        + "  hbase snapshot export \\\n"
1119        + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1120        + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n"
1121        + "\n"
1122        + "  hbase snapshot export \\\n"
1123        + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1124        + "    --copy-to hdfs://srv1:50070/hbase");
1125  }
1126
1127  @Override protected void addOptions() {
1128    addRequiredOption(Options.SNAPSHOT);
1129    addOption(Options.COPY_TO);
1130    addOption(Options.COPY_FROM);
1131    addOption(Options.TARGET_NAME);
1132    addOption(Options.NO_CHECKSUM_VERIFY);
1133    addOption(Options.NO_TARGET_VERIFY);
1134    addOption(Options.OVERWRITE);
1135    addOption(Options.CHUSER);
1136    addOption(Options.CHGROUP);
1137    addOption(Options.CHMOD);
1138    addOption(Options.MAPPERS);
1139    addOption(Options.BANDWIDTH);
1140  }
1141
1142  public static void main(String[] args) {
1143    new ExportSnapshot().doStaticMain(args);
1144  }
1145}