1
16 package org.springframework.data.repository.util;
17
18 import io.reactivex.Flowable;
19 import io.reactivex.Maybe;
20 import kotlinx.coroutines.flow.Flow;
21 import kotlinx.coroutines.reactive.ReactiveFlowKt;
22 import lombok.experimental.UtilityClass;
23 import reactor.core.publisher.Flux;
24 import reactor.core.publisher.Mono;
25 import rx.Observable;
26 import rx.Single;
27
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.function.Function;
31
32 import javax.annotation.Nonnull;
33
34 import org.reactivestreams.Publisher;
35
36 import org.springframework.core.ReactiveAdapter;
37 import org.springframework.core.ReactiveAdapterRegistry;
38 import org.springframework.core.convert.ConversionService;
39 import org.springframework.core.convert.TypeDescriptor;
40 import org.springframework.core.convert.converter.ConditionalConverter;
41 import org.springframework.core.convert.converter.Converter;
42 import org.springframework.core.convert.converter.ConverterFactory;
43 import org.springframework.core.convert.support.ConfigurableConversionService;
44 import org.springframework.core.convert.support.GenericConversionService;
45 import org.springframework.data.repository.util.ReactiveWrappers.ReactiveLibrary;
46 import org.springframework.lang.Nullable;
47 import org.springframework.util.Assert;
48 import org.springframework.util.ClassUtils;
49
50
63 @UtilityClass
64 public class ReactiveWrapperConverters {
65
66 private static final List<ReactiveTypeWrapper<?>> REACTIVE_WRAPPERS = new ArrayList<>();
67 private static final GenericConversionService GENERIC_CONVERSION_SERVICE = new GenericConversionService();
68
69 static {
70
71 if (ReactiveWrappers.isAvailable(ReactiveLibrary.RXJAVA1)) {
72
73 REACTIVE_WRAPPERS.add(RxJava1SingleWrapper.INSTANCE);
74 REACTIVE_WRAPPERS.add(RxJava1ObservableWrapper.INSTANCE);
75 }
76
77 if (ReactiveWrappers.isAvailable(ReactiveLibrary.RXJAVA2)) {
78
79 REACTIVE_WRAPPERS.add(RxJava2SingleWrapper.INSTANCE);
80 REACTIVE_WRAPPERS.add(RxJava2MaybeWrapper.INSTANCE);
81 REACTIVE_WRAPPERS.add(RxJava2ObservableWrapper.INSTANCE);
82 REACTIVE_WRAPPERS.add(RxJava2FlowableWrapper.INSTANCE);
83 }
84
85 if (ReactiveWrappers.isAvailable(ReactiveLibrary.PROJECT_REACTOR)) {
86
87 REACTIVE_WRAPPERS.add(FluxWrapper.INSTANCE);
88 REACTIVE_WRAPPERS.add(MonoWrapper.INSTANCE);
89 REACTIVE_WRAPPERS.add(PublisherWrapper.INSTANCE);
90 }
91
92 registerConvertersIn(GENERIC_CONVERSION_SERVICE);
93 }
94
95
100 private static ConversionService registerConvertersIn(ConfigurableConversionService conversionService) {
101
102 Assert.notNull(conversionService, "ConversionService must not be null!");
103
104 if (ReactiveWrappers.isAvailable(ReactiveLibrary.PROJECT_REACTOR)) {
105
106 conversionService.addConverter(PublisherToMonoConverter.INSTANCE);
107 conversionService.addConverter(PublisherToFluxConverter.INSTANCE);
108
109 if (ReactiveWrappers.isAvailable(ReactiveLibrary.KOTLIN_COROUTINES)) {
110 conversionService.addConverter(PublisherToFlowConverter.INSTANCE);
111 }
112
113 if (RegistryHolder.REACTIVE_ADAPTER_REGISTRY != null) {
114 conversionService.addConverterFactory(ReactiveAdapterConverterFactory.INSTANCE);
115 }
116 }
117
118 return conversionService;
119 }
120
121
131 public static boolean supports(Class<?> type) {
132 return RegistryHolder.REACTIVE_ADAPTER_REGISTRY != null
133 && RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(type) != null;
134 }
135
136
143 @Nullable
144 @SuppressWarnings("unchecked")
145 public static <T> T toWrapper(Object reactiveObject, Class<? extends T> targetWrapperType) {
146
147 Assert.notNull(reactiveObject, "Reactive source object must not be null!");
148 Assert.notNull(targetWrapperType, "Reactive target type must not be null!");
149
150 if (targetWrapperType.isAssignableFrom(reactiveObject.getClass())) {
151 return (T) reactiveObject;
152 }
153
154 return GENERIC_CONVERSION_SERVICE.convert(reactiveObject, targetWrapperType);
155 }
156
157
164 @SuppressWarnings("unchecked")
165 public static <T> T map(Object reactiveObject, Function<Object, Object> converter) {
166
167 Assert.notNull(reactiveObject, "Reactive source object must not be null!");
168 Assert.notNull(converter, "Converter must not be null!");
169
170 return REACTIVE_WRAPPERS.stream()
171 .filter(it -> ClassUtils.isAssignable(it.getWrapperClass(), reactiveObject.getClass()))
172 .findFirst()
173 .map(it -> (T) it.map(reactiveObject, converter))
174 .orElseThrow(() -> new IllegalStateException(String.format("Cannot apply converter to %s", reactiveObject)));
175 }
176
177
184 public static boolean canConvert(Class<?> sourceType, Class<?> targetType) {
185
186 Assert.notNull(sourceType, "Source type must not be null!");
187 Assert.notNull(targetType, "Target type must not be null!");
188
189 return GENERIC_CONVERSION_SERVICE.canConvert(sourceType, targetType);
190 }
191
192
193
194
195
196
202 private interface ReactiveTypeWrapper<T> {
203
204
207 Class<? super T> getWrapperClass();
208
209
216 Object map(Object wrapper, Function<Object, Object> function);
217 }
218
219
222 private enum MonoWrapper implements ReactiveTypeWrapper<Mono<?>> {
223
224 INSTANCE;
225
226 @Override
227 public Class<? super Mono<?>> getWrapperClass() {
228 return Mono.class;
229 }
230
231 @Override
232 public Mono<?> map(Object wrapper, Function<Object, Object> function) {
233 return ((Mono<?>) wrapper).map(function::apply);
234 }
235 }
236
237
240 private enum FluxWrapper implements ReactiveTypeWrapper<Flux<?>> {
241
242 INSTANCE;
243
244 @Override
245 public Class<? super Flux<?>> getWrapperClass() {
246 return Flux.class;
247 }
248
249 public Flux<?> map(Object wrapper, Function<Object, Object> function) {
250 return ((Flux<?>) wrapper).map(function::apply);
251 }
252 }
253
254
257 private enum PublisherWrapper implements ReactiveTypeWrapper<Publisher<?>> {
258
259 INSTANCE;
260
261 @Override
262 public Class<? super Publisher<?>> getWrapperClass() {
263 return Publisher.class;
264 }
265
266 @Override
267 public Publisher<?> map(Object wrapper, Function<Object, Object> function) {
268
269 if (wrapper instanceof Mono) {
270 return MonoWrapper.INSTANCE.map(wrapper, function);
271 }
272
273 if (wrapper instanceof Flux) {
274 return FluxWrapper.INSTANCE.map(wrapper, function);
275 }
276
277 return FluxWrapper.INSTANCE.map(Flux.from((Publisher<?>) wrapper), function);
278 }
279 }
280
281
284 private enum RxJava1SingleWrapper implements ReactiveTypeWrapper<Single<?>> {
285
286 INSTANCE;
287
288 @Override
289 public Class<? super Single<?>> getWrapperClass() {
290 return Single.class;
291 }
292
293 @Override
294 public Single<?> map(Object wrapper, Function<Object, Object> function) {
295 return ((Single<?>) wrapper).map(function::apply);
296 }
297 }
298
299
302 private enum RxJava1ObservableWrapper implements ReactiveTypeWrapper<Observable<?>> {
303
304 INSTANCE;
305
306 @Override
307 public Class<? super Observable<?>> getWrapperClass() {
308 return Observable.class;
309 }
310
311 @Override
312 public Observable<?> map(Object wrapper, Function<Object, Object> function) {
313 return ((Observable<?>) wrapper).map(function::apply);
314 }
315 }
316
317
320 private enum RxJava2SingleWrapper implements ReactiveTypeWrapper<io.reactivex.Single<?>> {
321
322 INSTANCE;
323
324 @Override
325 public Class<? super io.reactivex.Single<?>> getWrapperClass() {
326 return io.reactivex.Single.class;
327 }
328
329 @Override
330 public io.reactivex.Single<?> map(Object wrapper, Function<Object, Object> function) {
331 return ((io.reactivex.Single<?>) wrapper).map(function::apply);
332 }
333 }
334
335
338 private enum RxJava2MaybeWrapper implements ReactiveTypeWrapper<Maybe<?>> {
339
340 INSTANCE;
341
342 @Override
343 public Class<? super io.reactivex.Maybe<?>> getWrapperClass() {
344 return io.reactivex.Maybe.class;
345 }
346
347 @Override
348 public io.reactivex.Maybe<?> map(Object wrapper, Function<Object, Object> function) {
349 return ((io.reactivex.Maybe<?>) wrapper).map(function::apply);
350 }
351 }
352
353
356 private enum RxJava2ObservableWrapper implements ReactiveTypeWrapper<io.reactivex.Observable<?>> {
357
358 INSTANCE;
359
360 @Override
361 public Class<? super io.reactivex.Observable<?>> getWrapperClass() {
362 return io.reactivex.Observable.class;
363 }
364
365 @Override
366 public io.reactivex.Observable<?> map(Object wrapper, Function<Object, Object> function) {
367 return ((io.reactivex.Observable<?>) wrapper).map(function::apply);
368 }
369 }
370
371
374 private enum RxJava2FlowableWrapper implements ReactiveTypeWrapper<Flowable<?>> {
375
376 INSTANCE;
377
378 @Override
379 public Class<? super Flowable<?>> getWrapperClass() {
380 return io.reactivex.Flowable.class;
381 }
382
383 @Override
384 public io.reactivex.Flowable<?> map(Object wrapper, Function<Object, Object> function) {
385 return ((io.reactivex.Flowable<?>) wrapper).map(function::apply);
386 }
387 }
388
389
390
391
392
393
399 private enum PublisherToFluxConverter implements Converter<Publisher<?>, Flux<?>> {
400
401 INSTANCE;
402
403 @Nonnull
404 @Override
405 public Flux<?> convert(Publisher<?> source) {
406 return Flux.from(source);
407 }
408 }
409
410
416 private enum PublisherToMonoConverter implements Converter<Publisher<?>, Mono<?>> {
417
418 INSTANCE;
419
420 @Nonnull
421 @Override
422 public Mono<?> convert(Publisher<?> source) {
423 return Mono.from(source);
424 }
425 }
426
427
428
429
430
431
437 private enum PublisherToFlowConverter implements Converter<Publisher<?>, Flow<?>> {
438
439 INSTANCE;
440
441 @Nonnull
442 @Override
443 public Flow<?> convert(Publisher<?> source) {
444 return ReactiveFlowKt.asFlow(source);
445 }
446 }
447
448
451 private enum ReactiveAdapterConverterFactory implements ConverterFactory<Object, Object>, ConditionalConverter {
452
453 INSTANCE;
454
455 @Override
456 public boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {
457 return isSupported(sourceType) || isSupported(targetType);
458 }
459
460 private boolean isSupported(TypeDescriptor typeDescriptor) {
461 return RegistryHolder.REACTIVE_ADAPTER_REGISTRY != null
462 && RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(typeDescriptor.getType()) != null;
463 }
464
465 @Override
466 @SuppressWarnings({ "ConstantConditions", "unchecked" })
467 public <T> Converter<Object, T> getConverter(Class<T> targetType) {
468 return source -> {
469
470 Publisher<?> publisher = source instanceof Publisher ? (Publisher<?>) source
471 : RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(Publisher.class, source).toPublisher(source);
472
473 ReactiveAdapter adapter = RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(targetType);
474
475 return (T) adapter.fromPublisher(publisher);
476 };
477 }
478 }
479
480
486 static class RegistryHolder {
487
488 static final @Nullable ReactiveAdapterRegistry REACTIVE_ADAPTER_REGISTRY;
489
490 static {
491
492 if (ReactiveWrappers.isAvailable(ReactiveLibrary.PROJECT_REACTOR)) {
493 REACTIVE_ADAPTER_REGISTRY = new ReactiveAdapterRegistry();
494 } else {
495 REACTIVE_ADAPTER_REGISTRY = null;
496 }
497 }
498 }
499 }
500