1
16
17 package org.springframework.cloud.aws.messaging.support.converter;
18
19 import java.nio.ByteBuffer;
20 import java.util.HashMap;
21 import java.util.Iterator;
22 import java.util.Map;
23 import java.util.UUID;
24
25 import com.fasterxml.jackson.databind.JsonNode;
26 import com.fasterxml.jackson.databind.ObjectMapper;
27
28 import org.springframework.cloud.aws.messaging.core.MessageAttributeDataTypes;
29 import org.springframework.messaging.Message;
30 import org.springframework.messaging.MessageHeaders;
31 import org.springframework.messaging.MessagingException;
32 import org.springframework.messaging.converter.MessageConversionException;
33 import org.springframework.messaging.converter.MessageConverter;
34 import org.springframework.messaging.support.GenericMessage;
35 import org.springframework.util.Assert;
36 import org.springframework.util.MimeType;
37 import org.springframework.util.NumberUtils;
38
39
44 public class NotificationRequestConverter implements MessageConverter {
45
46 private final ObjectMapper jsonMapper = new ObjectMapper();
47
48 private final MessageConverter payloadConverter;
49
50 public NotificationRequestConverter(MessageConverter payloadConverter) {
51 this.payloadConverter = payloadConverter;
52 }
53
54 private static Map<String, Object> getMessageAttributesAsMessageHeaders(
55 JsonNode message) {
56 Map<String, Object> messageHeaders = new HashMap<>();
57 Iterator<String> fieldNames = message.fieldNames();
58 while (fieldNames.hasNext()) {
59 String attributeName = fieldNames.next();
60 String attributeValue = message.get(attributeName).get("Value").asText();
61 String attributeType = message.get(attributeName).get("Type").asText();
62 if (MessageHeaders.CONTENT_TYPE.equals(attributeName)) {
63 messageHeaders.put(MessageHeaders.CONTENT_TYPE,
64 MimeType.valueOf(attributeValue));
65 }
66 else if (MessageHeaders.ID.equals(attributeName)) {
67 messageHeaders.put(MessageHeaders.ID, UUID.fromString(attributeValue));
68 }
69 else {
70 if (MessageAttributeDataTypes.STRING.equals(attributeType)) {
71 messageHeaders.put(attributeName, attributeValue);
72 }
73 else if (attributeType.startsWith(MessageAttributeDataTypes.NUMBER)) {
74 Object numberValue = getNumberValue(attributeType, attributeValue);
75 if (numberValue != null) {
76 messageHeaders.put(attributeName, numberValue);
77 }
78 }
79 else if (MessageAttributeDataTypes.BINARY.equals(attributeName)) {
80 messageHeaders.put(attributeName,
81 ByteBuffer.wrap(attributeType.getBytes()));
82 }
83 }
84 }
85
86 return messageHeaders;
87 }
88
89 private static Object getNumberValue(String attributeType, String attributeValue) {
90 String numberType = attributeType
91 .substring(MessageAttributeDataTypes.NUMBER.length() + 1);
92 try {
93 Class<? extends Number> numberTypeClass = Class.forName(numberType)
94 .asSubclass(Number.class);
95 return NumberUtils.parseNumber(attributeValue, numberTypeClass);
96 }
97 catch (ClassNotFoundException e) {
98 throw new MessagingException(String.format(
99 "Message attribute with value '%s' and data type '%s' could not be converted "
100 + "into a Number because target class was not found.",
101 attributeValue, attributeType), e);
102 }
103 }
104
105 @Override
106 public Object fromMessage(Message<?> message, Class<?> targetClass) {
107 Assert.notNull(message, "message must not be null");
108 Assert.notNull(targetClass, "target class must not be null");
109
110 JsonNode jsonNode;
111 try {
112 jsonNode = this.jsonMapper.readTree(message.getPayload().toString());
113 }
114 catch (Exception e) {
115 throw new MessageConversionException("Could not read JSON", e);
116 }
117 if (!jsonNode.has("Type")) {
118 throw new MessageConversionException("Payload: '" + message.getPayload()
119 + "' does not contain a Type attribute", null);
120 }
121
122 if (!"Notification".equals(jsonNode.get("Type").asText())) {
123 throw new MessageConversionException(
124 "Payload: '" + message.getPayload() + "' is not a valid notification",
125 null);
126 }
127
128 if (!jsonNode.has("Message")) {
129 throw new MessageConversionException(
130 "Payload: '" + message.getPayload() + "' does not contain a message",
131 null);
132 }
133
134 String messagePayload = jsonNode.get("Message").asText();
135 GenericMessage<String> genericMessage = new GenericMessage<>(messagePayload,
136 getMessageAttributesAsMessageHeaders(jsonNode.path("MessageAttributes")));
137 return new NotificationRequest(jsonNode.path("Subject").asText(),
138 this.payloadConverter.fromMessage(genericMessage, targetClass));
139 }
140
141 @Override
142 public Message<?> toMessage(Object payload, MessageHeaders headers) {
143 throw new UnsupportedOperationException(
144 "This converter only supports reading a SNS notification and not writing them");
145 }
146
147
150 public static class NotificationRequest {
151
152 private final String subject;
153
154 private final Object message;
155
156 public NotificationRequest(String subject, Object message) {
157 this.subject = subject;
158 this.message = message;
159 }
160
161 public String getSubject() {
162 return this.subject;
163 }
164
165 public Object getMessage() {
166 return this.message;
167 }
168
169 }
170
171 }
172