initial versions.
[xmlrouter] / impl / src / main / java / org / wamblee / xmlrouter / impl / XMLRouter.java
diff --git a/impl/src/main/java/org/wamblee/xmlrouter/impl/XMLRouter.java b/impl/src/main/java/org/wamblee/xmlrouter/impl/XMLRouter.java
new file mode 100644 (file)
index 0000000..85a51df
--- /dev/null
@@ -0,0 +1,265 @@
+package org.wamblee.xmlrouter.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.transform.dom.DOMSource;
+
+import org.wamblee.xml.XMLDocument;
+import org.wamblee.xmlrouter.common.Id;
+import org.wamblee.xmlrouter.config.Config;
+import org.wamblee.xmlrouter.config.DocumentType;
+import org.wamblee.xmlrouter.config.Filter;
+import org.wamblee.xmlrouter.config.Transformation;
+import org.wamblee.xmlrouter.publish.Gateway;
+import org.wamblee.xmlrouter.subscribe.Destination;
+import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
+
+// TODO concurrency.
+
+public class XMLRouter implements Config, Gateway, DestinationRegistry {
+
+    private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
+        .getName());
+
+    private AtomicInteger sequenceNumbers;
+    private Map<Integer, DocumentType> documentTypes;
+    private Transformations transformations;
+    private Map<Integer, Filter> filters;
+    private Map<Integer, Destination> destinations;
+
+    public XMLRouter() {
+        sequenceNumbers = new AtomicInteger(1);
+        documentTypes = new LinkedHashMap<Integer, DocumentType>();
+        transformations = new Transformations();
+        filters = new LinkedHashMap<Integer, Filter>();
+        destinations = new LinkedHashMap<Integer, Destination>();
+    }
+
+    @Override
+    public Id<DocumentType> addDocumentType(DocumentType aType) {
+        int seqno = sequenceNumbers.getAndIncrement();
+        documentTypes.put(seqno, aType);
+        return new Id<DocumentType>(seqno);
+    }
+
+    @Override
+    public void removeDocumentType(Id<DocumentType> aId) {
+        documentTypes.remove(aId);
+    }
+
+    @Override
+    public Collection<DocumentType> getDocumentTypes() {
+        return Collections.unmodifiableCollection(documentTypes.values());
+    }
+
+    @Override
+    public Id<Transformation> addTransformation(Transformation aTransformation) {
+        return transformations.addTransformation(aTransformation);
+    }
+
+    @Override
+    public void removeTransformation(Id<Transformation> aId) {
+        transformations.removeTransformation(aId);
+    }
+
+    @Override
+    public Collection<Transformation> getTransformations() {
+        return transformations.getTransformations();
+    }
+
+    @Override
+    public Id<Filter> addFilter(Filter aFilter) {
+        int seqno = sequenceNumbers.getAndIncrement();
+        filters.put(seqno, aFilter);
+        return new Id<Filter>(seqno);
+    }
+
+    @Override
+    public void removeFilter(Id<Filter> aId) {
+        filters.remove(aId);
+    }
+
+    @Override
+    public Collection<Filter> getFilters() {
+        return Collections.unmodifiableCollection(filters.values());
+    }
+
+    @Override
+    public boolean publish(String aSource, DOMSource aEvent) {
+
+        boolean delivered = false;
+        try {
+
+            List<String> filteredInputTypes = determineFilteredInputTypes(aEvent);
+            if (filteredInputTypes.isEmpty()) {
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    String doc = new XMLDocument(aEvent).print(true);
+                    LOGGER
+                        .log(
+                            Level.FINE,
+                            "Event ''0}'' from source {1} removed because of filters.",
+                            new Object[] { doc, aSource });
+                }
+                return delivered;
+            }
+
+            // get the reachable target types through transformations.
+
+            // It is possible that a given event belongs to multiple input
+            // types.
+            // This is however certainly not the main case.
+
+            for (String inputType : filteredInputTypes) {
+                boolean result = deliverEvent(aSource, aEvent, inputType);
+                delivered = delivered || result;
+            }
+        } finally {
+            if (!delivered) {
+                destinationNotFound(aSource, aEvent);
+            }
+            return delivered;
+        }
+    }
+
+    private boolean deliverEvent(String aSource, DOMSource aEvent,
+        String aInputType) {
+
+        boolean delivered = false;
+        Set<String> possibleTargetTypes = new HashSet<String>();
+        possibleTargetTypes.addAll(transformations
+            .getPossibleTargetTypes(aInputType));
+
+        // ask each destination what target types, if any they want to have.
+        for (Destination destination : destinations.values()) {
+            Collection<String> requested = destination
+                .chooseFromTargetTypes(possibleTargetTypes);
+            if (!requested.isEmpty()) {
+                // Deliver to the destination.
+                for (String targetType : requested) {
+                    TransformationPath path = transformations.getPath(
+                        aInputType, targetType);
+                    List<Transformation> ts = path.getTransformations();
+                    int i = 0;
+                    boolean allowed = true;
+                    DOMSource transformed = aEvent;
+                    while (i < ts.size() && allowed && transformed != null) {
+                        Transformation t = ts.get(i);
+                        DOMSource orig = transformed;
+                        transformed = t.transform(transformed);
+                        if (transformed == null) {
+                            transformationReturnedNull(aSource, aEvent,
+                                aInputType, t, orig);
+                        }
+
+                        if (!isAllowedByFilters(t.getToType(), transformed)) {
+                            allowed = false;
+                        }
+                        i++;
+                    }
+                    if (allowed && transformed != null) {
+                        // all transformations done and all filters still
+                        // allow the event.
+                        boolean result = destination.receive(transformed);
+                        delivered = delivered || result;
+
+                    }
+                }
+            }
+        }
+        return delivered;
+    }
+
+    private List<String> determineFilteredInputTypes(DOMSource aEvent) {
+        List<String> types = determineDocumentTypes(aEvent);
+        // apply filters to the input
+        List<String> filteredTypes = new ArrayList<String>();
+        for (String type : types) {
+            boolean allowed = isAllowedByFilters(type, aEvent);
+            if (allowed) {
+                filteredTypes.add(type);
+            }
+        }
+        return filteredTypes;
+    }
+
+    private boolean isAllowedByFilters(String aType, DOMSource aEvent) {
+        boolean allowed = true;
+        for (Filter filter : filters.values()) {
+            if (!filter.isAllowed(aType, aEvent)) {
+                allowed = false;
+            }
+        }
+        return allowed;
+    }
+
+    private List<String> determineDocumentTypes(DOMSource aEvent) {
+        List<String> res = new ArrayList<String>();
+        for (DocumentType type : documentTypes.values()) {
+            if (type.isInstance(aEvent)) {
+                res.add(type.getName());
+            }
+        }
+        return res;
+    }
+
+    private void logEvent(String aMessage, String aSource, DOMSource aEvent,
+        Exception aException) {
+        LOGGER.log(Level.WARNING, aMessage + ": source '" + aSource +
+            "': Event: '" + new XMLDocument(aEvent).print(true) + "'",
+            aException);
+    }
+
+    private void logEvent(String aMessage, String aSource, DOMSource aEvent) {
+        LOGGER.log(Level.WARNING,
+            aMessage + ": " + eventToString(aSource, aEvent));
+    }
+
+    private String eventToString(String aSource, DOMSource aEvent) {
+        return "source '" + aSource + "': Event: '" +
+            new XMLDocument(aEvent).print(true) + "'";
+    }
+
+    private void transformationReturnedNull(String aSource, DOMSource aEvent,
+        String aInputType, Transformation aT, DOMSource aTransformed) {
+        LOGGER.log(Level.WARNING, "Transformation returned null for event " +
+            eventToString(aSource, aEvent) + " inputType '" + aInputType +
+            "', transformation '" + aT + "' document to transform " +
+            new XMLDocument(aTransformed).print(true));
+    }
+
+    private void destinationNotFound(String aSource, DOMSource aEvent) {
+        LOGGER.log(Level.WARNING, "No destination found for event: " +
+            eventToString(aSource, aEvent));
+    }
+
+    @Override
+    public Id<Destination> registerDestination(Destination aDestination) {
+        notNull("destination", aDestination);
+        int seqno = sequenceNumbers.getAndIncrement();
+        Id<Destination> id = new Id<Destination>(seqno);
+        destinations.put(seqno, new RobustDestination(id, aDestination));
+        return id;
+    }
+
+    @Override
+    public void unregisterDestination(Id<Destination> aId) {
+        destinations.remove(aId.getId());
+    }
+
+    private void notNull(String aName, Object aValue) {
+        if (aValue == null) {
+            throw new IllegalArgumentException("Parameter '" + aName +
+                "' may not be null");
+        }
+    }
+}