+++ /dev/null
-/*
- * Copyright 2005-2011 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wamblee.xmlrouter.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.xml.transform.dom.DOMSource;
-
-import org.wamblee.general.Clock;
-import org.wamblee.general.Pair;
-import org.wamblee.xml.XMLDocument;
-import org.wamblee.xmlrouter.common.Id;
-import org.wamblee.xmlrouter.config.DocumentType;
-import org.wamblee.xmlrouter.config.Filter;
-import org.wamblee.xmlrouter.config.Transformation;
-import org.wamblee.xmlrouter.listener.EventInfo;
-import org.wamblee.xmlrouter.listener.EventListener;
-import org.wamblee.xmlrouter.publish.Gateway;
-import org.wamblee.xmlrouter.subscribe.Destination;
-import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
-
-/**
- * The XML Router.
- *
- * @author Erik Brakkee
- *
- */
-public class XMLRouter implements Gateway, DestinationRegistry {
-
- private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
- .getName());
-
- private AtomicLong sequenceNumbers;
- private EventListener listener;
- private Clock clock;
- private AtomicLong nextEventId;
-
- private XMLRouterConfiguration config;
-
- private Map<Id<Destination>, Destination> destinations;
-
- public XMLRouter(Clock aClock, XMLRouterConfiguration aConfig,
- EventListener aListener) {
- sequenceNumbers = new AtomicLong(1);
- listener = aListener;
- clock = aClock;
- nextEventId = new AtomicLong(clock.currentTimeMillis());
- config = aConfig;
- destinations = new LinkedHashMap<Id<Destination>, Destination>();
- }
-
- @Override
- public void publish(String aSource, DOMSource aEvent) {
- long time = clock.currentTimeMillis();
-
- Pair<ExtendedRouterConfig, TransformationPaths> snapshotconfig = config
- .getConfig();
-
- Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement() + "");
- List<String> types = determineDocumentTypes(snapshotconfig.getFirst()
- .documentTypeConfig().values(), aEvent);
- EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
-
- boolean delivered = false;
- try {
-
- List<String> filteredInputTypes = determineFilteredInputTypes(
- snapshotconfig.getFirst().filterConfig().values(), types,
- aEvent);
- if (filteredInputTypes.isEmpty()) {
- if (LOGGER.isLoggable(Level.FINE)) {
- String doc = new XMLDocument(aEvent).print(true);
- LOGGER
- .log(
- Level.FINE,
- "Event ''{0}'' from source ''{1}'' removed because of filters.",
- new Object[] { doc, aSource });
- }
- }
-
- // get the reachable target types through transformations.
-
- // It is possible that a given event belongs to multiple input
- // types.
- // This is however certainly not the main case.
-
- for (String inputType : filteredInputTypes) {
- boolean result = deliverEvent(snapshotconfig.getFirst()
- .filterConfig().values(), snapshotconfig.getSecond(), info,
- inputType);
- delivered = delivered || result;
- }
- } finally {
- if (!delivered) {
- destinationNotFound(aSource, aEvent);
- listener.notDelivered(info);
- }
- }
- }
-
- private boolean deliverEvent(Collection<Filter> aFilters,
- TransformationPaths aTransformations, EventInfo aInfo, String aInputType) {
-
- boolean delivered = false;
- Set<String> possibleTargetTypes = new HashSet<String>();
- possibleTargetTypes.addAll(aTransformations
- .getPossibleTargetTypes(aInputType));
-
- // ask each destination what target types, if any they want to have.
- for (Map.Entry<Id<Destination>, Destination> entry : destinations
- .entrySet()) {
- Id<Destination> destinationId = entry.getKey();
- Destination destination = entry.getValue();
- Collection<String> requested = destination
- .chooseFromTargetTypes(possibleTargetTypes);
- if (!requested.isEmpty()) {
- // Deliver to the destination.
- for (String targetType : requested) {
- TransformationPath path = aTransformations.getPath(
- aInputType, targetType);
- List<Transformation> ts = path.getTransformations();
- int i = 0;
- boolean allowed = true;
- DOMSource transformed = aInfo.getEvent();
- while (i < ts.size() && allowed && transformed != null) {
- Transformation t = ts.get(i);
- DOMSource orig = transformed;
- transformed = t.transform(transformed);
- if (transformed == null) {
- transformationReturnedNull(aInfo.getSource(),
- aInfo.getEvent(), aInputType, t, orig);
- }
-
- if (!isAllowedByFilters(aFilters, t.getToType(),
- transformed)) {
- allowed = false;
- }
- i++;
- }
- if (allowed && transformed != null) {
- // all transformations done and all filters still
- // allow the event.
- boolean result = destination.receive(transformed);
- listener.delivered(aInfo, ts, destinationId.getId(),
- result);
- delivered = delivered || result;
-
- }
- }
- }
- }
- return delivered;
- }
-
- private List<String> determineFilteredInputTypes(
- Collection<Filter> aFilters, List<String> aTypes, DOMSource aEvent) {
-
- // apply filters to the input
- List<String> filteredTypes = new ArrayList<String>();
- for (String type : aTypes) {
- boolean allowed = isAllowedByFilters(aFilters, type, aEvent);
- if (allowed) {
- filteredTypes.add(type);
- }
- }
- return filteredTypes;
- }
-
- private boolean isAllowedByFilters(Collection<Filter> aFilters,
- String aType, DOMSource aEvent) {
- boolean allowed = true;
- for (Filter filter : aFilters) {
- if (!filter.isAllowed(aType, aEvent)) {
- allowed = false;
- }
- }
- return allowed;
- }
-
- private List<String> determineDocumentTypes(
- Collection<DocumentType> aTypes, DOMSource aEvent) {
- List<String> res = new ArrayList<String>();
- for (DocumentType type : aTypes) {
- if (type.isInstance(aEvent)) {
- res.add(type.getName());
- }
- }
- return res;
- }
-
- private String eventToString(String aSource, DOMSource aEvent) {
- return "source '" + aSource + "': Event: '" +
- new XMLDocument(aEvent).print(true) + "'";
- }
-
- private void transformationReturnedNull(String aSource, DOMSource aEvent,
- String aInputType, Transformation aT, DOMSource aTransformed) {
- LOGGER.log(Level.WARNING, "Transformation returned null for event " +
- eventToString(aSource, aEvent) + " inputType '" + aInputType +
- "', transformation '" + aT + "' document to transform " +
- new XMLDocument(aTransformed).print(true));
- }
-
- private void destinationNotFound(String aSource, DOMSource aEvent) {
- LOGGER.log(Level.WARNING, "No destination found for event: " +
- eventToString(aSource, aEvent));
- }
-
- @Override
- public Id<Destination> registerDestination(Destination aDestination) {
- notNull("destination", aDestination);
- long seqno = sequenceNumbers.getAndIncrement();
- Id<Destination> id = new Id<Destination>(seqno + "");
- destinations.put(id, new RobustDestination(id, aDestination));
- return id;
- }
-
- @Override
- public void unregisterDestination(Id<Destination> aId) {
- destinations.remove(aId);
- }
-
- private void notNull(String aName, Object aValue) {
- if (aValue == null) {
- throw new IllegalArgumentException("Parameter '" + aName +
- "' may not be null");
- }
- }
-}