001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapred;
020
021import java.io.IOException;
022
023import org.apache.hadoop.fs.FileAlreadyExistsException;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.hadoop.hbase.client.BufferedMutator;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.mapred.FileOutputFormat;
033import org.apache.hadoop.mapred.InvalidJobConfException;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.hadoop.mapred.RecordWriter;
036import org.apache.hadoop.mapred.Reporter;
037import org.apache.hadoop.util.Progressable;
038
039/**
040 * Convert Map/Reduce output and write it to an HBase table
041 */
042@InterfaceAudience.Public
043public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
044
045  /** JobConf parameter that specifies the output table */
046  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
047
048  /**
049   * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
050   * and write to an HBase table.
051   */
052  protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
053    private BufferedMutator m_mutator;
054    private Connection conn;
055
056
057    /**
058     * Instantiate a TableRecordWriter with the HBase HClient for writing.
059     *
060     * @deprecated Please use {@code #TableRecordWriter(JobConf)}  This version does not clean up
061     * connections and will leak connections (removed in 2.0)
062     */
063    @Deprecated
064    public TableRecordWriter(final BufferedMutator mutator) throws IOException {
065      this.m_mutator = mutator;
066      this.conn = null;
067    }
068
069    /**
070     * Instantiate a TableRecordWriter with a BufferedMutator for batch writing.
071     */
072    public TableRecordWriter(JobConf job) throws IOException {
073      // expecting exactly one path
074      TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
075      try {
076        this.conn = ConnectionFactory.createConnection(job);
077        this.m_mutator = conn.getBufferedMutator(tableName);
078      } finally {
079        if (this.m_mutator == null) {
080          conn.close();
081          conn = null;
082        }
083      }
084    }
085
086    public void close(Reporter reporter) throws IOException {
087      try {
088        if (this.m_mutator != null) {
089          this.m_mutator.close();
090        }
091      } finally {
092        if (conn != null) {
093          this.conn.close();
094        }
095      }
096    }
097
098    public void write(ImmutableBytesWritable key, Put value) throws IOException {
099      m_mutator.mutate(new Put(value));
100    }
101  }
102
103  /**
104   * Creates a new record writer.
105   *
106   * Be aware that the baseline javadoc gives the impression that there is a single
107   * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
108   * RecordWriter per call of this method. You must close the returned RecordWriter when done.
109   * Failure to do so will drop writes.
110   *
111   * @param ignored Ignored filesystem
112   * @param job Current JobConf
113   * @param name Name of the job
114   * @param progress
115   * @return The newly created writer instance.
116   * @throws IOException When creating the writer fails.
117   */
118  @Override
119  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
120      Progressable progress)
121  throws IOException {
122    // Clear write buffer on fail is true by default so no need to reset it.
123    return new TableRecordWriter(job);
124  }
125
126  @Override
127  public void checkOutputSpecs(FileSystem ignored, JobConf job)
128  throws FileAlreadyExistsException, InvalidJobConfException, IOException {
129    String tableName = job.get(OUTPUT_TABLE);
130    if (tableName == null) {
131      throw new IOException("Must specify table name");
132    }
133  }
134}