1
15
16 package software.amazon.awssdk.http.apache.internal.conn;
17
18 import java.time.Duration;
19 import java.util.Map;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Supplier;
25 import org.apache.http.conn.HttpClientConnectionManager;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import software.amazon.awssdk.annotations.SdkInternalApi;
29 import software.amazon.awssdk.annotations.SdkTestInternalApi;
30
31
34 @SdkInternalApi
35 public final class IdleConnectionReaper {
36 private static final Logger log = LoggerFactory.getLogger(IdleConnectionReaper.class);
37
38 private static final IdleConnectionReaper INSTANCE = new IdleConnectionReaper();
39
40 private final Map<HttpClientConnectionManager, Long> connectionManagers;
41
42 private final Supplier<ExecutorService> executorServiceSupplier;
43
44 private final long sleepPeriod;
45
46 private volatile ExecutorService exec;
47
48 private volatile ReaperTask reaperTask;
49
50 private IdleConnectionReaper() {
51 this.connectionManagers = new ConcurrentHashMap<>();
52
53 this.executorServiceSupplier = () -> {
54 ExecutorService e = Executors.newSingleThreadExecutor(r -> {
55 Thread t = new Thread(r, "idle-connection-reaper");
56 t.setDaemon(true);
57 return t;
58 });
59 return e;
60 };
61
62 this.sleepPeriod = Duration.ofMinutes(1).toMillis();
63 }
64
65 @SdkTestInternalApi
66 IdleConnectionReaper(Map<HttpClientConnectionManager, Long> connectionManagers,
67 Supplier<ExecutorService> executorServiceSupplier,
68 long sleepPeriod) {
69
70 this.connectionManagers = connectionManagers;
71 this.executorServiceSupplier = executorServiceSupplier;
72 this.sleepPeriod = sleepPeriod;
73 }
74
75
83 public synchronized boolean registerConnectionManager(HttpClientConnectionManager manager, long maxIdleTime) {
84 boolean notPreviouslyRegistered = connectionManagers.put(manager, maxIdleTime) == null;
85 setupExecutorIfNecessary();
86 return notPreviouslyRegistered;
87 }
88
89
96 public synchronized boolean deregisterConnectionManager(HttpClientConnectionManager manager) {
97 boolean wasRemoved = connectionManagers.remove(manager) != null;
98 cleanupExecutorIfNecessary();
99 return wasRemoved;
100 }
101
102
105 public static IdleConnectionReaper getInstance() {
106 return INSTANCE;
107 }
108
109 private void setupExecutorIfNecessary() {
110 if (exec != null) {
111 return;
112 }
113
114 ExecutorService e = executorServiceSupplier.get();
115
116 this.reaperTask = new ReaperTask(connectionManagers, sleepPeriod);
117
118 e.execute(this.reaperTask);
119
120 exec = e;
121 }
122
123 private void cleanupExecutorIfNecessary() {
124 if (exec == null || !connectionManagers.isEmpty()) {
125 return;
126 }
127
128 reaperTask.stop();
129 reaperTask = null;
130 exec.shutdownNow();
131 exec = null;
132 }
133
134 private static final class ReaperTask implements Runnable {
135 private final Map<HttpClientConnectionManager, Long> connectionManagers;
136 private final long sleepPeriod;
137
138 private volatile boolean stopping = false;
139
140 private ReaperTask(Map<HttpClientConnectionManager, Long> connectionManagers,
141 long sleepPeriod) {
142 this.connectionManagers = connectionManagers;
143 this.sleepPeriod = sleepPeriod;
144 }
145
146 @Override
147 public void run() {
148 while (!stopping) {
149 try {
150 Thread.sleep(sleepPeriod);
151
152 for (Map.Entry<HttpClientConnectionManager, Long> entry : connectionManagers.entrySet()) {
153 try {
154 entry.getKey().closeIdleConnections(entry.getValue(), TimeUnit.MILLISECONDS);
155 } catch (Exception t) {
156 log.warn("Unable to close idle connections", t);
157 }
158 }
159 } catch (Throwable t) {
160 log.debug("Reaper thread: ", t);
161 }
162 }
163 log.debug("Shutting down reaper thread.");
164 }
165
166 private void stop() {
167 stopping = true;
168 }
169 }
170 }
171