1 /*
2  * Copyright 2016-2020 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.springframework.data.repository.util;
17
18 import io.reactivex.Flowable;
19 import io.reactivex.Maybe;
20 import kotlinx.coroutines.flow.Flow;
21 import kotlinx.coroutines.reactive.ReactiveFlowKt;
22 import lombok.experimental.UtilityClass;
23 import reactor.core.publisher.Flux;
24 import reactor.core.publisher.Mono;
25 import rx.Observable;
26 import rx.Single;
27
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.function.Function;
31
32 import javax.annotation.Nonnull;
33
34 import org.reactivestreams.Publisher;
35
36 import org.springframework.core.ReactiveAdapter;
37 import org.springframework.core.ReactiveAdapterRegistry;
38 import org.springframework.core.convert.ConversionService;
39 import org.springframework.core.convert.TypeDescriptor;
40 import org.springframework.core.convert.converter.ConditionalConverter;
41 import org.springframework.core.convert.converter.Converter;
42 import org.springframework.core.convert.converter.ConverterFactory;
43 import org.springframework.core.convert.support.ConfigurableConversionService;
44 import org.springframework.core.convert.support.GenericConversionService;
45 import org.springframework.data.repository.util.ReactiveWrappers.ReactiveLibrary;
46 import org.springframework.lang.Nullable;
47 import org.springframework.util.Assert;
48 import org.springframework.util.ClassUtils;
49
50 /**
51  * Conversion support for reactive wrapper types. This class is a reactive extension to
52  * {@link QueryExecutionConverters}.
53  * <p>
54  * This class discovers reactive wrapper availability and their conversion support based on the class path. Reactive
55  * wrapper types might be supported/on the class path but conversion may require additional dependencies.
56  *
57  * @author Mark Paluch
58  * @author Oliver Gierke
59  * @since 2.0
60  * @see ReactiveWrappers
61  * @see ReactiveAdapterRegistry
62  */

63 @UtilityClass
64 public class ReactiveWrapperConverters {
65
66     private static final List<ReactiveTypeWrapper<?>> REACTIVE_WRAPPERS = new ArrayList<>();
67     private static final GenericConversionService GENERIC_CONVERSION_SERVICE = new GenericConversionService();
68
69     static {
70
71         if (ReactiveWrappers.isAvailable(ReactiveLibrary.RXJAVA1)) {
72
73             REACTIVE_WRAPPERS.add(RxJava1SingleWrapper.INSTANCE);
74             REACTIVE_WRAPPERS.add(RxJava1ObservableWrapper.INSTANCE);
75         }
76
77         if (ReactiveWrappers.isAvailable(ReactiveLibrary.RXJAVA2)) {
78
79             REACTIVE_WRAPPERS.add(RxJava2SingleWrapper.INSTANCE);
80             REACTIVE_WRAPPERS.add(RxJava2MaybeWrapper.INSTANCE);
81             REACTIVE_WRAPPERS.add(RxJava2ObservableWrapper.INSTANCE);
82             REACTIVE_WRAPPERS.add(RxJava2FlowableWrapper.INSTANCE);
83         }
84
85         if (ReactiveWrappers.isAvailable(ReactiveLibrary.PROJECT_REACTOR)) {
86
87             REACTIVE_WRAPPERS.add(FluxWrapper.INSTANCE);
88             REACTIVE_WRAPPERS.add(MonoWrapper.INSTANCE);
89             REACTIVE_WRAPPERS.add(PublisherWrapper.INSTANCE);
90         }
91
92         registerConvertersIn(GENERIC_CONVERSION_SERVICE);
93     }
94
95     /**
96      * Registers converters for wrapper types found on the classpath.
97      *
98      * @param conversionService must not be {@literal null}.
99      */

100     private static ConversionService registerConvertersIn(ConfigurableConversionService conversionService) {
101
102         Assert.notNull(conversionService, "ConversionService must not be null!");
103
104         if (ReactiveWrappers.isAvailable(ReactiveLibrary.PROJECT_REACTOR)) {
105
106             conversionService.addConverter(PublisherToMonoConverter.INSTANCE);
107             conversionService.addConverter(PublisherToFluxConverter.INSTANCE);
108
109             if (ReactiveWrappers.isAvailable(ReactiveLibrary.KOTLIN_COROUTINES)) {
110                 conversionService.addConverter(PublisherToFlowConverter.INSTANCE);
111             }
112
113             if (RegistryHolder.REACTIVE_ADAPTER_REGISTRY != null) {
114                 conversionService.addConverterFactory(ReactiveAdapterConverterFactory.INSTANCE);
115             }
116         }
117
118         return conversionService;
119     }
120
121     /**
122      * Returns whether the given type is supported for wrapper type conversion.
123      * <p>
124      * NOTE: A reactive wrapper type might be supported in general by {@link ReactiveWrappers#supports(Class)} but not
125      * necessarily for conversion using this method.
126      * </p>
127      *
128      * @param type must not be {@literal null}.
129      * @return {@literal trueif the {@code type} is a supported reactive wrapper type.
130      */

131     public static boolean supports(Class<?> type) {
132         return RegistryHolder.REACTIVE_ADAPTER_REGISTRY != null
133                 && RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(type) != null;
134     }
135
136     /**
137      * Casts or adopts the given wrapper type to a target wrapper type.
138      *
139      * @param reactiveObject the stream, must not be {@literal null}.
140      * @param targetWrapperType must not be {@literal null}.
141      * @return
142      */

143     @Nullable
144     @SuppressWarnings("unchecked")
145     public static <T> T toWrapper(Object reactiveObject, Class<? extends T> targetWrapperType) {
146
147         Assert.notNull(reactiveObject, "Reactive source object must not be null!");
148         Assert.notNull(targetWrapperType, "Reactive target type must not be null!");
149
150         if (targetWrapperType.isAssignableFrom(reactiveObject.getClass())) {
151             return (T) reactiveObject;
152         }
153
154         return GENERIC_CONVERSION_SERVICE.convert(reactiveObject, targetWrapperType);
155     }
156
157     /**
158      * Maps elements of a reactive element stream to other elements.
159      *
160      * @param reactiveObject must not be {@literal null}.
161      * @param converter must not be {@literal null}.
162      * @return
163      */

164     @SuppressWarnings("unchecked")
165     public static <T> T map(Object reactiveObject, Function<Object, Object> converter) {
166
167         Assert.notNull(reactiveObject, "Reactive source object must not be null!");
168         Assert.notNull(converter, "Converter must not be null!");
169
170         return REACTIVE_WRAPPERS.stream()//
171                 .filter(it -> ClassUtils.isAssignable(it.getWrapperClass(), reactiveObject.getClass()))//
172                 .findFirst()//
173                 .map(it -> (T) it.map(reactiveObject, converter))//
174                 .orElseThrow(() -> new IllegalStateException(String.format("Cannot apply converter to %s", reactiveObject)));
175     }
176
177     /**
178      * Return {@literal trueif objects of {@code sourceType} can be converted to the {@code targetType}.
179      *
180      * @param sourceType must not be {@literal null}.
181      * @param targetType must not be {@literal null}.
182      * @return {@literal trueif a conversion can be performed.
183      */

184     public static boolean canConvert(Class<?> sourceType, Class<?> targetType) {
185
186         Assert.notNull(sourceType, "Source type must not be null!");
187         Assert.notNull(targetType, "Target type must not be null!");
188
189         return GENERIC_CONVERSION_SERVICE.canConvert(sourceType, targetType);
190     }
191
192     // -------------------------------------------------------------------------
193     // Wrapper descriptors
194     // -------------------------------------------------------------------------
195
196     /**
197      * Wrapper descriptor that can apply a {@link Function} to map items inside its stream.
198      *
199      * @author Mark Paluch
200      * @author Christoph Strobl
201      */

202     private interface ReactiveTypeWrapper<T> {
203
204         /**
205          * @return the wrapper class.
206          */

207         Class<? super T> getWrapperClass();
208
209         /**
210          * Apply a {@link Function} to a reactive type.
211          *
212          * @param wrapper the reactive type, must not be {@literal null}.
213          * @param function the converter, must not be {@literal null}.
214          * @return the reactive type applying conversion.
215          */

216         Object map(Object wrapper, Function<Object, Object> function);
217     }
218
219     /**
220      * Wrapper for Project Reactor's {@link Mono}.
221      */

222     private enum MonoWrapper implements ReactiveTypeWrapper<Mono<?>> {
223
224         INSTANCE;
225
226         @Override
227         public Class<? super Mono<?>> getWrapperClass() {
228             return Mono.class;
229         }
230
231         @Override
232         public Mono<?> map(Object wrapper, Function<Object, Object> function) {
233             return ((Mono<?>) wrapper).map(function::apply);
234         }
235     }
236
237     /**
238      * Wrapper for Project Reactor's {@link Flux}.
239      */

240     private enum FluxWrapper implements ReactiveTypeWrapper<Flux<?>> {
241
242         INSTANCE;
243
244         @Override
245         public Class<? super Flux<?>> getWrapperClass() {
246             return Flux.class;
247         }
248
249         public Flux<?> map(Object wrapper, Function<Object, Object> function) {
250             return ((Flux<?>) wrapper).map(function::apply);
251         }
252     }
253
254     /**
255      * Wrapper for Reactive Stream's {@link Publisher}.
256      */

257     private enum PublisherWrapper implements ReactiveTypeWrapper<Publisher<?>> {
258
259         INSTANCE;
260
261         @Override
262         public Class<? super Publisher<?>> getWrapperClass() {
263             return Publisher.class;
264         }
265
266         @Override
267         public Publisher<?> map(Object wrapper, Function<Object, Object> function) {
268
269             if (wrapper instanceof Mono) {
270                 return MonoWrapper.INSTANCE.map(wrapper, function);
271             }
272
273             if (wrapper instanceof Flux) {
274                 return FluxWrapper.INSTANCE.map(wrapper, function);
275             }
276
277             return FluxWrapper.INSTANCE.map(Flux.from((Publisher<?>) wrapper), function);
278         }
279     }
280
281     /**
282      * Wrapper for RxJava 1's {@link Single}.
283      */

284     private enum RxJava1SingleWrapper implements ReactiveTypeWrapper<Single<?>> {
285
286         INSTANCE;
287
288         @Override
289         public Class<? super Single<?>> getWrapperClass() {
290             return Single.class;
291         }
292
293         @Override
294         public Single<?> map(Object wrapper, Function<Object, Object> function) {
295             return ((Single<?>) wrapper).map(function::apply);
296         }
297     }
298
299     /**
300      * Wrapper for RxJava 1's {@link Observable}.
301      */

302     private enum RxJava1ObservableWrapper implements ReactiveTypeWrapper<Observable<?>> {
303
304         INSTANCE;
305
306         @Override
307         public Class<? super Observable<?>> getWrapperClass() {
308             return Observable.class;
309         }
310
311         @Override
312         public Observable<?> map(Object wrapper, Function<Object, Object> function) {
313             return ((Observable<?>) wrapper).map(function::apply);
314         }
315     }
316
317     /**
318      * Wrapper for RxJava 2's {@link io.reactivex.Single}.
319      */

320     private enum RxJava2SingleWrapper implements ReactiveTypeWrapper<io.reactivex.Single<?>> {
321
322         INSTANCE;
323
324         @Override
325         public Class<? super io.reactivex.Single<?>> getWrapperClass() {
326             return io.reactivex.Single.class;
327         }
328
329         @Override
330         public io.reactivex.Single<?> map(Object wrapper, Function<Object, Object> function) {
331             return ((io.reactivex.Single<?>) wrapper).map(function::apply);
332         }
333     }
334
335     /**
336      * Wrapper for RxJava 2's {@link io.reactivex.Maybe}.
337      */

338     private enum RxJava2MaybeWrapper implements ReactiveTypeWrapper<Maybe<?>> {
339
340         INSTANCE;
341
342         @Override
343         public Class<? super io.reactivex.Maybe<?>> getWrapperClass() {
344             return io.reactivex.Maybe.class;
345         }
346
347         @Override
348         public io.reactivex.Maybe<?> map(Object wrapper, Function<Object, Object> function) {
349             return ((io.reactivex.Maybe<?>) wrapper).map(function::apply);
350         }
351     }
352
353     /**
354      * Wrapper for RxJava 2's {@link io.reactivex.Observable}.
355      */

356     private enum RxJava2ObservableWrapper implements ReactiveTypeWrapper<io.reactivex.Observable<?>> {
357
358         INSTANCE;
359
360         @Override
361         public Class<? super io.reactivex.Observable<?>> getWrapperClass() {
362             return io.reactivex.Observable.class;
363         }
364
365         @Override
366         public io.reactivex.Observable<?> map(Object wrapper, Function<Object, Object> function) {
367             return ((io.reactivex.Observable<?>) wrapper).map(function::apply);
368         }
369     }
370
371     /**
372      * Wrapper for RxJava 2's {@link io.reactivex.Flowable}.
373      */

374     private enum RxJava2FlowableWrapper implements ReactiveTypeWrapper<Flowable<?>> {
375
376         INSTANCE;
377
378         @Override
379         public Class<? super Flowable<?>> getWrapperClass() {
380             return io.reactivex.Flowable.class;
381         }
382
383         @Override
384         public io.reactivex.Flowable<?> map(Object wrapper, Function<Object, Object> function) {
385             return ((io.reactivex.Flowable<?>) wrapper).map(function::apply);
386         }
387     }
388
389     // -------------------------------------------------------------------------
390     // ReactiveStreams converters
391     // -------------------------------------------------------------------------
392
393     /**
394      * A {@link Converter} to convert a {@link Publisher} to {@link Flux}.
395      *
396      * @author Mark Paluch
397      * @author 2.0
398      */

399     private enum PublisherToFluxConverter implements Converter<Publisher<?>, Flux<?>> {
400
401         INSTANCE;
402
403         @Nonnull
404         @Override
405         public Flux<?> convert(Publisher<?> source) {
406             return Flux.from(source);
407         }
408     }
409
410     /**
411      * A {@link Converter} to convert a {@link Publisher} to {@link Mono}.
412      *
413      * @author Mark Paluch
414      * @author 2.0
415      */

416     private enum PublisherToMonoConverter implements Converter<Publisher<?>, Mono<?>> {
417
418         INSTANCE;
419
420         @Nonnull
421         @Override
422         public Mono<?> convert(Publisher<?> source) {
423             return Mono.from(source);
424         }
425     }
426
427     // -------------------------------------------------------------------------
428     // Coroutine converters
429     // -------------------------------------------------------------------------
430
431     /**
432      * A {@link Converter} to convert a {@link Publisher} to {@link Flow}.
433      *
434      * @author Mark Paluch
435      * @author 2.3
436      */

437     private enum PublisherToFlowConverter implements Converter<Publisher<?>, Flow<?>> {
438
439         INSTANCE;
440
441         @Nonnull
442         @Override
443         public Flow<?> convert(Publisher<?> source) {
444             return ReactiveFlowKt.asFlow(source);
445         }
446     }
447
448     /**
449      * A {@link ConverterFactory} that adapts between reactive types using {@link ReactiveAdapterRegistry}.
450      */

451     private enum ReactiveAdapterConverterFactory implements ConverterFactory<Object, Object>, ConditionalConverter {
452
453         INSTANCE;
454
455         @Override
456         public boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {
457             return isSupported(sourceType) || isSupported(targetType);
458         }
459
460         private boolean isSupported(TypeDescriptor typeDescriptor) {
461             return RegistryHolder.REACTIVE_ADAPTER_REGISTRY != null
462                     && RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(typeDescriptor.getType()) != null;
463         }
464
465         @Override
466         @SuppressWarnings({ "ConstantConditions""unchecked" })
467         public <T> Converter<Object, T> getConverter(Class<T> targetType) {
468             return source -> {
469
470                 Publisher<?> publisher = source instanceof Publisher ? (Publisher<?>) source
471                         : RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(Publisher.class, source).toPublisher(source);
472
473                 ReactiveAdapter adapter = RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(targetType);
474
475                 return (T) adapter.fromPublisher(publisher);
476             };
477         }
478     }
479
480     /**
481      * Holder for delayed initialization of {@link ReactiveAdapterRegistry}.
482      *
483      * @author Mark Paluch
484      * @author 2.0
485      */

486     static class RegistryHolder {
487
488         static final @Nullable ReactiveAdapterRegistry REACTIVE_ADAPTER_REGISTRY;
489
490         static {
491
492             if (ReactiveWrappers.isAvailable(ReactiveLibrary.PROJECT_REACTOR)) {
493                 REACTIVE_ADAPTER_REGISTRY = new ReactiveAdapterRegistry();
494             } else {
495                 REACTIVE_ADAPTER_REGISTRY = null;
496             }
497         }
498     }
499 }
500