1 /*
2  * Copyright (C) 2013, 2014 Brett Wooldridge
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package com.zaxxer.hikari.util;
17
18 import com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import java.lang.ref.WeakReference;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.SynchronousQueue;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.stream.Collectors;
31
32 import static com.zaxxer.hikari.util.ClockSource.currentTime;
33 import static com.zaxxer.hikari.util.ClockSource.elapsedNanos;
34 import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.*;
35 import static java.util.concurrent.TimeUnit.MICROSECONDS;
36 import static java.util.concurrent.TimeUnit.NANOSECONDS;
37 import static java.util.concurrent.locks.LockSupport.parkNanos;
38
39 /**
40  * This is a specialized concurrent bag that achieves superior performance
41  * to LinkedBlockingQueue and LinkedTransferQueue for the purposes of a
42  * connection pool.  It uses ThreadLocal storage when possible to avoid
43  * locks, but resorts to scanning a common collection if there are no
44  * available items in the ThreadLocal list.  Not-in-use items in the
45  * ThreadLocal lists can be "stolen" when the borrowing thread has none
46  * of its own.  It is a "lock-less" implementation using a specialized
47  * AbstractQueuedLongSynchronizer to manage cross-thread signaling.
48  *
49  * Note that items that are "borrowed" from the bag are not actually
50  * removed from any collection, so garbage collection will not occur
51  * even if the reference is abandoned.  Thus care must be taken to
52  * "requite" borrowed objects otherwise a memory leak will result.  Only
53  * the "remove" method can completely remove an object from the bag.
54  *
55  * @author Brett Wooldridge
56  *
57  * @param <T> the templated type to store in the bag
58  */

59 public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable
60 {
61    private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);
62
63    private final CopyOnWriteArrayList<T> sharedList;
64    private final boolean weakThreadLocals;
65
66    private final ThreadLocal<List<Object>> threadList;
67    private final IBagStateListener listener;
68    private final AtomicInteger waiters;
69    private volatile boolean closed;
70
71    private final SynchronousQueue<T> handoffQueue;
72
73    public interface IConcurrentBagEntry
74    {
75       int STATE_NOT_IN_USE = 0;
76       int STATE_IN_USE = 1;
77       int STATE_REMOVED = -1;
78       int STATE_RESERVED = -2;
79
80       boolean compareAndSet(int expectState, int newState);
81       void setState(int newState);
82       int getState();
83    }
84
85    public interface IBagStateListener
86    {
87       void addBagItem(int waiting);
88    }
89
90    /**
91     * Construct a ConcurrentBag with the specified listener.
92     *
93     * @param listener the IBagStateListener to attach to this bag
94     */

95    public ConcurrentBag(final IBagStateListener listener)
96    {
97       this.listener = listener;
98       this.weakThreadLocals = useWeakThreadLocals();
99
100       this.handoffQueue = new SynchronousQueue<>(true);
101       this.waiters = new AtomicInteger();
102       this.sharedList = new CopyOnWriteArrayList<>();
103       if (weakThreadLocals) {
104          this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
105       }
106       else {
107          this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
108       }
109    }
110
111    /**
112     * The method will borrow a BagEntry from the bag, blocking for the
113     * specified timeout if none are available.
114     *
115     * @param timeout how long to wait before giving up, in units of unit
116     * @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
117     * @return a borrowed instance from the bag or null if a timeout occurs
118     * @throws InterruptedException if interrupted while waiting
119     */

120    public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
121    {
122       // Try the thread-local list first
123       final List<Object> list = threadList.get();
124       for (int i = list.size() - 1; i >= 0; i--) {
125          final Object entry = list.remove(i);
126          @SuppressWarnings("unchecked")
127          final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
128          if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
129             return bagEntry;
130          }
131       }
132
133       // Otherwise, scan the shared list ... then poll the handoff queue
134       final int waiting = waiters.incrementAndGet();
135       try {
136          for (T bagEntry : sharedList) {
137             if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
138                // If we may have stolen another waiter's connection, request another bag add.
139                if (waiting > 1) {
140                   listener.addBagItem(waiting - 1);
141                }
142                return bagEntry;
143             }
144          }
145
146          listener.addBagItem(waiting);
147
148          timeout = timeUnit.toNanos(timeout);
149          do {
150             final long start = currentTime();
151             final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
152             if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
153                return bagEntry;
154             }
155
156             timeout -= elapsedNanos(start);
157          } while (timeout > 10_000);
158
159          return null;
160       }
161       finally {
162          waiters.decrementAndGet();
163       }
164    }
165
166    /**
167     * This method will return a borrowed object to the bag.  Objects
168     * that are borrowed from the bag but never "requited" will result
169     * in a memory leak.
170     *
171     * @param bagEntry the value to return to the bag
172     * @throws NullPointerException if value is null
173     * @throws IllegalStateException if the bagEntry was not borrowed from the bag
174     */

175    public void requite(final T bagEntry)
176    {
177       bagEntry.setState(STATE_NOT_IN_USE);
178
179       for (int i = 0; waiters.get() > 0; i++) {
180          if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
181             return;
182          }
183          else if ((i & 0xff) == 0xff) {
184             parkNanos(MICROSECONDS.toNanos(10));
185          }
186          else {
187             Thread.yield();
188          }
189       }
190
191       final List<Object> threadLocalList = threadList.get();
192       if (threadLocalList.size() < 50) {
193          threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
194       }
195    }
196
197    /**
198     * Add a new object to the bag for others to borrow.
199     *
200     * @param bagEntry an object to add to the bag
201     */

202    public void add(final T bagEntry)
203    {
204       if (closed) {
205          LOGGER.info("ConcurrentBag has been closed, ignoring add()");
206          throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
207       }
208
209       sharedList.add(bagEntry);
210
211       // spin until a thread takes it or none are waiting
212       while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
213          Thread.yield();
214       }
215    }
216
217    /**
218     * Remove a value from the bag.  This method should only be called
219     * with objects obtained by <code>borrow(long, TimeUnit)</code> or <code>reserve(T)</code>
220     *
221     * @param bagEntry the value to remove
222     * @return true if the entry was removed, false otherwise
223     * @throws IllegalStateException if an attempt is made to remove an object
224     *         from the bag that was not borrowed or reserved first
225     */

226    public boolean remove(final T bagEntry)
227    {
228       if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
229          LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
230          return false;
231       }
232
233       final boolean removed = sharedList.remove(bagEntry);
234       if (!removed && !closed) {
235          LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
236       }
237
238       threadList.get().remove(bagEntry);
239
240       return removed;
241    }
242
243    /**
244     * Close the bag to further adds.
245     */

246    @Override
247    public void close()
248    {
249       closed = true;
250    }
251
252    /**
253     * This method provides a "snapshot" in time of the BagEntry
254     * items in the bag in the specified state.  It does not "lock"
255     * or reserve items in any way.  Call <code>reserve(T)</code>
256     * on items in list before performing any action on them.
257     *
258     * @param state one of the {@link IConcurrentBagEntry} states
259     * @return a possibly empty list of objects having the state specified
260     */

261    public List<T> values(final int state)
262    {
263       final List<T> list = sharedList.stream().filter(e -> e.getState() == state).collect(Collectors.toList());
264       Collections.reverse(list);
265       return list;
266    }
267
268    /**
269     * This method provides a "snapshot" in time of the bag items.  It
270     * does not "lock" or reserve items in any way.  Call <code>reserve(T)</code>
271     * on items in the list, or understand the concurrency implications of
272     * modifying items, before performing any action on them.
273     *
274     * @return a possibly empty list of (all) bag items
275     */

276    @SuppressWarnings("unchecked")
277    public List<T> values()
278    {
279       return (List<T>) sharedList.clone();
280    }
281
282    /**
283     * The method is used to make an item in the bag "unavailable" for
284     * borrowing.  It is primarily used when wanting to operate on items
285     * returned by the <code>values(int)</code> method.  Items that are
286     * reserved can be removed from the bag via <code>remove(T)</code>
287     * without the need to unreserve them.  Items that are not removed
288     * from the bag can be make available for borrowing again by calling
289     * the <code>unreserve(T)</code> method.
290     *
291     * @param bagEntry the item to reserve
292     * @return true if the item was able to be reserved, false otherwise
293     */

294    public boolean reserve(final T bagEntry)
295    {
296       return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
297    }
298
299    /**
300     * This method is used to make an item reserved via <code>reserve(T)</code>
301     * available again for borrowing.
302     *
303     * @param bagEntry the item to unreserve
304     */

305    @SuppressWarnings("SpellCheckingInspection")
306    public void unreserve(final T bagEntry)
307    {
308       if (bagEntry.compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {
309          // spin until a thread takes it or none are waiting
310          while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
311             Thread.yield();
312          }
313       }
314       else {
315          LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry);
316       }
317    }
318
319    /**
320     * Get the number of threads pending (waiting) for an item from the
321     * bag to become available.
322     *
323     * @return the number of threads waiting for items from the bag
324     */

325    public int getWaitingThreadCount()
326    {
327       return waiters.get();
328    }
329
330    /**
331     * Get a count of the number of items in the specified state at the time of this call.
332     *
333     * @param state the state of the items to count
334     * @return a count of how many items in the bag are in the specified state
335     */

336    public int getCount(final int state)
337    {
338       int count = 0;
339       for (IConcurrentBagEntry e : sharedList) {
340          if (e.getState() == state) {
341             count++;
342          }
343       }
344       return count;
345    }
346
347    public int[] getStateCounts()
348    {
349       final int[] states = new int[6];
350       for (IConcurrentBagEntry e : sharedList) {
351          ++states[e.getState()];
352       }
353       states[4] = sharedList.size();
354       states[5] = waiters.get();
355
356       return states;
357    }
358
359    /**
360     * Get the total number of items in the bag.
361     *
362     * @return the number of items in the bag
363     */

364    public int size()
365    {
366       return sharedList.size();
367    }
368
369    public void dumpState()
370    {
371       sharedList.forEach(entry -> LOGGER.info(entry.toString()));
372    }
373
374    /**
375     * Determine whether to use WeakReferences based on whether there is a
376     * custom ClassLoader implementation sitting between this class and the
377     * System ClassLoader.
378     *
379     * @return true if we should use WeakReferences in our ThreadLocals, false otherwise
380     */

381    private boolean useWeakThreadLocals()
382    {
383       try {
384          if (System.getProperty("com.zaxxer.hikari.useWeakReferences") != null) {   // undocumented manual override of WeakReference behavior
385             return Boolean.getBoolean("com.zaxxer.hikari.useWeakReferences");
386          }
387
388          return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();
389       }
390       catch (SecurityException se) {
391          return true;
392       }
393    }
394 }
395