+/*
+ * Copyright 2005-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.wamblee.xmlrouter.impl;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.transform.dom.DOMSource;
+import org.wamblee.general.Clock;
import org.wamblee.xml.XMLDocument;
import org.wamblee.xmlrouter.common.Id;
-import org.wamblee.xmlrouter.config.Config;
import org.wamblee.xmlrouter.config.DocumentType;
import org.wamblee.xmlrouter.config.Filter;
import org.wamblee.xmlrouter.config.Transformation;
+import org.wamblee.xmlrouter.listener.EventInfo;
+import org.wamblee.xmlrouter.listener.EventListener;
import org.wamblee.xmlrouter.publish.Gateway;
import org.wamblee.xmlrouter.subscribe.Destination;
import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
-// TODO concurrency.
+// TODO check intermediate types during transformation based on filters.
-public class XMLRouter implements Config, Gateway, DestinationRegistry {
+/**
+ * The XML Router.
+ *
+ * @author Erik Brakkee
+ *
+ */
+public class XMLRouter implements Gateway, DestinationRegistry {
private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
.getName());
- private AtomicInteger sequenceNumbers;
- private Map<Integer, DocumentType> documentTypes;
- private Transformations transformations;
- private Map<Integer, Filter> filters;
- private Map<Integer, Destination> destinations;
-
- public XMLRouter() {
- sequenceNumbers = new AtomicInteger(1);
- documentTypes = new LinkedHashMap<Integer, DocumentType>();
- transformations = new Transformations();
- filters = new LinkedHashMap<Integer, Filter>();
- destinations = new LinkedHashMap<Integer, Destination>();
- }
-
- @Override
- public Id<DocumentType> addDocumentType(DocumentType aType) {
- int seqno = sequenceNumbers.getAndIncrement();
- documentTypes.put(seqno, aType);
- return new Id<DocumentType>(seqno);
- }
-
- @Override
- public void removeDocumentType(Id<DocumentType> aId) {
- documentTypes.remove(aId);
- }
-
- @Override
- public Collection<DocumentType> getDocumentTypes() {
- return Collections.unmodifiableCollection(documentTypes.values());
- }
+ private AtomicLong sequenceNumbers;
+ private EventListener listener;
+ private Clock clock;
+ private AtomicLong nextEventId;
- @Override
- public Id<Transformation> addTransformation(Transformation aTransformation) {
- return transformations.addTransformation(aTransformation);
- }
+ private XMLRouterConfiguration config;
- @Override
- public void removeTransformation(Id<Transformation> aId) {
- transformations.removeTransformation(aId);
- }
+ private Map<Id<Destination>, Destination> destinations;
- @Override
- public Collection<Transformation> getTransformations() {
- return transformations.getTransformations();
+ public XMLRouter(Clock aClock, XMLRouterConfiguration aConfig,
+ EventListener aListener) {
+ sequenceNumbers = new AtomicLong(1);
+ listener = aListener;
+ clock = aClock;
+ nextEventId = new AtomicLong(clock.currentTimeMillis());
+ config = aConfig;
+ destinations = new LinkedHashMap<Id<Destination>, Destination>();
}
@Override
- public Id<Filter> addFilter(Filter aFilter) {
- int seqno = sequenceNumbers.getAndIncrement();
- filters.put(seqno, aFilter);
- return new Id<Filter>(seqno);
- }
-
- @Override
- public void removeFilter(Id<Filter> aId) {
- filters.remove(aId);
+ public void publish(String aSource, DOMSource aEvent) {
+ config.startPublishEvent();
+ try {
+ publishImpl(aSource, aEvent);
+ } finally {
+ config.endPublishEvent();
+ }
}
- @Override
- public Collection<Filter> getFilters() {
- return Collections.unmodifiableCollection(filters.values());
- }
+ private void publishImpl(String aSource, DOMSource aEvent) {
+ long time = clock.currentTimeMillis();
- @Override
- public boolean publish(String aSource, DOMSource aEvent) {
+ Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement() + "");
+ List<String> types = determineDocumentTypes(aEvent);
+ EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
boolean delivered = false;
try {
- List<String> filteredInputTypes = determineFilteredInputTypes(aEvent);
+ List<String> filteredInputTypes = determineFilteredInputTypes(
+ types, aEvent);
if (filteredInputTypes.isEmpty()) {
if (LOGGER.isLoggable(Level.FINE)) {
String doc = new XMLDocument(aEvent).print(true);
"Event ''0}'' from source {1} removed because of filters.",
new Object[] { doc, aSource });
}
- return delivered;
}
// get the reachable target types through transformations.
// This is however certainly not the main case.
for (String inputType : filteredInputTypes) {
- boolean result = deliverEvent(aSource, aEvent, inputType);
+ boolean result = deliverEvent(info, inputType);
delivered = delivered || result;
}
} finally {
if (!delivered) {
destinationNotFound(aSource, aEvent);
+ listener.notDelivered(info);
}
- return delivered;
}
}
- private boolean deliverEvent(String aSource, DOMSource aEvent,
- String aInputType) {
+ private boolean deliverEvent(EventInfo aInfo, String aInputType) {
boolean delivered = false;
Set<String> possibleTargetTypes = new HashSet<String>();
- possibleTargetTypes.addAll(transformations
+ possibleTargetTypes.addAll(config.getTransformations()
.getPossibleTargetTypes(aInputType));
// ask each destination what target types, if any they want to have.
- for (Destination destination : destinations.values()) {
+ for (Map.Entry<Id<Destination>, Destination> entry : destinations
+ .entrySet()) {
+ Id<Destination> destinationId = entry.getKey();
+ Destination destination = entry.getValue();
Collection<String> requested = destination
.chooseFromTargetTypes(possibleTargetTypes);
if (!requested.isEmpty()) {
// Deliver to the destination.
for (String targetType : requested) {
- TransformationPath path = transformations.getPath(
- aInputType, targetType);
+ TransformationPath path = config.getTransformations()
+ .getPath(aInputType, targetType);
List<Transformation> ts = path.getTransformations();
int i = 0;
boolean allowed = true;
- DOMSource transformed = aEvent;
+ DOMSource transformed = aInfo.getEvent();
while (i < ts.size() && allowed && transformed != null) {
Transformation t = ts.get(i);
DOMSource orig = transformed;
transformed = t.transform(transformed);
if (transformed == null) {
- transformationReturnedNull(aSource, aEvent,
- aInputType, t, orig);
+ transformationReturnedNull(aInfo.getSource(),
+ aInfo.getEvent(), aInputType, t, orig);
}
if (!isAllowedByFilters(t.getToType(), transformed)) {
// all transformations done and all filters still
// allow the event.
boolean result = destination.receive(transformed);
+ listener.delivered(aInfo, ts, destinationId.getId(),
+ result);
delivered = delivered || result;
}
return delivered;
}
- private List<String> determineFilteredInputTypes(DOMSource aEvent) {
- List<String> types = determineDocumentTypes(aEvent);
+ private List<String> determineFilteredInputTypes(List<String> aTypes,
+ DOMSource aEvent) {
+
// apply filters to the input
List<String> filteredTypes = new ArrayList<String>();
- for (String type : types) {
+ for (String type : aTypes) {
boolean allowed = isAllowedByFilters(type, aEvent);
if (allowed) {
filteredTypes.add(type);
private boolean isAllowedByFilters(String aType, DOMSource aEvent) {
boolean allowed = true;
- for (Filter filter : filters.values()) {
+ for (Filter filter : config.getRouterConfig().filterConfig().values()) {
if (!filter.isAllowed(aType, aEvent)) {
allowed = false;
}
private List<String> determineDocumentTypes(DOMSource aEvent) {
List<String> res = new ArrayList<String>();
- for (DocumentType type : documentTypes.values()) {
+ for (DocumentType type : config.getRouterConfig().documentTypeConfig()
+ .values()) {
if (type.isInstance(aEvent)) {
res.add(type.getName());
}
return res;
}
- private void logEvent(String aMessage, String aSource, DOMSource aEvent,
- Exception aException) {
- LOGGER.log(Level.WARNING, aMessage + ": source '" + aSource +
- "': Event: '" + new XMLDocument(aEvent).print(true) + "'",
- aException);
- }
-
- private void logEvent(String aMessage, String aSource, DOMSource aEvent) {
- LOGGER.log(Level.WARNING,
- aMessage + ": " + eventToString(aSource, aEvent));
- }
-
private String eventToString(String aSource, DOMSource aEvent) {
return "source '" + aSource + "': Event: '" +
new XMLDocument(aEvent).print(true) + "'";
@Override
public Id<Destination> registerDestination(Destination aDestination) {
notNull("destination", aDestination);
- int seqno = sequenceNumbers.getAndIncrement();
- Id<Destination> id = new Id<Destination>(seqno);
- destinations.put(seqno, new RobustDestination(id, aDestination));
+ long seqno = sequenceNumbers.getAndIncrement();
+ Id<Destination> id = new Id<Destination>(seqno + "");
+ destinations.put(id, new RobustDestination(id, aDestination));
return id;
}
@Override
public void unregisterDestination(Id<Destination> aId) {
- destinations.remove(aId.getId());
+ destinations.remove(aId);
}
private void notNull(String aName, Object aValue) {