package org.wamblee.xmlrouter.impl; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import javax.xml.transform.dom.DOMSource; import org.wamblee.xml.XMLDocument; import org.wamblee.xmlrouter.common.Id; import org.wamblee.xmlrouter.config.Config; import org.wamblee.xmlrouter.config.DocumentType; import org.wamblee.xmlrouter.config.Filter; import org.wamblee.xmlrouter.config.Transformation; import org.wamblee.xmlrouter.publish.Gateway; import org.wamblee.xmlrouter.subscribe.Destination; import org.wamblee.xmlrouter.subscribe.DestinationRegistry; // TODO concurrency. public class XMLRouter implements Config, Gateway, DestinationRegistry { private static final Logger LOGGER = Logger.getLogger(XMLRouter.class .getName()); private AtomicInteger sequenceNumbers; private Map documentTypes; private Transformations transformations; private Map filters; private Map destinations; public XMLRouter() { sequenceNumbers = new AtomicInteger(1); documentTypes = new LinkedHashMap(); transformations = new Transformations(); filters = new LinkedHashMap(); destinations = new LinkedHashMap(); } @Override public Id addDocumentType(DocumentType aType) { int seqno = sequenceNumbers.getAndIncrement(); documentTypes.put(seqno, aType); return new Id(seqno); } @Override public void removeDocumentType(Id aId) { documentTypes.remove(aId); } @Override public Collection getDocumentTypes() { return Collections.unmodifiableCollection(documentTypes.values()); } @Override public Id addTransformation(Transformation aTransformation) { return transformations.addTransformation(aTransformation); } @Override public void removeTransformation(Id aId) { transformations.removeTransformation(aId); } @Override public Collection getTransformations() { return transformations.getTransformations(); } @Override public Id addFilter(Filter aFilter) { int seqno = sequenceNumbers.getAndIncrement(); filters.put(seqno, aFilter); return new Id(seqno); } @Override public void removeFilter(Id aId) { filters.remove(aId); } @Override public Collection getFilters() { return Collections.unmodifiableCollection(filters.values()); } @Override public boolean publish(String aSource, DOMSource aEvent) { boolean delivered = false; try { List filteredInputTypes = determineFilteredInputTypes(aEvent); if (filteredInputTypes.isEmpty()) { if (LOGGER.isLoggable(Level.FINE)) { String doc = new XMLDocument(aEvent).print(true); LOGGER .log( Level.FINE, "Event ''0}'' from source {1} removed because of filters.", new Object[] { doc, aSource }); } return delivered; } // get the reachable target types through transformations. // It is possible that a given event belongs to multiple input // types. // This is however certainly not the main case. for (String inputType : filteredInputTypes) { boolean result = deliverEvent(aSource, aEvent, inputType); delivered = delivered || result; } } finally { if (!delivered) { destinationNotFound(aSource, aEvent); } return delivered; } } private boolean deliverEvent(String aSource, DOMSource aEvent, String aInputType) { boolean delivered = false; Set possibleTargetTypes = new HashSet(); possibleTargetTypes.addAll(transformations .getPossibleTargetTypes(aInputType)); // ask each destination what target types, if any they want to have. for (Destination destination : destinations.values()) { Collection requested = destination .chooseFromTargetTypes(possibleTargetTypes); if (!requested.isEmpty()) { // Deliver to the destination. for (String targetType : requested) { TransformationPath path = transformations.getPath( aInputType, targetType); List ts = path.getTransformations(); int i = 0; boolean allowed = true; DOMSource transformed = aEvent; while (i < ts.size() && allowed && transformed != null) { Transformation t = ts.get(i); DOMSource orig = transformed; transformed = t.transform(transformed); if (transformed == null) { transformationReturnedNull(aSource, aEvent, aInputType, t, orig); } if (!isAllowedByFilters(t.getToType(), transformed)) { allowed = false; } i++; } if (allowed && transformed != null) { // all transformations done and all filters still // allow the event. boolean result = destination.receive(transformed); delivered = delivered || result; } } } } return delivered; } private List determineFilteredInputTypes(DOMSource aEvent) { List types = determineDocumentTypes(aEvent); // apply filters to the input List filteredTypes = new ArrayList(); for (String type : types) { boolean allowed = isAllowedByFilters(type, aEvent); if (allowed) { filteredTypes.add(type); } } return filteredTypes; } private boolean isAllowedByFilters(String aType, DOMSource aEvent) { boolean allowed = true; for (Filter filter : filters.values()) { if (!filter.isAllowed(aType, aEvent)) { allowed = false; } } return allowed; } private List determineDocumentTypes(DOMSource aEvent) { List res = new ArrayList(); for (DocumentType type : documentTypes.values()) { if (type.isInstance(aEvent)) { res.add(type.getName()); } } return res; } private void logEvent(String aMessage, String aSource, DOMSource aEvent, Exception aException) { LOGGER.log(Level.WARNING, aMessage + ": source '" + aSource + "': Event: '" + new XMLDocument(aEvent).print(true) + "'", aException); } private void logEvent(String aMessage, String aSource, DOMSource aEvent) { LOGGER.log(Level.WARNING, aMessage + ": " + eventToString(aSource, aEvent)); } private String eventToString(String aSource, DOMSource aEvent) { return "source '" + aSource + "': Event: '" + new XMLDocument(aEvent).print(true) + "'"; } private void transformationReturnedNull(String aSource, DOMSource aEvent, String aInputType, Transformation aT, DOMSource aTransformed) { LOGGER.log(Level.WARNING, "Transformation returned null for event " + eventToString(aSource, aEvent) + " inputType '" + aInputType + "', transformation '" + aT + "' document to transform " + new XMLDocument(aTransformed).print(true)); } private void destinationNotFound(String aSource, DOMSource aEvent) { LOGGER.log(Level.WARNING, "No destination found for event: " + eventToString(aSource, aEvent)); } @Override public Id registerDestination(Destination aDestination) { notNull("destination", aDestination); int seqno = sequenceNumbers.getAndIncrement(); Id id = new Id(seqno); destinations.put(seqno, new RobustDestination(id, aDestination)); return id; } @Override public void unregisterDestination(Id aId) { destinations.remove(aId.getId()); } private void notNull(String aName, Object aValue) { if (aValue == null) { throw new IllegalArgumentException("Parameter '" + aName + "' may not be null"); } } }