1 package cz.cuni.amis.pogamut.base.agent.module.comm;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.HashSet;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Set;
9 import java.util.concurrent.ConcurrentLinkedQueue;
10 import java.util.concurrent.LinkedBlockingQueue;
11 import java.util.concurrent.ThreadPoolExecutor;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.locks.Lock;
14 import java.util.concurrent.locks.ReadWriteLock;
15 import java.util.concurrent.locks.ReentrantReadWriteLock;
16 import java.util.logging.Level;
17 import java.util.logging.Logger;
18
19 import cz.cuni.amis.pogamut.base.agent.IAgentId;
20 import cz.cuni.amis.pogamut.base.agent.IObservingAgent;
21 import cz.cuni.amis.pogamut.base.agent.impl.AbstractAgent;
22 import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEvent;
23 import cz.cuni.amis.pogamut.base.communication.worldview.IWorldView;
24 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
25 import cz.cuni.amis.pogamut.base.utils.PogamutPlatform;
26 import cz.cuni.amis.pogamut.base.utils.logging.LogCategory;
27 import cz.cuni.amis.utils.ExceptionToString;
28 import cz.cuni.amis.utils.Tuple2;
29 import cz.cuni.amis.utils.maps.HashMapSet;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 public class PogamutJVMComm {
48
49 protected static Object instanceMutex = new Object();
50
51 protected static PogamutJVMComm instance;
52
53 protected static ConcurrentLinkedQueue<PogamutJVMComm> comms = new ConcurrentLinkedQueue<PogamutJVMComm>();
54
55
56
57
58
59 public static PogamutJVMComm getInstance() {
60 if (instance == null) {
61 synchronized(instanceMutex) {
62 if (instance == null) {
63 instance = new PogamutJVMComm();
64 ((LogCategory)instance.getLog()).addConsoleHandler();
65 instance.getLog().setLevel(Level.INFO);
66 }
67 }
68 }
69 return instance;
70 }
71
72 public static void platformClose() {
73 while(comms.size() > 0) {
74 comms.poll().destroy();
75 }
76 }
77
78
79
80
81 public static final int ALL_CHANNELS = -1;
82
83
84
85
86 protected Map<IAgentId, Tuple2<IObservingAgent, Integer>> registeredAgents = new HashMap<IAgentId, Tuple2<IObservingAgent, Integer>>();
87
88
89
90
91 protected Set<IObservingAgent> allChannels = new HashSet<IObservingAgent>();
92
93
94
95
96 protected HashMapSet<Integer, IObservingAgent> channels = new HashMapSet<Integer, IObservingAgent>();
97
98
99
100
101 protected ReadWriteLock lock = new ReentrantReadWriteLock(true);
102
103 protected Lock readLock = lock.readLock();
104
105 protected Lock writeLock = lock.writeLock();
106
107 protected Logger log;
108
109 public PogamutJVMComm() {
110 this(null);
111 }
112
113
114
115
116 public PogamutJVMComm(Logger log) {
117 this.log = log;
118 synchronized(comms) {
119 if (this.log == null) {
120 this.log = new LogCategory("AgentJVMComm" + comms.size());
121 }
122 comms.add(this);
123 }
124 }
125
126 public Logger getLog() {
127 return log;
128 }
129
130
131
132
133
134
135
136
137
138
139 protected int incRegisteredAgent(IObservingAgent agent) {
140 synchronized(registeredAgents) {
141 Tuple2<IObservingAgent, Integer> record = registeredAgents.get(agent.getComponentId());
142 if (record == null) {
143 record = new Tuple2<IObservingAgent, Integer>(agent, 0);
144 registeredAgents.put(agent.getComponentId(), record);
145 }
146
147 if (record.getFirst() != agent) {
148 throw new RuntimeException("agent.getComponentId() clash! Under " + agent.getComponentId() + " is registered agent " + record.getFirst() + " NOT AGENT " + agent + ".");
149 }
150 record.setSecond(record.getSecond()+1);
151 return record.getSecond();
152 }
153 }
154
155
156
157
158
159 protected int decRegisteredAgent(IObservingAgent agent) {
160 synchronized(registeredAgents) {
161 Tuple2<IObservingAgent, Integer> record = registeredAgents.get(agent.getComponentId());
162 if (record == null) {
163 throw new RuntimeException("Attempt to decrease registration count for agent that is not registered, agent.getComponentId() == " + agent.getComponentId() + ".");
164 }
165 record.setSecond(record.getSecond()-1);
166 if (record.getSecond() == 0) {
167 registeredAgents.remove(agent.getComponentId());
168 }
169 return record.getSecond();
170 }
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185 public void registerAgent(IObservingAgent agent, int channel) {
186 if (channel < 0 && channel != ALL_CHANNELS) {
187 throw new RuntimeException("channel == " + channel + " < 0, INVALID");
188 }
189 if (writeLock.tryLock()) {
190 try {
191 registerAgentUnsyncImpl(agent, channel);
192 } finally {
193 writeLock.unlock();
194 }
195 } else {
196 execute(new RegisterAgent(agent, channel), true);
197 }
198 }
199
200 protected void registerAgentSyncImpl(IObservingAgent agent, int channel) {
201 writeLock.lock();
202 try {
203 registerAgentUnsyncImpl(agent, channel);
204 } finally {
205 writeLock.unlock();
206 }
207 }
208
209 protected void registerAgentUnsyncImpl(IObservingAgent agent, int channel) {
210 if (channel == ALL_CHANNELS) {
211
212 if (allChannels.contains(agent)) {
213
214 if (log != null && log.isLoggable(Level.WARNING)) log.warning("Agent " + agent.getComponentId() + " is already registered for ALL_CHANNELS (ignoring this request).");
215 return;
216 }
217 allChannels.add(agent);
218 } else {
219 if (channels.get(channel).contains(agent)) {
220
221 if (log != null && log.isLoggable(Level.WARNING)) log.warning("Agent " + agent.getComponentId() + " is already registered for channel " + channel + " (ignoring this request).");
222 return;
223 }
224 channels.add(channel, agent);
225 }
226 int registerCount = incRegisteredAgent(agent);
227 if (log != null && log.isLoggable(Level.INFO)) log.info("Registered " + agent.getComponentId() + " for " + (channel == ALL_CHANNELS ? "ALL_CHANNELS" : "channel " + channel) + ". Agent is registered " + registerCount + "x (in total).");
228 }
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245 public boolean isAgentRegistered(IObservingAgent agent, int channel) {
246 if (channel < 0 && channel != ALL_CHANNELS) {
247 throw new RuntimeException("channel == " + channel + " < 0, INVALID");
248 }
249 readLock.lock();
250 try {
251 if (channel == ALL_CHANNELS) {
252 return allChannels.contains(agent);
253 } else {
254 return getChannel(channel).contains(agent);
255 }
256 } finally {
257 readLock.unlock();
258 }
259 }
260
261 protected Set<IObservingAgent> getChannel(int channel) {
262 if (channel == ALL_CHANNELS) {
263 return allChannels;
264 } else {
265 if (channels.containsKey(channel)) {
266 return channels.get(channel);
267 }
268 synchronized(channels) {
269 return channels.get(channel);
270 }
271 }
272 }
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 public void unregisterAgent(IObservingAgent agent, int channel) {
288 if (channel < 0 && channel != ALL_CHANNELS) {
289 throw new RuntimeException("channel == " + channel + " < 0, INVALID");
290 }
291 if (writeLock.tryLock()) {
292 try {
293 unregisterAgentUnsyncImpl(agent, channel);
294 } finally {
295 writeLock.unlock();
296 }
297 } else {
298 execute(new UnregisterAgentFromChannel(agent, channel), false);
299 }
300 }
301
302 protected void unregisterAgentSyncImpl(IObservingAgent agent, int channel) {
303 writeLock.lock();
304 try {
305 unregisterAgentUnsyncImpl(agent, channel);
306 } finally {
307 writeLock.unlock();
308 }
309 }
310
311 protected void unregisterAgentUnsyncImpl(IObservingAgent agent, int channel) {
312 if (channel == ALL_CHANNELS) {
313 if (allChannels.remove(agent)) {
314 int registerCount = decRegisteredAgent(agent);
315 if (log != null && log.isLoggable(Level.INFO)) log.info("UNregistered " + agent.getComponentId() + " from ALL_CHANNELS. " + (registerCount == 0 ? "Agent is now fully UNREGISTERED." : "Agent remains registered for other channels (" + registerCount + "x in total)."));
316 if (registeredAgents.size() == 0) {
317 shutdown(false);
318 }
319 }
320 } else {
321 if (getChannel(channel).remove(agent)) {
322 int registerCount = decRegisteredAgent(agent);
323 if (log != null && log.isLoggable(Level.INFO)) log.info("UNregistered " + agent.getComponentId() + " from channel " + channel + ". " + (registerCount == 0 ? "Agent is now fully UNREGISTERED." : "Agent remains registered for other channels (" + registerCount + "x in total)."));
324 if (registeredAgents.size() == 0) {
325 shutdown(false);
326 }
327 }
328 }
329 }
330
331
332
333
334
335
336
337
338
339 public void unregisterAgent(IObservingAgent agent) {
340 if (writeLock.tryLock()) {
341 try {
342 unregisterAgentUnsyncImpl(agent);
343 } finally {
344 writeLock.unlock();
345 }
346 } else {
347 execute(new UnregisterAgent(agent), false);
348 }
349
350 }
351
352 protected void unregisterAgentSyncImpl(IObservingAgent agent) {
353 writeLock.lock();
354 try {
355 unregisterAgentUnsyncImpl(agent);
356 } finally {
357 writeLock.unlock();
358 }
359 }
360
361 protected void unregisterAgentUnsyncImpl(IObservingAgent agent) {
362 synchronized(channels) {
363 for (Integer channel : channels.keySet()) {
364 unregisterAgentUnsyncImpl(agent, channel);
365 }
366 }
367 unregisterAgentUnsyncImpl(agent, ALL_CHANNELS);
368 }
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385 public void send(IWorldChangeEvent event, int channel) {
386 execute(new Send(event, channel), false);
387 }
388
389
390
391
392
393
394
395
396
397 public void sendToOthers(IWorldChangeEvent event, int channel, IObservingAgent sender) {
398 execute(new SendToOthers(event, channel, sender), false);
399 }
400
401 protected void sendSyncImpl(IWorldChangeEvent event, int channel) {
402 readLock.lock();
403 try {
404 sendUnsyncImpl(event, channel);
405 } finally {
406 readLock.unlock();
407 }
408 }
409
410 protected void sendToOthersSyncImpl(IWorldChangeEvent event, int channel, IObservingAgent sender) {
411 readLock.lock();
412 try {
413 sendToOthersUnsyncImpl(event, channel, sender);
414 } finally {
415 readLock.unlock();
416 }
417 }
418
419 protected void sendUnsyncImpl(IWorldChangeEvent event, int channel) {
420 if (channel == ALL_CHANNELS) {
421 broadcastUnsyncImpl(event);
422 } else {
423 for (IObservingAgent agent : getChannel(channel)) {
424 sendToAgentUnsyncImpl(agent, event);
425 }
426 for (IObservingAgent agent : allChannels) {
427 sendToAgentUnsyncImpl(agent, event);
428 }
429 }
430 }
431
432 protected void sendToOthersUnsyncImpl(IWorldChangeEvent event, int channel, IObservingAgent sender) {
433 if (channel == ALL_CHANNELS) {
434 broadcastToOthersUnsyncImpl(event, sender);
435 } else {
436 for (IObservingAgent agent : getChannel(channel)) {
437 if (agent != sender) {
438 sendToAgentUnsyncImpl(agent, event);
439 }
440 }
441 for (IObservingAgent agent : allChannels) {
442 if (agent != sender) {
443 sendToAgentUnsyncImpl(agent, event);
444 }
445 }
446 }
447 }
448
449 protected void sendToAgentUnsyncImpl(IObservingAgent agent, IWorldChangeEvent event) {
450 try {
451 if (log != null && log.isLoggable(Level.FINE))log.fine(event + " -> " + agent.getComponentId());
452 agent.getWorldView().notify(event);
453 } catch (ComponentNotRunningException e1) {
454
455 if (log != null && log.isLoggable(Level.WARNING)) log.warning("Agent " + agent.getComponentId() + " is not running, did not receive: " + event);
456 } catch (Exception e2) {
457 if (log != null && log.isLoggable(Level.WARNING)) log.warning(ExceptionToString.process("Agent " + agent.getComponentId() + " failed to process " + event + ".", e2));
458 }
459 }
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475 public void broadcast(IWorldChangeEvent event) {
476 execute(new Broadcast(event), false);
477 }
478
479
480
481
482
483
484
485
486
487 public void broadcastToOthers(IWorldChangeEvent event, IObservingAgent sender) {
488 if (sender == null) {
489 if (log != null && log.isLoggable(Level.WARNING)) log.warning("broadcast(event, null) called, sender unspecified");
490 broadcast(event);
491 } else {
492 execute(new BroadcastToOthers(event, sender), false);
493 }
494 }
495
496 protected void broadcastSyncImpl(IWorldChangeEvent event) {
497 readLock.lock();
498 try {
499 broadcastUnsyncImpl(event);
500 } finally {
501 readLock.unlock();
502 }
503 }
504
505 protected void broadcastToOthersSyncImpl(IWorldChangeEvent event, IObservingAgent sender) {
506 readLock.lock();
507 try {
508 broadcastToOthersUnsyncImpl(event, sender);
509 } finally {
510 readLock.unlock();
511 }
512 }
513
514 protected void broadcastUnsyncImpl(IWorldChangeEvent event) {
515 List<Integer> existingChannels;
516 synchronized(channels) {
517 existingChannels = new ArrayList<Integer>(channels.keySet());
518 }
519 for (Integer channel : existingChannels) {
520 for (IObservingAgent agent : getChannel(channel)) {
521 sendToAgentUnsyncImpl(agent, event);
522 }
523 }
524 for (IObservingAgent agent : allChannels) {
525 sendToAgentUnsyncImpl(agent, event);
526 }
527 }
528
529 protected void broadcastToOthersUnsyncImpl(IWorldChangeEvent event, IObservingAgent sender) {
530 List<Integer> existingChannels;
531 synchronized(channels) {
532 existingChannels = new ArrayList<Integer>(channels.keySet());
533 }
534 for (Integer channel : existingChannels) {
535 for (IObservingAgent agent : getChannel(channel)) {
536 if (agent != sender) {
537 sendToAgentUnsyncImpl(agent, event);
538 }
539 }
540 }
541 for (IObservingAgent agent : allChannels) {
542 if (agent != sender) {
543 sendToAgentUnsyncImpl(agent, event);
544 }
545 }
546 }
547
548
549
550
551
552
553
554
555
556
557
558
559 public void destroy() {
560 try {
561 try {
562 shutdown(true);
563 } finally {
564 try {
565 synchronized(channels) {
566 channels.clear();
567 }
568 } finally {
569 try {
570 synchronized(allChannels) {
571 allChannels.clear();
572 }
573 } finally {
574 synchronized(registeredAgents) {
575 registeredAgents.clear();
576 }
577 }
578 }
579 }
580 } catch (Exception e) {
581 if (log != null && log.isLoggable(Level.WARNING)) log.warning(ExceptionToString.process("Failed to fully PogamutJVMComm.destroy().", e));
582 }
583 }
584
585
586
587
588
589 protected Object executorMutex = new Object();
590
591 protected ThreadPoolExecutor executor = null;
592
593 protected Object numberOfRegisterAgentPendingMutex = new Object();
594
595 protected int numberOfRegisterAgentPending = 0;
596
597 protected void execute(Runnable job, boolean forceStart) {
598 if (executor == null) {
599 synchronized(executorMutex) {
600 if (executor == null) {
601 if (forceStart || registeredAgents.size() > 0) {
602 if (log != null && log.isLoggable(Level.INFO)) log.info("Starting thread pool executor.");
603 executor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
604 }
605 }
606 }
607 }
608 if (executor == null) return;
609 executor.execute(job);
610 }
611
612 protected void shutdown(boolean forced) {
613 if (executor != null) {
614 synchronized(executorMutex) {
615 if (executor != null) {
616 if (!forced) {
617 synchronized(numberOfRegisterAgentPendingMutex) {
618 if (numberOfRegisterAgentPending > 0) {
619 if (log != null && log.isLoggable(Level.INFO)) log.info("Won't shutdown thread pool executor, there are unprocessed agent-registration jobs in queue and shutdown is NOT forced.");
620 return;
621 }
622 }
623 }
624 if (log != null && log.isLoggable(Level.INFO)) log.info("Shutting down thread pool executor.");
625 try {
626 executor.shutdownNow();
627 } catch (Exception e) {
628 if (log != null && log.isLoggable(Level.WARNING)) log.warning(ExceptionToString.process("Error shutting down thread pool executor.", e));
629 }
630 executor = null;
631 }
632 }
633 }
634 }
635
636 protected class RegisterAgent implements Runnable {
637
638 protected IObservingAgent agent;
639 protected int channel;
640
641 public RegisterAgent(IObservingAgent agent, int channel) {
642 synchronized(numberOfRegisterAgentPendingMutex) {
643 ++numberOfRegisterAgentPending;
644 }
645 this.agent = agent;
646 this.channel = channel;
647 }
648
649 @Override
650 public void run() {
651 try {
652 registerAgentSyncImpl(agent, channel);
653 } finally {
654 synchronized(numberOfRegisterAgentPendingMutex) {
655 --numberOfRegisterAgentPending;
656 }
657 }
658 }
659
660 }
661
662 protected class UnregisterAgentFromChannel implements Runnable {
663
664 protected IObservingAgent agent;
665 protected int channel;
666
667 public UnregisterAgentFromChannel(IObservingAgent agent, int channel) {
668 this.agent = agent;
669 this.channel = channel;
670 }
671
672 @Override
673 public void run() {
674 unregisterAgentSyncImpl(agent, channel);
675 }
676
677 }
678
679 protected class UnregisterAgent implements Runnable {
680
681 protected IObservingAgent agent;
682
683 public UnregisterAgent(IObservingAgent agent) {
684 this.agent = agent;
685 }
686
687 @Override
688 public void run() {
689 unregisterAgentSyncImpl(agent);
690 }
691
692 }
693
694 protected class Send implements Runnable {
695
696 protected IWorldChangeEvent event;
697 protected int channel;
698
699 public Send(IWorldChangeEvent event, int channel) {
700 this.event = event;
701 this.channel = channel;
702 }
703
704 @Override
705 public void run() {
706 sendSyncImpl(event, channel);
707 }
708
709 }
710
711 protected class SendToOthers implements Runnable {
712
713 protected IWorldChangeEvent event;
714 protected int channel;
715 protected IObservingAgent sender;
716
717 public SendToOthers(IWorldChangeEvent event, int channel, IObservingAgent sender) {
718 this.event = event;
719 this.channel = channel;
720 this.sender = sender;
721 }
722
723 @Override
724 public void run() {
725 sendToOthersSyncImpl(event, channel, sender);
726 }
727
728 }
729
730 protected class Broadcast implements Runnable {
731
732 protected IWorldChangeEvent event;
733
734 public Broadcast(IWorldChangeEvent event) {
735 this.event = event;
736 }
737
738 @Override
739 public void run() {
740 broadcastSyncImpl(event);
741 }
742
743 }
744
745 protected class BroadcastToOthers implements Runnable {
746
747 protected IWorldChangeEvent event;
748 protected IObservingAgent sender;
749
750 public BroadcastToOthers(IWorldChangeEvent event, IObservingAgent sender) {
751 this.event = event;
752 this.sender = sender;
753 }
754
755 @Override
756 public void run() {
757 broadcastToOthersSyncImpl(event, sender);
758 }
759
760 }
761
762 }