001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.collect.Lists.newArrayList; 018 019import com.google.common.annotations.Beta; 020import com.google.common.annotations.GwtIncompatible; 021import com.google.common.annotations.VisibleForTesting; 022import com.google.common.base.MoreObjects; 023import com.google.common.base.Preconditions; 024import com.google.common.base.Supplier; 025import com.google.common.collect.ImmutableList; 026import com.google.common.collect.MapMaker; 027import com.google.common.math.IntMath; 028import com.google.common.primitives.Ints; 029import java.lang.ref.Reference; 030import java.lang.ref.ReferenceQueue; 031import java.lang.ref.WeakReference; 032import java.math.RoundingMode; 033import java.util.Arrays; 034import java.util.Collections; 035import java.util.List; 036import java.util.concurrent.ConcurrentMap; 037import java.util.concurrent.Semaphore; 038import java.util.concurrent.atomic.AtomicReferenceArray; 039import java.util.concurrent.locks.Condition; 040import java.util.concurrent.locks.Lock; 041import java.util.concurrent.locks.ReadWriteLock; 042import java.util.concurrent.locks.ReentrantLock; 043import java.util.concurrent.locks.ReentrantReadWriteLock; 044import org.checkerframework.checker.nullness.qual.Nullable; 045 046/** 047 * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping similar 048 * to that of {@code ConcurrentHashMap} in a reusable form, and extends it for semaphores and 049 * read-write locks. Conceptually, lock striping is the technique of dividing a lock into many 050 * <i>stripes</i>, increasing the granularity of a single lock and allowing independent operations 051 * to lock different stripes and proceed concurrently, instead of creating contention for a single 052 * lock. 053 * 054 * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore), 055 * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} (assuming 056 * {@link Object#hashCode()} is correctly implemented for the keys). Note that if {@code key1} is 057 * <strong>not</strong> equal to {@code key2}, it is <strong>not</strong> guaranteed that {@code 058 * striped.get(key1) != striped.get(key2)}; the elements might nevertheless be mapped to the same 059 * lock. The lower the number of stripes, the higher the probability of this happening. 060 * 061 * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>}, and 062 * {@code Striped<ReadWriteLock>}. For each type, two implementations are offered: {@linkplain 063 * #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak} {@code Striped<Lock>}, {@linkplain 064 * #semaphore(int, int) strong} and {@linkplain #lazyWeakSemaphore(int, int) weak} {@code 065 * Striped<Semaphore>}, and {@linkplain #readWriteLock(int) strong} and {@linkplain 066 * #lazyWeakReadWriteLock(int) weak} {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all 067 * stripes (locks/semaphores) are initialized eagerly, and are not reclaimed unless {@code Striped} 068 * itself is reclaimable. <i>Weak</i> means that locks/semaphores are created lazily, and they are 069 * allowed to be reclaimed if nobody is holding on to them. This is useful, for example, if one 070 * wants to create a {@code Striped<Lock>} of many locks, but worries that in most cases only a 071 * small portion of these would be in use. 072 * 073 * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K} 074 * represents the task. This maximizes concurrency by having each unique key mapped to a unique 075 * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock for 076 * all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of choosing 077 * either of these extremes, {@code Striped} allows the user to trade between required concurrency 078 * and memory footprint. For example, if a set of tasks are CPU-bound, one could easily create a 079 * very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes, instead of 080 * possibly thousands of locks which could be created in a {@code Map<K, Lock>} structure. 081 * 082 * @author Dimitris Andreou 083 * @since 13.0 084 */ 085@Beta 086@GwtIncompatible 087@ElementTypesAreNonnullByDefault 088public abstract class Striped<L> { 089 /** 090 * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be 091 * smaller than a large array. (This assumes that in the lazy case, most stripes are unused. As 092 * always, if many stripes are in use, a non-lazy striped makes more sense.) 093 */ 094 private static final int LARGE_LAZY_CUTOFF = 1024; 095 096 private Striped() {} 097 098 /** 099 * Returns the stripe that corresponds to the passed key. It is always guaranteed that if {@code 100 * key1.equals(key2)}, then {@code get(key1) == get(key2)}. 101 * 102 * @param key an arbitrary, non-null key 103 * @return the stripe that the passed key corresponds to 104 */ 105 public abstract L get(Object key); 106 107 /** 108 * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to {@code size()}, 109 * exclusively. 110 * 111 * @param index the index of the stripe to return; must be in {@code [0...size())} 112 * @return the stripe at the specified index 113 */ 114 public abstract L getAt(int index); 115 116 /** 117 * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key). 118 */ 119 abstract int indexFor(Object key); 120 121 /** Returns the total number of stripes in this instance. */ 122 public abstract int size(); 123 124 /** 125 * Returns the stripes that correspond to the passed objects, in ascending (as per {@link 126 * #getAt(int)}) order. Thus, threads that use the stripes in the order returned by this method 127 * are guaranteed to not deadlock each other. 128 * 129 * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and {@code 130 * bulkGet(keys)} with a relative large number of keys can cause an excessive number of shared 131 * stripes (much like the birthday paradox, where much fewer than anticipated birthdays are needed 132 * for a pair of them to match). Please consider carefully the implications of the number of 133 * stripes, the intended concurrency level, and the typical number of keys used in a {@code 134 * bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls in 135 * Bins model</a> for mathematical formulas that can be used to estimate the probability of 136 * collisions. 137 * 138 * @param keys arbitrary non-null keys 139 * @return the stripes corresponding to the objects (one per each object, derived by delegating to 140 * {@link #get(Object)}; may contain duplicates), in an increasing index order. 141 */ 142 public Iterable<L> bulkGet(Iterable<? extends Object> keys) { 143 // Initially using the list to store the keys, then reusing it to store the respective L's 144 List<Object> result = newArrayList(keys); 145 if (result.isEmpty()) { 146 return ImmutableList.of(); 147 } 148 int[] stripes = new int[result.size()]; 149 for (int i = 0; i < result.size(); i++) { 150 stripes[i] = indexFor(result.get(i)); 151 } 152 Arrays.sort(stripes); 153 // optimize for runs of identical stripes 154 int previousStripe = stripes[0]; 155 result.set(0, getAt(previousStripe)); 156 for (int i = 1; i < result.size(); i++) { 157 int currentStripe = stripes[i]; 158 if (currentStripe == previousStripe) { 159 result.set(i, result.get(i - 1)); 160 } else { 161 result.set(i, getAt(currentStripe)); 162 previousStripe = currentStripe; 163 } 164 } 165 /* 166 * Note that the returned Iterable holds references to the returned stripes, to avoid 167 * error-prone code like: 168 * 169 * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)' 170 * Iterable<Lock> locks = stripedLock.bulkGet(keys); 171 * for (Lock lock : locks) { 172 * lock.lock(); 173 * } 174 * operation(); 175 * for (Lock lock : locks) { 176 * lock.unlock(); 177 * } 178 * 179 * If we only held the int[] stripes, translating it on the fly to L's, the original locks might 180 * be garbage collected after locking them, ending up in a huge mess. 181 */ 182 @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's 183 List<L> asStripes = (List<L>) result; 184 return Collections.unmodifiableList(asStripes); 185 } 186 187 // Static factories 188 189 /** 190 * Creates a {@code Striped<L>} with eagerly initialized, strongly referenced locks. Every lock is 191 * obtained from the passed supplier. 192 * 193 * @param stripes the minimum number of stripes (locks) required 194 * @param supplier a {@code Supplier<L>} object to obtain locks from 195 * @return a new {@code Striped<L>} 196 */ 197 static <L> Striped<L> custom(int stripes, Supplier<L> supplier) { 198 return new CompactStriped<>(stripes, supplier); 199 } 200 201 /** 202 * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks. Every lock 203 * is reentrant. 204 * 205 * @param stripes the minimum number of stripes (locks) required 206 * @return a new {@code Striped<Lock>} 207 */ 208 public static Striped<Lock> lock(int stripes) { 209 return custom( 210 stripes, 211 new Supplier<Lock>() { 212 @Override 213 public Lock get() { 214 return new PaddedLock(); 215 } 216 }); 217 } 218 219 /** 220 * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks. Every lock is 221 * reentrant. 222 * 223 * @param stripes the minimum number of stripes (locks) required 224 * @return a new {@code Striped<Lock>} 225 */ 226 public static Striped<Lock> lazyWeakLock(int stripes) { 227 return lazy( 228 stripes, 229 new Supplier<Lock>() { 230 @Override 231 public Lock get() { 232 return new ReentrantLock(false); 233 } 234 }); 235 } 236 237 private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) { 238 return stripes < LARGE_LAZY_CUTOFF 239 ? new SmallLazyStriped<L>(stripes, supplier) 240 : new LargeLazyStriped<L>(stripes, supplier); 241 } 242 243 /** 244 * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores, 245 * with the specified number of permits. 246 * 247 * @param stripes the minimum number of stripes (semaphores) required 248 * @param permits the number of permits in each semaphore 249 * @return a new {@code Striped<Semaphore>} 250 */ 251 public static Striped<Semaphore> semaphore(int stripes, final int permits) { 252 return custom( 253 stripes, 254 new Supplier<Semaphore>() { 255 @Override 256 public Semaphore get() { 257 return new PaddedSemaphore(permits); 258 } 259 }); 260 } 261 262 /** 263 * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores, 264 * with the specified number of permits. 265 * 266 * @param stripes the minimum number of stripes (semaphores) required 267 * @param permits the number of permits in each semaphore 268 * @return a new {@code Striped<Semaphore>} 269 */ 270 public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) { 271 return lazy( 272 stripes, 273 new Supplier<Semaphore>() { 274 @Override 275 public Semaphore get() { 276 return new Semaphore(permits, false); 277 } 278 }); 279 } 280 281 /** 282 * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced 283 * read-write locks. Every lock is reentrant. 284 * 285 * @param stripes the minimum number of stripes (locks) required 286 * @return a new {@code Striped<ReadWriteLock>} 287 */ 288 public static Striped<ReadWriteLock> readWriteLock(int stripes) { 289 return custom(stripes, READ_WRITE_LOCK_SUPPLIER); 290 } 291 292 /** 293 * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced read-write 294 * locks. Every lock is reentrant. 295 * 296 * @param stripes the minimum number of stripes (locks) required 297 * @return a new {@code Striped<ReadWriteLock>} 298 */ 299 public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) { 300 return lazy(stripes, WEAK_SAFE_READ_WRITE_LOCK_SUPPLIER); 301 } 302 303 private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER = 304 new Supplier<ReadWriteLock>() { 305 @Override 306 public ReadWriteLock get() { 307 return new ReentrantReadWriteLock(); 308 } 309 }; 310 311 private static final Supplier<ReadWriteLock> WEAK_SAFE_READ_WRITE_LOCK_SUPPLIER = 312 new Supplier<ReadWriteLock>() { 313 @Override 314 public ReadWriteLock get() { 315 return new WeakSafeReadWriteLock(); 316 } 317 }; 318 319 /** 320 * ReadWriteLock implementation whose read and write locks retain a reference back to this lock. 321 * Otherwise, a reference to just the read lock or just the write lock would not suffice to ensure 322 * the {@code ReadWriteLock} is retained. 323 */ 324 private static final class WeakSafeReadWriteLock implements ReadWriteLock { 325 private final ReadWriteLock delegate; 326 327 WeakSafeReadWriteLock() { 328 this.delegate = new ReentrantReadWriteLock(); 329 } 330 331 @Override 332 public Lock readLock() { 333 return new WeakSafeLock(delegate.readLock(), this); 334 } 335 336 @Override 337 public Lock writeLock() { 338 return new WeakSafeLock(delegate.writeLock(), this); 339 } 340 } 341 342 /** Lock object that ensures a strong reference is retained to a specified object. */ 343 private static final class WeakSafeLock extends ForwardingLock { 344 private final Lock delegate; 345 346 @SuppressWarnings("unused") 347 private final WeakSafeReadWriteLock strongReference; 348 349 WeakSafeLock(Lock delegate, WeakSafeReadWriteLock strongReference) { 350 this.delegate = delegate; 351 this.strongReference = strongReference; 352 } 353 354 @Override 355 Lock delegate() { 356 return delegate; 357 } 358 359 @Override 360 public Condition newCondition() { 361 return new WeakSafeCondition(delegate.newCondition(), strongReference); 362 } 363 } 364 365 /** Condition object that ensures a strong reference is retained to a specified object. */ 366 private static final class WeakSafeCondition extends ForwardingCondition { 367 private final Condition delegate; 368 369 @SuppressWarnings("unused") 370 private final WeakSafeReadWriteLock strongReference; 371 372 WeakSafeCondition(Condition delegate, WeakSafeReadWriteLock strongReference) { 373 this.delegate = delegate; 374 this.strongReference = strongReference; 375 } 376 377 @Override 378 Condition delegate() { 379 return delegate; 380 } 381 } 382 383 private abstract static class PowerOfTwoStriped<L> extends Striped<L> { 384 /** Capacity (power of two) minus one, for fast mod evaluation */ 385 final int mask; 386 387 PowerOfTwoStriped(int stripes) { 388 Preconditions.checkArgument(stripes > 0, "Stripes must be positive"); 389 this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1; 390 } 391 392 @Override 393 final int indexFor(Object key) { 394 int hash = smear(key.hashCode()); 395 return hash & mask; 396 } 397 398 @Override 399 public final L get(Object key) { 400 return getAt(indexFor(key)); 401 } 402 } 403 404 /** 405 * Implementation of Striped where 2^k stripes are represented as an array of the same length, 406 * eagerly initialized. 407 */ 408 private static class CompactStriped<L> extends PowerOfTwoStriped<L> { 409 /** Size is a power of two. */ 410 private final Object[] array; 411 412 private CompactStriped(int stripes, Supplier<L> supplier) { 413 super(stripes); 414 Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)"); 415 416 this.array = new Object[mask + 1]; 417 for (int i = 0; i < array.length; i++) { 418 array[i] = supplier.get(); 419 } 420 } 421 422 @SuppressWarnings("unchecked") // we only put L's in the array 423 @Override 424 public L getAt(int index) { 425 return (L) array[index]; 426 } 427 428 @Override 429 public int size() { 430 return array.length; 431 } 432 } 433 434 /** 435 * Implementation of Striped where up to 2^k stripes can be represented, using an 436 * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the 437 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. 438 */ 439 @VisibleForTesting 440 static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> { 441 final AtomicReferenceArray<@Nullable ArrayReference<? extends L>> locks; 442 final Supplier<L> supplier; 443 final int size; 444 final ReferenceQueue<L> queue = new ReferenceQueue<L>(); 445 446 SmallLazyStriped(int stripes, Supplier<L> supplier) { 447 super(stripes); 448 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; 449 this.locks = new AtomicReferenceArray<>(size); 450 this.supplier = supplier; 451 } 452 453 @Override 454 public L getAt(int index) { 455 if (size != Integer.MAX_VALUE) { 456 Preconditions.checkElementIndex(index, size()); 457 } // else no check necessary, all index values are valid 458 ArrayReference<? extends L> existingRef = locks.get(index); 459 L existing = existingRef == null ? null : existingRef.get(); 460 if (existing != null) { 461 return existing; 462 } 463 L created = supplier.get(); 464 ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue); 465 while (!locks.compareAndSet(index, existingRef, newRef)) { 466 // we raced, we need to re-read and try again 467 existingRef = locks.get(index); 468 existing = existingRef == null ? null : existingRef.get(); 469 if (existing != null) { 470 return existing; 471 } 472 } 473 drainQueue(); 474 return created; 475 } 476 477 // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references 478 // in the array. We could skip this if we decide we don't care about holding on to Reference 479 // objects indefinitely. 480 private void drainQueue() { 481 Reference<? extends L> ref; 482 while ((ref = queue.poll()) != null) { 483 // We only ever register ArrayReferences with the queue so this is always safe. 484 ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref; 485 // Try to clear out the array slot, n.b. if we fail that is fine, in either case the 486 // arrayRef will be out of the array after this step. 487 locks.compareAndSet(arrayRef.index, arrayRef, null); 488 } 489 } 490 491 @Override 492 public int size() { 493 return size; 494 } 495 496 private static final class ArrayReference<L> extends WeakReference<L> { 497 final int index; 498 499 ArrayReference(L referent, int index, ReferenceQueue<L> queue) { 500 super(referent, queue); 501 this.index = index; 502 } 503 } 504 } 505 506 /** 507 * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap 508 * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the 509 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. 510 */ 511 @VisibleForTesting 512 static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> { 513 final ConcurrentMap<Integer, L> locks; 514 final Supplier<L> supplier; 515 final int size; 516 517 LargeLazyStriped(int stripes, Supplier<L> supplier) { 518 super(stripes); 519 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; 520 this.supplier = supplier; 521 this.locks = new MapMaker().weakValues().makeMap(); 522 } 523 524 @Override 525 public L getAt(int index) { 526 if (size != Integer.MAX_VALUE) { 527 Preconditions.checkElementIndex(index, size()); 528 } // else no check necessary, all index values are valid 529 L existing = locks.get(index); 530 if (existing != null) { 531 return existing; 532 } 533 L created = supplier.get(); 534 existing = locks.putIfAbsent(index, created); 535 return MoreObjects.firstNonNull(existing, created); 536 } 537 538 @Override 539 public int size() { 540 return size; 541 } 542 } 543 544 /** A bit mask were all bits are set. */ 545 private static final int ALL_SET = ~0; 546 547 private static int ceilToPowerOfTwo(int x) { 548 return 1 << IntMath.log2(x, RoundingMode.CEILING); 549 } 550 551 /* 552 * This method was written by Doug Lea with assistance from members of JCP JSR-166 Expert Group 553 * and released to the public domain, as explained at 554 * http://creativecommons.org/licenses/publicdomain 555 * 556 * As of 2010/06/11, this method is identical to the (package private) hash method in OpenJDK 7's 557 * java.util.HashMap class. 558 */ 559 // Copied from java/com/google/common/collect/Hashing.java 560 private static int smear(int hashCode) { 561 hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); 562 return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); 563 } 564 565 private static class PaddedLock extends ReentrantLock { 566 /* 567 * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add a fourth 568 * long here, to minimize chance of interference between consecutive locks, but I couldn't 569 * observe any benefit from that. 570 */ 571 long unused1; 572 long unused2; 573 long unused3; 574 575 PaddedLock() { 576 super(false); 577 } 578 } 579 580 private static class PaddedSemaphore extends Semaphore { 581 // See PaddedReentrantLock comment 582 long unused1; 583 long unused2; 584 long unused3; 585 586 PaddedSemaphore(int permits) { 587 super(permits, false); 588 } 589 } 590}