View Javadoc

1   /*
2   Copyright (c) 2006, University of Tromsø
3   All rights reserved.
4   
5   Redistribution and use in source and binary forms, with or without 
6   modification, are permitted provided that the following conditions are met:
7   
8    * Redistributions of source code must retain the above copyright notice, this list 
9      of conditions and the following disclaimer.
10  
11   * Redistributions in binary form must reproduce the above copyright notice, this 
12     list of conditions and the following disclaimer in the documentation and/or other 
13     materials provided with the distribution.
14  
15   * Neither the name of the University of Tromsø nor the names of its contributors may 
16     be used to endorse or promote products derived from this software without specific 
17     prior written permission.
18  
19  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY 
20  EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 
21  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT 
22  SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
23  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 
24  TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR 
25  BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
26  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
27  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 
28  DAMAGE.
29  */
30  
31  package argos.deploy;
32  
33  import java.io.IOException;
34  import java.lang.management.ManagementFactory;
35  import java.lang.reflect.Field;
36  import java.lang.reflect.InvocationTargetException;
37  import java.lang.reflect.Method;
38  import java.net.MalformedURLException;
39  import java.util.*;
40  import java.util.logging.Level;
41  import java.util.logging.Logger;
42  
43  import javax.management.*;
44  import javax.management.remote.JMXConnector;
45  import javax.management.remote.JMXConnectorFactory;
46  import javax.management.remote.JMXServiceURL;
47  import javax.management.timer.Timer;
48  
49  import argos.annotation.*;
50  import argos.config.Config;
51  import argos.metadata.AttributeValue;
52  import argos.metadata.ComponentMetaInfo;
53  import argos.naming.NamingService;
54  import argos.proxy.DynamicProxyUtil;
55  import argos.proxy.NotificationProxy;
56  
57  /**
58   * Created on 07.jul.2006
59   * 
60   * @author Dan Peder Eriksen
61   */
62  public class ComponentRunner implements NotificationFilter, NotificationListener {
63  	public static final long serialVersionUID = 2807813948753461L;
64  	
65  	private static final Logger logger = Logger.getLogger(ComponentRunner.class.getName());
66  	private static final String NOTIFICATION_TYPE = "RunExecute";
67  	private static final String COMPONENT_EVENT = "COMPONENT_EVENT";
68  	private static final String EXECUTE_HANDBACK = "EXECUTE_HANDBACK";
69  	
70  	
71  	private ComponentMetaInfo meta;
72  	private Object component;
73  	private ObjectName objectName;
74  	private transient Method unload;
75  	private transient Method execute;
76  	private transient Method notificationHandler;
77  	private transient Timer timer;
78  	private List<String> listenToRemote;
79  	
80  	public ComponentRunner(ComponentMetaInfo meta) {
81  		super();
82  		this.meta = meta;
83  		listenToRemote = new ArrayList<String>();
84  	}
85  	
86  	public Object start() {
87  		Method init = null;
88  		String domain = null;
89  		try {
90  			// Create instance
91  			MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
92  			ObjectName classLoaderName = new ObjectName("Classloaders:name=" + meta.getService().getName());
93  			if(!mbeanServer.isRegistered(classLoaderName)) {
94  				mbeanServer.registerMBean(meta.getService().getClassLoader(), classLoaderName);
95  			}
96  			
97  			Class<?> compClass = Class.forName(meta.getClassName(), true, meta.getService().getClassLoader());
98  			component = compClass.newInstance();
99  			
100 			// Create MBean
101 			domain = Config.get(Config.DEFAULT_COMPONENT_DOMAIN);
102 			if(meta.getService().getName().startsWith(Config.BANG_BANG)) {
103 				domain = Config.get(Config.DEFAULT_BANGBANG_DOMAIN);
104 			}
105 			objectName = DynamicProxyUtil.instrument(domain, meta.getName(), component, classLoaderName);
106 			
107 			// Find marked methods
108 			for(Method method : compClass.getMethods()) {
109 				if(method.getParameterTypes().length == 0) {
110 					if(method.isAnnotationPresent(Init.class)) {
111 						if(init != null) {
112 							logger.warning("Found more than one method in component " + 
113 									meta.getName() + " marked with Init annotation.");
114 						}
115 						init = method;
116 					}
117 					else if(method.isAnnotationPresent(Unload.class)) {
118 						if(unload != null) {
119 							logger.warning("Found more than one method in component " + 
120 									meta.getName() + " marked with Init annotation.");
121 						}
122 						unload = method;
123 					}
124 					else if(method.isAnnotationPresent(Execute.class)) {
125 						execute = method;
126 						timer = new Timer();
127 						timer.addNotificationListener(this, this, EXECUTE_HANDBACK);
128 						Execute ex = execute.getAnnotation(Execute.class);
129 						timer.addNotification(NOTIFICATION_TYPE, "execute", null,
130 								Calendar.getInstance().getTime(), (int) (ex.value() * 1000));
131 					}
132 				}
133 				else {
134 					if(method.isAnnotationPresent(Init.class)) {
135 						logger.severe("Method marked with @Init can not take arguments");
136 					}
137 					else if(method.isAnnotationPresent(Unload.class)) {
138 						logger.severe("Method marked with @Unload can not take arguments");
139 					}
140 					else if(method.isAnnotationPresent(Execute.class)) {
141 						logger.severe("Method marked with @Exceute can not take arguments");
142 					}
143 					
144 					if(method.getParameterTypes().length == 1 && 
145 							Notification.class.isAssignableFrom(method.getParameterTypes()[0]) &&
146 							method.isAnnotationPresent(NotificationHandler.class)) {
147 						notificationHandler = method;
148 					}
149 					// Config
150 					else if(method.getParameterTypes().length == 1 && method.getName().startsWith("set")) {
151 						for(AttributeValue att : meta.getAttributeValues()) {
152 							if(method.getName().substring(3).equalsIgnoreCase(att.getName())) {
153 								try {
154 									Object[] types = method.getParameterTypes();
155 									Object value = null;
156 									// Transform to the right type
157 									if(types[0].toString().equals("int")) {
158 										value = Integer.parseInt(att.getValue());
159 									}
160 									else if(types[0].toString().equals("short")) {
161 										value = Short.parseShort(att.getValue());
162 									}
163 									else if(types[0].toString().equals("long")) {
164 										value = Long.parseLong(att.getValue());
165 									}
166 									else if(types[0].toString().equals("double")) {
167 										value = Double.parseDouble(att.getValue());
168 									}
169 									else if(types[0].toString().equals("byte")) {
170 										value = Byte.parseByte(att.getValue());
171 									}
172 									else if(types[0].toString().equals("character")) {
173 										value = att.getValue().charAt(0);
174 									}
175 									else if(types[0].equals(String.class)) {
176 										value = att.getValue();
177 									}
178 									else if(types[0].equals(Integer.class)) {
179 										value = Integer.valueOf(Integer.parseInt(att
180 												.getValue()));
181 									}
182 									else if(types[0].equals(Short.class)) {
183 										value = Short.valueOf(Short
184 												.parseShort(att.getValue()));
185 									}
186 									else if(types[0].equals(Long.class)) {
187 										value = Long.valueOf(Long.parseLong(att.getValue()));
188 									}
189 									else if(types[0].equals(Double.class)) {
190 										value = Double.valueOf(Double.parseDouble(att
191 												.getValue()));
192 									}
193 									else if(types[0].equals(Byte.class)) {
194 										value = Byte.valueOf(Byte.parseByte(att.getValue()));
195 									}
196 									else if(types[0].equals(Character.class)) {
197 										value = Character.valueOf(att.getValue().charAt(0));
198 									}
199 									else {
200 										logger.warning("Unknown type: " + types[0].toString());
201 									}
202 									method.invoke(component, value);
203 								}
204 								catch(InvocationTargetException e) {
205 									logger.log(Level.SEVERE, "Unable to set attribute "
206 													+ att.getName()
207 													+ ", check log for exception ", e);
208 								}
209 							}
210 						}
211 					}
212 				}
213 				
214 			}
215 			
216 			//Set class loader in component
217 			for(Field field : component.getClass().getFields()) {
218 				if(field.isAnnotationPresent(argos.annotation.ServiceClassLoader.class)) {
219 					try {
220 						field.set(component, meta.getService().getClassLoader());
221 					}
222 					catch(Exception e) {
223 						logger.log(Level.SEVERE, "Unable to set classloader for class "
224 								+ component.getClass().getName(), e);
225 					}
226 				}
227 				else if(field.isAnnotationPresent(ServiceMeta.class)) {
228 					try {
229 						field.set(component, meta.getService());
230 					}
231 					catch(Exception e) {
232 						logger.log(Level.SEVERE, "Unable to set service meta for class "
233 								+ component.getClass().getName(), e);
234 					}
235 				}
236 				else if(field.isAnnotationPresent(ComponentMeta.class)) {
237 					try {
238 						field.set(component, meta);
239 					}
240 					catch(Exception e) {
241 						logger.log(Level.SEVERE, "Unable to set component meta for class "
242 								+ component.getClass().getName(), e);
243 					}
244 				}
245 				else if(field.isAnnotationPresent(Component.class)) {
246 					try {
247 						Component annontation = field.getAnnotation(Component.class);
248 						field.set(component, 
249 								NamingService.getInstance().getComponentByName(annontation.value()));
250 					}
251 					catch(Exception e) {
252 						logger.log(Level.SEVERE, "Unable to set component in class "
253 								+ component.getClass().getName(), e);
254 					}
255 				}
256 			}
257 			
258 			meta.setListenerProxy(this);
259 			meta.setMbeanName(domain + ":name=" + meta.getName());
260 			
261 			// Init
262 			if(init != null) {
263 				try {
264 				    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
265 				    Thread.currentThread().setContextClassLoader(meta.getService().getClassLoader());
266 					init.invoke(component);
267 					Thread.currentThread().setContextClassLoader(classLoader);
268 				}
269 				catch(Exception e) {
270 					logger.log(Level.SEVERE, "Unable to run init method on component " + 
271 							meta.getName() + ": " + e.getMessage(), e);
272 					removeMbeanIgnore();
273 					return null;
274 				}
275 			}
276 			
277 			// Add local listeners
278 			for(String comp : meta.getListenTo()) {
279 				Object o;
280 				if(comp.startsWith(Config.BANG_BANG)) {
281 					o = NamingService.getInstance().getBangBang(comp);
282 				}
283 				else {
284 					o = NamingService.getInstance().getComponentByName(comp);
285 				}
286 				
287 				if(o == null) {
288 					logger.severe(meta.getName() + " can not listen to " + comp
289 							+ " as it doesnt exists.");
290 				}
291 				else {
292 					try {
293 						NotificationProxy proxy = NamingService.getInstance().getNotificationProxy(o);
294 						if(proxy == null) {
295 							logger.severe("Unable to find NotificationProxy for " + comp);
296 						}
297 						else {
298 							proxy.addNotificationListener(this, this, COMPONENT_EVENT);
299 						}
300 						
301 					}
302 					catch(ClassCastException e) {
303 						logger.severe(meta.getName() + " can not listen to " + comp
304 								+ " as it doesnt implement NotificationEmitter.");
305 						stop();
306 					}
307 				}
308 			}
309 			
310 			//Add remote listeners
311 			Map<String, JMXConnector> map = new HashMap<String, JMXConnector>();
312 			List<String> list = meta.getListenToRemote();
313 			for(int i = 0; i < list.size(); i +=2) {
314 				String url = list.get(i);
315 				String comp = list.get(i + 1);
316 				JMXConnector connector = map.get(url);
317 				try {
318 					MBeanServerConnection server;
319 					if(connector == null) {
320 						connector = JMXConnectorFactory.connect(new JMXServiceURL(url)); 
321 						map.put(url, connector);
322 						server = connector.getMBeanServerConnection();
323 					}
324 					else {
325 						server = connector.getMBeanServerConnection();
326 					}
327 					server.addNotificationListener(new ObjectName(comp), this, null, COMPONENT_EVENT);
328 				}
329 				catch(InstanceNotFoundException e) {
330 					logger.severe("Unable to remote listen to " + comp + " on url " + url);
331 				}
332 				catch(MalformedURLException e) {
333 					logger.severe("Unable to remote listen to " + comp + " url is malformed " + url + ": " + e.getMessage());
334 				}
335 				catch(IOException e) {
336 					logger.warning("Unable to connect to " + url + ", trying later.");
337 					listenToRemote.add(url);
338 					listenToRemote.add(comp);
339 				}
340 				catch(MalformedObjectNameException e) {
341 					logger.severe("Malformed object name: " + comp + ": " + e.getMessage());
342 				}
343 			}
344 			if(!listenToRemote.isEmpty()) {
345 				Worker worker = new Worker(this);
346 				worker.start();
347 				logger.info("Started thread for adding remote listeners when they becomme availible.");
348 			}
349 			
350 			// Start Timer
351 			if(timer != null) {
352 				timer.start();
353 			}
354 			
355 			// Done
356 			logger.info("Component " + meta.getName() + " has been started.");
357 		}
358 		catch(ClassNotFoundException e) {
359 			logger.severe("Unable to start component " + meta.getName()
360 					+ " due to ClassNotFoundException");
361 			removeMbeanIgnore();
362 			return null;
363 		}
364 		catch(IllegalAccessException e2) {
365 			logger.severe("Unable to start component " + meta.getName()
366 					+ " due to IllegalAccessException: " + e2.getMessage());
367 			removeMbeanIgnore();
368 			return null;
369 		}
370 		catch(Exception e3) {
371 			logger.log(Level.SEVERE, "Unable to start component " + meta.getName()
372 					+ " due to Exception: " + e3.getMessage(), e3);
373 			removeMbeanIgnore();
374 			return null;
375 		}
376 		
377 		return component;
378 	}
379 	
380 	private void removeMbeanIgnore() {
381 		try {
382 			ManagementFactory.getPlatformMBeanServer().unregisterMBean(objectName);
383 		}
384 		catch(Exception e) {
385 			logger.log(Level.FINE, "Ignoring Exception", e);
386 		}
387 	}
388 	
389 	public void stop() {
390 		if(timer != null) {
391 			timer.stop();
392 		}
393 		if(unload != null) {
394 			try {
395 				unload.invoke(component);
396 			}
397 			catch(Exception e) {
398 				logger.log(Level.SEVERE, "Unable to run unload method check log.", e);
399 			}
400 		}
401 		try {
402 			ManagementFactory.getPlatformMBeanServer().unregisterMBean(objectName);
403 		}
404 		catch(Exception e) {
405 			logger.log(Level.SEVERE, "Unable to unregister mbean " + objectName, e);
406 		}
407 		logger.info("Stopped component " + meta.getName());
408 	}
409 	
410 	public boolean isNotificationEnabled(Notification not) {
411 		return true;
412 	}
413 	
414 	public void handleNotification(Notification not, Object handback) {
415 		if(EXECUTE_HANDBACK.equals(handback)){
416 			try {
417 				execute.invoke(component);
418 			}
419 			catch(Exception e) {
420 				logger.log(Level.SEVERE, "Component " + meta.getName() + ": Unable to call method marked with "
421 						+ "annotation execute: " + e.getMessage(), e);
422 			}
423 		}
424 		else {
425 			try {
426 				Class<?> to = Class.forName(notificationHandler.getParameterTypes()[0].getName(), true, meta.getService().getClassLoader());
427 				Object o = to.cast(not);
428 				notificationHandler.invoke(component, o);
429 			}
430 			catch(Exception e) {
431 				logger.log(Level.SEVERE, "Component " + meta.getName() + ": Unable to call method marked with "
432 						+ "annotation NotificationHandler: " + e.getMessage(), e);
433 			}
434 		}
435 	}
436 	
437 	public ComponentMetaInfo getMeta() {
438 		return meta;
439 	}
440 	
441 	class Worker extends Thread {
442 		private ComponentRunner runner;
443 		public Worker(ComponentRunner runner) {
444 			super("Worker");
445 			this.runner = runner;
446 		}
447 		
448 		@Override
449 		public void run() {
450 			while(!listenToRemote.isEmpty()) {
451 				Map<String, JMXConnector> map = new HashMap<String, JMXConnector>();
452 				List<String> list = meta.getListenToRemote();
453 				for(int i = 0; i < list.size(); i +=2) {
454 					String url = list.get(i);
455 					String comp = list.get(i + 1);
456 					JMXConnector connector = map.get(url);
457 					try {
458 						MBeanServerConnection server;
459 						if(connector == null) {
460 							connector = JMXConnectorFactory.connect(new JMXServiceURL(url)); 
461 							map.put(url, connector);
462 							server = connector.getMBeanServerConnection();
463 						}
464 						else {
465 							server = connector.getMBeanServerConnection();
466 						}
467 						server.addNotificationListener(new ObjectName(comp), runner, runner, COMPONENT_EVENT);
468 						listenToRemote.remove(i);
469 						listenToRemote.remove(i + 1);
470 						i -= 2;
471 					}
472 					catch(MalformedURLException e) {
473 						logger.severe("Unable to remote listen to " + comp + " on url " + url);
474 					}
475 					catch(MalformedObjectNameException e) {
476 						logger.severe("Unable to remote listen to " + comp + " on url " + url);
477 					}
478 					catch(IOException e) {
479 						logger.severe("Unable to remote listen to " + comp + " on url " + url);
480 					}
481 					catch(InstanceNotFoundException e) {
482 						logger.severe("Unable to remote listen to " + comp + " on url " + url);
483 					}
484 				}
485 				for(JMXConnector connector : map.values()) {
486 					try {
487 						connector.close();
488 					}
489 					catch(IOException e) {
490 						logger.log(Level.FINE, "Error while disconnecting from jmx connector", e);
491 					}
492 				}
493 			}
494 			try {
495 				sleep(60 * 1000);
496 			}
497 			catch(InterruptedException ignore) {}
498 		}
499 	}
500 }