1 /*
2 * Copyright (C) 2013, 2014 Brett Wooldridge
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package com.zaxxer.hikari.util;
17
18 import com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import java.lang.ref.WeakReference;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.SynchronousQueue;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.stream.Collectors;
31
32 import static com.zaxxer.hikari.util.ClockSource.currentTime;
33 import static com.zaxxer.hikari.util.ClockSource.elapsedNanos;
34 import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.*;
35 import static java.util.concurrent.TimeUnit.MICROSECONDS;
36 import static java.util.concurrent.TimeUnit.NANOSECONDS;
37 import static java.util.concurrent.locks.LockSupport.parkNanos;
38
39 /**
40 * This is a specialized concurrent bag that achieves superior performance
41 * to LinkedBlockingQueue and LinkedTransferQueue for the purposes of a
42 * connection pool. It uses ThreadLocal storage when possible to avoid
43 * locks, but resorts to scanning a common collection if there are no
44 * available items in the ThreadLocal list. Not-in-use items in the
45 * ThreadLocal lists can be "stolen" when the borrowing thread has none
46 * of its own. It is a "lock-less" implementation using a specialized
47 * AbstractQueuedLongSynchronizer to manage cross-thread signaling.
48 *
49 * Note that items that are "borrowed" from the bag are not actually
50 * removed from any collection, so garbage collection will not occur
51 * even if the reference is abandoned. Thus care must be taken to
52 * "requite" borrowed objects otherwise a memory leak will result. Only
53 * the "remove" method can completely remove an object from the bag.
54 *
55 * @author Brett Wooldridge
56 *
57 * @param <T> the templated type to store in the bag
58 */
59 public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable
60 {
61 private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);
62
63 private final CopyOnWriteArrayList<T> sharedList;
64 private final boolean weakThreadLocals;
65
66 private final ThreadLocal<List<Object>> threadList;
67 private final IBagStateListener listener;
68 private final AtomicInteger waiters;
69 private volatile boolean closed;
70
71 private final SynchronousQueue<T> handoffQueue;
72
73 public interface IConcurrentBagEntry
74 {
75 int STATE_NOT_IN_USE = 0;
76 int STATE_IN_USE = 1;
77 int STATE_REMOVED = -1;
78 int STATE_RESERVED = -2;
79
80 boolean compareAndSet(int expectState, int newState);
81 void setState(int newState);
82 int getState();
83 }
84
85 public interface IBagStateListener
86 {
87 void addBagItem(int waiting);
88 }
89
90 /**
91 * Construct a ConcurrentBag with the specified listener.
92 *
93 * @param listener the IBagStateListener to attach to this bag
94 */
95 public ConcurrentBag(final IBagStateListener listener)
96 {
97 this.listener = listener;
98 this.weakThreadLocals = useWeakThreadLocals();
99
100 this.handoffQueue = new SynchronousQueue<>(true);
101 this.waiters = new AtomicInteger();
102 this.sharedList = new CopyOnWriteArrayList<>();
103 if (weakThreadLocals) {
104 this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
105 }
106 else {
107 this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
108 }
109 }
110
111 /**
112 * The method will borrow a BagEntry from the bag, blocking for the
113 * specified timeout if none are available.
114 *
115 * @param timeout how long to wait before giving up, in units of unit
116 * @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
117 * @return a borrowed instance from the bag or null if a timeout occurs
118 * @throws InterruptedException if interrupted while waiting
119 */
120 public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
121 {
122 // Try the thread-local list first
123 final List<Object> list = threadList.get();
124 for (int i = list.size() - 1; i >= 0; i--) {
125 final Object entry = list.remove(i);
126 @SuppressWarnings("unchecked")
127 final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
128 if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
129 return bagEntry;
130 }
131 }
132
133 // Otherwise, scan the shared list ... then poll the handoff queue
134 final int waiting = waiters.incrementAndGet();
135 try {
136 for (T bagEntry : sharedList) {
137 if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
138 // If we may have stolen another waiter's connection, request another bag add.
139 if (waiting > 1) {
140 listener.addBagItem(waiting - 1);
141 }
142 return bagEntry;
143 }
144 }
145
146 listener.addBagItem(waiting);
147
148 timeout = timeUnit.toNanos(timeout);
149 do {
150 final long start = currentTime();
151 final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
152 if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
153 return bagEntry;
154 }
155
156 timeout -= elapsedNanos(start);
157 } while (timeout > 10_000);
158
159 return null;
160 }
161 finally {
162 waiters.decrementAndGet();
163 }
164 }
165
166 /**
167 * This method will return a borrowed object to the bag. Objects
168 * that are borrowed from the bag but never "requited" will result
169 * in a memory leak.
170 *
171 * @param bagEntry the value to return to the bag
172 * @throws NullPointerException if value is null
173 * @throws IllegalStateException if the bagEntry was not borrowed from the bag
174 */
175 public void requite(final T bagEntry)
176 {
177 bagEntry.setState(STATE_NOT_IN_USE);
178
179 for (int i = 0; waiters.get() > 0; i++) {
180 if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
181 return;
182 }
183 else if ((i & 0xff) == 0xff) {
184 parkNanos(MICROSECONDS.toNanos(10));
185 }
186 else {
187 Thread.yield();
188 }
189 }
190
191 final List<Object> threadLocalList = threadList.get();
192 if (threadLocalList.size() < 50) {
193 threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
194 }
195 }
196
197 /**
198 * Add a new object to the bag for others to borrow.
199 *
200 * @param bagEntry an object to add to the bag
201 */
202 public void add(final T bagEntry)
203 {
204 if (closed) {
205 LOGGER.info("ConcurrentBag has been closed, ignoring add()");
206 throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
207 }
208
209 sharedList.add(bagEntry);
210
211 // spin until a thread takes it or none are waiting
212 while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
213 Thread.yield();
214 }
215 }
216
217 /**
218 * Remove a value from the bag. This method should only be called
219 * with objects obtained by <code>borrow(long, TimeUnit)</code> or <code>reserve(T)</code>
220 *
221 * @param bagEntry the value to remove
222 * @return true if the entry was removed, false otherwise
223 * @throws IllegalStateException if an attempt is made to remove an object
224 * from the bag that was not borrowed or reserved first
225 */
226 public boolean remove(final T bagEntry)
227 {
228 if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
229 LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
230 return false;
231 }
232
233 final boolean removed = sharedList.remove(bagEntry);
234 if (!removed && !closed) {
235 LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
236 }
237
238 threadList.get().remove(bagEntry);
239
240 return removed;
241 }
242
243 /**
244 * Close the bag to further adds.
245 */
246 @Override
247 public void close()
248 {
249 closed = true;
250 }
251
252 /**
253 * This method provides a "snapshot" in time of the BagEntry
254 * items in the bag in the specified state. It does not "lock"
255 * or reserve items in any way. Call <code>reserve(T)</code>
256 * on items in list before performing any action on them.
257 *
258 * @param state one of the {@link IConcurrentBagEntry} states
259 * @return a possibly empty list of objects having the state specified
260 */
261 public List<T> values(final int state)
262 {
263 final List<T> list = sharedList.stream().filter(e -> e.getState() == state).collect(Collectors.toList());
264 Collections.reverse(list);
265 return list;
266 }
267
268 /**
269 * This method provides a "snapshot" in time of the bag items. It
270 * does not "lock" or reserve items in any way. Call <code>reserve(T)</code>
271 * on items in the list, or understand the concurrency implications of
272 * modifying items, before performing any action on them.
273 *
274 * @return a possibly empty list of (all) bag items
275 */
276 @SuppressWarnings("unchecked")
277 public List<T> values()
278 {
279 return (List<T>) sharedList.clone();
280 }
281
282 /**
283 * The method is used to make an item in the bag "unavailable" for
284 * borrowing. It is primarily used when wanting to operate on items
285 * returned by the <code>values(int)</code> method. Items that are
286 * reserved can be removed from the bag via <code>remove(T)</code>
287 * without the need to unreserve them. Items that are not removed
288 * from the bag can be make available for borrowing again by calling
289 * the <code>unreserve(T)</code> method.
290 *
291 * @param bagEntry the item to reserve
292 * @return true if the item was able to be reserved, false otherwise
293 */
294 public boolean reserve(final T bagEntry)
295 {
296 return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
297 }
298
299 /**
300 * This method is used to make an item reserved via <code>reserve(T)</code>
301 * available again for borrowing.
302 *
303 * @param bagEntry the item to unreserve
304 */
305 @SuppressWarnings("SpellCheckingInspection")
306 public void unreserve(final T bagEntry)
307 {
308 if (bagEntry.compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {
309 // spin until a thread takes it or none are waiting
310 while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
311 Thread.yield();
312 }
313 }
314 else {
315 LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry);
316 }
317 }
318
319 /**
320 * Get the number of threads pending (waiting) for an item from the
321 * bag to become available.
322 *
323 * @return the number of threads waiting for items from the bag
324 */
325 public int getWaitingThreadCount()
326 {
327 return waiters.get();
328 }
329
330 /**
331 * Get a count of the number of items in the specified state at the time of this call.
332 *
333 * @param state the state of the items to count
334 * @return a count of how many items in the bag are in the specified state
335 */
336 public int getCount(final int state)
337 {
338 int count = 0;
339 for (IConcurrentBagEntry e : sharedList) {
340 if (e.getState() == state) {
341 count++;
342 }
343 }
344 return count;
345 }
346
347 public int[] getStateCounts()
348 {
349 final int[] states = new int[6];
350 for (IConcurrentBagEntry e : sharedList) {
351 ++states[e.getState()];
352 }
353 states[4] = sharedList.size();
354 states[5] = waiters.get();
355
356 return states;
357 }
358
359 /**
360 * Get the total number of items in the bag.
361 *
362 * @return the number of items in the bag
363 */
364 public int size()
365 {
366 return sharedList.size();
367 }
368
369 public void dumpState()
370 {
371 sharedList.forEach(entry -> LOGGER.info(entry.toString()));
372 }
373
374 /**
375 * Determine whether to use WeakReferences based on whether there is a
376 * custom ClassLoader implementation sitting between this class and the
377 * System ClassLoader.
378 *
379 * @return true if we should use WeakReferences in our ThreadLocals, false otherwise
380 */
381 private boolean useWeakThreadLocals()
382 {
383 try {
384 if (System.getProperty("com.zaxxer.hikari.useWeakReferences") != null) { // undocumented manual override of WeakReference behavior
385 return Boolean.getBoolean("com.zaxxer.hikari.useWeakReferences");
386 }
387
388 return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();
389 }
390 catch (SecurityException se) {
391 return true;
392 }
393 }
394 }
395