1 /*
2 * Copyright The Apache Software Foundation
3 *
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20 package org.apache.hadoop.hbase.util;
21
22
23 import com.google.common.base.Supplier;
24 import com.google.common.collect.Multiset;
25
26 import java.util.Comparator;
27 import java.util.ConcurrentModificationException;
28 import java.util.Set;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.ConcurrentSkipListSet;
32
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.classification.InterfaceStability;
35
36 /**
37 * A simple concurrent map of sets. This is similar in concept to
38 * {@link Multiset}, with the following exceptions:
39 * <ul>
40 * <li>The set is thread-safe and concurrent: no external locking or
41 * synchronization is required. This is important for the use case where
42 * this class is used to index cached blocks by filename for their
43 * efficient eviction from cache when the file is closed or compacted.</li>
44 * <li>The expectation is that all entries may only be removed for a key
45 * once no more additions of values are being made under that key.</li>
46 * </ul>
47 * @param <K> Key type
48 * @param <V> Value type
49 */
50 @InterfaceAudience.Private
51 @InterfaceStability.Evolving
52 public class ConcurrentIndex<K, V> {
53
54 /** Container for the sets, indexed by key */
55 private final ConcurrentMap<K, Set<V>> container;
56
57 /**
58 * A factory that constructs new instances of the sets if no set is
59 * associated with a given key.
60 */
61 private final Supplier<Set<V>> valueSetFactory;
62
63 /**
64 * Creates an instance with a specified factory object for sets to be
65 * associated with a given key.
66 * @param valueSetFactory The factory instance
67 */
68 public ConcurrentIndex(Supplier<Set<V>> valueSetFactory) {
69 this.valueSetFactory = valueSetFactory;
70 this.container = new ConcurrentHashMap<K, Set<V>>();
71 }
72
73 /**
74 * Creates an instance using the DefaultValueSetFactory for sets,
75 * which in turn creates instances of {@link ConcurrentSkipListSet}
76 * @param valueComparator A {@link Comparator} for value types
77 */
78 public ConcurrentIndex(Comparator<V> valueComparator) {
79 this(new DefaultValueSetFactory<V>(valueComparator));
80 }
81
82 /**
83 * Associate a new unique value with a specified key. Under the covers, the
84 * method employs optimistic concurrency: if no set is associated with a
85 * given key, we create a new set; if another thread comes in, creates,
86 * and associates a set with the same key in the mean-time, we simply add
87 * the value to the already created set.
88 * @param key The key
89 * @param value An additional unique value we want to associate with a key
90 */
91 public void put(K key, V value) {
92 Set<V> set = container.get(key);
93 if (set != null) {
94 set.add(value);
95 } else {
96 set = valueSetFactory.get();
97 set.add(value);
98 Set<V> existing = container.putIfAbsent(key, set);
99 if (existing != null) {
100 // If a set is already associated with a key, that means another
101 // writer has already come in and created the set for the given key.
102 // Pursuant to an optimistic concurrency policy, in this case we will
103 // simply add the value to the existing set associated with the key.
104 existing.add(value);
105 }
106 }
107 }
108
109 /**
110 * Get all values associated with a specified key or null if no values are
111 * associated. <b>Note:</b> if the caller wishes to add or removes values
112 * to under the specified as they're iterating through the returned value,
113 * they should make a defensive copy; otherwise, a
114 * {@link ConcurrentModificationException} may be thrown.
115 * @param key The key
116 * @return All values associated with the specified key or null if no values
117 * are associated with the key.
118 */
119 public Set<V> values(K key) {
120 return container.get(key);
121 }
122
123 /**
124 * Removes the association between a specified key and value. If as a
125 * result of removing a value a set becomes empty, we remove the given
126 * set from the mapping as well.
127 * @param key The specified key
128 * @param value The value to disassociate with the key
129 */
130 public boolean remove(K key, V value) {
131 Set<V> set = container.get(key);
132 boolean success = false;
133 if (set != null) {
134 success = set.remove(value);
135 if (set.isEmpty()) {
136 container.remove(key);
137 }
138 }
139 return success;
140 }
141
142 /**
143 * Default factory class for the sets associated with given keys. Creates
144 * a {@link ConcurrentSkipListSet} using the comparator passed into the
145 * constructor.
146 * @see ConcurrentSkipListSet
147 * @see Supplier
148 * @param <V> The value type. Should match value type of the
149 * ConcurrentIndex instances of this object are passed to.
150 */
151 private static class DefaultValueSetFactory<V> implements Supplier<Set<V>> {
152 private final Comparator<V> comparator;
153
154 /**
155 * Creates an instance that passes a specified comparator to the
156 * {@link ConcurrentSkipListSet}
157 * @param comparator The specified comparator
158 */
159 public DefaultValueSetFactory(Comparator<V> comparator) {
160 this.comparator = comparator;
161 }
162
163 /**
164 * Creates a new {@link ConcurrentSkipListSet} instance using the
165 * comparator specified when the class instance was constructed.
166 * @return The instantiated {@link ConcurrentSkipListSet} object
167 */
168 @Override
169 public Set<V> get() {
170 return new ConcurrentSkipListSet<V>(comparator);
171 }
172 }
173 }