+package org.wamblee.xmlrouter.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.transform.dom.DOMSource;
+
+import org.wamblee.xml.XMLDocument;
+import org.wamblee.xmlrouter.common.Id;
+import org.wamblee.xmlrouter.config.Config;
+import org.wamblee.xmlrouter.config.DocumentType;
+import org.wamblee.xmlrouter.config.Filter;
+import org.wamblee.xmlrouter.config.Transformation;
+import org.wamblee.xmlrouter.publish.Gateway;
+import org.wamblee.xmlrouter.subscribe.Destination;
+import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
+
+// TODO concurrency.
+
+public class XMLRouter implements Config, Gateway, DestinationRegistry {
+
+ private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
+ .getName());
+
+ private AtomicInteger sequenceNumbers;
+ private Map<Integer, DocumentType> documentTypes;
+ private Transformations transformations;
+ private Map<Integer, Filter> filters;
+ private Map<Integer, Destination> destinations;
+
+ public XMLRouter() {
+ sequenceNumbers = new AtomicInteger(1);
+ documentTypes = new LinkedHashMap<Integer, DocumentType>();
+ transformations = new Transformations();
+ filters = new LinkedHashMap<Integer, Filter>();
+ destinations = new LinkedHashMap<Integer, Destination>();
+ }
+
+ @Override
+ public Id<DocumentType> addDocumentType(DocumentType aType) {
+ int seqno = sequenceNumbers.getAndIncrement();
+ documentTypes.put(seqno, aType);
+ return new Id<DocumentType>(seqno);
+ }
+
+ @Override
+ public void removeDocumentType(Id<DocumentType> aId) {
+ documentTypes.remove(aId);
+ }
+
+ @Override
+ public Collection<DocumentType> getDocumentTypes() {
+ return Collections.unmodifiableCollection(documentTypes.values());
+ }
+
+ @Override
+ public Id<Transformation> addTransformation(Transformation aTransformation) {
+ return transformations.addTransformation(aTransformation);
+ }
+
+ @Override
+ public void removeTransformation(Id<Transformation> aId) {
+ transformations.removeTransformation(aId);
+ }
+
+ @Override
+ public Collection<Transformation> getTransformations() {
+ return transformations.getTransformations();
+ }
+
+ @Override
+ public Id<Filter> addFilter(Filter aFilter) {
+ int seqno = sequenceNumbers.getAndIncrement();
+ filters.put(seqno, aFilter);
+ return new Id<Filter>(seqno);
+ }
+
+ @Override
+ public void removeFilter(Id<Filter> aId) {
+ filters.remove(aId);
+ }
+
+ @Override
+ public Collection<Filter> getFilters() {
+ return Collections.unmodifiableCollection(filters.values());
+ }
+
+ @Override
+ public boolean publish(String aSource, DOMSource aEvent) {
+
+ boolean delivered = false;
+ try {
+
+ List<String> filteredInputTypes = determineFilteredInputTypes(aEvent);
+ if (filteredInputTypes.isEmpty()) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ String doc = new XMLDocument(aEvent).print(true);
+ LOGGER
+ .log(
+ Level.FINE,
+ "Event ''0}'' from source {1} removed because of filters.",
+ new Object[] { doc, aSource });
+ }
+ return delivered;
+ }
+
+ // get the reachable target types through transformations.
+
+ // It is possible that a given event belongs to multiple input
+ // types.
+ // This is however certainly not the main case.
+
+ for (String inputType : filteredInputTypes) {
+ boolean result = deliverEvent(aSource, aEvent, inputType);
+ delivered = delivered || result;
+ }
+ } finally {
+ if (!delivered) {
+ destinationNotFound(aSource, aEvent);
+ }
+ return delivered;
+ }
+ }
+
+ private boolean deliverEvent(String aSource, DOMSource aEvent,
+ String aInputType) {
+
+ boolean delivered = false;
+ Set<String> possibleTargetTypes = new HashSet<String>();
+ possibleTargetTypes.addAll(transformations
+ .getPossibleTargetTypes(aInputType));
+
+ // ask each destination what target types, if any they want to have.
+ for (Destination destination : destinations.values()) {
+ Collection<String> requested = destination
+ .chooseFromTargetTypes(possibleTargetTypes);
+ if (!requested.isEmpty()) {
+ // Deliver to the destination.
+ for (String targetType : requested) {
+ TransformationPath path = transformations.getPath(
+ aInputType, targetType);
+ List<Transformation> ts = path.getTransformations();
+ int i = 0;
+ boolean allowed = true;
+ DOMSource transformed = aEvent;
+ while (i < ts.size() && allowed && transformed != null) {
+ Transformation t = ts.get(i);
+ DOMSource orig = transformed;
+ transformed = t.transform(transformed);
+ if (transformed == null) {
+ transformationReturnedNull(aSource, aEvent,
+ aInputType, t, orig);
+ }
+
+ if (!isAllowedByFilters(t.getToType(), transformed)) {
+ allowed = false;
+ }
+ i++;
+ }
+ if (allowed && transformed != null) {
+ // all transformations done and all filters still
+ // allow the event.
+ boolean result = destination.receive(transformed);
+ delivered = delivered || result;
+
+ }
+ }
+ }
+ }
+ return delivered;
+ }
+
+ private List<String> determineFilteredInputTypes(DOMSource aEvent) {
+ List<String> types = determineDocumentTypes(aEvent);
+ // apply filters to the input
+ List<String> filteredTypes = new ArrayList<String>();
+ for (String type : types) {
+ boolean allowed = isAllowedByFilters(type, aEvent);
+ if (allowed) {
+ filteredTypes.add(type);
+ }
+ }
+ return filteredTypes;
+ }
+
+ private boolean isAllowedByFilters(String aType, DOMSource aEvent) {
+ boolean allowed = true;
+ for (Filter filter : filters.values()) {
+ if (!filter.isAllowed(aType, aEvent)) {
+ allowed = false;
+ }
+ }
+ return allowed;
+ }
+
+ private List<String> determineDocumentTypes(DOMSource aEvent) {
+ List<String> res = new ArrayList<String>();
+ for (DocumentType type : documentTypes.values()) {
+ if (type.isInstance(aEvent)) {
+ res.add(type.getName());
+ }
+ }
+ return res;
+ }
+
+ private void logEvent(String aMessage, String aSource, DOMSource aEvent,
+ Exception aException) {
+ LOGGER.log(Level.WARNING, aMessage + ": source '" + aSource +
+ "': Event: '" + new XMLDocument(aEvent).print(true) + "'",
+ aException);
+ }
+
+ private void logEvent(String aMessage, String aSource, DOMSource aEvent) {
+ LOGGER.log(Level.WARNING,
+ aMessage + ": " + eventToString(aSource, aEvent));
+ }
+
+ private String eventToString(String aSource, DOMSource aEvent) {
+ return "source '" + aSource + "': Event: '" +
+ new XMLDocument(aEvent).print(true) + "'";
+ }
+
+ private void transformationReturnedNull(String aSource, DOMSource aEvent,
+ String aInputType, Transformation aT, DOMSource aTransformed) {
+ LOGGER.log(Level.WARNING, "Transformation returned null for event " +
+ eventToString(aSource, aEvent) + " inputType '" + aInputType +
+ "', transformation '" + aT + "' document to transform " +
+ new XMLDocument(aTransformed).print(true));
+ }
+
+ private void destinationNotFound(String aSource, DOMSource aEvent) {
+ LOGGER.log(Level.WARNING, "No destination found for event: " +
+ eventToString(aSource, aEvent));
+ }
+
+ @Override
+ public Id<Destination> registerDestination(Destination aDestination) {
+ notNull("destination", aDestination);
+ int seqno = sequenceNumbers.getAndIncrement();
+ Id<Destination> id = new Id<Destination>(seqno);
+ destinations.put(seqno, new RobustDestination(id, aDestination));
+ return id;
+ }
+
+ @Override
+ public void unregisterDestination(Id<Destination> aId) {
+ destinations.remove(aId.getId());
+ }
+
+ private void notNull(String aName, Object aValue) {
+ if (aValue == null) {
+ throw new IllegalArgumentException("Parameter '" + aName +
+ "' may not be null");
+ }
+ }
+}