1
16 package org.springframework.data.repository.core.support;
17
18 import kotlin.coroutines.Continuation;
19 import kotlinx.coroutines.reactive.AwaitKt;
20
21 import java.lang.reflect.Method;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26
27 import org.aopalliance.intercept.MethodInterceptor;
28 import org.aopalliance.intercept.MethodInvocation;
29 import org.reactivestreams.Publisher;
30 import org.springframework.core.KotlinDetector;
31 import org.springframework.core.ResolvableType;
32 import org.springframework.data.projection.ProjectionFactory;
33 import org.springframework.data.repository.core.NamedQueries;
34 import org.springframework.data.repository.core.RepositoryInformation;
35 import org.springframework.data.repository.query.QueryLookupStrategy;
36 import org.springframework.data.repository.query.QueryMethod;
37 import org.springframework.data.repository.query.RepositoryQuery;
38 import org.springframework.data.repository.util.QueryExecutionConverters;
39 import org.springframework.data.repository.util.ReactiveWrapperConverters;
40 import org.springframework.data.repository.util.ReactiveWrappers;
41 import org.springframework.data.util.KotlinReflectionUtils;
42 import org.springframework.data.util.Pair;
43 import org.springframework.lang.Nullable;
44 import org.springframework.util.ConcurrentReferenceHashMap;
45
46
55 class QueryExecutorMethodInterceptor implements MethodInterceptor {
56
57 private final Map<Method, RepositoryQuery> queries;
58 private final Map<Method, QueryMethodInvoker> invocationMetadataCache = new ConcurrentReferenceHashMap<>();
59 private final QueryExecutionResultHandler resultHandler;
60 private final NamedQueries namedQueries;
61 private final List<QueryCreationListener<?>> queryPostProcessors;
62
63
67 public QueryExecutorMethodInterceptor(RepositoryInformation repositoryInformation,
68 ProjectionFactory projectionFactory, Optional<QueryLookupStrategy> queryLookupStrategy, NamedQueries namedQueries,
69 List<QueryCreationListener<?>> queryPostProcessors) {
70
71 this.namedQueries = namedQueries;
72 this.queryPostProcessors = queryPostProcessors;
73
74 this.resultHandler = new QueryExecutionResultHandler(RepositoryFactorySupport.CONVERSION_SERVICE);
75
76 if (!queryLookupStrategy.isPresent() && repositoryInformation.hasQueryMethods()) {
77
78 throw new IllegalStateException("You have defined query methods in the repository"
79 + " but do not have any query lookup strategy defined."
80 + " The infrastructure apparently does not support query methods!");
81 }
82
83 this.queries = queryLookupStrategy
84 .map(it -> mapMethodsToQuery(repositoryInformation, it, projectionFactory))
85 .orElse(Collections.emptyMap());
86 }
87
88 private Map<Method, RepositoryQuery> mapMethodsToQuery(RepositoryInformation repositoryInformation,
89 QueryLookupStrategy lookupStrategy, ProjectionFactory projectionFactory) {
90
91 return repositoryInformation.getQueryMethods().stream()
92 .map(method -> lookupQuery(method, repositoryInformation, lookupStrategy, projectionFactory))
93 .peek(pair -> invokeListeners(pair.getSecond()))
94 .collect(Pair.toMap());
95 }
96
97 private Pair<Method, RepositoryQuery> lookupQuery(Method method, RepositoryInformation information,
98 QueryLookupStrategy strategy, ProjectionFactory projectionFactory) {
99 return Pair.of(method, strategy.resolveQuery(method, information, projectionFactory, namedQueries));
100 }
101
102 @SuppressWarnings({ "rawtypes", "unchecked" })
103 private void invokeListeners(RepositoryQuery query) {
104
105 for (QueryCreationListener listener : queryPostProcessors) {
106
107 ResolvableType typeArgument = ResolvableType.forClass(QueryCreationListener.class, listener.getClass())
108 .getGeneric(0);
109
110 if (typeArgument != null && typeArgument.isAssignableFrom(ResolvableType.forClass(query.getClass()))) {
111 listener.onCreation(query);
112 }
113 }
114 }
115
116
120 @Override
121 @Nullable
122 public Object invoke(@SuppressWarnings("null") MethodInvocation invocation) throws Throwable {
123
124 Method method = invocation.getMethod();
125
126 QueryExecutionConverters.ExecutionAdapter executionAdapter = QueryExecutionConverters
127 .getExecutionAdapter(method.getReturnType());
128
129 if (executionAdapter == null) {
130 return resultHandler.postProcessInvocationResult(doInvoke(invocation), method);
131 }
132
133 return executionAdapter
134 .apply(() -> resultHandler.postProcessInvocationResult(doInvoke(invocation), method));
135 }
136
137 @Nullable
138 private Object doInvoke(MethodInvocation invocation) throws Throwable {
139
140 Method method = invocation.getMethod();
141
142 if (hasQueryFor(method)) {
143
144 QueryMethodInvoker invocationMetadata = invocationMetadataCache.get(method);
145
146 if (invocationMetadata == null) {
147 invocationMetadata = new QueryMethodInvoker(method);
148 invocationMetadataCache.put(method, invocationMetadata);
149 }
150
151 RepositoryQuery repositoryQuery = queries.get(method);
152 return invocationMetadata.invoke(repositoryQuery, invocation.getArguments());
153 }
154
155 return invocation.proceed();
156 }
157
158
164 private boolean hasQueryFor(Method method) {
165 return queries.containsKey(method);
166 }
167
168
171 static class QueryMethodInvoker {
172
173 private final boolean suspendedDeclaredMethod;
174 private final Class<?> returnedType;
175 private final boolean returnsReactiveType;
176
177 QueryMethodInvoker(Method invokedMethod) {
178
179 if (KotlinDetector.isKotlinReflectPresent()) {
180
181 this.suspendedDeclaredMethod = KotlinReflectionUtils.isSuspend(invokedMethod);
182 this.returnedType = this.suspendedDeclaredMethod ? KotlinReflectionUtils.getReturnType(invokedMethod)
183 : invokedMethod.getReturnType();
184 } else {
185
186 this.suspendedDeclaredMethod = false;
187 this.returnedType = invokedMethod.getReturnType();
188 }
189
190 this.returnsReactiveType = ReactiveWrappers.supports(returnedType);
191 }
192
193 @Nullable
194 public Object invoke(RepositoryQuery query, Object[] args) {
195 return suspendedDeclaredMethod ? invokeReactiveToSuspend(query, args) : query.execute(args);
196 }
197
198 @Nullable
199 @SuppressWarnings({ "unchecked", "ConstantConditions" })
200 private Object invokeReactiveToSuspend(RepositoryQuery query, Object[] args) {
201
202
207 Continuation<Object> continuation = (Continuation) args[args.length - 1];
208 args[args.length - 1] = null;
209 Object result = query.execute(args);
210
211 if (returnsReactiveType) {
212 return ReactiveWrapperConverters.toWrapper(result, returnedType);
213 }
214
215 Publisher<?> publisher = result instanceof Publisher ? (Publisher<?>) result
216 : ReactiveWrapperConverters.toWrapper(result, Publisher.class);
217
218 return AwaitKt.awaitFirstOrNull(publisher, continuation);
219 }
220 }
221 }
222