001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapred; 020 021import java.io.IOException; 022 023import org.apache.hadoop.fs.FileAlreadyExistsException; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.hadoop.hbase.client.BufferedMutator; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.mapred.FileOutputFormat; 033import org.apache.hadoop.mapred.InvalidJobConfException; 034import org.apache.hadoop.mapred.JobConf; 035import org.apache.hadoop.mapred.RecordWriter; 036import org.apache.hadoop.mapred.Reporter; 037import org.apache.hadoop.util.Progressable; 038 039/** 040 * Convert Map/Reduce output and write it to an HBase table 041 */ 042@InterfaceAudience.Public 043public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> { 044 045 /** JobConf parameter that specifies the output table */ 046 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; 047 048 /** 049 * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) 050 * and write to an HBase table. 051 */ 052 protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> { 053 private BufferedMutator m_mutator; 054 private Connection conn; 055 056 057 /** 058 * Instantiate a TableRecordWriter with the HBase HClient for writing. 059 * 060 * @deprecated Please use {@code #TableRecordWriter(JobConf)} This version does not clean up 061 * connections and will leak connections (removed in 2.0) 062 */ 063 @Deprecated 064 public TableRecordWriter(final BufferedMutator mutator) throws IOException { 065 this.m_mutator = mutator; 066 this.conn = null; 067 } 068 069 /** 070 * Instantiate a TableRecordWriter with a BufferedMutator for batch writing. 071 */ 072 public TableRecordWriter(JobConf job) throws IOException { 073 // expecting exactly one path 074 TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); 075 try { 076 this.conn = ConnectionFactory.createConnection(job); 077 this.m_mutator = conn.getBufferedMutator(tableName); 078 } finally { 079 if (this.m_mutator == null) { 080 conn.close(); 081 conn = null; 082 } 083 } 084 } 085 086 public void close(Reporter reporter) throws IOException { 087 try { 088 if (this.m_mutator != null) { 089 this.m_mutator.close(); 090 } 091 } finally { 092 if (conn != null) { 093 this.conn.close(); 094 } 095 } 096 } 097 098 public void write(ImmutableBytesWritable key, Put value) throws IOException { 099 m_mutator.mutate(new Put(value)); 100 } 101 } 102 103 /** 104 * Creates a new record writer. 105 * 106 * Be aware that the baseline javadoc gives the impression that there is a single 107 * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new 108 * RecordWriter per call of this method. You must close the returned RecordWriter when done. 109 * Failure to do so will drop writes. 110 * 111 * @param ignored Ignored filesystem 112 * @param job Current JobConf 113 * @param name Name of the job 114 * @param progress 115 * @return The newly created writer instance. 116 * @throws IOException When creating the writer fails. 117 */ 118 @Override 119 public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, 120 Progressable progress) 121 throws IOException { 122 // Clear write buffer on fail is true by default so no need to reset it. 123 return new TableRecordWriter(job); 124 } 125 126 @Override 127 public void checkOutputSpecs(FileSystem ignored, JobConf job) 128 throws FileAlreadyExistsException, InvalidJobConfException, IOException { 129 String tableName = job.get(OUTPUT_TABLE); 130 if (tableName == null) { 131 throw new IOException("Must specify table name"); 132 } 133 } 134}