1 /*
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.NavigableSet;
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.KeyValue;
27 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
28 import org.apache.hadoop.hbase.util.Bytes;
29
30 /**
31 * This class is used for the tracking and enforcement of columns and numbers
32 * of versions during the course of a Get or Scan operation, when explicit
33 * column qualifiers have been asked for in the query.
34 *
35 * With a little magic (see {@link ScanQueryMatcher}), we can use this matcher
36 * for both scans and gets. The main difference is 'next' and 'done' collapse
37 * for the scan case (since we see all columns in order), and we only reset
38 * between rows.
39 *
40 * <p>
41 * This class is utilized by {@link ScanQueryMatcher} mainly through two methods:
42 * <ul><li>{@link #checkColumn} is called when a Put satisfies all other
43 * conditions of the query.
44 * <ul><li>{@link #getNextRowOrNextColumn} is called whenever ScanQueryMatcher
45 * believes that the current column should be skipped (by timestamp, filter etc.)
46 * <p>
47 * These two methods returns a
48 * {@link org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode}
49 * to define what action should be taken.
50 * <p>
51 * This class is NOT thread-safe as queries are never multi-threaded
52 */
53 @InterfaceAudience.Private
54 public class ExplicitColumnTracker implements ColumnTracker {
55
56 private final int maxVersions;
57 private final int minVersions;
58
59 // hint for the tracker about how many KVs we will attempt to search via next()
60 // before we schedule a (re)seek operation
61 private final int lookAhead;
62
63 /**
64 * Contains the list of columns that the ExplicitColumnTracker is tracking.
65 * Each ColumnCount instance also tracks how many versions of the requested
66 * column have been returned.
67 */
68 private final ColumnCount[] columns;
69 private int index;
70 private ColumnCount column;
71 /** Keeps track of the latest timestamp included for current column.
72 * Used to eliminate duplicates. */
73 private long latestTSOfCurrentColumn;
74 private long oldestStamp;
75 private int skipCount;
76
77 /**
78 * Default constructor.
79 * @param columns columns specified user in query
80 * @param minVersions minimum number of versions to keep
81 * @param maxVersions maximum versions to return per column
82 * @param oldestUnexpiredTS the oldest timestamp we are interested in,
83 * based on TTL
84 * @param lookAhead number of KeyValues to look ahead via next before
85 * (re)seeking
86 */
87 public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions,
88 int maxVersions, long oldestUnexpiredTS, int lookAhead) {
89 this.maxVersions = maxVersions;
90 this.minVersions = minVersions;
91 this.lookAhead = lookAhead;
92 this.oldestStamp = oldestUnexpiredTS;
93 this.columns = new ColumnCount[columns.size()];
94 int i=0;
95 for(byte [] column : columns) {
96 this.columns[i++] = new ColumnCount(column);
97 }
98 reset();
99 }
100
101 /**
102 * Done when there are no more columns to match against.
103 */
104 public boolean done() {
105 return this.index >= columns.length;
106 }
107
108 public ColumnCount getColumnHint() {
109 return this.column;
110 }
111
112 /**
113 * {@inheritDoc}
114 */
115 @Override
116 public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
117 int length, byte type) {
118 // delete markers should never be passed to an
119 // *Explicit*ColumnTracker
120 assert !KeyValue.isDelete(type);
121 do {
122 // No more columns left, we are done with this query
123 if(done()) {
124 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
125 }
126
127 // No more columns to match against, done with storefile
128 if(this.column == null) {
129 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
130 }
131
132 // Compare specific column to current column
133 int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(),
134 column.getLength(), bytes, offset, length);
135
136 // Column Matches. Return include code. The caller would call checkVersions
137 // to limit the number of versions.
138 if(ret == 0) {
139 return ScanQueryMatcher.MatchCode.INCLUDE;
140 }
141
142 resetTS();
143
144 if (ret > 0) {
145 // The current KV is smaller than the column the ExplicitColumnTracker
146 // is interested in, so seek to that column of interest.
147 return this.skipCount++ < this.lookAhead ? ScanQueryMatcher.MatchCode.SKIP
148 : ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
149 }
150
151 // The current KV is bigger than the column the ExplicitColumnTracker
152 // is interested in. That means there is no more data for the column
153 // of interest. Advance the ExplicitColumnTracker state to next
154 // column of interest, and check again.
155 if (ret <= -1) {
156 ++this.index;
157 this.skipCount = 0;
158 if (done()) {
159 // No more to match, do not include, done with this row.
160 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
161 }
162 // This is the recursive case.
163 this.column = this.columns[this.index];
164 }
165 } while(true);
166 }
167
168 @Override
169 public ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length,
170 long timestamp, byte type, boolean ignoreCount) throws IOException {
171 assert !KeyValue.isDelete(type);
172 if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
173 // Check if it is a duplicate timestamp
174 if (sameAsPreviousTS(timestamp)) {
175 // If duplicate, skip this Key
176 return ScanQueryMatcher.MatchCode.SKIP;
177 }
178 int count = this.column.increment();
179 if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) {
180 // Done with versions for this column
181 ++this.index;
182 this.skipCount = 0;
183 resetTS();
184 if (done()) {
185 // We have served all the requested columns.
186 this.column = null;
187 return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
188 }
189 // We are done with current column; advance to next column
190 // of interest.
191 this.column = this.columns[this.index];
192 return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
193 }
194 setTS(timestamp);
195 return ScanQueryMatcher.MatchCode.INCLUDE;
196 }
197
198 // Called between every row.
199 public void reset() {
200 this.index = 0;
201 this.skipCount = 0;
202 this.column = this.columns[this.index];
203 for(ColumnCount col : this.columns) {
204 col.setCount(0);
205 }
206 resetTS();
207 }
208
209 private void resetTS() {
210 latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP;
211 }
212
213 private void setTS(long timestamp) {
214 latestTSOfCurrentColumn = timestamp;
215 }
216
217 private boolean sameAsPreviousTS(long timestamp) {
218 return timestamp == latestTSOfCurrentColumn;
219 }
220
221 private boolean isExpired(long timestamp) {
222 return timestamp < oldestStamp;
223 }
224
225 /**
226 * This method is used to inform the column tracker that we are done with
227 * this column. We may get this information from external filters or
228 * timestamp range and we then need to indicate this information to
229 * tracker. It is required only in case of ExplicitColumnTracker.
230 * @param bytes
231 * @param offset
232 * @param length
233 */
234 public void doneWithColumn(byte [] bytes, int offset, int length) {
235 while (this.column != null) {
236 int compare = Bytes.compareTo(column.getBuffer(), column.getOffset(),
237 column.getLength(), bytes, offset, length);
238 resetTS();
239 if (compare <= 0) {
240 ++this.index;
241 this.skipCount = 0;
242 if (done()) {
243 // Will not hit any more columns in this storefile
244 this.column = null;
245 } else {
246 this.column = this.columns[this.index];
247 }
248 if (compare <= -1)
249 continue;
250 }
251 return;
252 }
253 }
254
255 public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
256 int qualLength) {
257 doneWithColumn(bytes, offset,qualLength);
258
259 if (getColumnHint() == null) {
260 return MatchCode.SEEK_NEXT_ROW;
261 } else {
262 return MatchCode.SEEK_NEXT_COL;
263 }
264 }
265
266 public boolean isDone(long timestamp) {
267 return minVersions <= 0 && isExpired(timestamp);
268 }
269 }