2 * Copyright 2005-2011 the original author or authors.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.wamblee.xmlrouter.impl;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.LinkedHashMap;
23 import java.util.List;
26 import java.util.concurrent.atomic.AtomicLong;
27 import java.util.logging.Level;
28 import java.util.logging.Logger;
30 import javax.xml.transform.dom.DOMSource;
32 import org.wamblee.general.Clock;
33 import org.wamblee.xml.XMLDocument;
34 import org.wamblee.xmlrouter.common.Id;
35 import org.wamblee.xmlrouter.config.Config;
36 import org.wamblee.xmlrouter.config.DocumentType;
37 import org.wamblee.xmlrouter.config.Filter;
38 import org.wamblee.xmlrouter.config.Transformation;
39 import org.wamblee.xmlrouter.listener.EventInfo;
40 import org.wamblee.xmlrouter.listener.EventListener;
41 import org.wamblee.xmlrouter.publish.Gateway;
42 import org.wamblee.xmlrouter.subscribe.Destination;
43 import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
47 public class XMLRouter implements Config, Gateway, DestinationRegistry {
49 private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
52 private EventListener listener;
54 private AtomicLong nextEventId;
55 private AtomicLong sequenceNumbers;
56 private Map<Long, DocumentType> documentTypes;
57 private Transformations transformations;
58 private Map<Long, Filter> filters;
59 private Map<Long, Destination> destinations;
61 public XMLRouter(Clock aClock, EventListener aListener) {
64 nextEventId = new AtomicLong(clock.currentTimeMillis());
65 sequenceNumbers = new AtomicLong(1);
66 documentTypes = new LinkedHashMap<Long, DocumentType>();
67 transformations = new Transformations();
68 filters = new LinkedHashMap<Long, Filter>();
69 destinations = new LinkedHashMap<Long, Destination>();
73 public Id<DocumentType> addDocumentType(DocumentType aType) {
74 long seqno = sequenceNumbers.getAndIncrement();
75 documentTypes.put(seqno, aType);
76 return new Id<DocumentType>(seqno);
80 public void removeDocumentType(Id<DocumentType> aId) {
81 documentTypes.remove(aId);
85 public Collection<DocumentType> getDocumentTypes() {
86 return Collections.unmodifiableCollection(documentTypes.values());
90 public Id<Transformation> addTransformation(Transformation aTransformation) {
91 return transformations.addTransformation(aTransformation);
95 public void removeTransformation(Id<Transformation> aId) {
96 transformations.removeTransformation(aId);
100 public Collection<Transformation> getTransformations() {
101 return transformations.getTransformations();
105 public Id<Filter> addFilter(Filter aFilter) {
106 long seqno = sequenceNumbers.getAndIncrement();
107 filters.put(seqno, aFilter);
108 return new Id<Filter>(seqno);
112 public void removeFilter(Id<Filter> aId) {
117 public Collection<Filter> getFilters() {
118 return Collections.unmodifiableCollection(filters.values());
122 public void publish(String aSource, DOMSource aEvent) {
124 long time = clock.currentTimeMillis();
125 Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement());
126 List<String> types = determineDocumentTypes(aEvent);
127 EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
129 boolean delivered = false;
132 List<String> filteredInputTypes = determineFilteredInputTypes(
134 if (filteredInputTypes.isEmpty()) {
135 if (LOGGER.isLoggable(Level.FINE)) {
136 String doc = new XMLDocument(aEvent).print(true);
140 "Event ''0}'' from source {1} removed because of filters.",
141 new Object[] { doc, aSource });
145 // get the reachable target types through transformations.
147 // It is possible that a given event belongs to multiple input
149 // This is however certainly not the main case.
151 for (String inputType : filteredInputTypes) {
152 boolean result = deliverEvent(info, inputType);
153 delivered = delivered || result;
157 destinationNotFound(aSource, aEvent);
158 listener.notDelivered(info);
163 private boolean deliverEvent(EventInfo aInfo, String aInputType) {
165 boolean delivered = false;
166 Set<String> possibleTargetTypes = new HashSet<String>();
167 possibleTargetTypes.addAll(transformations
168 .getPossibleTargetTypes(aInputType));
170 // ask each destination what target types, if any they want to have.
171 for (Map.Entry<Long, Destination> entry : destinations.entrySet()) {
172 long destinationId = entry.getKey();
173 Destination destination = entry.getValue();
174 Collection<String> requested = destination
175 .chooseFromTargetTypes(possibleTargetTypes);
176 if (!requested.isEmpty()) {
177 // Deliver to the destination.
178 for (String targetType : requested) {
179 TransformationPath path = transformations.getPath(
180 aInputType, targetType);
181 List<Transformation> ts = path.getTransformations();
183 boolean allowed = true;
184 DOMSource transformed = aInfo.getEvent();
185 while (i < ts.size() && allowed && transformed != null) {
186 Transformation t = ts.get(i);
187 DOMSource orig = transformed;
188 transformed = t.transform(transformed);
189 if (transformed == null) {
190 transformationReturnedNull(aInfo.getSource(),
191 aInfo.getEvent(), aInputType, t, orig);
194 if (!isAllowedByFilters(t.getToType(), transformed)) {
199 if (allowed && transformed != null) {
200 // all transformations done and all filters still
202 boolean result = destination.receive(transformed);
203 listener.delivered(aInfo, ts, destinationId,
204 destination.getName(), result);
205 delivered = delivered || result;
214 private List<String> determineFilteredInputTypes(List<String> aTypes,
217 // apply filters to the input
218 List<String> filteredTypes = new ArrayList<String>();
219 for (String type : aTypes) {
220 boolean allowed = isAllowedByFilters(type, aEvent);
222 filteredTypes.add(type);
225 return filteredTypes;
228 private boolean isAllowedByFilters(String aType, DOMSource aEvent) {
229 boolean allowed = true;
230 for (Filter filter : filters.values()) {
231 if (!filter.isAllowed(aType, aEvent)) {
238 private List<String> determineDocumentTypes(DOMSource aEvent) {
239 List<String> res = new ArrayList<String>();
240 for (DocumentType type : documentTypes.values()) {
241 if (type.isInstance(aEvent)) {
242 res.add(type.getName());
248 private void logEvent(String aMessage, String aSource, DOMSource aEvent,
249 Exception aException) {
250 LOGGER.log(Level.WARNING, aMessage + ": source '" + aSource +
251 "': Event: '" + new XMLDocument(aEvent).print(true) + "'",
255 private void logEvent(String aMessage, String aSource, DOMSource aEvent) {
256 LOGGER.log(Level.WARNING,
257 aMessage + ": " + eventToString(aSource, aEvent));
260 private String eventToString(String aSource, DOMSource aEvent) {
261 return "source '" + aSource + "': Event: '" +
262 new XMLDocument(aEvent).print(true) + "'";
265 private void transformationReturnedNull(String aSource, DOMSource aEvent,
266 String aInputType, Transformation aT, DOMSource aTransformed) {
267 LOGGER.log(Level.WARNING, "Transformation returned null for event " +
268 eventToString(aSource, aEvent) + " inputType '" + aInputType +
269 "', transformation '" + aT + "' document to transform " +
270 new XMLDocument(aTransformed).print(true));
273 private void destinationNotFound(String aSource, DOMSource aEvent) {
274 LOGGER.log(Level.WARNING, "No destination found for event: " +
275 eventToString(aSource, aEvent));
279 public Id<Destination> registerDestination(Destination aDestination) {
280 notNull("destination", aDestination);
281 long seqno = sequenceNumbers.getAndIncrement();
282 Id<Destination> id = new Id<Destination>(seqno);
283 destinations.put(seqno, new RobustDestination(id, aDestination));
288 public void unregisterDestination(Id<Destination> aId) {
289 destinations.remove(aId.getId());
292 private void notNull(String aName, Object aValue) {
293 if (aValue == null) {
294 throw new IllegalArgumentException("Parameter '" + aName +
295 "' may not be null");