X-Git-Url: http://wamblee.org/gitweb/?a=blobdiff_plain;f=impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fwamblee%2Fxmlrouter%2Fimpl%2FXMLRouter.java;fp=impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fwamblee%2Fxmlrouter%2Fimpl%2FXMLRouter.java;h=85a51df010f8180452b03aabc799ffd89375029d;hb=f7f3bbbc63a9e177f56064d821dc5f502dee378e;hp=0000000000000000000000000000000000000000;hpb=9dbda773fb0f33b7022a044f0e4cbc0e64f1929e;p=xmlrouter diff --git a/impl/src/main/java/org/wamblee/xmlrouter/impl/XMLRouter.java b/impl/src/main/java/org/wamblee/xmlrouter/impl/XMLRouter.java new file mode 100644 index 0000000..85a51df --- /dev/null +++ b/impl/src/main/java/org/wamblee/xmlrouter/impl/XMLRouter.java @@ -0,0 +1,265 @@ +package org.wamblee.xmlrouter.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.xml.transform.dom.DOMSource; + +import org.wamblee.xml.XMLDocument; +import org.wamblee.xmlrouter.common.Id; +import org.wamblee.xmlrouter.config.Config; +import org.wamblee.xmlrouter.config.DocumentType; +import org.wamblee.xmlrouter.config.Filter; +import org.wamblee.xmlrouter.config.Transformation; +import org.wamblee.xmlrouter.publish.Gateway; +import org.wamblee.xmlrouter.subscribe.Destination; +import org.wamblee.xmlrouter.subscribe.DestinationRegistry; + +// TODO concurrency. + +public class XMLRouter implements Config, Gateway, DestinationRegistry { + + private static final Logger LOGGER = Logger.getLogger(XMLRouter.class + .getName()); + + private AtomicInteger sequenceNumbers; + private Map documentTypes; + private Transformations transformations; + private Map filters; + private Map destinations; + + public XMLRouter() { + sequenceNumbers = new AtomicInteger(1); + documentTypes = new LinkedHashMap(); + transformations = new Transformations(); + filters = new LinkedHashMap(); + destinations = new LinkedHashMap(); + } + + @Override + public Id addDocumentType(DocumentType aType) { + int seqno = sequenceNumbers.getAndIncrement(); + documentTypes.put(seqno, aType); + return new Id(seqno); + } + + @Override + public void removeDocumentType(Id aId) { + documentTypes.remove(aId); + } + + @Override + public Collection getDocumentTypes() { + return Collections.unmodifiableCollection(documentTypes.values()); + } + + @Override + public Id addTransformation(Transformation aTransformation) { + return transformations.addTransformation(aTransformation); + } + + @Override + public void removeTransformation(Id aId) { + transformations.removeTransformation(aId); + } + + @Override + public Collection getTransformations() { + return transformations.getTransformations(); + } + + @Override + public Id addFilter(Filter aFilter) { + int seqno = sequenceNumbers.getAndIncrement(); + filters.put(seqno, aFilter); + return new Id(seqno); + } + + @Override + public void removeFilter(Id aId) { + filters.remove(aId); + } + + @Override + public Collection getFilters() { + return Collections.unmodifiableCollection(filters.values()); + } + + @Override + public boolean publish(String aSource, DOMSource aEvent) { + + boolean delivered = false; + try { + + List filteredInputTypes = determineFilteredInputTypes(aEvent); + if (filteredInputTypes.isEmpty()) { + if (LOGGER.isLoggable(Level.FINE)) { + String doc = new XMLDocument(aEvent).print(true); + LOGGER + .log( + Level.FINE, + "Event ''0}'' from source {1} removed because of filters.", + new Object[] { doc, aSource }); + } + return delivered; + } + + // get the reachable target types through transformations. + + // It is possible that a given event belongs to multiple input + // types. + // This is however certainly not the main case. + + for (String inputType : filteredInputTypes) { + boolean result = deliverEvent(aSource, aEvent, inputType); + delivered = delivered || result; + } + } finally { + if (!delivered) { + destinationNotFound(aSource, aEvent); + } + return delivered; + } + } + + private boolean deliverEvent(String aSource, DOMSource aEvent, + String aInputType) { + + boolean delivered = false; + Set possibleTargetTypes = new HashSet(); + possibleTargetTypes.addAll(transformations + .getPossibleTargetTypes(aInputType)); + + // ask each destination what target types, if any they want to have. + for (Destination destination : destinations.values()) { + Collection requested = destination + .chooseFromTargetTypes(possibleTargetTypes); + if (!requested.isEmpty()) { + // Deliver to the destination. + for (String targetType : requested) { + TransformationPath path = transformations.getPath( + aInputType, targetType); + List ts = path.getTransformations(); + int i = 0; + boolean allowed = true; + DOMSource transformed = aEvent; + while (i < ts.size() && allowed && transformed != null) { + Transformation t = ts.get(i); + DOMSource orig = transformed; + transformed = t.transform(transformed); + if (transformed == null) { + transformationReturnedNull(aSource, aEvent, + aInputType, t, orig); + } + + if (!isAllowedByFilters(t.getToType(), transformed)) { + allowed = false; + } + i++; + } + if (allowed && transformed != null) { + // all transformations done and all filters still + // allow the event. + boolean result = destination.receive(transformed); + delivered = delivered || result; + + } + } + } + } + return delivered; + } + + private List determineFilteredInputTypes(DOMSource aEvent) { + List types = determineDocumentTypes(aEvent); + // apply filters to the input + List filteredTypes = new ArrayList(); + for (String type : types) { + boolean allowed = isAllowedByFilters(type, aEvent); + if (allowed) { + filteredTypes.add(type); + } + } + return filteredTypes; + } + + private boolean isAllowedByFilters(String aType, DOMSource aEvent) { + boolean allowed = true; + for (Filter filter : filters.values()) { + if (!filter.isAllowed(aType, aEvent)) { + allowed = false; + } + } + return allowed; + } + + private List determineDocumentTypes(DOMSource aEvent) { + List res = new ArrayList(); + for (DocumentType type : documentTypes.values()) { + if (type.isInstance(aEvent)) { + res.add(type.getName()); + } + } + return res; + } + + private void logEvent(String aMessage, String aSource, DOMSource aEvent, + Exception aException) { + LOGGER.log(Level.WARNING, aMessage + ": source '" + aSource + + "': Event: '" + new XMLDocument(aEvent).print(true) + "'", + aException); + } + + private void logEvent(String aMessage, String aSource, DOMSource aEvent) { + LOGGER.log(Level.WARNING, + aMessage + ": " + eventToString(aSource, aEvent)); + } + + private String eventToString(String aSource, DOMSource aEvent) { + return "source '" + aSource + "': Event: '" + + new XMLDocument(aEvent).print(true) + "'"; + } + + private void transformationReturnedNull(String aSource, DOMSource aEvent, + String aInputType, Transformation aT, DOMSource aTransformed) { + LOGGER.log(Level.WARNING, "Transformation returned null for event " + + eventToString(aSource, aEvent) + " inputType '" + aInputType + + "', transformation '" + aT + "' document to transform " + + new XMLDocument(aTransformed).print(true)); + } + + private void destinationNotFound(String aSource, DOMSource aEvent) { + LOGGER.log(Level.WARNING, "No destination found for event: " + + eventToString(aSource, aEvent)); + } + + @Override + public Id registerDestination(Destination aDestination) { + notNull("destination", aDestination); + int seqno = sequenceNumbers.getAndIncrement(); + Id id = new Id(seqno); + destinations.put(seqno, new RobustDestination(id, aDestination)); + return id; + } + + @Override + public void unregisterDestination(Id aId) { + destinations.remove(aId.getId()); + } + + private void notNull(String aName, Object aValue) { + if (aValue == null) { + throw new IllegalArgumentException("Parameter '" + aName + + "' may not be null"); + } + } +}