/* * Copyright 2005-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.wamblee.xmlrouter.impl; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import javax.xml.transform.dom.DOMSource; import org.wamblee.general.Clock; import org.wamblee.xml.XMLDocument; import org.wamblee.xmlrouter.common.Id; import org.wamblee.xmlrouter.config.DocumentType; import org.wamblee.xmlrouter.config.Filter; import org.wamblee.xmlrouter.config.Transformation; import org.wamblee.xmlrouter.listener.EventInfo; import org.wamblee.xmlrouter.listener.EventListener; import org.wamblee.xmlrouter.publish.Gateway; import org.wamblee.xmlrouter.subscribe.Destination; import org.wamblee.xmlrouter.subscribe.DestinationRegistry; // TODO check intermediate types during transformation based on filters. /** * The XML Router. * * @author Erik Brakkee * */ public class XMLRouter implements Gateway, DestinationRegistry { private static final Logger LOGGER = Logger.getLogger(XMLRouter.class .getName()); private AtomicLong sequenceNumbers; private EventListener listener; private Clock clock; private AtomicLong nextEventId; private XMLRouterConfiguration config; private Map, Destination> destinations; public XMLRouter(Clock aClock, XMLRouterConfiguration aConfig, EventListener aListener) { sequenceNumbers = new AtomicLong(1); listener = aListener; clock = aClock; nextEventId = new AtomicLong(clock.currentTimeMillis()); config = aConfig; destinations = new LinkedHashMap, Destination>(); } @Override public void publish(String aSource, DOMSource aEvent) { config.startPublishEvent(); try { publishImpl(aSource, aEvent); } finally { config.endPublishEvent(); } } private void publishImpl(String aSource, DOMSource aEvent) { long time = clock.currentTimeMillis(); Id id = new Id(nextEventId.getAndIncrement() + ""); List types = determineDocumentTypes(aEvent); EventInfo info = new EventInfo(time, aSource, id, types, aEvent); boolean delivered = false; try { List filteredInputTypes = determineFilteredInputTypes( types, aEvent); if (filteredInputTypes.isEmpty()) { if (LOGGER.isLoggable(Level.FINE)) { String doc = new XMLDocument(aEvent).print(true); LOGGER .log( Level.FINE, "Event ''0}'' from source {1} removed because of filters.", new Object[] { doc, aSource }); } } // get the reachable target types through transformations. // It is possible that a given event belongs to multiple input // types. // This is however certainly not the main case. for (String inputType : filteredInputTypes) { boolean result = deliverEvent(info, inputType); delivered = delivered || result; } } finally { if (!delivered) { destinationNotFound(aSource, aEvent); listener.notDelivered(info); } } } private boolean deliverEvent(EventInfo aInfo, String aInputType) { boolean delivered = false; Set possibleTargetTypes = new HashSet(); possibleTargetTypes.addAll(config.getTransformations() .getPossibleTargetTypes(aInputType)); // ask each destination what target types, if any they want to have. for (Map.Entry, Destination> entry : destinations .entrySet()) { Id destinationId = entry.getKey(); Destination destination = entry.getValue(); Collection requested = destination .chooseFromTargetTypes(possibleTargetTypes); if (!requested.isEmpty()) { // Deliver to the destination. for (String targetType : requested) { TransformationPath path = config.getTransformations() .getPath(aInputType, targetType); List ts = path.getTransformations(); int i = 0; boolean allowed = true; DOMSource transformed = aInfo.getEvent(); while (i < ts.size() && allowed && transformed != null) { Transformation t = ts.get(i); DOMSource orig = transformed; transformed = t.transform(transformed); if (transformed == null) { transformationReturnedNull(aInfo.getSource(), aInfo.getEvent(), aInputType, t, orig); } if (!isAllowedByFilters(t.getToType(), transformed)) { allowed = false; } i++; } if (allowed && transformed != null) { // all transformations done and all filters still // allow the event. boolean result = destination.receive(transformed); listener.delivered(aInfo, ts, destinationId.getId(), result); delivered = delivered || result; } } } } return delivered; } private List determineFilteredInputTypes(List aTypes, DOMSource aEvent) { // apply filters to the input List filteredTypes = new ArrayList(); for (String type : aTypes) { boolean allowed = isAllowedByFilters(type, aEvent); if (allowed) { filteredTypes.add(type); } } return filteredTypes; } private boolean isAllowedByFilters(String aType, DOMSource aEvent) { boolean allowed = true; for (Filter filter : config.getRouterConfig().filterConfig().values()) { if (!filter.isAllowed(aType, aEvent)) { allowed = false; } } return allowed; } private List determineDocumentTypes(DOMSource aEvent) { List res = new ArrayList(); for (DocumentType type : config.getRouterConfig().documentTypeConfig() .values()) { if (type.isInstance(aEvent)) { res.add(type.getName()); } } return res; } private String eventToString(String aSource, DOMSource aEvent) { return "source '" + aSource + "': Event: '" + new XMLDocument(aEvent).print(true) + "'"; } private void transformationReturnedNull(String aSource, DOMSource aEvent, String aInputType, Transformation aT, DOMSource aTransformed) { LOGGER.log(Level.WARNING, "Transformation returned null for event " + eventToString(aSource, aEvent) + " inputType '" + aInputType + "', transformation '" + aT + "' document to transform " + new XMLDocument(aTransformed).print(true)); } private void destinationNotFound(String aSource, DOMSource aEvent) { LOGGER.log(Level.WARNING, "No destination found for event: " + eventToString(aSource, aEvent)); } @Override public Id registerDestination(Destination aDestination) { notNull("destination", aDestination); long seqno = sequenceNumbers.getAndIncrement(); Id id = new Id(seqno + ""); destinations.put(id, new RobustDestination(id, aDestination)); return id; } @Override public void unregisterDestination(Id aId) { destinations.remove(aId); } private void notNull(String aName, Object aValue) { if (aValue == null) { throw new IllegalArgumentException("Parameter '" + aName + "' may not be null"); } } }