637440fe4dc0b6e0f1ae4d87fda3676817978302
[xmlrouter] / impl / src / main / java / org / wamblee / xmlrouter / impl / XMLRouter.java
1 /*
2  * Copyright 2005-2011 the original author or authors.
3  * 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * 
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  * 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.wamblee.xmlrouter.impl;
17
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashSet;
21 import java.util.LinkedHashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.logging.Level;
27 import java.util.logging.Logger;
28
29 import javax.xml.transform.dom.DOMSource;
30
31 import org.wamblee.general.Clock;
32 import org.wamblee.xml.XMLDocument;
33 import org.wamblee.xmlrouter.common.Id;
34 import org.wamblee.xmlrouter.config.DocumentType;
35 import org.wamblee.xmlrouter.config.Filter;
36 import org.wamblee.xmlrouter.config.Transformation;
37 import org.wamblee.xmlrouter.listener.EventInfo;
38 import org.wamblee.xmlrouter.listener.EventListener;
39 import org.wamblee.xmlrouter.publish.Gateway;
40 import org.wamblee.xmlrouter.subscribe.Destination;
41 import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
42
43 /**
44  * The XML Router.
45  * 
46  * @author Erik Brakkee
47  * 
48  */
49 public class XMLRouter implements Gateway, DestinationRegistry {
50
51     private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
52         .getName());
53
54     private AtomicLong sequenceNumbers;
55     private EventListener listener;
56     private Clock clock;
57     private AtomicLong nextEventId;
58
59     private XMLRouterConfiguration config;
60
61     private Map<Id<Destination>, Destination> destinations;
62
63     public XMLRouter(Clock aClock, XMLRouterConfiguration aConfig,
64         EventListener aListener) {
65         sequenceNumbers = new AtomicLong(1);
66         listener = aListener;
67         clock = aClock;
68         nextEventId = new AtomicLong(clock.currentTimeMillis());
69         config = aConfig;
70         destinations = new LinkedHashMap<Id<Destination>, Destination>();
71     }
72
73     @Override
74     public void publish(String aSource, DOMSource aEvent) {
75         config.startPublishEvent();
76         try {
77             publishImpl(aSource, aEvent);
78         } finally {
79             config.endPublishEvent();
80         }
81     }
82
83     private void publishImpl(String aSource, DOMSource aEvent) {
84         long time = clock.currentTimeMillis();
85
86         // TODO dirty flag will become unnecessary in the future.
87         if (config.routerConfig().isDirty()) {
88             config.transformations().replaceTransformations(
89                 config.routerConfig().transformationConfig().map());
90             config.routerConfig().resetDirty();
91         }
92
93         Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement());
94         List<String> types = determineDocumentTypes(aEvent);
95         EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
96
97         boolean delivered = false;
98         try {
99
100             List<String> filteredInputTypes = determineFilteredInputTypes(
101                 types, aEvent);
102             if (filteredInputTypes.isEmpty()) {
103                 if (LOGGER.isLoggable(Level.FINE)) {
104                     String doc = new XMLDocument(aEvent).print(true);
105                     LOGGER
106                         .log(
107                             Level.FINE,
108                             "Event ''0}'' from source {1} removed because of filters.",
109                             new Object[] { doc, aSource });
110                 }
111             }
112
113             // get the reachable target types through transformations.
114
115             // It is possible that a given event belongs to multiple input
116             // types.
117             // This is however certainly not the main case.
118
119             for (String inputType : filteredInputTypes) {
120                 boolean result = deliverEvent(info, inputType);
121                 delivered = delivered || result;
122             }
123         } finally {
124             if (!delivered) {
125                 destinationNotFound(aSource, aEvent);
126                 listener.notDelivered(info);
127             }
128         }
129     }
130
131     private boolean deliverEvent(EventInfo aInfo, String aInputType) {
132
133         boolean delivered = false;
134         Set<String> possibleTargetTypes = new HashSet<String>();
135         possibleTargetTypes.addAll(config.transformations()
136             .getPossibleTargetTypes(aInputType));
137
138         // ask each destination what target types, if any they want to have.
139         for (Map.Entry<Id<Destination>, Destination> entry : destinations
140             .entrySet()) {
141             Id<Destination> destinationId = entry.getKey();
142             Destination destination = entry.getValue();
143             Collection<String> requested = destination
144                 .chooseFromTargetTypes(possibleTargetTypes);
145             if (!requested.isEmpty()) {
146                 // Deliver to the destination.
147                 for (String targetType : requested) {
148                     TransformationPath path = config.transformations().getPath(
149                         aInputType, targetType);
150                     List<Transformation> ts = path.getTransformations();
151                     int i = 0;
152                     boolean allowed = true;
153                     DOMSource transformed = aInfo.getEvent();
154                     while (i < ts.size() && allowed && transformed != null) {
155                         Transformation t = ts.get(i);
156                         DOMSource orig = transformed;
157                         transformed = t.transform(transformed);
158                         if (transformed == null) {
159                             transformationReturnedNull(aInfo.getSource(),
160                                 aInfo.getEvent(), aInputType, t, orig);
161                         }
162
163                         if (!isAllowedByFilters(t.getToType(), transformed)) {
164                             allowed = false;
165                         }
166                         i++;
167                     }
168                     if (allowed && transformed != null) {
169                         // all transformations done and all filters still
170                         // allow the event.
171                         boolean result = destination.receive(transformed);
172                         listener.delivered(aInfo, ts, destinationId.getId(),
173                             destination.getName(), result);
174                         delivered = delivered || result;
175
176                     }
177                 }
178             }
179         }
180         return delivered;
181     }
182
183     private List<String> determineFilteredInputTypes(List<String> aTypes,
184         DOMSource aEvent) {
185
186         // apply filters to the input
187         List<String> filteredTypes = new ArrayList<String>();
188         for (String type : aTypes) {
189             boolean allowed = isAllowedByFilters(type, aEvent);
190             if (allowed) {
191                 filteredTypes.add(type);
192             }
193         }
194         return filteredTypes;
195     }
196
197     private boolean isAllowedByFilters(String aType, DOMSource aEvent) {
198         boolean allowed = true;
199         for (Filter filter : config.routerConfig().filterConfig().map()
200             .values()) {
201             if (!filter.isAllowed(aType, aEvent)) {
202                 allowed = false;
203             }
204         }
205         return allowed;
206     }
207
208     private List<String> determineDocumentTypes(DOMSource aEvent) {
209         List<String> res = new ArrayList<String>();
210         for (DocumentType type : config.routerConfig().documentTypeConfig()
211             .map().values()) {
212             if (type.isInstance(aEvent)) {
213                 res.add(type.getName());
214             }
215         }
216         return res;
217     }
218
219     private void logEvent(String aMessage, String aSource, DOMSource aEvent) {
220         LOGGER.log(Level.WARNING,
221             aMessage + ": " + eventToString(aSource, aEvent));
222     }
223
224     private String eventToString(String aSource, DOMSource aEvent) {
225         return "source '" + aSource + "': Event: '" +
226             new XMLDocument(aEvent).print(true) + "'";
227     }
228
229     private void transformationReturnedNull(String aSource, DOMSource aEvent,
230         String aInputType, Transformation aT, DOMSource aTransformed) {
231         LOGGER.log(Level.WARNING, "Transformation returned null for event " +
232             eventToString(aSource, aEvent) + " inputType '" + aInputType +
233             "', transformation '" + aT + "' document to transform " +
234             new XMLDocument(aTransformed).print(true));
235     }
236
237     private void destinationNotFound(String aSource, DOMSource aEvent) {
238         LOGGER.log(Level.WARNING, "No destination found for event: " +
239             eventToString(aSource, aEvent));
240     }
241
242     @Override
243     public Id<Destination> registerDestination(Destination aDestination) {
244         notNull("destination", aDestination);
245         long seqno = sequenceNumbers.getAndIncrement();
246         Id<Destination> id = new Id<Destination>(seqno);
247         destinations.put(id, new RobustDestination(id, aDestination));
248         return id;
249     }
250
251     @Override
252     public void unregisterDestination(Id<Destination> aId) {
253         destinations.remove(aId);
254     }
255
256     private void notNull(String aName, Object aValue) {
257         if (aValue == null) {
258             throw new IllegalArgumentException("Parameter '" + aName +
259                 "' may not be null");
260         }
261     }
262 }