1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 package argos.deploy;
32
33 import java.io.IOException;
34 import java.lang.management.ManagementFactory;
35 import java.lang.reflect.Field;
36 import java.lang.reflect.InvocationTargetException;
37 import java.lang.reflect.Method;
38 import java.net.MalformedURLException;
39 import java.util.*;
40 import java.util.logging.Level;
41 import java.util.logging.Logger;
42
43 import javax.management.*;
44 import javax.management.remote.JMXConnector;
45 import javax.management.remote.JMXConnectorFactory;
46 import javax.management.remote.JMXServiceURL;
47 import javax.management.timer.Timer;
48
49 import argos.annotation.*;
50 import argos.config.Config;
51 import argos.metadata.AttributeValue;
52 import argos.metadata.ComponentMetaInfo;
53 import argos.naming.NamingService;
54 import argos.proxy.DynamicProxyUtil;
55 import argos.proxy.NotificationProxy;
56
57
58
59
60
61
62 public class ComponentRunner implements NotificationFilter, NotificationListener {
63 public static final long serialVersionUID = 2807813948753461L;
64
65 private static final Logger logger = Logger.getLogger(ComponentRunner.class.getName());
66 private static final String NOTIFICATION_TYPE = "RunExecute";
67 private static final String COMPONENT_EVENT = "COMPONENT_EVENT";
68 private static final String EXECUTE_HANDBACK = "EXECUTE_HANDBACK";
69
70
71 private ComponentMetaInfo meta;
72 private Object component;
73 private ObjectName objectName;
74 private transient Method unload;
75 private transient Method execute;
76 private transient Method notificationHandler;
77 private transient Timer timer;
78 private List<String> listenToRemote;
79
80 public ComponentRunner(ComponentMetaInfo meta) {
81 super();
82 this.meta = meta;
83 listenToRemote = new ArrayList<String>();
84 }
85
86 public Object start() {
87 Method init = null;
88 String domain = null;
89 try {
90
91 MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
92 ObjectName classLoaderName = new ObjectName("Classloaders:name=" + meta.getService().getName());
93 if(!mbeanServer.isRegistered(classLoaderName)) {
94 mbeanServer.registerMBean(meta.getService().getClassLoader(), classLoaderName);
95 }
96
97 Class<?> compClass = Class.forName(meta.getClassName(), true, meta.getService().getClassLoader());
98 component = compClass.newInstance();
99
100
101 domain = Config.get(Config.DEFAULT_COMPONENT_DOMAIN);
102 if(meta.getService().getName().startsWith(Config.BANG_BANG)) {
103 domain = Config.get(Config.DEFAULT_BANGBANG_DOMAIN);
104 }
105 objectName = DynamicProxyUtil.instrument(domain, meta.getName(), component, classLoaderName);
106
107
108 for(Method method : compClass.getMethods()) {
109 if(method.getParameterTypes().length == 0) {
110 if(method.isAnnotationPresent(Init.class)) {
111 if(init != null) {
112 logger.warning("Found more than one method in component " +
113 meta.getName() + " marked with Init annotation.");
114 }
115 init = method;
116 }
117 else if(method.isAnnotationPresent(Unload.class)) {
118 if(unload != null) {
119 logger.warning("Found more than one method in component " +
120 meta.getName() + " marked with Init annotation.");
121 }
122 unload = method;
123 }
124 else if(method.isAnnotationPresent(Execute.class)) {
125 execute = method;
126 timer = new Timer();
127 timer.addNotificationListener(this, this, EXECUTE_HANDBACK);
128 Execute ex = execute.getAnnotation(Execute.class);
129 timer.addNotification(NOTIFICATION_TYPE, "execute", null,
130 Calendar.getInstance().getTime(), (int) (ex.value() * 1000));
131 }
132 }
133 else {
134 if(method.isAnnotationPresent(Init.class)) {
135 logger.severe("Method marked with @Init can not take arguments");
136 }
137 else if(method.isAnnotationPresent(Unload.class)) {
138 logger.severe("Method marked with @Unload can not take arguments");
139 }
140 else if(method.isAnnotationPresent(Execute.class)) {
141 logger.severe("Method marked with @Exceute can not take arguments");
142 }
143
144 if(method.getParameterTypes().length == 1 &&
145 Notification.class.isAssignableFrom(method.getParameterTypes()[0]) &&
146 method.isAnnotationPresent(NotificationHandler.class)) {
147 notificationHandler = method;
148 }
149
150 else if(method.getParameterTypes().length == 1 && method.getName().startsWith("set")) {
151 for(AttributeValue att : meta.getAttributeValues()) {
152 if(method.getName().substring(3).equalsIgnoreCase(att.getName())) {
153 try {
154 Object[] types = method.getParameterTypes();
155 Object value = null;
156
157 if(types[0].toString().equals("int")) {
158 value = Integer.parseInt(att.getValue());
159 }
160 else if(types[0].toString().equals("short")) {
161 value = Short.parseShort(att.getValue());
162 }
163 else if(types[0].toString().equals("long")) {
164 value = Long.parseLong(att.getValue());
165 }
166 else if(types[0].toString().equals("double")) {
167 value = Double.parseDouble(att.getValue());
168 }
169 else if(types[0].toString().equals("byte")) {
170 value = Byte.parseByte(att.getValue());
171 }
172 else if(types[0].toString().equals("character")) {
173 value = att.getValue().charAt(0);
174 }
175 else if(types[0].equals(String.class)) {
176 value = att.getValue();
177 }
178 else if(types[0].equals(Integer.class)) {
179 value = Integer.valueOf(Integer.parseInt(att
180 .getValue()));
181 }
182 else if(types[0].equals(Short.class)) {
183 value = Short.valueOf(Short
184 .parseShort(att.getValue()));
185 }
186 else if(types[0].equals(Long.class)) {
187 value = Long.valueOf(Long.parseLong(att.getValue()));
188 }
189 else if(types[0].equals(Double.class)) {
190 value = Double.valueOf(Double.parseDouble(att
191 .getValue()));
192 }
193 else if(types[0].equals(Byte.class)) {
194 value = Byte.valueOf(Byte.parseByte(att.getValue()));
195 }
196 else if(types[0].equals(Character.class)) {
197 value = Character.valueOf(att.getValue().charAt(0));
198 }
199 else {
200 logger.warning("Unknown type: " + types[0].toString());
201 }
202 method.invoke(component, value);
203 }
204 catch(InvocationTargetException e) {
205 logger.log(Level.SEVERE, "Unable to set attribute "
206 + att.getName()
207 + ", check log for exception ", e);
208 }
209 }
210 }
211 }
212 }
213
214 }
215
216
217 for(Field field : component.getClass().getFields()) {
218 if(field.isAnnotationPresent(argos.annotation.ServiceClassLoader.class)) {
219 try {
220 field.set(component, meta.getService().getClassLoader());
221 }
222 catch(Exception e) {
223 logger.log(Level.SEVERE, "Unable to set classloader for class "
224 + component.getClass().getName(), e);
225 }
226 }
227 else if(field.isAnnotationPresent(ServiceMeta.class)) {
228 try {
229 field.set(component, meta.getService());
230 }
231 catch(Exception e) {
232 logger.log(Level.SEVERE, "Unable to set service meta for class "
233 + component.getClass().getName(), e);
234 }
235 }
236 else if(field.isAnnotationPresent(ComponentMeta.class)) {
237 try {
238 field.set(component, meta);
239 }
240 catch(Exception e) {
241 logger.log(Level.SEVERE, "Unable to set component meta for class "
242 + component.getClass().getName(), e);
243 }
244 }
245 else if(field.isAnnotationPresent(Component.class)) {
246 try {
247 Component annontation = field.getAnnotation(Component.class);
248 field.set(component,
249 NamingService.getInstance().getComponentByName(annontation.value()));
250 }
251 catch(Exception e) {
252 logger.log(Level.SEVERE, "Unable to set component in class "
253 + component.getClass().getName(), e);
254 }
255 }
256 }
257
258 meta.setListenerProxy(this);
259 meta.setMbeanName(domain + ":name=" + meta.getName());
260
261
262 if(init != null) {
263 try {
264 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
265 Thread.currentThread().setContextClassLoader(meta.getService().getClassLoader());
266 init.invoke(component);
267 Thread.currentThread().setContextClassLoader(classLoader);
268 }
269 catch(Exception e) {
270 logger.log(Level.SEVERE, "Unable to run init method on component " +
271 meta.getName() + ": " + e.getMessage(), e);
272 removeMbeanIgnore();
273 return null;
274 }
275 }
276
277
278 for(String comp : meta.getListenTo()) {
279 Object o;
280 if(comp.startsWith(Config.BANG_BANG)) {
281 o = NamingService.getInstance().getBangBang(comp);
282 }
283 else {
284 o = NamingService.getInstance().getComponentByName(comp);
285 }
286
287 if(o == null) {
288 logger.severe(meta.getName() + " can not listen to " + comp
289 + " as it doesnt exists.");
290 }
291 else {
292 try {
293 NotificationProxy proxy = NamingService.getInstance().getNotificationProxy(o);
294 if(proxy == null) {
295 logger.severe("Unable to find NotificationProxy for " + comp);
296 }
297 else {
298 proxy.addNotificationListener(this, this, COMPONENT_EVENT);
299 }
300
301 }
302 catch(ClassCastException e) {
303 logger.severe(meta.getName() + " can not listen to " + comp
304 + " as it doesnt implement NotificationEmitter.");
305 stop();
306 }
307 }
308 }
309
310
311 Map<String, JMXConnector> map = new HashMap<String, JMXConnector>();
312 List<String> list = meta.getListenToRemote();
313 for(int i = 0; i < list.size(); i +=2) {
314 String url = list.get(i);
315 String comp = list.get(i + 1);
316 JMXConnector connector = map.get(url);
317 try {
318 MBeanServerConnection server;
319 if(connector == null) {
320 connector = JMXConnectorFactory.connect(new JMXServiceURL(url));
321 map.put(url, connector);
322 server = connector.getMBeanServerConnection();
323 }
324 else {
325 server = connector.getMBeanServerConnection();
326 }
327 server.addNotificationListener(new ObjectName(comp), this, null, COMPONENT_EVENT);
328 }
329 catch(InstanceNotFoundException e) {
330 logger.severe("Unable to remote listen to " + comp + " on url " + url);
331 }
332 catch(MalformedURLException e) {
333 logger.severe("Unable to remote listen to " + comp + " url is malformed " + url + ": " + e.getMessage());
334 }
335 catch(IOException e) {
336 logger.warning("Unable to connect to " + url + ", trying later.");
337 listenToRemote.add(url);
338 listenToRemote.add(comp);
339 }
340 catch(MalformedObjectNameException e) {
341 logger.severe("Malformed object name: " + comp + ": " + e.getMessage());
342 }
343 }
344 if(!listenToRemote.isEmpty()) {
345 Worker worker = new Worker(this);
346 worker.start();
347 logger.info("Started thread for adding remote listeners when they becomme availible.");
348 }
349
350
351 if(timer != null) {
352 timer.start();
353 }
354
355
356 logger.info("Component " + meta.getName() + " has been started.");
357 }
358 catch(ClassNotFoundException e) {
359 logger.severe("Unable to start component " + meta.getName()
360 + " due to ClassNotFoundException");
361 removeMbeanIgnore();
362 return null;
363 }
364 catch(IllegalAccessException e2) {
365 logger.severe("Unable to start component " + meta.getName()
366 + " due to IllegalAccessException: " + e2.getMessage());
367 removeMbeanIgnore();
368 return null;
369 }
370 catch(Exception e3) {
371 logger.log(Level.SEVERE, "Unable to start component " + meta.getName()
372 + " due to Exception: " + e3.getMessage(), e3);
373 removeMbeanIgnore();
374 return null;
375 }
376
377 return component;
378 }
379
380 private void removeMbeanIgnore() {
381 try {
382 ManagementFactory.getPlatformMBeanServer().unregisterMBean(objectName);
383 }
384 catch(Exception e) {
385 logger.log(Level.FINE, "Ignoring Exception", e);
386 }
387 }
388
389 public void stop() {
390 if(timer != null) {
391 timer.stop();
392 }
393 if(unload != null) {
394 try {
395 unload.invoke(component);
396 }
397 catch(Exception e) {
398 logger.log(Level.SEVERE, "Unable to run unload method check log.", e);
399 }
400 }
401 try {
402 ManagementFactory.getPlatformMBeanServer().unregisterMBean(objectName);
403 }
404 catch(Exception e) {
405 logger.log(Level.SEVERE, "Unable to unregister mbean " + objectName, e);
406 }
407 logger.info("Stopped component " + meta.getName());
408 }
409
410 public boolean isNotificationEnabled(Notification not) {
411 return true;
412 }
413
414 public void handleNotification(Notification not, Object handback) {
415 if(EXECUTE_HANDBACK.equals(handback)){
416 try {
417 execute.invoke(component);
418 }
419 catch(Exception e) {
420 logger.log(Level.SEVERE, "Component " + meta.getName() + ": Unable to call method marked with "
421 + "annotation execute: " + e.getMessage(), e);
422 }
423 }
424 else {
425 try {
426 Class<?> to = Class.forName(notificationHandler.getParameterTypes()[0].getName(), true, meta.getService().getClassLoader());
427 Object o = to.cast(not);
428 notificationHandler.invoke(component, o);
429 }
430 catch(Exception e) {
431 logger.log(Level.SEVERE, "Component " + meta.getName() + ": Unable to call method marked with "
432 + "annotation NotificationHandler: " + e.getMessage(), e);
433 }
434 }
435 }
436
437 public ComponentMetaInfo getMeta() {
438 return meta;
439 }
440
441 class Worker extends Thread {
442 private ComponentRunner runner;
443 public Worker(ComponentRunner runner) {
444 super("Worker");
445 this.runner = runner;
446 }
447
448 @Override
449 public void run() {
450 while(!listenToRemote.isEmpty()) {
451 Map<String, JMXConnector> map = new HashMap<String, JMXConnector>();
452 List<String> list = meta.getListenToRemote();
453 for(int i = 0; i < list.size(); i +=2) {
454 String url = list.get(i);
455 String comp = list.get(i + 1);
456 JMXConnector connector = map.get(url);
457 try {
458 MBeanServerConnection server;
459 if(connector == null) {
460 connector = JMXConnectorFactory.connect(new JMXServiceURL(url));
461 map.put(url, connector);
462 server = connector.getMBeanServerConnection();
463 }
464 else {
465 server = connector.getMBeanServerConnection();
466 }
467 server.addNotificationListener(new ObjectName(comp), runner, runner, COMPONENT_EVENT);
468 listenToRemote.remove(i);
469 listenToRemote.remove(i + 1);
470 i -= 2;
471 }
472 catch(MalformedURLException e) {
473 logger.severe("Unable to remote listen to " + comp + " on url " + url);
474 }
475 catch(MalformedObjectNameException e) {
476 logger.severe("Unable to remote listen to " + comp + " on url " + url);
477 }
478 catch(IOException e) {
479 logger.severe("Unable to remote listen to " + comp + " on url " + url);
480 }
481 catch(InstanceNotFoundException e) {
482 logger.severe("Unable to remote listen to " + comp + " on url " + url);
483 }
484 }
485 for(JMXConnector connector : map.values()) {
486 try {
487 connector.close();
488 }
489 catch(IOException e) {
490 logger.log(Level.FINE, "Error while disconnecting from jmx connector", e);
491 }
492 }
493 }
494 try {
495 sleep(60 * 1000);
496 }
497 catch(InterruptedException ignore) {}
498 }
499 }
500 }