2 * Copyright 2005-2011 the original author or authors.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.wamblee.xmlrouter.impl;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashSet;
21 import java.util.LinkedHashMap;
22 import java.util.List;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.logging.Level;
27 import java.util.logging.Logger;
29 import javax.xml.transform.dom.DOMSource;
31 import org.wamblee.general.Clock;
32 import org.wamblee.general.Pair;
33 import org.wamblee.xml.XMLDocument;
34 import org.wamblee.xmlrouter.common.Id;
35 import org.wamblee.xmlrouter.config.DocumentType;
36 import org.wamblee.xmlrouter.config.Filter;
37 import org.wamblee.xmlrouter.config.Transformation;
38 import org.wamblee.xmlrouter.listener.EventInfo;
39 import org.wamblee.xmlrouter.listener.EventListener;
40 import org.wamblee.xmlrouter.publish.Gateway;
41 import org.wamblee.xmlrouter.subscribe.Destination;
42 import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
44 // TODO check intermediate types during transformation based on filters.
49 * @author Erik Brakkee
52 public class XMLRouter implements Gateway, DestinationRegistry {
54 private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
57 private AtomicLong sequenceNumbers;
58 private EventListener listener;
60 private AtomicLong nextEventId;
62 private XMLRouterConfiguration config;
64 private Map<Id<Destination>, Destination> destinations;
66 public XMLRouter(Clock aClock, XMLRouterConfiguration aConfig,
67 EventListener aListener) {
68 sequenceNumbers = new AtomicLong(1);
71 nextEventId = new AtomicLong(clock.currentTimeMillis());
73 destinations = new LinkedHashMap<Id<Destination>, Destination>();
77 public void publish(String aSource, DOMSource aEvent) {
78 long time = clock.currentTimeMillis();
80 Pair<ExtendedRouterConfig, TransformationPaths> snapshotconfig = config
83 Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement() + "");
84 List<String> types = determineDocumentTypes(snapshotconfig.getFirst()
85 .documentTypeConfig().values(), aEvent);
86 EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
88 boolean delivered = false;
91 List<String> filteredInputTypes = determineFilteredInputTypes(
92 snapshotconfig.getFirst().filterConfig().values(), types,
94 if (filteredInputTypes.isEmpty()) {
95 if (LOGGER.isLoggable(Level.FINE)) {
96 String doc = new XMLDocument(aEvent).print(true);
100 "Event ''0}'' from source {1} removed because of filters.",
101 new Object[] { doc, aSource });
105 // get the reachable target types through transformations.
107 // It is possible that a given event belongs to multiple input
109 // This is however certainly not the main case.
111 for (String inputType : filteredInputTypes) {
112 boolean result = deliverEvent(snapshotconfig.getFirst()
113 .filterConfig().values(), snapshotconfig.getSecond(), info,
115 delivered = delivered || result;
119 destinationNotFound(aSource, aEvent);
120 listener.notDelivered(info);
125 private boolean deliverEvent(Collection<Filter> aFilters,
126 TransformationPaths aTransformations, EventInfo aInfo, String aInputType) {
128 boolean delivered = false;
129 Set<String> possibleTargetTypes = new HashSet<String>();
130 possibleTargetTypes.addAll(aTransformations
131 .getPossibleTargetTypes(aInputType));
133 // ask each destination what target types, if any they want to have.
134 for (Map.Entry<Id<Destination>, Destination> entry : destinations
136 Id<Destination> destinationId = entry.getKey();
137 Destination destination = entry.getValue();
138 Collection<String> requested = destination
139 .chooseFromTargetTypes(possibleTargetTypes);
140 if (!requested.isEmpty()) {
141 // Deliver to the destination.
142 for (String targetType : requested) {
143 TransformationPath path = aTransformations.getPath(
144 aInputType, targetType);
145 List<Transformation> ts = path.getTransformations();
147 boolean allowed = true;
148 DOMSource transformed = aInfo.getEvent();
149 while (i < ts.size() && allowed && transformed != null) {
150 Transformation t = ts.get(i);
151 DOMSource orig = transformed;
152 transformed = t.transform(transformed);
153 if (transformed == null) {
154 transformationReturnedNull(aInfo.getSource(),
155 aInfo.getEvent(), aInputType, t, orig);
158 if (!isAllowedByFilters(aFilters, t.getToType(),
164 if (allowed && transformed != null) {
165 // all transformations done and all filters still
167 boolean result = destination.receive(transformed);
168 listener.delivered(aInfo, ts, destinationId.getId(),
170 delivered = delivered || result;
179 private List<String> determineFilteredInputTypes(
180 Collection<Filter> aFilters, List<String> aTypes, DOMSource aEvent) {
182 // apply filters to the input
183 List<String> filteredTypes = new ArrayList<String>();
184 for (String type : aTypes) {
185 boolean allowed = isAllowedByFilters(aFilters, type, aEvent);
187 filteredTypes.add(type);
190 return filteredTypes;
193 private boolean isAllowedByFilters(Collection<Filter> aFilters,
194 String aType, DOMSource aEvent) {
195 boolean allowed = true;
196 for (Filter filter : aFilters) {
197 if (!filter.isAllowed(aType, aEvent)) {
204 private List<String> determineDocumentTypes(
205 Collection<DocumentType> aTypes, DOMSource aEvent) {
206 List<String> res = new ArrayList<String>();
207 for (DocumentType type : aTypes) {
208 if (type.isInstance(aEvent)) {
209 res.add(type.getName());
215 private String eventToString(String aSource, DOMSource aEvent) {
216 return "source '" + aSource + "': Event: '" +
217 new XMLDocument(aEvent).print(true) + "'";
220 private void transformationReturnedNull(String aSource, DOMSource aEvent,
221 String aInputType, Transformation aT, DOMSource aTransformed) {
222 LOGGER.log(Level.WARNING, "Transformation returned null for event " +
223 eventToString(aSource, aEvent) + " inputType '" + aInputType +
224 "', transformation '" + aT + "' document to transform " +
225 new XMLDocument(aTransformed).print(true));
228 private void destinationNotFound(String aSource, DOMSource aEvent) {
229 LOGGER.log(Level.WARNING, "No destination found for event: " +
230 eventToString(aSource, aEvent));
234 public Id<Destination> registerDestination(Destination aDestination) {
235 notNull("destination", aDestination);
236 long seqno = sequenceNumbers.getAndIncrement();
237 Id<Destination> id = new Id<Destination>(seqno + "");
238 destinations.put(id, new RobustDestination(id, aDestination));
243 public void unregisterDestination(Id<Destination> aId) {
244 destinations.remove(aId);
247 private void notNull(String aName, Object aValue) {
248 if (aValue == null) {
249 throw new IllegalArgumentException("Parameter '" + aName +
250 "' may not be null");