Turned the API packages into bundles.
[xmlrouter] / impl / src / main / java / org / wamblee / xmlrouter / impl / XMLRouter.java
1 /*
2  * Copyright 2005-2011 the original author or authors.
3  * 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * 
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  * 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.wamblee.xmlrouter.impl;
17
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashSet;
21 import java.util.LinkedHashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.logging.Level;
27 import java.util.logging.Logger;
28
29 import javax.xml.transform.dom.DOMSource;
30
31 import org.wamblee.general.Clock;
32 import org.wamblee.general.Pair;
33 import org.wamblee.xml.XMLDocument;
34 import org.wamblee.xmlrouter.common.Id;
35 import org.wamblee.xmlrouter.config.DocumentType;
36 import org.wamblee.xmlrouter.config.Filter;
37 import org.wamblee.xmlrouter.config.Transformation;
38 import org.wamblee.xmlrouter.listener.EventInfo;
39 import org.wamblee.xmlrouter.listener.EventListener;
40 import org.wamblee.xmlrouter.publish.Gateway;
41 import org.wamblee.xmlrouter.subscribe.Destination;
42 import org.wamblee.xmlrouter.subscribe.DestinationRegistry;
43
44 /**
45  * The XML Router.
46  * 
47  * @author Erik Brakkee
48  * 
49  */
50 public class XMLRouter implements Gateway, DestinationRegistry {
51
52     private static final Logger LOGGER = Logger.getLogger(XMLRouter.class
53         .getName());
54
55     private AtomicLong sequenceNumbers;
56     private EventListener listener;
57     private Clock clock;
58     private AtomicLong nextEventId;
59
60     private XMLRouterConfiguration config;
61
62     private Map<Id<Destination>, Destination> destinations;
63
64     public XMLRouter(Clock aClock, XMLRouterConfiguration aConfig,
65         EventListener aListener) {
66         sequenceNumbers = new AtomicLong(1);
67         listener = aListener;
68         clock = aClock;
69         nextEventId = new AtomicLong(clock.currentTimeMillis());
70         config = aConfig;
71         destinations = new LinkedHashMap<Id<Destination>, Destination>();
72     }
73
74     @Override
75     public void publish(String aSource, DOMSource aEvent) {
76         long time = clock.currentTimeMillis();
77
78         Pair<ExtendedRouterConfig, TransformationPaths> snapshotconfig = config
79             .getConfig();
80
81         Id<DOMSource> id = new Id<DOMSource>(nextEventId.getAndIncrement() + "");
82         List<String> types = determineDocumentTypes(snapshotconfig.getFirst()
83             .documentTypeConfig().values(), aEvent);
84         EventInfo info = new EventInfo(time, aSource, id, types, aEvent);
85
86         boolean delivered = false;
87         try {
88
89             List<String> filteredInputTypes = determineFilteredInputTypes(
90                 snapshotconfig.getFirst().filterConfig().values(), types,
91                 aEvent);
92             if (filteredInputTypes.isEmpty()) {
93                 if (LOGGER.isLoggable(Level.FINE)) {
94                     String doc = new XMLDocument(aEvent).print(true);
95                     LOGGER
96                         .log(
97                             Level.FINE,
98                             "Event ''{0}'' from source ''{1}'' removed because of filters.",
99                             new Object[] { doc, aSource });
100                 }
101             }
102
103             // get the reachable target types through transformations.
104
105             // It is possible that a given event belongs to multiple input
106             // types.
107             // This is however certainly not the main case.
108
109             for (String inputType : filteredInputTypes) {
110                 boolean result = deliverEvent(snapshotconfig.getFirst()
111                     .filterConfig().values(), snapshotconfig.getSecond(), info,
112                     inputType);
113                 delivered = delivered || result;
114             }
115         } finally {
116             if (!delivered) {
117                 destinationNotFound(aSource, aEvent);
118                 listener.notDelivered(info);
119             }
120         }
121     }
122
123     private boolean deliverEvent(Collection<Filter> aFilters,
124         TransformationPaths aTransformations, EventInfo aInfo, String aInputType) {
125
126         boolean delivered = false;
127         Set<String> possibleTargetTypes = new HashSet<String>();
128         possibleTargetTypes.addAll(aTransformations
129             .getPossibleTargetTypes(aInputType));
130
131         // ask each destination what target types, if any they want to have.
132         for (Map.Entry<Id<Destination>, Destination> entry : destinations
133             .entrySet()) {
134             Id<Destination> destinationId = entry.getKey();
135             Destination destination = entry.getValue();
136             Collection<String> requested = destination
137                 .chooseFromTargetTypes(possibleTargetTypes);
138             if (!requested.isEmpty()) {
139                 // Deliver to the destination.
140                 for (String targetType : requested) {
141                     TransformationPath path = aTransformations.getPath(
142                         aInputType, targetType);
143                     List<Transformation> ts = path.getTransformations();
144                     int i = 0;
145                     boolean allowed = true;
146                     DOMSource transformed = aInfo.getEvent();
147                     while (i < ts.size() && allowed && transformed != null) {
148                         Transformation t = ts.get(i);
149                         DOMSource orig = transformed;
150                         transformed = t.transform(transformed);
151                         if (transformed == null) {
152                             transformationReturnedNull(aInfo.getSource(),
153                                 aInfo.getEvent(), aInputType, t, orig);
154                         }
155
156                         if (!isAllowedByFilters(aFilters, t.getToType(),
157                             transformed)) {
158                             allowed = false;
159                         }
160                         i++;
161                     }
162                     if (allowed && transformed != null) {
163                         // all transformations done and all filters still
164                         // allow the event.
165                         boolean result = destination.receive(transformed);
166                         listener.delivered(aInfo, ts, destinationId.getId(),
167                             result);
168                         delivered = delivered || result;
169
170                     }
171                 }
172             }
173         }
174         return delivered;
175     }
176
177     private List<String> determineFilteredInputTypes(
178         Collection<Filter> aFilters, List<String> aTypes, DOMSource aEvent) {
179
180         // apply filters to the input
181         List<String> filteredTypes = new ArrayList<String>();
182         for (String type : aTypes) {
183             boolean allowed = isAllowedByFilters(aFilters, type, aEvent);
184             if (allowed) {
185                 filteredTypes.add(type);
186             }
187         }
188         return filteredTypes;
189     }
190
191     private boolean isAllowedByFilters(Collection<Filter> aFilters,
192         String aType, DOMSource aEvent) {
193         boolean allowed = true;
194         for (Filter filter : aFilters) {
195             if (!filter.isAllowed(aType, aEvent)) {
196                 allowed = false;
197             }
198         }
199         return allowed;
200     }
201
202     private List<String> determineDocumentTypes(
203         Collection<DocumentType> aTypes, DOMSource aEvent) {
204         List<String> res = new ArrayList<String>();
205         for (DocumentType type : aTypes) {
206             if (type.isInstance(aEvent)) {
207                 res.add(type.getName());
208             }
209         }
210         return res;
211     }
212
213     private String eventToString(String aSource, DOMSource aEvent) {
214         return "source '" + aSource + "': Event: '" +
215             new XMLDocument(aEvent).print(true) + "'";
216     }
217
218     private void transformationReturnedNull(String aSource, DOMSource aEvent,
219         String aInputType, Transformation aT, DOMSource aTransformed) {
220         LOGGER.log(Level.WARNING, "Transformation returned null for event " +
221             eventToString(aSource, aEvent) + " inputType '" + aInputType +
222             "', transformation '" + aT + "' document to transform " +
223             new XMLDocument(aTransformed).print(true));
224     }
225
226     private void destinationNotFound(String aSource, DOMSource aEvent) {
227         LOGGER.log(Level.WARNING, "No destination found for event: " +
228             eventToString(aSource, aEvent));
229     }
230
231     @Override
232     public Id<Destination> registerDestination(Destination aDestination) {
233         notNull("destination", aDestination);
234         long seqno = sequenceNumbers.getAndIncrement();
235         Id<Destination> id = new Id<Destination>(seqno + "");
236         destinations.put(id, new RobustDestination(id, aDestination));
237         return id;
238     }
239
240     @Override
241     public void unregisterDestination(Id<Destination> aId) {
242         destinations.remove(aId);
243     }
244
245     private void notNull(String aName, Object aValue) {
246         if (aValue == null) {
247             throw new IllegalArgumentException("Parameter '" + aName +
248                 "' may not be null");
249         }
250     }
251 }