1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.metrics2.util;
20
21 import java.io.IOException;
22 import java.util.Arrays;
23 import java.util.HashMap;
24 import java.util.LinkedList;
25 import java.util.ListIterator;
26 import java.util.Map;
27
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29
30 /**
31 * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm
32 * for streaming calculation of targeted high-percentile epsilon-approximate
33 * quantiles.
34 *
35 * This is a generalization of the earlier work by Greenwald and Khanna (GK),
36 * which essentially allows different error bounds on the targeted quantiles,
37 * which allows for far more efficient calculation of high-percentiles.
38 *
39 * See: Cormode, Korn, Muthukrishnan, and Srivastava
40 * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005
41 *
42 * Greenwald and Khanna,
43 * "Space-efficient online computation of quantile summaries" in SIGMOD 2001
44 *
45 */
46 @InterfaceAudience.Private
47 public class MetricSampleQuantiles {
48
49 /**
50 * Total number of items in stream
51 */
52 private long count = 0;
53
54 /**
55 * Current list of sampled items, maintained in sorted order with error bounds
56 */
57 private LinkedList<SampleItem> samples;
58
59 /**
60 * Buffers incoming items to be inserted in batch. Items are inserted into
61 * the buffer linearly. When the buffer fills, it is flushed into the samples
62 * array in its entirety.
63 */
64 private long[] buffer = new long[500];
65 private int bufferCount = 0;
66
67 /**
68 * Array of Quantiles that we care about, along with desired error.
69 */
70 private final MetricQuantile quantiles[];
71
72 public MetricSampleQuantiles(MetricQuantile[] quantiles) {
73 this.quantiles = Arrays.copyOf(quantiles, quantiles.length);
74 this.samples = new LinkedList<SampleItem>();
75 }
76
77 /**
78 * Specifies the allowable error for this rank, depending on which quantiles
79 * are being targeted.
80 *
81 * This is the f(r_i, n) function from the CKMS paper. It's basically how wide
82 * the range of this rank can be.
83 *
84 * @param rank
85 * the index in the list of samples
86 */
87 private double allowableError(int rank) {
88 int size = samples.size();
89 double minError = size + 1;
90 for (MetricQuantile q : quantiles) {
91 double error;
92 if (rank <= q.quantile * size) {
93 error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
94 } else {
95 error = (2.0 * q.error * rank) / q.quantile;
96 }
97 if (error < minError) {
98 minError = error;
99 }
100 }
101
102 return minError;
103 }
104
105 /**
106 * Add a new value from the stream.
107 *
108 * @param v
109 */
110 synchronized public void insert(long v) {
111 buffer[bufferCount] = v;
112 bufferCount++;
113
114 count++;
115
116 if (bufferCount == buffer.length) {
117 insertBatch();
118 compress();
119 }
120 }
121
122 /**
123 * Merges items from buffer into the samples array in one pass.
124 * This is more efficient than doing an insert on every item.
125 */
126 private void insertBatch() {
127 if (bufferCount == 0) {
128 return;
129 }
130
131 Arrays.sort(buffer, 0, bufferCount);
132
133 // Base case: no samples
134 int start = 0;
135 if (samples.size() == 0) {
136 SampleItem newItem = new SampleItem(buffer[0], 1, 0);
137 samples.add(newItem);
138 start++;
139 }
140
141 ListIterator<SampleItem> it = samples.listIterator();
142 SampleItem item = it.next();
143 for (int i = start; i < bufferCount; i++) {
144 long v = buffer[i];
145 while (it.nextIndex() < samples.size() && item.value < v) {
146 item = it.next();
147 }
148 // If we found that bigger item, back up so we insert ourselves before it
149 if (item.value > v) {
150 it.previous();
151 }
152 // We use different indexes for the edge comparisons, because of the above
153 // if statement that adjusts the iterator
154 int delta;
155 if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
156 delta = 0;
157 } else {
158 delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
159 }
160 SampleItem newItem = new SampleItem(v, 1, delta);
161 it.add(newItem);
162 item = newItem;
163 }
164
165 bufferCount = 0;
166 }
167
168 /**
169 * Try to remove extraneous items from the set of sampled items. This checks
170 * if an item is unnecessary based on the desired error bounds, and merges it
171 * with the adjacent item if it is.
172 */
173 private void compress() {
174 if (samples.size() < 2) {
175 return;
176 }
177
178 ListIterator<SampleItem> it = samples.listIterator();
179 SampleItem prev = null;
180 SampleItem next = it.next();
181
182 while (it.hasNext()) {
183 prev = next;
184 next = it.next();
185 if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
186 next.g += prev.g;
187 // Remove prev. it.remove() kills the last thing returned.
188 it.previous();
189 it.previous();
190 it.remove();
191 // it.next() is now equal to next, skip it back forward again
192 it.next();
193 }
194 }
195 }
196
197 /**
198 * Get the estimated value at the specified quantile.
199 *
200 * @param quantile Queried quantile, e.g. 0.50 or 0.99.
201 * @return Estimated value at that quantile.
202 */
203 private long query(double quantile) throws IOException {
204 if (samples.size() == 0) {
205 throw new IOException("No samples present");
206 }
207
208 int rankMin = 0;
209 int desired = (int) (quantile * count);
210
211 for (int i = 1; i < samples.size(); i++) {
212 SampleItem prev = samples.get(i - 1);
213 SampleItem cur = samples.get(i);
214
215 rankMin += prev.g;
216
217 if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
218 return prev.value;
219 }
220 }
221
222 // edge case of wanting max value
223 return samples.get(samples.size() - 1).value;
224 }
225
226 /**
227 * Get a snapshot of the current values of all the tracked quantiles.
228 *
229 * @return snapshot of the tracked quantiles
230 * @throws IOException
231 * if no items have been added to the estimator
232 */
233 synchronized public Map<MetricQuantile, Long> snapshot() throws IOException {
234 // flush the buffer first for best results
235 insertBatch();
236 Map<MetricQuantile, Long> values = new HashMap<MetricQuantile, Long>(quantiles.length);
237 for (int i = 0; i < quantiles.length; i++) {
238 values.put(quantiles[i], query(quantiles[i].quantile));
239 }
240
241 return values;
242 }
243
244 /**
245 * Returns the number of items that the estimator has processed
246 *
247 * @return count total number of items processed
248 */
249 synchronized public long getCount() {
250 return count;
251 }
252
253 /**
254 * Returns the number of samples kept by the estimator
255 *
256 * @return count current number of samples
257 */
258 synchronized public int getSampleCount() {
259 return samples.size();
260 }
261
262 /**
263 * Resets the estimator, clearing out all previously inserted items
264 */
265 synchronized public void clear() {
266 count = 0;
267 bufferCount = 0;
268 samples.clear();
269 }
270
271 /**
272 * Describes a measured value passed to the estimator, tracking additional
273 * metadata required by the CKMS algorithm.
274 */
275 private static class SampleItem {
276
277 /**
278 * Value of the sampled item (e.g. a measured latency value)
279 */
280 public final long value;
281
282 /**
283 * Difference between the lowest possible rank of the previous item, and
284 * the lowest possible rank of this item.
285 *
286 * The sum of the g of all previous items yields this item's lower bound.
287 */
288 public int g;
289
290 /**
291 * Difference between the item's greatest possible rank and lowest possible
292 * rank.
293 */
294 public final int delta;
295
296 public SampleItem(long value, int lowerDelta, int delta) {
297 this.value = value;
298 this.g = lowerDelta;
299 this.delta = delta;
300 }
301
302 @Override
303 public String toString() {
304 return String.format("%d, %d, %d", value, g, delta);
305 }
306 }
307 }