1 /**
2 * Copyright The Apache Software Foundation
3 *
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with this
6 * work for additional information regarding copyright ownership. The ASF
7 * licenses this file to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.hadoop.hbase.util;
20
21 import java.nio.ByteBuffer;
22 import java.util.concurrent.locks.Lock;
23 import java.util.concurrent.locks.ReentrantLock;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.util.StringUtils;
29
30 /**
31 * This class manages an array of ByteBuffers with a default size 4MB. These
32 * buffers are sequential and could be considered as a large buffer.It supports
33 * reading/writing data from this large buffer with a position and offset
34 */
35 @InterfaceAudience.Private
36 public final class ByteBufferArray {
37 static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
38
39 static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
40 private ByteBuffer buffers[];
41 private Lock locks[];
42 private int bufferSize;
43 private int bufferCount;
44
45 /**
46 * We allocate a number of byte buffers as the capacity. In order not to out
47 * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
48 * we will allocate one additional buffer with capacity 0;
49 * @param capacity total size of the byte buffer array
50 * @param directByteBuffer true if we allocate direct buffer
51 */
52 public ByteBufferArray(long capacity, boolean directByteBuffer) {
53 this.bufferSize = DEFAULT_BUFFER_SIZE;
54 if (this.bufferSize > (capacity / 16))
55 this.bufferSize = (int) roundUp(capacity / 16, 32768);
56 this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
57 LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
58 + " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
59 + bufferCount);
60 buffers = new ByteBuffer[bufferCount + 1];
61 locks = new Lock[bufferCount + 1];
62 for (int i = 0; i <= bufferCount; i++) {
63 locks[i] = new ReentrantLock();
64 if (i < bufferCount) {
65 buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
66 : ByteBuffer.allocate(bufferSize);
67 } else {
68 buffers[i] = ByteBuffer.allocate(0);
69 }
70
71 }
72 }
73
74 private long roundUp(long n, long to) {
75 return ((n + to - 1) / to) * to;
76 }
77
78 /**
79 * Transfers bytes from this buffer array into the given destination array
80 * @param start start position in the ByteBufferArray
81 * @param len The maximum number of bytes to be written to the given array
82 * @param dstArray The array into which bytes are to be written
83 * @return number of bytes read
84 */
85 public int getMultiple(long start, int len, byte[] dstArray) {
86 return getMultiple(start, len, dstArray, 0);
87 }
88
89 /**
90 * Transfers bytes from this buffer array into the given destination array
91 * @param start start offset of this buffer array
92 * @param len The maximum number of bytes to be written to the given array
93 * @param dstArray The array into which bytes are to be written
94 * @param dstOffset The offset within the given array of the first byte to be
95 * written
96 * @return number of bytes read
97 */
98 public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
99 multiple(start, len, dstArray, dstOffset, new Visitor() {
100 public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
101 bb.get(array, arrayIdx, len);
102 }
103 });
104 return len;
105 }
106
107 /**
108 * Transfers bytes from the given source array into this buffer array
109 * @param start start offset of this buffer array
110 * @param len The maximum number of bytes to be read from the given array
111 * @param srcArray The array from which bytes are to be read
112 */
113 public void putMultiple(long start, int len, byte[] srcArray) {
114 putMultiple(start, len, srcArray, 0);
115 }
116
117 /**
118 * Transfers bytes from the given source array into this buffer array
119 * @param start start offset of this buffer array
120 * @param len The maximum number of bytes to be read from the given array
121 * @param srcArray The array from which bytes are to be read
122 * @param srcOffset The offset within the given array of the first byte to be
123 * read
124 */
125 public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
126 multiple(start, len, srcArray, srcOffset, new Visitor() {
127 public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
128 bb.put(array, arrayIdx, len);
129 }
130 });
131 }
132
133 private interface Visitor {
134 /**
135 * Visit the given byte buffer, if it is a read action, we will transfer the
136 * bytes from the buffer to the destination array, else if it is a write
137 * action, we will transfer the bytes from the source array to the buffer
138 * @param bb byte buffer
139 * @param array a source or destination byte array
140 * @param arrayOffset offset of the byte array
141 * @param len read/write length
142 */
143 void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len);
144 }
145
146 /**
147 * Access(read or write) this buffer array with a position and length as the
148 * given array. Here we will only lock one buffer even if it may be need visit
149 * several buffers. The consistency is guaranteed by the caller.
150 * @param start start offset of this buffer array
151 * @param len The maximum number of bytes to be accessed
152 * @param array The array from/to which bytes are to be read/written
153 * @param arrayOffset The offset within the given array of the first byte to
154 * be read or written
155 * @param visitor implement of how to visit the byte buffer
156 */
157 void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
158 assert len >= 0;
159 long end = start + len;
160 int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
161 int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
162 assert array.length >= len + arrayOffset;
163 assert startBuffer >= 0 && startBuffer < bufferCount;
164 assert endBuffer >= 0 && endBuffer < bufferCount
165 || (endBuffer == bufferCount && endOffset == 0);
166 if (startBuffer >= locks.length || startBuffer < 0) {
167 String msg = "Failed multiple, start=" + start + ",startBuffer="
168 + startBuffer + ",bufferSize=" + bufferSize;
169 LOG.error(msg);
170 throw new RuntimeException(msg);
171 }
172 int srcIndex = 0, cnt = -1;
173 for (int i = startBuffer; i <= endBuffer; ++i) {
174 Lock lock = locks[i];
175 lock.lock();
176 try {
177 ByteBuffer bb = buffers[i];
178 if (i == startBuffer) {
179 cnt = bufferSize - startOffset;
180 if (cnt > len) cnt = len;
181 bb.limit(startOffset + cnt).position(
182 startOffset );
183 } else if (i == endBuffer) {
184 cnt = endOffset;
185 bb.limit(cnt).position(0);
186 } else {
187 cnt = bufferSize ;
188 bb.limit(cnt).position(0);
189 }
190 visitor.visit(bb, array, srcIndex + arrayOffset, cnt);
191 srcIndex += cnt;
192 } finally {
193 lock.unlock();
194 }
195 }
196 assert srcIndex == len;
197 }
198 }