now a void method (fire and forget).
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+ */
package org.wamblee.xmlrouter.common;
public class Id<T> {
- private int id;
+ private long id;
- public Id(int aId) {
+ public Id(long aId) {
id = aId;
}
- public int getId() {
+ public long getId() {
return id;
}
@Override
public int hashCode() {
- return id;
+ return ((Long) id).hashCode();
}
@Override
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+ */
package org.wamblee.xmlrouter.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.wamblee.xmlrouter.common.Id;
import org.wamblee.xmlrouter.config.Transformation;
public class Transformations {
- private AtomicInteger sequenceNumber;
- private Map<Integer, Transformation> transformations;
+ private AtomicLong sequenceNumber;
+ private Map<Long, Transformation> transformations;
private List<String> vertices;
private TransformationPath[][] matrix;
private Map<String, List<TransformationPath>> sequences;
public Transformations() {
- sequenceNumber = new AtomicInteger(1);
- transformations = new LinkedHashMap<Integer, Transformation>();
+ sequenceNumber = new AtomicLong(1);
+ transformations = new LinkedHashMap<Long, Transformation>();
vertices = new ArrayList<String>();
matrix = new TransformationPath[0][0];
}
public Id<Transformation> addTransformation(Transformation aTransformation) {
- int seqno = sequenceNumber.getAndIncrement();
+ long seqno = sequenceNumber.getAndIncrement();
Id<Transformation> id = new Id<Transformation>(seqno);
transformations.put(seqno,
new RobustTransformation(id, aTransformation));
*/
public TransformationPath getPath(String aFrom, String aTo) {
int i = vertices.indexOf(aFrom);
- if (i == -1) {
+ if (i < 0) {
if (aFrom.equals(aTo)) {
return new TransformationPath();
}
}
int j = vertices.indexOf(aTo);
+ if (j < 0) {
+ return null;
+ }
return matrix[i][j];
}
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.transform.dom.DOMSource;
+import org.wamblee.general.Clock;
import org.wamblee.xml.XMLDocument;
import org.wamblee.xmlrouter.common.Id;
import org.wamblee.xmlrouter.config.Config;
import org.wamblee.xmlrouter.config.DocumentType;
import org.wamblee.xmlrouter.config.Filter;
import org.wamblee.xmlrouter.config.Transformation;
+import org.wamblee.xmlrouter.listener.EventInfo;
import org.wamblee.xmlrouter.listener.EventListener;
import org.wamblee.xmlrouter.publish.Gateway;
import org.wamblee.xmlrouter.subscribe.Destination;
.getName());
private EventListener listener;
- private AtomicInteger sequenceNumbers;
- private Map<Integer, DocumentType> documentTypes;
+ private Clock clock;
+ private AtomicLong nextEventId;
+ private AtomicLong sequenceNumbers;
+ private Map<Long, DocumentType> documentTypes;
private Transformations transformations;
- private Map<Integer, Filter> filters;
- private Map<Integer, Destination> destinations;
+ private Map<Long, Filter> filters;
+ private Map<Long, Destination> destinations;
- public XMLRouter(EventListener aListener) {
+ public XMLRouter(Clock aClock, EventListener aListener) {
listener = aListener;
- sequenceNumbers = new AtomicInteger(1);
- documentTypes = new LinkedHashMap<Integer, DocumentType>();
+ clock = aClock;
+ nextEventId = new AtomicLong(clock.currentTimeMillis());
+ sequenceNumbers = new AtomicLong(1);
+ documentTypes = new LinkedHashMap<Long, DocumentType>();
transformations = new Transformations();
- filters = new LinkedHashMap<Integer, Filter>();
- destinations = new LinkedHashMap<Integer, Destination>();
+ filters = new LinkedHashMap<Long, Filter>();
+ destinations = new LinkedHashMap<Long, Destination>();
}
@Override
public Id<DocumentType> addDocumentType(DocumentType aType) {
- int seqno = sequenceNumbers.getAndIncrement();
+ long seqno = sequenceNumbers.getAndIncrement();
documentTypes.put(seqno, aType);
return new Id<DocumentType>(seqno);
}
@Override
public Id<Filter> addFilter(Filter aFilter) {
- int seqno = sequenceNumbers.getAndIncrement();
+ long seqno = sequenceNumbers.getAndIncrement();
filters.put(seqno, aFilter);
return new Id<Filter>(seqno);
}
}
@Override
- public boolean publish(String aSource, DOMSource aEvent) {
+ public void publish(String aSource, DOMSource aEvent) {
+
+ long time = clock.currentTimeMillis();
+ Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement());
+ List<String> types = determineDocumentTypes(aEvent);
+ EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
boolean delivered = false;
try {
- List<String> filteredInputTypes = determineFilteredInputTypes(aEvent);
+ List<String> filteredInputTypes = determineFilteredInputTypes(
+ types, aEvent);
if (filteredInputTypes.isEmpty()) {
if (LOGGER.isLoggable(Level.FINE)) {
String doc = new XMLDocument(aEvent).print(true);
"Event ''0}'' from source {1} removed because of filters.",
new Object[] { doc, aSource });
}
- return delivered;
}
// get the reachable target types through transformations.
// This is however certainly not the main case.
for (String inputType : filteredInputTypes) {
- boolean result = deliverEvent(aSource, aEvent, inputType);
+ boolean result = deliverEvent(info, inputType);
delivered = delivered || result;
}
} finally {
if (!delivered) {
destinationNotFound(aSource, aEvent);
+ listener.notDelivered(info);
}
- return delivered;
}
}
- private boolean deliverEvent(String aSource, DOMSource aEvent,
- String aInputType) {
+ private boolean deliverEvent(EventInfo aInfo, String aInputType) {
boolean delivered = false;
Set<String> possibleTargetTypes = new HashSet<String>();
.getPossibleTargetTypes(aInputType));
// ask each destination what target types, if any they want to have.
- for (Destination destination : destinations.values()) {
+ for (Map.Entry<Long, Destination> entry : destinations.entrySet()) {
+ long destinationId = entry.getKey();
+ Destination destination = entry.getValue();
Collection<String> requested = destination
.chooseFromTargetTypes(possibleTargetTypes);
if (!requested.isEmpty()) {
List<Transformation> ts = path.getTransformations();
int i = 0;
boolean allowed = true;
- DOMSource transformed = aEvent;
+ DOMSource transformed = aInfo.getEvent();
while (i < ts.size() && allowed && transformed != null) {
Transformation t = ts.get(i);
DOMSource orig = transformed;
transformed = t.transform(transformed);
if (transformed == null) {
- transformationReturnedNull(aSource, aEvent,
- aInputType, t, orig);
+ transformationReturnedNull(aInfo.getSource(),
+ aInfo.getEvent(), aInputType, t, orig);
}
if (!isAllowedByFilters(t.getToType(), transformed)) {
// all transformations done and all filters still
// allow the event.
boolean result = destination.receive(transformed);
+ listener.delivered(aInfo, ts, destinationId,
+ destination.getName(), result);
delivered = delivered || result;
}
return delivered;
}
- private List<String> determineFilteredInputTypes(DOMSource aEvent) {
- List<String> types = determineDocumentTypes(aEvent);
+ private List<String> determineFilteredInputTypes(List<String> aTypes,
+ DOMSource aEvent) {
+
// apply filters to the input
List<String> filteredTypes = new ArrayList<String>();
- for (String type : types) {
+ for (String type : aTypes) {
boolean allowed = isAllowedByFilters(type, aEvent);
if (allowed) {
filteredTypes.add(type);
@Override
public Id<Destination> registerDestination(Destination aDestination) {
notNull("destination", aDestination);
- int seqno = sequenceNumbers.getAndIncrement();
+ long seqno = sequenceNumbers.getAndIncrement();
Id<Destination> id = new Id<Destination>(seqno);
destinations.put(seqno, new RobustDestination(id, aDestination));
return id;
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+ */
package org.wamblee.xmlrouter.impl;
import static junit.framework.Assert.*;
from = aFrom;
to = aTo;
}
-
+
@Override
public String getName() {
return "myname";
assertNull(transformations.getPath("C", "B"));
}
+ @Test
+ public void testUnknownDestination() {
+ transformations.addTransformation(new MyTransformation("A", "B"));
+ assertNull(transformations.getPath("A", "D"));
+ }
+
@Test
public void testWithoutTransformations() {
Collection<String> res = transformations.getPossibleTargetTypes("a");
*/
package org.wamblee.xmlrouter.impl;
-import static junit.framework.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import org.junit.Before;
import org.junit.Test;
+import org.wamblee.general.SystemClock;
import org.wamblee.xmlrouter.common.Id;
import org.wamblee.xmlrouter.config.DocumentType;
import org.wamblee.xmlrouter.config.Transformation;
+import org.wamblee.xmlrouter.listener.EventInfo;
import org.wamblee.xmlrouter.listener.EventListener;
import org.wamblee.xmlrouter.listener.LoggingEventListener;
import org.wamblee.xmlrouter.subscribe.Destination;
public void setUp() {
EventListener logListener = new LoggingEventListener(Level.INFO);
listener = spy(logListener);
- router = new XMLRouter(listener);
+ router = new XMLRouter(new SystemClock(), listener);
source1 = mock(DOMSource.class);
source2 = mock(DOMSource.class);
source3 = mock(DOMSource.class);
destinationSpy = spy(destination);
destinationId = router.registerDestination(destinationSpy);
- assertFalse(router.publish("any", source1));
+ router.publish("any", source1);
+ verify(listener).notDelivered(any(EventInfo.class));
}
@Test
destinationSpy = registerDestination(true, "any");
registerDocumentType("any");
- assertTrue(router.publish("any", source1));
+ router.publish("any", source1);
+ verify(listener).delivered(any(EventInfo.class),
+ anyListOf(Transformation.class), anyLong(), anyString(), eq(true));
verify(destinationSpy).receive(same(source1));
// Unregister the destination.
router.unregisterDestination(destinationId);
- reset(destinationSpy);
- assertFalse(router.publish("any", source2));
+ resetMocks();
+ router.publish("any", source2);
+ verify(listener).notDelivered(any(EventInfo.class));
verifyNoMoreInteractions(destinationSpy);
}
+ private void resetMocks() {
+ reset(destinationSpy);
+ reset(listener);
+ }
+
private void registerDocumentType(String aType) {
DocumentType type = mock(DocumentType.class);
when(type.isInstance(any(DOMSource.class))).thenReturn(true);
destinationSpy = registerDestination(true);
registerDocumentType("any");
- assertFalse(router.publish("any", source1));
+ router.publish("any", source1);
+ verify(listener).notDelivered(any(EventInfo.class));
verify(destinationSpy, never()).receive(any(DOMSource.class));
}
doThrow(new RuntimeException()).when(destinationSpy).receive(
any(DOMSource.class));
- assertFalse(router.publish("any", source1));
+ router.publish("any", source1);
+ verify(listener).notDelivered(any(EventInfo.class));
verify(destinationSpy).receive(same(source1));
}
Id<Destination> destinationId2 = router
.registerDestination(destinationSpy2);
- assertTrue(router.publish("any", source1));
+ router.publish("any", source1);
+ verify(listener).delivered(any(EventInfo.class),
+ anyListOf(Transformation.class), anyLong(), anyString(), eq(true));
+
verify(destinationSpy2).receive(same(source1));
+
}
@Test
doThrow(new RuntimeException()).when(destinationSpy)
.chooseFromTargetTypes((Collection<String>) anyObject());
- assertFalse(router.publish("any", source1));
+ router.publish("any", source1);
+ verify(listener).notDelivered(any(EventInfo.class));
verify(destinationSpy, never()).receive(same(source1));
}
Destination destinationSpy2 = spy(destination2);
Id<Destination> destinationId2 = router
.registerDestination(destinationSpy2);
- assertTrue(router.publish("any", source1));
+ router.publish("any", source1);
+ verify(listener).delivered(any(EventInfo.class),
+ anyListOf(Transformation.class), anyLong(), anyString(), eq(true));
+
verify(destinationSpy, never()).receive(same(source1));
verify(destinationSpy2).receive(same(source1));
}
.chooseFromTargetTypes((Collection<String>) anyObject()))
.thenReturn(null);
- assertFalse(router.publish("any", source1));
+ router.publish("any", source1);
+ verify(listener).notDelivered(any(EventInfo.class));
verify(destinationSpy, never()).receive(same(source1));
}
Destination destinationSpy2 = spy(destination2);
Id<Destination> destinationId2 = router
.registerDestination(destinationSpy2);
- assertTrue(router.publish("any", source1));
+ router.publish("any", source1);
+ verify(listener).delivered(any(EventInfo.class),
+ anyListOf(Transformation.class), anyLong(), anyString(), eq(true));
+
verify(destinationSpy, never()).receive(same(source1));
verify(destinationSpy2).receive(same(source1));
}
public void testOneTransformationOneDestination() {
registerDocumentType("any");
Transformation transformation = mock(Transformation.class);
+ when(transformation.getName()).thenReturn("trans");
when(transformation.getFromType()).thenReturn("any");
when(transformation.getToType()).thenReturn("bla");
when(transformation.transform(same(source1))).thenReturn(source2);
router.registerDestination(destination);
when(destination.receive(any(DOMSource.class))).thenReturn(true);
- assertTrue(router.publish("bla", source1));
+ router.publish("bla", source1);
+ verify(listener).delivered(any(EventInfo.class),
+ anyListOf(Transformation.class), anyLong(), anyString(), eq(true));
verify(transformation).transform(source1);
verify(destination).receive(same(source2));
// now the same when the destination rejects the event.
when(destination.receive(any(DOMSource.class))).thenReturn(false);
- assertFalse(router.publish("bla", source1));
+ router.publish("bla", source1);
+ verify(listener).notDelivered(any(EventInfo.class));
}
private Transformation createTransformation(String aFrom, String aTo,
DOMSource aSource, DOMSource aTarget) {
Transformation transformation = mock(Transformation.class);
+ when(transformation.getName()).thenReturn("trans");
when(transformation.getFromType()).thenReturn(aFrom);
when(transformation.getToType()).thenReturn(aTo);
when(transformation.transform(same(aSource))).thenReturn(aTarget);
.thenReturn(Arrays.asList("bla"));
router.registerDestination(destination);
- assertFalse(router.publish("bla", source1));
+ router.publish("bla", source1);
+ verify(listener).notDelivered(any(EventInfo.class));
verify(transformation).transform(source1);
verify(destination, never()).receive(any(DOMSource.class));
.thenReturn(Arrays.asList("bla", "bla2"));
reset(transformation);
+ when(transformation.getName()).thenReturn("trans");
when(transformation.getFromType()).thenReturn("any");
when(transformation.getToType()).thenReturn("bla");
when(transformation.transform(same(source1))).thenReturn(null);
when(destination.receive(any(DOMSource.class))).thenReturn(true);
- assertTrue(router.publish("bla", source1));
+ router.publish("bla", source1);
+ verify(listener).delivered(any(EventInfo.class),
+ anyListOf(Transformation.class), anyLong(), anyString(), eq(true));
verify(transformation).transform(source1);
verify(transformation2).transform(source1);
Destination dest2 = registerDestination(true, "any");
registerDocumentType("any");
- assertTrue(router.publish("source", source1));
+ router.publish("source", source1);
+ verify(listener, times(2)).delivered(any(EventInfo.class),
+ anyListOf(Transformation.class), anyLong(), anyString(), eq(true));
verify(dest1).receive(same(source1));
verify(dest2).receive(same(source1));
source1, source2);
router.addTransformation(transformation);
- assertTrue(router.publish("source", source1));
+ router.publish("source", source1);
+ verify(listener, times(2)).delivered(any(EventInfo.class),
+ anyListOf(Transformation.class), anyLong(), anyString(), eq(true));
verify(dest).receive(same(source1));
verify(dest).receive(same(source2));
@Test
public void testMultipleTransformations() {
- Destination dest = registerDestination(true, "any", "other");
+ Destination dest = registerDestination(true, "other");
registerDocumentType("any", source1);
registerDocumentType("other", source3);
source2, source3);
router.addTransformation(t2);
- assertTrue(router.publish("source", source1));
+ router.publish("source", source1);
+ verify(listener).delivered(any(EventInfo.class),
+ anyListOf(Transformation.class), anyLong(), anyString(), eq(true));
verify(dest).receive(same(source3));
}
+
+ @Test
+ public void testDestinationGivesError() {
+ Destination destination = mock(Destination.class);
+ when(destination.getName()).thenReturn("name");
+ when(destination.chooseFromTargetTypes(anyCollectionOf(String.class)))
+ .thenReturn(Arrays.asList("any"));
+ doThrow(new RuntimeException("x")).when(destination).receive(
+ any(DOMSource.class));
+ router.registerDestination(destination);
+
+ registerDocumentType("any");
+
+ router.publish("source", source1);
+
+ verify(listener).delivered(any(EventInfo.class),
+ anyListOf(Transformation.class), anyLong(), anyString(), eq(false));
+
+ }
}
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.print.attribute.standard.Destination;
-
import org.wamblee.concurrency.ReadLock;
import org.wamblee.concurrency.WriteLock;
-import org.wamblee.xmlrouter.common.Id;
import org.wamblee.xmlrouter.config.Transformation;
public class CompositeEventListener implements EventListener {
@Override
@ReadLock
public void delivered(EventInfo aInfo, List<Transformation> aSequence,
- Id<Destination> aDestination, String aDestinationName,
- boolean aSuccessFlag) {
+ long aDestinationId, String aDestinationName, boolean aSuccessFlag) {
for (EventListener logger : loggers) {
try {
- logger.delivered(aInfo, aSequence, aDestination,
+ logger.delivered(aInfo, aSequence, aDestinationId,
aDestinationName, aSuccessFlag);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Logger threw exception", e);
*/
package org.wamblee.xmlrouter.listener;
+import java.util.List;
+
import javax.xml.transform.dom.DOMSource;
import org.wamblee.xml.XMLDocument;
private long time;
private String source;
private Id<DOMSource> id;
- private String type;
+ private List<String> types;
private DOMSource event;
public EventInfo(long aTime, String aSource, Id<DOMSource> aId,
- String aType, DOMSource aEvent) {
+ List<String> aTypes, DOMSource aEvent) {
time = aTime;
source = aSource;
id = aId;
- type = aType;
+ types = aTypes;
event = aEvent;
}
return id;
}
- public String getType() {
- return type;
- }
-
- public void setType(String aType) {
- type = aType;
+ public List<String> getTypes() {
+ return types;
}
public DOMSource getEvent() {
buf.append("time " + time);
buf.append(", source " + source);
buf.append(", id " + id);
- buf.append(", type " + type);
+ buf.append(", types " + types);
buf.append(", event " + new XMLDocument(event).print(true));
buf.append(")");
return buf.toString();
import java.util.List;
-import javax.print.attribute.standard.Destination;
-
-import org.wamblee.xmlrouter.common.Id;
import org.wamblee.xmlrouter.config.Transformation;
/**
* Event information.
* @param aSequence
* Sequence of transformations performed.
- * @param aDestination
+ * @param aDestinationId
* Id of the destination the event was delivered to.
* @param aDestinationName
* Destination name.
* Whether or not event delivery succeeded.
*/
void delivered(EventInfo aInfo, List<Transformation> aSequence,
- Id<Destination> aDestination, String aDestinationName,
- boolean aSuccessFlag);
+ long aDestinationId, String aDestinationName, boolean aSuccessFlag);
/**
* Called when an event could not be delivered to any destination.
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.print.attribute.standard.Destination;
-
-import org.wamblee.xmlrouter.common.Id;
import org.wamblee.xmlrouter.config.Transformation;
/**
@Override
public void delivered(EventInfo aEvent, List<Transformation> aSequence,
- Id<Destination> aDestination, String aDestinationName,
- boolean aSuccessFlag) {
+ long aDestinationId, String aDestinationName, boolean aSuccessFlag) {
if (LOGGER.isLoggable(level)) {
- LOGGER.log(level, "event delivered: " + aEvent + ", sequence '" +
- getSequenceString(aSequence) + "', destionationId " +
- aDestination + ", destinationName '" + aDestinationName + "'");
+ LOGGER
+ .log(level, "event delivered: " + aEvent + ", sequence '" +
+ getSequenceString(aSequence) + "', destinationId " +
+ aDestinationId + ", destinationName '" + aDestinationName +
+ "'");
}
}
private String getSequenceString(List<Transformation> aSequence) {
StringBuffer buf = new StringBuffer();
- for (Transformation transformation : aSequence) {
+ for (int i = 0; i < aSequence.size(); i++) {
+ if (i > 0) {
+ buf.append(", ");
+ }
+ Transformation transformation = aSequence.get(i);
buf.append(transformation.getName());
buf.append("(");
buf.append(transformation.getFromType());
import java.util.ArrayList;
-import javax.print.attribute.standard.Destination;
-
import org.junit.Before;
import org.junit.Test;
-import org.wamblee.xmlrouter.common.Id;
import org.wamblee.xmlrouter.config.Transformation;
public class CompositeEventListenerTest {
private static final String DESTINATION_NAME = "dest";
- private static final Id<Destination> DESTINATION_ID = new Id<Destination>(
- 12);
+ private static final long DESTINATION_ID = 12L;
private CompositeEventListener composite;
private EventInfo source;
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+ */
package org.wamblee.xmlrouter.publish;
import javax.xml.transform.dom.DOMSource;
* Source.
* @param aEvent
* Event.
- * @return True iff th event was delivered to at least one destination.
*/
- boolean publish(String aSource, DOMSource aEvent);
+ void publish(String aSource, DOMSource aEvent);
}