09bbdc1dc50cc78c89e1900b879360c1d32e1dab
[xmlrouter] / impl / src / main / java / org / wamblee / xmlrouter / impl / XMLRouter.java
1 /*
2  * Copyright 2005-2011 the original author or authors.
3  * 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * 
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  * 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.wamblee.xmlrouter.impl;
17
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashSet;
21 import java.util.LinkedHashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.logging.Level;
27 import java.util.logging.Logger;
28
29 import javax.xml.transform.dom.DOMSource;
30
31 import org.wamblee.general.Clock;
32 import org.wamblee.general.Pair;
33 import org.wamblee.xml.XMLDocument;
34 import org.wamblee.xmlrouter.common.Id;
35 import org.wamblee.xmlrouter.config.DocumentType;
36 import org.wamblee.xmlrouter.config.Filter;
37 import org.wamblee.xmlrouter.config.Transformation;
38 import org.wamblee.xmlrouter.listener.EventInfo;
39 import org.wamblee.xmlrouter.listener.EventListener;
40 import org.wamblee.xmlrouter.publish.Gateway;
41 import org.wamblee.xmlrouter.subscribe.Destination;
42 import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
43
44 // TODO check intermediate types during transformation based on filters. 
45
46 /**
47  * The XML Router.
48  * 
49  * @author Erik Brakkee
50  * 
51  */
52 public class XMLRouter implements Gateway, DestinationRegistry {
53
54     private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
55         .getName());
56
57     private AtomicLong sequenceNumbers;
58     private EventListener listener;
59     private Clock clock;
60     private AtomicLong nextEventId;
61
62     private XMLRouterConfiguration config;
63
64     private Map<Id<Destination>, Destination> destinations;
65
66     public XMLRouter(Clock aClock, XMLRouterConfiguration aConfig,
67         EventListener aListener) {
68         sequenceNumbers = new AtomicLong(1);
69         listener = aListener;
70         clock = aClock;
71         nextEventId = new AtomicLong(clock.currentTimeMillis());
72         config = aConfig;
73         destinations = new LinkedHashMap<Id<Destination>, Destination>();
74     }
75
76     @Override
77     public void publish(String aSource, DOMSource aEvent) {
78         long time = clock.currentTimeMillis();
79
80         Pair<ExtendedRouterConfig, TransformationPaths> snapshotconfig = config
81             .getConfig();
82
83         Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement() + "");
84         List<String> types = determineDocumentTypes(snapshotconfig.getFirst()
85             .documentTypeConfig().values(), aEvent);
86         EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
87
88         boolean delivered = false;
89         try {
90
91             List<String> filteredInputTypes = determineFilteredInputTypes(
92                 snapshotconfig.getFirst().filterConfig().values(), types,
93                 aEvent);
94             if (filteredInputTypes.isEmpty()) {
95                 if (LOGGER.isLoggable(Level.FINE)) {
96                     String doc = new XMLDocument(aEvent).print(true);
97                     LOGGER
98                         .log(
99                             Level.FINE,
100                             "Event ''0}'' from source {1} removed because of filters.",
101                             new Object[] { doc, aSource });
102                 }
103             }
104
105             // get the reachable target types through transformations.
106
107             // It is possible that a given event belongs to multiple input
108             // types.
109             // This is however certainly not the main case.
110
111             for (String inputType : filteredInputTypes) {
112                 boolean result = deliverEvent(snapshotconfig.getFirst()
113                     .filterConfig().values(), snapshotconfig.getSecond(), info,
114                     inputType);
115                 delivered = delivered || result;
116             }
117         } finally {
118             if (!delivered) {
119                 destinationNotFound(aSource, aEvent);
120                 listener.notDelivered(info);
121             }
122         }
123     }
124
125     private boolean deliverEvent(Collection<Filter> aFilters,
126         TransformationPaths aTransformations, EventInfo aInfo, String aInputType) {
127
128         boolean delivered = false;
129         Set<String> possibleTargetTypes = new HashSet<String>();
130         possibleTargetTypes.addAll(aTransformations
131             .getPossibleTargetTypes(aInputType));
132
133         // ask each destination what target types, if any they want to have.
134         for (Map.Entry<Id<Destination>, Destination> entry : destinations
135             .entrySet()) {
136             Id<Destination> destinationId = entry.getKey();
137             Destination destination = entry.getValue();
138             Collection<String> requested = destination
139                 .chooseFromTargetTypes(possibleTargetTypes);
140             if (!requested.isEmpty()) {
141                 // Deliver to the destination.
142                 for (String targetType : requested) {
143                     TransformationPath path = aTransformations.getPath(
144                         aInputType, targetType);
145                     List<Transformation> ts = path.getTransformations();
146                     int i = 0;
147                     boolean allowed = true;
148                     DOMSource transformed = aInfo.getEvent();
149                     while (i < ts.size() && allowed && transformed != null) {
150                         Transformation t = ts.get(i);
151                         DOMSource orig = transformed;
152                         transformed = t.transform(transformed);
153                         if (transformed == null) {
154                             transformationReturnedNull(aInfo.getSource(),
155                                 aInfo.getEvent(), aInputType, t, orig);
156                         }
157
158                         if (!isAllowedByFilters(aFilters, t.getToType(),
159                             transformed)) {
160                             allowed = false;
161                         }
162                         i++;
163                     }
164                     if (allowed && transformed != null) {
165                         // all transformations done and all filters still
166                         // allow the event.
167                         boolean result = destination.receive(transformed);
168                         listener.delivered(aInfo, ts, destinationId.getId(),
169                             result);
170                         delivered = delivered || result;
171
172                     }
173                 }
174             }
175         }
176         return delivered;
177     }
178
179     private List<String> determineFilteredInputTypes(
180         Collection<Filter> aFilters, List<String> aTypes, DOMSource aEvent) {
181
182         // apply filters to the input
183         List<String> filteredTypes = new ArrayList<String>();
184         for (String type : aTypes) {
185             boolean allowed = isAllowedByFilters(aFilters, type, aEvent);
186             if (allowed) {
187                 filteredTypes.add(type);
188             }
189         }
190         return filteredTypes;
191     }
192
193     private boolean isAllowedByFilters(Collection<Filter> aFilters,
194         String aType, DOMSource aEvent) {
195         boolean allowed = true;
196         for (Filter filter : aFilters) {
197             if (!filter.isAllowed(aType, aEvent)) {
198                 allowed = false;
199             }
200         }
201         return allowed;
202     }
203
204     private List<String> determineDocumentTypes(
205         Collection<DocumentType> aTypes, DOMSource aEvent) {
206         List<String> res = new ArrayList<String>();
207         for (DocumentType type : aTypes) {
208             if (type.isInstance(aEvent)) {
209                 res.add(type.getName());
210             }
211         }
212         return res;
213     }
214
215     private String eventToString(String aSource, DOMSource aEvent) {
216         return "source '" + aSource + "': Event: '" +
217             new XMLDocument(aEvent).print(true) + "'";
218     }
219
220     private void transformationReturnedNull(String aSource, DOMSource aEvent,
221         String aInputType, Transformation aT, DOMSource aTransformed) {
222         LOGGER.log(Level.WARNING, "Transformation returned null for event " +
223             eventToString(aSource, aEvent) + " inputType '" + aInputType +
224             "', transformation '" + aT + "' document to transform " +
225             new XMLDocument(aTransformed).print(true));
226     }
227
228     private void destinationNotFound(String aSource, DOMSource aEvent) {
229         LOGGER.log(Level.WARNING, "No destination found for event: " +
230             eventToString(aSource, aEvent));
231     }
232
233     @Override
234     public Id<Destination> registerDestination(Destination aDestination) {
235         notNull("destination", aDestination);
236         long seqno = sequenceNumbers.getAndIncrement();
237         Id<Destination> id = new Id<Destination>(seqno + "");
238         destinations.put(id, new RobustDestination(id, aDestination));
239         return id;
240     }
241
242     @Override
243     public void unregisterDestination(Id<Destination> aId) {
244         destinations.remove(aId);
245     }
246
247     private void notNull(String aName, Object aValue) {
248         if (aValue == null) {
249             throw new IllegalArgumentException("Parameter '" + aName +
250                 "' may not be null");
251         }
252     }
253 }