event listener is now used by the xml router and the publish method of the gateway is
[xmlrouter] / impl / src / main / java / org / wamblee / xmlrouter / impl / XMLRouter.java
index e166d75d74880926b86459ab60971862dec0d0e8..6b460d1dea203495d7ef258afec33c75e86c71ab 100644 (file)
@@ -23,18 +23,20 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.xml.transform.dom.DOMSource;
 
+import org.wamblee.general.Clock;
 import org.wamblee.xml.XMLDocument;
 import org.wamblee.xmlrouter.common.Id;
 import org.wamblee.xmlrouter.config.Config;
 import org.wamblee.xmlrouter.config.DocumentType;
 import org.wamblee.xmlrouter.config.Filter;
 import org.wamblee.xmlrouter.config.Transformation;
+import org.wamblee.xmlrouter.listener.EventInfo;
 import org.wamblee.xmlrouter.listener.EventListener;
 import org.wamblee.xmlrouter.publish.Gateway;
 import org.wamblee.xmlrouter.subscribe.Destination;
@@ -48,24 +50,28 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry {
         .getName());
 
     private EventListener listener;
-    private AtomicInteger sequenceNumbers;
-    private Map<Integer, DocumentType> documentTypes;
+    private Clock clock;
+    private AtomicLong nextEventId;
+    private AtomicLong sequenceNumbers;
+    private Map<Long, DocumentType> documentTypes;
     private Transformations transformations;
-    private Map<Integer, Filter> filters;
-    private Map<Integer, Destination> destinations;
+    private Map<Long, Filter> filters;
+    private Map<Long, Destination> destinations;
 
-    public XMLRouter(EventListener aListener) {
+    public XMLRouter(Clock aClock, EventListener aListener) {
         listener = aListener;
-        sequenceNumbers = new AtomicInteger(1);
-        documentTypes = new LinkedHashMap<Integer, DocumentType>();
+        clock = aClock;
+        nextEventId = new AtomicLong(clock.currentTimeMillis());
+        sequenceNumbers = new AtomicLong(1);
+        documentTypes = new LinkedHashMap<Long, DocumentType>();
         transformations = new Transformations();
-        filters = new LinkedHashMap<Integer, Filter>();
-        destinations = new LinkedHashMap<Integer, Destination>();
+        filters = new LinkedHashMap<Long, Filter>();
+        destinations = new LinkedHashMap<Long, Destination>();
     }
 
     @Override
     public Id<DocumentType> addDocumentType(DocumentType aType) {
-        int seqno = sequenceNumbers.getAndIncrement();
+        long seqno = sequenceNumbers.getAndIncrement();
         documentTypes.put(seqno, aType);
         return new Id<DocumentType>(seqno);
     }
@@ -97,7 +103,7 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry {
 
     @Override
     public Id<Filter> addFilter(Filter aFilter) {
-        int seqno = sequenceNumbers.getAndIncrement();
+        long seqno = sequenceNumbers.getAndIncrement();
         filters.put(seqno, aFilter);
         return new Id<Filter>(seqno);
     }
@@ -113,12 +119,18 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry {
     }
 
     @Override
-    public boolean publish(String aSource, DOMSource aEvent) {
+    public void publish(String aSource, DOMSource aEvent) {
+
+        long time = clock.currentTimeMillis();
+        Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement());
+        List<String> types = determineDocumentTypes(aEvent);
+        EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
 
         boolean delivered = false;
         try {
 
-            List<String> filteredInputTypes = determineFilteredInputTypes(aEvent);
+            List<String> filteredInputTypes = determineFilteredInputTypes(
+                types, aEvent);
             if (filteredInputTypes.isEmpty()) {
                 if (LOGGER.isLoggable(Level.FINE)) {
                     String doc = new XMLDocument(aEvent).print(true);
@@ -128,7 +140,6 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry {
                             "Event ''0}'' from source {1} removed because of filters.",
                             new Object[] { doc, aSource });
                 }
-                return delivered;
             }
 
             // get the reachable target types through transformations.
@@ -138,19 +149,18 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry {
             // This is however certainly not the main case.
 
             for (String inputType : filteredInputTypes) {
-                boolean result = deliverEvent(aSource, aEvent, inputType);
+                boolean result = deliverEvent(info, inputType);
                 delivered = delivered || result;
             }
         } finally {
             if (!delivered) {
                 destinationNotFound(aSource, aEvent);
+                listener.notDelivered(info);
             }
-            return delivered;
         }
     }
 
-    private boolean deliverEvent(String aSource, DOMSource aEvent,
-        String aInputType) {
+    private boolean deliverEvent(EventInfo aInfo, String aInputType) {
 
         boolean delivered = false;
         Set<String> possibleTargetTypes = new HashSet<String>();
@@ -158,7 +168,9 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry {
             .getPossibleTargetTypes(aInputType));
 
         // ask each destination what target types, if any they want to have.
-        for (Destination destination : destinations.values()) {
+        for (Map.Entry<Long, Destination> entry : destinations.entrySet()) {
+            long destinationId = entry.getKey();
+            Destination destination = entry.getValue();
             Collection<String> requested = destination
                 .chooseFromTargetTypes(possibleTargetTypes);
             if (!requested.isEmpty()) {
@@ -169,14 +181,14 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry {
                     List<Transformation> ts = path.getTransformations();
                     int i = 0;
                     boolean allowed = true;
-                    DOMSource transformed = aEvent;
+                    DOMSource transformed = aInfo.getEvent();
                     while (i < ts.size() && allowed && transformed != null) {
                         Transformation t = ts.get(i);
                         DOMSource orig = transformed;
                         transformed = t.transform(transformed);
                         if (transformed == null) {
-                            transformationReturnedNull(aSource, aEvent,
-                                aInputType, t, orig);
+                            transformationReturnedNull(aInfo.getSource(),
+                                aInfo.getEvent(), aInputType, t, orig);
                         }
 
                         if (!isAllowedByFilters(t.getToType(), transformed)) {
@@ -188,6 +200,8 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry {
                         // all transformations done and all filters still
                         // allow the event.
                         boolean result = destination.receive(transformed);
+                        listener.delivered(aInfo, ts, destinationId,
+                            destination.getName(), result);
                         delivered = delivered || result;
 
                     }
@@ -197,11 +211,12 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry {
         return delivered;
     }
 
-    private List<String> determineFilteredInputTypes(DOMSource aEvent) {
-        List<String> types = determineDocumentTypes(aEvent);
+    private List<String> determineFilteredInputTypes(List<String> aTypes,
+        DOMSource aEvent) {
+
         // apply filters to the input
         List<String> filteredTypes = new ArrayList<String>();
-        for (String type : types) {
+        for (String type : aTypes) {
             boolean allowed = isAllowedByFilters(type, aEvent);
             if (allowed) {
                 filteredTypes.add(type);
@@ -263,7 +278,7 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry {
     @Override
     public Id<Destination> registerDestination(Destination aDestination) {
         notNull("destination", aDestination);
-        int seqno = sequenceNumbers.getAndIncrement();
+        long seqno = sequenceNumbers.getAndIncrement();
         Id<Destination> id = new Id<Destination>(seqno);
         destinations.put(seqno, new RobustDestination(id, aDestination));
         return id;