* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+ */
package org.wamblee.xmlrouter.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.transform.dom.DOMSource;
+import org.wamblee.general.Clock;
import org.wamblee.xml.XMLDocument;
import org.wamblee.xmlrouter.common.Id;
-import org.wamblee.xmlrouter.config.Config;
import org.wamblee.xmlrouter.config.DocumentType;
import org.wamblee.xmlrouter.config.Filter;
+import org.wamblee.xmlrouter.config.RouterConfig;
import org.wamblee.xmlrouter.config.Transformation;
+import org.wamblee.xmlrouter.listener.EventInfo;
+import org.wamblee.xmlrouter.listener.EventListener;
import org.wamblee.xmlrouter.publish.Gateway;
import org.wamblee.xmlrouter.subscribe.Destination;
import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
// TODO concurrency.
-public class XMLRouter implements Config, Gateway, DestinationRegistry {
+public class XMLRouter implements RouterConfig, Gateway, DestinationRegistry {
private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
.getName());
- private AtomicInteger sequenceNumbers;
- private Map<Integer, DocumentType> documentTypes;
+ private EventListener listener;
+ private Clock clock;
+ private AtomicLong nextEventId;
+ private AtomicLong sequenceNumbers;
+ private Map<Id<DocumentType>, DocumentType> documentTypes;
private Transformations transformations;
- private Map<Integer, Filter> filters;
- private Map<Integer, Destination> destinations;
-
- public XMLRouter() {
- sequenceNumbers = new AtomicInteger(1);
- documentTypes = new LinkedHashMap<Integer, DocumentType>();
+ private Map<Id<Filter>, Filter> filters;
+ private Map<Id<Destination>, Destination> destinations;
+
+ public XMLRouter(Clock aClock, EventListener aListener) {
+ listener = aListener;
+ clock = aClock;
+ nextEventId = new AtomicLong(clock.currentTimeMillis());
+ sequenceNumbers = new AtomicLong(1);
+ documentTypes = new LinkedHashMap<Id<DocumentType>, DocumentType>();
transformations = new Transformations();
- filters = new LinkedHashMap<Integer, Filter>();
- destinations = new LinkedHashMap<Integer, Destination>();
+ filters = new LinkedHashMap<Id<Filter>, Filter>();
+ destinations = new LinkedHashMap<Id<Destination>, Destination>();
}
@Override
public Id<DocumentType> addDocumentType(DocumentType aType) {
- int seqno = sequenceNumbers.getAndIncrement();
- documentTypes.put(seqno, aType);
+ long seqno = sequenceNumbers.getAndIncrement();
+ documentTypes.put(new Id<DocumentType>(seqno), aType);
return new Id<DocumentType>(seqno);
}
}
@Override
- public Collection<DocumentType> getDocumentTypes() {
- return Collections.unmodifiableCollection(documentTypes.values());
+ public Collection<Id<DocumentType>> getDocumentTypes() {
+ return Collections.unmodifiableCollection(documentTypes.keySet());
+ }
+
+ @Override
+ public DocumentType getDocumentType(Id<DocumentType> aId) {
+ return documentTypes.get(aId);
}
@Override
}
@Override
- public Collection<Transformation> getTransformations() {
+ public Collection<Id<Transformation>> getTransformations() {
return transformations.getTransformations();
}
+ @Override
+ public Transformation getTransformation(Id<Transformation> aId) {
+ return transformations.getTransformation(aId);
+ }
+
@Override
public Id<Filter> addFilter(Filter aFilter) {
- int seqno = sequenceNumbers.getAndIncrement();
- filters.put(seqno, aFilter);
+ long seqno = sequenceNumbers.getAndIncrement();
+ filters.put(new Id<Filter>(seqno), aFilter);
return new Id<Filter>(seqno);
}
}
@Override
- public Collection<Filter> getFilters() {
- return Collections.unmodifiableCollection(filters.values());
+ public Collection<Id<Filter>> getFilters() {
+ return Collections.unmodifiableCollection(filters.keySet());
}
@Override
- public boolean publish(String aSource, DOMSource aEvent) {
+ public Filter getFilter(Id<Filter> aId) {
+ return filters.get(aId);
+ }
+
+ @Override
+ public void publish(String aSource, DOMSource aEvent) {
+
+ long time = clock.currentTimeMillis();
+ Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement());
+ List<String> types = determineDocumentTypes(aEvent);
+ EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
boolean delivered = false;
try {
- List<String> filteredInputTypes = determineFilteredInputTypes(aEvent);
+ List<String> filteredInputTypes = determineFilteredInputTypes(
+ types, aEvent);
if (filteredInputTypes.isEmpty()) {
if (LOGGER.isLoggable(Level.FINE)) {
String doc = new XMLDocument(aEvent).print(true);
"Event ''0}'' from source {1} removed because of filters.",
new Object[] { doc, aSource });
}
- return delivered;
}
// get the reachable target types through transformations.
// This is however certainly not the main case.
for (String inputType : filteredInputTypes) {
- boolean result = deliverEvent(aSource, aEvent, inputType);
+ boolean result = deliverEvent(info, inputType);
delivered = delivered || result;
}
} finally {
if (!delivered) {
destinationNotFound(aSource, aEvent);
+ listener.notDelivered(info);
}
- return delivered;
}
}
- private boolean deliverEvent(String aSource, DOMSource aEvent,
- String aInputType) {
+ private boolean deliverEvent(EventInfo aInfo, String aInputType) {
boolean delivered = false;
Set<String> possibleTargetTypes = new HashSet<String>();
.getPossibleTargetTypes(aInputType));
// ask each destination what target types, if any they want to have.
- for (Destination destination : destinations.values()) {
+ for (Map.Entry<Id<Destination>, Destination> entry : destinations
+ .entrySet()) {
+ Id<Destination> destinationId = entry.getKey();
+ Destination destination = entry.getValue();
Collection<String> requested = destination
.chooseFromTargetTypes(possibleTargetTypes);
if (!requested.isEmpty()) {
List<Transformation> ts = path.getTransformations();
int i = 0;
boolean allowed = true;
- DOMSource transformed = aEvent;
+ DOMSource transformed = aInfo.getEvent();
while (i < ts.size() && allowed && transformed != null) {
Transformation t = ts.get(i);
DOMSource orig = transformed;
transformed = t.transform(transformed);
if (transformed == null) {
- transformationReturnedNull(aSource, aEvent,
- aInputType, t, orig);
+ transformationReturnedNull(aInfo.getSource(),
+ aInfo.getEvent(), aInputType, t, orig);
}
if (!isAllowedByFilters(t.getToType(), transformed)) {
// all transformations done and all filters still
// allow the event.
boolean result = destination.receive(transformed);
+ listener.delivered(aInfo, ts, destinationId.getId(),
+ destination.getName(), result);
delivered = delivered || result;
}
return delivered;
}
- private List<String> determineFilteredInputTypes(DOMSource aEvent) {
- List<String> types = determineDocumentTypes(aEvent);
+ private List<String> determineFilteredInputTypes(List<String> aTypes,
+ DOMSource aEvent) {
+
// apply filters to the input
List<String> filteredTypes = new ArrayList<String>();
- for (String type : types) {
+ for (String type : aTypes) {
boolean allowed = isAllowedByFilters(type, aEvent);
if (allowed) {
filteredTypes.add(type);
@Override
public Id<Destination> registerDestination(Destination aDestination) {
notNull("destination", aDestination);
- int seqno = sequenceNumbers.getAndIncrement();
+ long seqno = sequenceNumbers.getAndIncrement();
Id<Destination> id = new Id<Destination>(seqno);
- destinations.put(seqno, new RobustDestination(id, aDestination));
+ destinations.put(id, new RobustDestination(id, aDestination));
return id;
}
@Override
public void unregisterDestination(Id<Destination> aId) {
- destinations.remove(aId.getId());
+ destinations.remove(aId);
}
private void notNull(String aName, Object aValue) {