2 * Copyright 2005-2011 the original author or authors.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.wamblee.xmlrouter.impl;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashSet;
21 import java.util.LinkedHashMap;
22 import java.util.List;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.logging.Level;
27 import java.util.logging.Logger;
29 import javax.xml.transform.dom.DOMSource;
31 import org.wamblee.general.Clock;
32 import org.wamblee.xml.XMLDocument;
33 import org.wamblee.xmlrouter.common.Id;
34 import org.wamblee.xmlrouter.config.Config;
35 import org.wamblee.xmlrouter.config.DocumentType;
36 import org.wamblee.xmlrouter.config.Filter;
37 import org.wamblee.xmlrouter.config.RouterConfig;
38 import org.wamblee.xmlrouter.config.Transformation;
39 import org.wamblee.xmlrouter.listener.EventInfo;
40 import org.wamblee.xmlrouter.listener.EventListener;
41 import org.wamblee.xmlrouter.publish.Gateway;
42 import org.wamblee.xmlrouter.subscribe.Destination;
43 import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
48 * @author Erik Brakkee
51 public class XMLRouter implements RouterConfig, Gateway, DestinationRegistry {
53 private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
56 private AtomicLong sequenceNumbers;
57 private EventListener listener;
59 private AtomicLong nextEventId;
61 private Config<DocumentType> documentTypes;
62 private Transformations transformations;
63 private Config<Filter> filters;
64 private Map<Id<Destination>, Destination> destinations;
66 public XMLRouter(Clock aClock, EventListener aListener) {
67 sequenceNumbers = new AtomicLong(1);
70 nextEventId = new AtomicLong(clock.currentTimeMillis());
71 documentTypes = new ConfigImpl<DocumentType>() {
73 public DocumentType wrap(Id<DocumentType> aId, DocumentType aType) {
74 return new RobustDocumentType(aId, aType);
77 transformations = new Transformations();
78 filters = new ConfigImpl<Filter>() {
80 public Filter wrap(Id<Filter> aId, Filter aFilter) {
81 return new RobustFilter(aId, aFilter);
84 destinations = new LinkedHashMap<Id<Destination>, Destination>();
88 public Config<DocumentType> getDocumentTypeConfig() {
93 public Config<Transformation> getTransformationConfig() {
94 return transformations.getTransformationConfig();
98 public Config<Filter> getFilterConfig() {
103 public void publish(String aSource, DOMSource aEvent) {
105 long time = clock.currentTimeMillis();
106 Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement());
107 List<String> types = determineDocumentTypes(aEvent);
108 EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
110 boolean delivered = false;
113 List<String> filteredInputTypes = determineFilteredInputTypes(
115 if (filteredInputTypes.isEmpty()) {
116 if (LOGGER.isLoggable(Level.FINE)) {
117 String doc = new XMLDocument(aEvent).print(true);
121 "Event ''0}'' from source {1} removed because of filters.",
122 new Object[] { doc, aSource });
126 // get the reachable target types through transformations.
128 // It is possible that a given event belongs to multiple input
130 // This is however certainly not the main case.
132 for (String inputType : filteredInputTypes) {
133 boolean result = deliverEvent(info, inputType);
134 delivered = delivered || result;
138 destinationNotFound(aSource, aEvent);
139 listener.notDelivered(info);
144 private boolean deliverEvent(EventInfo aInfo, String aInputType) {
146 boolean delivered = false;
147 Set<String> possibleTargetTypes = new HashSet<String>();
148 possibleTargetTypes.addAll(transformations
149 .getPossibleTargetTypes(aInputType));
151 // ask each destination what target types, if any they want to have.
152 for (Map.Entry<Id<Destination>, Destination> entry : destinations
154 Id<Destination> destinationId = entry.getKey();
155 Destination destination = entry.getValue();
156 Collection<String> requested = destination
157 .chooseFromTargetTypes(possibleTargetTypes);
158 if (!requested.isEmpty()) {
159 // Deliver to the destination.
160 for (String targetType : requested) {
161 TransformationPath path = transformations.getPath(
162 aInputType, targetType);
163 List<Transformation> ts = path.getTransformations();
165 boolean allowed = true;
166 DOMSource transformed = aInfo.getEvent();
167 while (i < ts.size() && allowed && transformed != null) {
168 Transformation t = ts.get(i);
169 DOMSource orig = transformed;
170 transformed = t.transform(transformed);
171 if (transformed == null) {
172 transformationReturnedNull(aInfo.getSource(),
173 aInfo.getEvent(), aInputType, t, orig);
176 if (!isAllowedByFilters(t.getToType(), transformed)) {
181 if (allowed && transformed != null) {
182 // all transformations done and all filters still
184 boolean result = destination.receive(transformed);
185 listener.delivered(aInfo, ts, destinationId.getId(),
186 destination.getName(), result);
187 delivered = delivered || result;
196 private List<String> determineFilteredInputTypes(List<String> aTypes,
199 // apply filters to the input
200 List<String> filteredTypes = new ArrayList<String>();
201 for (String type : aTypes) {
202 boolean allowed = isAllowedByFilters(type, aEvent);
204 filteredTypes.add(type);
207 return filteredTypes;
210 private boolean isAllowedByFilters(String aType, DOMSource aEvent) {
211 boolean allowed = true;
212 for (Id<Filter> id : filters.ids()) {
213 Filter filter = filters.get(id);
214 if (!filter.isAllowed(aType, aEvent)) {
221 private List<String> determineDocumentTypes(DOMSource aEvent) {
222 List<String> res = new ArrayList<String>();
223 for (Id<DocumentType> id : documentTypes.ids()) {
224 DocumentType type = documentTypes.get(id);
225 if (type.isInstance(aEvent)) {
226 res.add(type.getName());
232 private void logEvent(String aMessage, String aSource, DOMSource aEvent,
233 Exception aException) {
234 LOGGER.log(Level.WARNING, aMessage + ": source '" + aSource +
235 "': Event: '" + new XMLDocument(aEvent).print(true) + "'",
239 private void logEvent(String aMessage, String aSource, DOMSource aEvent) {
240 LOGGER.log(Level.WARNING,
241 aMessage + ": " + eventToString(aSource, aEvent));
244 private String eventToString(String aSource, DOMSource aEvent) {
245 return "source '" + aSource + "': Event: '" +
246 new XMLDocument(aEvent).print(true) + "'";
249 private void transformationReturnedNull(String aSource, DOMSource aEvent,
250 String aInputType, Transformation aT, DOMSource aTransformed) {
251 LOGGER.log(Level.WARNING, "Transformation returned null for event " +
252 eventToString(aSource, aEvent) + " inputType '" + aInputType +
253 "', transformation '" + aT + "' document to transform " +
254 new XMLDocument(aTransformed).print(true));
257 private void destinationNotFound(String aSource, DOMSource aEvent) {
258 LOGGER.log(Level.WARNING, "No destination found for event: " +
259 eventToString(aSource, aEvent));
263 public Id<Destination> registerDestination(Destination aDestination) {
264 notNull("destination", aDestination);
265 long seqno = sequenceNumbers.getAndIncrement();
266 Id<Destination> id = new Id<Destination>(seqno);
267 destinations.put(id, new RobustDestination(id, aDestination));
272 public void unregisterDestination(Id<Destination> aId) {
273 destinations.remove(aId);
276 private void notNull(String aName, Object aValue) {
277 if (aValue == null) {
278 throw new IllegalArgumentException("Parameter '" + aName +
279 "' may not be null");