X-Git-Url: http://wamblee.org/gitweb/?a=blobdiff_plain;f=impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fwamblee%2Fxmlrouter%2Fimpl%2FXMLRouter.java;h=6b460d1dea203495d7ef258afec33c75e86c71ab;hb=20807d81708bd33b3b5a4616fadcf3ae91bf9508;hp=85a51df010f8180452b03aabc799ffd89375029d;hpb=f7f3bbbc63a9e177f56064d821dc5f502dee378e;p=xmlrouter diff --git a/impl/src/main/java/org/wamblee/xmlrouter/impl/XMLRouter.java b/impl/src/main/java/org/wamblee/xmlrouter/impl/XMLRouter.java index 85a51df..6b460d1 100644 --- a/impl/src/main/java/org/wamblee/xmlrouter/impl/XMLRouter.java +++ b/impl/src/main/java/org/wamblee/xmlrouter/impl/XMLRouter.java @@ -1,3 +1,18 @@ +/* + * Copyright 2005-2011 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.wamblee.xmlrouter.impl; import java.util.ArrayList; @@ -8,18 +23,21 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import javax.xml.transform.dom.DOMSource; +import org.wamblee.general.Clock; import org.wamblee.xml.XMLDocument; import org.wamblee.xmlrouter.common.Id; import org.wamblee.xmlrouter.config.Config; import org.wamblee.xmlrouter.config.DocumentType; import org.wamblee.xmlrouter.config.Filter; import org.wamblee.xmlrouter.config.Transformation; +import org.wamblee.xmlrouter.listener.EventInfo; +import org.wamblee.xmlrouter.listener.EventListener; import org.wamblee.xmlrouter.publish.Gateway; import org.wamblee.xmlrouter.subscribe.Destination; import org.wamblee.xmlrouter.subscribe.DestinationRegistry; @@ -31,23 +49,29 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry { private static final Logger LOGGER = Logger.getLogger(XMLRouter.class .getName()); - private AtomicInteger sequenceNumbers; - private Map documentTypes; + private EventListener listener; + private Clock clock; + private AtomicLong nextEventId; + private AtomicLong sequenceNumbers; + private Map documentTypes; private Transformations transformations; - private Map filters; - private Map destinations; - - public XMLRouter() { - sequenceNumbers = new AtomicInteger(1); - documentTypes = new LinkedHashMap(); + private Map filters; + private Map destinations; + + public XMLRouter(Clock aClock, EventListener aListener) { + listener = aListener; + clock = aClock; + nextEventId = new AtomicLong(clock.currentTimeMillis()); + sequenceNumbers = new AtomicLong(1); + documentTypes = new LinkedHashMap(); transformations = new Transformations(); - filters = new LinkedHashMap(); - destinations = new LinkedHashMap(); + filters = new LinkedHashMap(); + destinations = new LinkedHashMap(); } @Override public Id addDocumentType(DocumentType aType) { - int seqno = sequenceNumbers.getAndIncrement(); + long seqno = sequenceNumbers.getAndIncrement(); documentTypes.put(seqno, aType); return new Id(seqno); } @@ -79,7 +103,7 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry { @Override public Id addFilter(Filter aFilter) { - int seqno = sequenceNumbers.getAndIncrement(); + long seqno = sequenceNumbers.getAndIncrement(); filters.put(seqno, aFilter); return new Id(seqno); } @@ -95,12 +119,18 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry { } @Override - public boolean publish(String aSource, DOMSource aEvent) { + public void publish(String aSource, DOMSource aEvent) { + + long time = clock.currentTimeMillis(); + Id id = new Id(nextEventId.getAndIncrement()); + List types = determineDocumentTypes(aEvent); + EventInfo info = new EventInfo(time, aSource, id, types, aEvent); boolean delivered = false; try { - List filteredInputTypes = determineFilteredInputTypes(aEvent); + List filteredInputTypes = determineFilteredInputTypes( + types, aEvent); if (filteredInputTypes.isEmpty()) { if (LOGGER.isLoggable(Level.FINE)) { String doc = new XMLDocument(aEvent).print(true); @@ -110,7 +140,6 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry { "Event ''0}'' from source {1} removed because of filters.", new Object[] { doc, aSource }); } - return delivered; } // get the reachable target types through transformations. @@ -120,19 +149,18 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry { // This is however certainly not the main case. for (String inputType : filteredInputTypes) { - boolean result = deliverEvent(aSource, aEvent, inputType); + boolean result = deliverEvent(info, inputType); delivered = delivered || result; } } finally { if (!delivered) { destinationNotFound(aSource, aEvent); + listener.notDelivered(info); } - return delivered; } } - private boolean deliverEvent(String aSource, DOMSource aEvent, - String aInputType) { + private boolean deliverEvent(EventInfo aInfo, String aInputType) { boolean delivered = false; Set possibleTargetTypes = new HashSet(); @@ -140,7 +168,9 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry { .getPossibleTargetTypes(aInputType)); // ask each destination what target types, if any they want to have. - for (Destination destination : destinations.values()) { + for (Map.Entry entry : destinations.entrySet()) { + long destinationId = entry.getKey(); + Destination destination = entry.getValue(); Collection requested = destination .chooseFromTargetTypes(possibleTargetTypes); if (!requested.isEmpty()) { @@ -151,14 +181,14 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry { List ts = path.getTransformations(); int i = 0; boolean allowed = true; - DOMSource transformed = aEvent; + DOMSource transformed = aInfo.getEvent(); while (i < ts.size() && allowed && transformed != null) { Transformation t = ts.get(i); DOMSource orig = transformed; transformed = t.transform(transformed); if (transformed == null) { - transformationReturnedNull(aSource, aEvent, - aInputType, t, orig); + transformationReturnedNull(aInfo.getSource(), + aInfo.getEvent(), aInputType, t, orig); } if (!isAllowedByFilters(t.getToType(), transformed)) { @@ -170,6 +200,8 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry { // all transformations done and all filters still // allow the event. boolean result = destination.receive(transformed); + listener.delivered(aInfo, ts, destinationId, + destination.getName(), result); delivered = delivered || result; } @@ -179,11 +211,12 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry { return delivered; } - private List determineFilteredInputTypes(DOMSource aEvent) { - List types = determineDocumentTypes(aEvent); + private List determineFilteredInputTypes(List aTypes, + DOMSource aEvent) { + // apply filters to the input List filteredTypes = new ArrayList(); - for (String type : types) { + for (String type : aTypes) { boolean allowed = isAllowedByFilters(type, aEvent); if (allowed) { filteredTypes.add(type); @@ -245,7 +278,7 @@ public class XMLRouter implements Config, Gateway, DestinationRegistry { @Override public Id registerDestination(Destination aDestination) { notNull("destination", aDestination); - int seqno = sequenceNumbers.getAndIncrement(); + long seqno = sequenceNumbers.getAndIncrement(); Id id = new Id(seqno); destinations.put(seqno, new RobustDestination(id, aDestination)); return id;