001/*
002 * Copyright (C) 2011 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.collect.Lists.newArrayList;
018
019import com.google.common.annotations.Beta;
020import com.google.common.annotations.GwtIncompatible;
021import com.google.common.annotations.VisibleForTesting;
022import com.google.common.base.MoreObjects;
023import com.google.common.base.Preconditions;
024import com.google.common.base.Supplier;
025import com.google.common.collect.ImmutableList;
026import com.google.common.collect.MapMaker;
027import com.google.common.math.IntMath;
028import com.google.common.primitives.Ints;
029import java.lang.ref.Reference;
030import java.lang.ref.ReferenceQueue;
031import java.lang.ref.WeakReference;
032import java.math.RoundingMode;
033import java.util.Arrays;
034import java.util.Collections;
035import java.util.List;
036import java.util.concurrent.ConcurrentMap;
037import java.util.concurrent.Semaphore;
038import java.util.concurrent.atomic.AtomicReferenceArray;
039import java.util.concurrent.locks.Condition;
040import java.util.concurrent.locks.Lock;
041import java.util.concurrent.locks.ReadWriteLock;
042import java.util.concurrent.locks.ReentrantLock;
043import java.util.concurrent.locks.ReentrantReadWriteLock;
044import org.checkerframework.checker.nullness.qual.Nullable;
045
046/**
047 * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping similar
048 * to that of {@code ConcurrentHashMap} in a reusable form, and extends it for semaphores and
049 * read-write locks. Conceptually, lock striping is the technique of dividing a lock into many
050 * <i>stripes</i>, increasing the granularity of a single lock and allowing independent operations
051 * to lock different stripes and proceed concurrently, instead of creating contention for a single
052 * lock.
053 *
054 * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore),
055 * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} (assuming
056 * {@link Object#hashCode()} is correctly implemented for the keys). Note that if {@code key1} is
057 * <strong>not</strong> equal to {@code key2}, it is <strong>not</strong> guaranteed that {@code
058 * striped.get(key1) != striped.get(key2)}; the elements might nevertheless be mapped to the same
059 * lock. The lower the number of stripes, the higher the probability of this happening.
060 *
061 * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>}, and
062 * {@code Striped<ReadWriteLock>}. For each type, two implementations are offered: {@linkplain
063 * #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak} {@code Striped<Lock>}, {@linkplain
064 * #semaphore(int, int) strong} and {@linkplain #lazyWeakSemaphore(int, int) weak} {@code
065 * Striped<Semaphore>}, and {@linkplain #readWriteLock(int) strong} and {@linkplain
066 * #lazyWeakReadWriteLock(int) weak} {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all
067 * stripes (locks/semaphores) are initialized eagerly, and are not reclaimed unless {@code Striped}
068 * itself is reclaimable. <i>Weak</i> means that locks/semaphores are created lazily, and they are
069 * allowed to be reclaimed if nobody is holding on to them. This is useful, for example, if one
070 * wants to create a {@code Striped<Lock>} of many locks, but worries that in most cases only a
071 * small portion of these would be in use.
072 *
073 * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K}
074 * represents the task. This maximizes concurrency by having each unique key mapped to a unique
075 * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock for
076 * all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of choosing
077 * either of these extremes, {@code Striped} allows the user to trade between required concurrency
078 * and memory footprint. For example, if a set of tasks are CPU-bound, one could easily create a
079 * very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes, instead of
080 * possibly thousands of locks which could be created in a {@code Map<K, Lock>} structure.
081 *
082 * @author Dimitris Andreou
083 * @since 13.0
084 */
085@Beta
086@GwtIncompatible
087@ElementTypesAreNonnullByDefault
088public abstract class Striped<L> {
089  /**
090   * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be
091   * smaller than a large array. (This assumes that in the lazy case, most stripes are unused. As
092   * always, if many stripes are in use, a non-lazy striped makes more sense.)
093   */
094  private static final int LARGE_LAZY_CUTOFF = 1024;
095
096  private Striped() {}
097
098  /**
099   * Returns the stripe that corresponds to the passed key. It is always guaranteed that if {@code
100   * key1.equals(key2)}, then {@code get(key1) == get(key2)}.
101   *
102   * @param key an arbitrary, non-null key
103   * @return the stripe that the passed key corresponds to
104   */
105  public abstract L get(Object key);
106
107  /**
108   * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to {@code size()},
109   * exclusively.
110   *
111   * @param index the index of the stripe to return; must be in {@code [0...size())}
112   * @return the stripe at the specified index
113   */
114  public abstract L getAt(int index);
115
116  /**
117   * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
118   */
119  abstract int indexFor(Object key);
120
121  /** Returns the total number of stripes in this instance. */
122  public abstract int size();
123
124  /**
125   * Returns the stripes that correspond to the passed objects, in ascending (as per {@link
126   * #getAt(int)}) order. Thus, threads that use the stripes in the order returned by this method
127   * are guaranteed to not deadlock each other.
128   *
129   * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and {@code
130   * bulkGet(keys)} with a relative large number of keys can cause an excessive number of shared
131   * stripes (much like the birthday paradox, where much fewer than anticipated birthdays are needed
132   * for a pair of them to match). Please consider carefully the implications of the number of
133   * stripes, the intended concurrency level, and the typical number of keys used in a {@code
134   * bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls in
135   * Bins model</a> for mathematical formulas that can be used to estimate the probability of
136   * collisions.
137   *
138   * @param keys arbitrary non-null keys
139   * @return the stripes corresponding to the objects (one per each object, derived by delegating to
140   *     {@link #get(Object)}; may contain duplicates), in an increasing index order.
141   */
142  public Iterable<L> bulkGet(Iterable<? extends Object> keys) {
143    // Initially using the list to store the keys, then reusing it to store the respective L's
144    List<Object> result = newArrayList(keys);
145    if (result.isEmpty()) {
146      return ImmutableList.of();
147    }
148    int[] stripes = new int[result.size()];
149    for (int i = 0; i < result.size(); i++) {
150      stripes[i] = indexFor(result.get(i));
151    }
152    Arrays.sort(stripes);
153    // optimize for runs of identical stripes
154    int previousStripe = stripes[0];
155    result.set(0, getAt(previousStripe));
156    for (int i = 1; i < result.size(); i++) {
157      int currentStripe = stripes[i];
158      if (currentStripe == previousStripe) {
159        result.set(i, result.get(i - 1));
160      } else {
161        result.set(i, getAt(currentStripe));
162        previousStripe = currentStripe;
163      }
164    }
165    /*
166     * Note that the returned Iterable holds references to the returned stripes, to avoid
167     * error-prone code like:
168     *
169     * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)'
170     * Iterable<Lock> locks = stripedLock.bulkGet(keys);
171     * for (Lock lock : locks) {
172     *   lock.lock();
173     * }
174     * operation();
175     * for (Lock lock : locks) {
176     *   lock.unlock();
177     * }
178     *
179     * If we only held the int[] stripes, translating it on the fly to L's, the original locks might
180     * be garbage collected after locking them, ending up in a huge mess.
181     */
182    @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's
183    List<L> asStripes = (List<L>) result;
184    return Collections.unmodifiableList(asStripes);
185  }
186
187  // Static factories
188
189  /**
190   * Creates a {@code Striped<L>} with eagerly initialized, strongly referenced locks. Every lock is
191   * obtained from the passed supplier.
192   *
193   * @param stripes the minimum number of stripes (locks) required
194   * @param supplier a {@code Supplier<L>} object to obtain locks from
195   * @return a new {@code Striped<L>}
196   */
197  static <L> Striped<L> custom(int stripes, Supplier<L> supplier) {
198    return new CompactStriped<>(stripes, supplier);
199  }
200
201  /**
202   * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks. Every lock
203   * is reentrant.
204   *
205   * @param stripes the minimum number of stripes (locks) required
206   * @return a new {@code Striped<Lock>}
207   */
208  public static Striped<Lock> lock(int stripes) {
209    return custom(
210        stripes,
211        new Supplier<Lock>() {
212          @Override
213          public Lock get() {
214            return new PaddedLock();
215          }
216        });
217  }
218
219  /**
220   * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks. Every lock is
221   * reentrant.
222   *
223   * @param stripes the minimum number of stripes (locks) required
224   * @return a new {@code Striped<Lock>}
225   */
226  public static Striped<Lock> lazyWeakLock(int stripes) {
227    return lazy(
228        stripes,
229        new Supplier<Lock>() {
230          @Override
231          public Lock get() {
232            return new ReentrantLock(false);
233          }
234        });
235  }
236
237  private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) {
238    return stripes < LARGE_LAZY_CUTOFF
239        ? new SmallLazyStriped<L>(stripes, supplier)
240        : new LargeLazyStriped<L>(stripes, supplier);
241  }
242
243  /**
244   * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
245   * with the specified number of permits.
246   *
247   * @param stripes the minimum number of stripes (semaphores) required
248   * @param permits the number of permits in each semaphore
249   * @return a new {@code Striped<Semaphore>}
250   */
251  public static Striped<Semaphore> semaphore(int stripes, final int permits) {
252    return custom(
253        stripes,
254        new Supplier<Semaphore>() {
255          @Override
256          public Semaphore get() {
257            return new PaddedSemaphore(permits);
258          }
259        });
260  }
261
262  /**
263   * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
264   * with the specified number of permits.
265   *
266   * @param stripes the minimum number of stripes (semaphores) required
267   * @param permits the number of permits in each semaphore
268   * @return a new {@code Striped<Semaphore>}
269   */
270  public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) {
271    return lazy(
272        stripes,
273        new Supplier<Semaphore>() {
274          @Override
275          public Semaphore get() {
276            return new Semaphore(permits, false);
277          }
278        });
279  }
280
281  /**
282   * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
283   * read-write locks. Every lock is reentrant.
284   *
285   * @param stripes the minimum number of stripes (locks) required
286   * @return a new {@code Striped<ReadWriteLock>}
287   */
288  public static Striped<ReadWriteLock> readWriteLock(int stripes) {
289    return custom(stripes, READ_WRITE_LOCK_SUPPLIER);
290  }
291
292  /**
293   * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced read-write
294   * locks. Every lock is reentrant.
295   *
296   * @param stripes the minimum number of stripes (locks) required
297   * @return a new {@code Striped<ReadWriteLock>}
298   */
299  public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) {
300    return lazy(stripes, WEAK_SAFE_READ_WRITE_LOCK_SUPPLIER);
301  }
302
303  private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER =
304      new Supplier<ReadWriteLock>() {
305        @Override
306        public ReadWriteLock get() {
307          return new ReentrantReadWriteLock();
308        }
309      };
310
311  private static final Supplier<ReadWriteLock> WEAK_SAFE_READ_WRITE_LOCK_SUPPLIER =
312      new Supplier<ReadWriteLock>() {
313        @Override
314        public ReadWriteLock get() {
315          return new WeakSafeReadWriteLock();
316        }
317      };
318
319  /**
320   * ReadWriteLock implementation whose read and write locks retain a reference back to this lock.
321   * Otherwise, a reference to just the read lock or just the write lock would not suffice to ensure
322   * the {@code ReadWriteLock} is retained.
323   */
324  private static final class WeakSafeReadWriteLock implements ReadWriteLock {
325    private final ReadWriteLock delegate;
326
327    WeakSafeReadWriteLock() {
328      this.delegate = new ReentrantReadWriteLock();
329    }
330
331    @Override
332    public Lock readLock() {
333      return new WeakSafeLock(delegate.readLock(), this);
334    }
335
336    @Override
337    public Lock writeLock() {
338      return new WeakSafeLock(delegate.writeLock(), this);
339    }
340  }
341
342  /** Lock object that ensures a strong reference is retained to a specified object. */
343  private static final class WeakSafeLock extends ForwardingLock {
344    private final Lock delegate;
345
346    @SuppressWarnings("unused")
347    private final WeakSafeReadWriteLock strongReference;
348
349    WeakSafeLock(Lock delegate, WeakSafeReadWriteLock strongReference) {
350      this.delegate = delegate;
351      this.strongReference = strongReference;
352    }
353
354    @Override
355    Lock delegate() {
356      return delegate;
357    }
358
359    @Override
360    public Condition newCondition() {
361      return new WeakSafeCondition(delegate.newCondition(), strongReference);
362    }
363  }
364
365  /** Condition object that ensures a strong reference is retained to a specified object. */
366  private static final class WeakSafeCondition extends ForwardingCondition {
367    private final Condition delegate;
368
369    @SuppressWarnings("unused")
370    private final WeakSafeReadWriteLock strongReference;
371
372    WeakSafeCondition(Condition delegate, WeakSafeReadWriteLock strongReference) {
373      this.delegate = delegate;
374      this.strongReference = strongReference;
375    }
376
377    @Override
378    Condition delegate() {
379      return delegate;
380    }
381  }
382
383  private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
384    /** Capacity (power of two) minus one, for fast mod evaluation */
385    final int mask;
386
387    PowerOfTwoStriped(int stripes) {
388      Preconditions.checkArgument(stripes > 0, "Stripes must be positive");
389      this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1;
390    }
391
392    @Override
393    final int indexFor(Object key) {
394      int hash = smear(key.hashCode());
395      return hash & mask;
396    }
397
398    @Override
399    public final L get(Object key) {
400      return getAt(indexFor(key));
401    }
402  }
403
404  /**
405   * Implementation of Striped where 2^k stripes are represented as an array of the same length,
406   * eagerly initialized.
407   */
408  private static class CompactStriped<L> extends PowerOfTwoStriped<L> {
409    /** Size is a power of two. */
410    private final Object[] array;
411
412    private CompactStriped(int stripes, Supplier<L> supplier) {
413      super(stripes);
414      Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)");
415
416      this.array = new Object[mask + 1];
417      for (int i = 0; i < array.length; i++) {
418        array[i] = supplier.get();
419      }
420    }
421
422    @SuppressWarnings("unchecked") // we only put L's in the array
423    @Override
424    public L getAt(int index) {
425      return (L) array[index];
426    }
427
428    @Override
429    public int size() {
430      return array.length;
431    }
432  }
433
434  /**
435   * Implementation of Striped where up to 2^k stripes can be represented, using an
436   * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the
437   * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
438   */
439  @VisibleForTesting
440  static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> {
441    final AtomicReferenceArray<@Nullable ArrayReference<? extends L>> locks;
442    final Supplier<L> supplier;
443    final int size;
444    final ReferenceQueue<L> queue = new ReferenceQueue<L>();
445
446    SmallLazyStriped(int stripes, Supplier<L> supplier) {
447      super(stripes);
448      this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
449      this.locks = new AtomicReferenceArray<>(size);
450      this.supplier = supplier;
451    }
452
453    @Override
454    public L getAt(int index) {
455      if (size != Integer.MAX_VALUE) {
456        Preconditions.checkElementIndex(index, size());
457      } // else no check necessary, all index values are valid
458      ArrayReference<? extends L> existingRef = locks.get(index);
459      L existing = existingRef == null ? null : existingRef.get();
460      if (existing != null) {
461        return existing;
462      }
463      L created = supplier.get();
464      ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue);
465      while (!locks.compareAndSet(index, existingRef, newRef)) {
466        // we raced, we need to re-read and try again
467        existingRef = locks.get(index);
468        existing = existingRef == null ? null : existingRef.get();
469        if (existing != null) {
470          return existing;
471        }
472      }
473      drainQueue();
474      return created;
475    }
476
477    // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references
478    // in the array. We could skip this if we decide we don't care about holding on to Reference
479    // objects indefinitely.
480    private void drainQueue() {
481      Reference<? extends L> ref;
482      while ((ref = queue.poll()) != null) {
483        // We only ever register ArrayReferences with the queue so this is always safe.
484        ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref;
485        // Try to clear out the array slot, n.b. if we fail that is fine, in either case the
486        // arrayRef will be out of the array after this step.
487        locks.compareAndSet(arrayRef.index, arrayRef, null);
488      }
489    }
490
491    @Override
492    public int size() {
493      return size;
494    }
495
496    private static final class ArrayReference<L> extends WeakReference<L> {
497      final int index;
498
499      ArrayReference(L referent, int index, ReferenceQueue<L> queue) {
500        super(referent, queue);
501        this.index = index;
502      }
503    }
504  }
505
506  /**
507   * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap
508   * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the
509   * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
510   */
511  @VisibleForTesting
512  static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> {
513    final ConcurrentMap<Integer, L> locks;
514    final Supplier<L> supplier;
515    final int size;
516
517    LargeLazyStriped(int stripes, Supplier<L> supplier) {
518      super(stripes);
519      this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
520      this.supplier = supplier;
521      this.locks = new MapMaker().weakValues().makeMap();
522    }
523
524    @Override
525    public L getAt(int index) {
526      if (size != Integer.MAX_VALUE) {
527        Preconditions.checkElementIndex(index, size());
528      } // else no check necessary, all index values are valid
529      L existing = locks.get(index);
530      if (existing != null) {
531        return existing;
532      }
533      L created = supplier.get();
534      existing = locks.putIfAbsent(index, created);
535      return MoreObjects.firstNonNull(existing, created);
536    }
537
538    @Override
539    public int size() {
540      return size;
541    }
542  }
543
544  /** A bit mask were all bits are set. */
545  private static final int ALL_SET = ~0;
546
547  private static int ceilToPowerOfTwo(int x) {
548    return 1 << IntMath.log2(x, RoundingMode.CEILING);
549  }
550
551  /*
552   * This method was written by Doug Lea with assistance from members of JCP JSR-166 Expert Group
553   * and released to the public domain, as explained at
554   * http://creativecommons.org/licenses/publicdomain
555   *
556   * As of 2010/06/11, this method is identical to the (package private) hash method in OpenJDK 7's
557   * java.util.HashMap class.
558   */
559  // Copied from java/com/google/common/collect/Hashing.java
560  private static int smear(int hashCode) {
561    hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
562    return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
563  }
564
565  private static class PaddedLock extends ReentrantLock {
566    /*
567     * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add a fourth
568     * long here, to minimize chance of interference between consecutive locks, but I couldn't
569     * observe any benefit from that.
570     */
571    long unused1;
572    long unused2;
573    long unused3;
574
575    PaddedLock() {
576      super(false);
577    }
578  }
579
580  private static class PaddedSemaphore extends Semaphore {
581    // See PaddedReentrantLock comment
582    long unused1;
583    long unused2;
584    long unused3;
585
586    PaddedSemaphore(int permits) {
587      super(permits, false);
588    }
589  }
590}