e86593d274642c3a18500a899e4a9d4b78255103
[xmlrouter] / impl / src / main / java / org / wamblee / xmlrouter / impl / XMLRouter.java
1 /*
2  * Copyright 2005-2011 the original author or authors.
3  * 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * 
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  * 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.wamblee.xmlrouter.impl;
17
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.LinkedHashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.atomic.AtomicLong;
27 import java.util.logging.Level;
28 import java.util.logging.Logger;
29
30 import javax.xml.transform.dom.DOMSource;
31
32 import org.wamblee.general.Clock;
33 import org.wamblee.xml.XMLDocument;
34 import org.wamblee.xmlrouter.common.Id;
35 import org.wamblee.xmlrouter.config.DocumentType;
36 import org.wamblee.xmlrouter.config.Filter;
37 import org.wamblee.xmlrouter.config.RouterConfig;
38 import org.wamblee.xmlrouter.config.Transformation;
39 import org.wamblee.xmlrouter.listener.EventInfo;
40 import org.wamblee.xmlrouter.listener.EventListener;
41 import org.wamblee.xmlrouter.publish.Gateway;
42 import org.wamblee.xmlrouter.subscribe.Destination;
43 import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
44
45 // TODO concurrency.
46
47 public class XMLRouter implements RouterConfig, Gateway, DestinationRegistry {
48
49     private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
50         .getName());
51
52     private EventListener listener;
53     private Clock clock;
54     private AtomicLong nextEventId;
55     private AtomicLong sequenceNumbers;
56     private Map<Id<DocumentType>, DocumentType> documentTypes;
57     private Transformations transformations;
58     private Map<Id<Filter>, Filter> filters;
59     private Map<Id<Destination>, Destination> destinations;
60
61     public XMLRouter(Clock aClock, EventListener aListener) {
62         listener = aListener;
63         clock = aClock;
64         nextEventId = new AtomicLong(clock.currentTimeMillis());
65         sequenceNumbers = new AtomicLong(1);
66         documentTypes = new LinkedHashMap<Id<DocumentType>, DocumentType>();
67         transformations = new Transformations();
68         filters = new LinkedHashMap<Id<Filter>, Filter>();
69         destinations = new LinkedHashMap<Id<Destination>, Destination>();
70     }
71
72     @Override
73     public Id<DocumentType> addDocumentType(DocumentType aType) {
74         long seqno = sequenceNumbers.getAndIncrement();
75         documentTypes.put(new Id<DocumentType>(seqno), aType);
76         return new Id<DocumentType>(seqno);
77     }
78
79     @Override
80     public void removeDocumentType(Id<DocumentType> aId) {
81         documentTypes.remove(aId);
82     }
83
84     @Override
85     public Collection<Id<DocumentType>> getDocumentTypes() {
86         return Collections.unmodifiableCollection(documentTypes.keySet());
87     }
88
89     @Override
90     public DocumentType getDocumentType(Id<DocumentType> aId) {
91         return documentTypes.get(aId);
92     }
93
94     @Override
95     public Id<Transformation> addTransformation(Transformation aTransformation) {
96         return transformations.addTransformation(aTransformation);
97     }
98
99     @Override
100     public void removeTransformation(Id<Transformation> aId) {
101         transformations.removeTransformation(aId);
102     }
103
104     @Override
105     public Collection<Id<Transformation>> getTransformations() {
106         return transformations.getTransformations();
107     }
108
109     @Override
110     public Transformation getTransformation(Id<Transformation> aId) {
111         return transformations.getTransformation(aId);
112     }
113
114     @Override
115     public Id<Filter> addFilter(Filter aFilter) {
116         long seqno = sequenceNumbers.getAndIncrement();
117         filters.put(new Id<Filter>(seqno), aFilter);
118         return new Id<Filter>(seqno);
119     }
120
121     @Override
122     public void removeFilter(Id<Filter> aId) {
123         filters.remove(aId);
124     }
125
126     @Override
127     public Collection<Id<Filter>> getFilters() {
128         return Collections.unmodifiableCollection(filters.keySet());
129     }
130
131     @Override
132     public Filter getFilter(Id<Filter> aId) {
133         return filters.get(aId);
134     }
135
136     @Override
137     public void publish(String aSource, DOMSource aEvent) {
138
139         long time = clock.currentTimeMillis();
140         Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement());
141         List<String> types = determineDocumentTypes(aEvent);
142         EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
143
144         boolean delivered = false;
145         try {
146
147             List<String> filteredInputTypes = determineFilteredInputTypes(
148                 types, aEvent);
149             if (filteredInputTypes.isEmpty()) {
150                 if (LOGGER.isLoggable(Level.FINE)) {
151                     String doc = new XMLDocument(aEvent).print(true);
152                     LOGGER
153                         .log(
154                             Level.FINE,
155                             "Event ''0}'' from source {1} removed because of filters.",
156                             new Object[] { doc, aSource });
157                 }
158             }
159
160             // get the reachable target types through transformations.
161
162             // It is possible that a given event belongs to multiple input
163             // types.
164             // This is however certainly not the main case.
165
166             for (String inputType : filteredInputTypes) {
167                 boolean result = deliverEvent(info, inputType);
168                 delivered = delivered || result;
169             }
170         } finally {
171             if (!delivered) {
172                 destinationNotFound(aSource, aEvent);
173                 listener.notDelivered(info);
174             }
175         }
176     }
177
178     private boolean deliverEvent(EventInfo aInfo, String aInputType) {
179
180         boolean delivered = false;
181         Set<String> possibleTargetTypes = new HashSet<String>();
182         possibleTargetTypes.addAll(transformations
183             .getPossibleTargetTypes(aInputType));
184
185         // ask each destination what target types, if any they want to have.
186         for (Map.Entry<Id<Destination>, Destination> entry : destinations
187             .entrySet()) {
188             Id<Destination> destinationId = entry.getKey();
189             Destination destination = entry.getValue();
190             Collection<String> requested = destination
191                 .chooseFromTargetTypes(possibleTargetTypes);
192             if (!requested.isEmpty()) {
193                 // Deliver to the destination.
194                 for (String targetType : requested) {
195                     TransformationPath path = transformations.getPath(
196                         aInputType, targetType);
197                     List<Transformation> ts = path.getTransformations();
198                     int i = 0;
199                     boolean allowed = true;
200                     DOMSource transformed = aInfo.getEvent();
201                     while (i < ts.size() && allowed && transformed != null) {
202                         Transformation t = ts.get(i);
203                         DOMSource orig = transformed;
204                         transformed = t.transform(transformed);
205                         if (transformed == null) {
206                             transformationReturnedNull(aInfo.getSource(),
207                                 aInfo.getEvent(), aInputType, t, orig);
208                         }
209
210                         if (!isAllowedByFilters(t.getToType(), transformed)) {
211                             allowed = false;
212                         }
213                         i++;
214                     }
215                     if (allowed && transformed != null) {
216                         // all transformations done and all filters still
217                         // allow the event.
218                         boolean result = destination.receive(transformed);
219                         listener.delivered(aInfo, ts, destinationId.getId(),
220                             destination.getName(), result);
221                         delivered = delivered || result;
222
223                     }
224                 }
225             }
226         }
227         return delivered;
228     }
229
230     private List<String> determineFilteredInputTypes(List<String> aTypes,
231         DOMSource aEvent) {
232
233         // apply filters to the input
234         List<String> filteredTypes = new ArrayList<String>();
235         for (String type : aTypes) {
236             boolean allowed = isAllowedByFilters(type, aEvent);
237             if (allowed) {
238                 filteredTypes.add(type);
239             }
240         }
241         return filteredTypes;
242     }
243
244     private boolean isAllowedByFilters(String aType, DOMSource aEvent) {
245         boolean allowed = true;
246         for (Filter filter : filters.values()) {
247             if (!filter.isAllowed(aType, aEvent)) {
248                 allowed = false;
249             }
250         }
251         return allowed;
252     }
253
254     private List<String> determineDocumentTypes(DOMSource aEvent) {
255         List<String> res = new ArrayList<String>();
256         for (DocumentType type : documentTypes.values()) {
257             if (type.isInstance(aEvent)) {
258                 res.add(type.getName());
259             }
260         }
261         return res;
262     }
263
264     private void logEvent(String aMessage, String aSource, DOMSource aEvent,
265         Exception aException) {
266         LOGGER.log(Level.WARNING, aMessage + ": source '" + aSource +
267             "': Event: '" + new XMLDocument(aEvent).print(true) + "'",
268             aException);
269     }
270
271     private void logEvent(String aMessage, String aSource, DOMSource aEvent) {
272         LOGGER.log(Level.WARNING,
273             aMessage + ": " + eventToString(aSource, aEvent));
274     }
275
276     private String eventToString(String aSource, DOMSource aEvent) {
277         return "source '" + aSource + "': Event: '" +
278             new XMLDocument(aEvent).print(true) + "'";
279     }
280
281     private void transformationReturnedNull(String aSource, DOMSource aEvent,
282         String aInputType, Transformation aT, DOMSource aTransformed) {
283         LOGGER.log(Level.WARNING, "Transformation returned null for event " +
284             eventToString(aSource, aEvent) + " inputType '" + aInputType +
285             "', transformation '" + aT + "' document to transform " +
286             new XMLDocument(aTransformed).print(true));
287     }
288
289     private void destinationNotFound(String aSource, DOMSource aEvent) {
290         LOGGER.log(Level.WARNING, "No destination found for event: " +
291             eventToString(aSource, aEvent));
292     }
293
294     @Override
295     public Id<Destination> registerDestination(Destination aDestination) {
296         notNull("destination", aDestination);
297         long seqno = sequenceNumbers.getAndIncrement();
298         Id<Destination> id = new Id<Destination>(seqno);
299         destinations.put(id, new RobustDestination(id, aDestination));
300         return id;
301     }
302
303     @Override
304     public void unregisterDestination(Id<Destination> aId) {
305         destinations.remove(aId);
306     }
307
308     private void notNull(String aName, Object aValue) {
309         if (aValue == null) {
310             throw new IllegalArgumentException("Parameter '" + aName +
311                 "' may not be null");
312         }
313     }
314 }