1 package cz.cuni.amis.pogamut.base.communication.mediator.impl;
2
3 import java.util.concurrent.CountDownLatch;
4 import java.util.logging.Level;
5 import java.util.logging.Logger;
6
7 import com.google.inject.Inject;
8
9 import cz.cuni.amis.pogamut.base.agent.IAgentId;
10 import cz.cuni.amis.pogamut.base.communication.exception.CommunicationException;
11 import cz.cuni.amis.pogamut.base.communication.mediator.IMediator;
12 import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEvent;
13 import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEventOutput;
14 import cz.cuni.amis.pogamut.base.communication.worldview.IWorldChangeEventInput;
15 import cz.cuni.amis.pogamut.base.component.bus.IComponentBus;
16 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
17 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
18 import cz.cuni.amis.pogamut.base.component.controller.ComponentControlHelper;
19 import cz.cuni.amis.pogamut.base.component.controller.ComponentController;
20 import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencyType;
21 import cz.cuni.amis.pogamut.base.component.controller.IComponentControlHelper;
22 import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
23 import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
24 import cz.cuni.amis.pogamut.base.utils.logging.LogCategory;
25 import cz.cuni.amis.utils.exception.PogamutException;
26 import cz.cuni.amis.utils.flag.Flag;
27 import cz.cuni.amis.utils.token.Token;
28 import cz.cuni.amis.utils.token.Tokens;
29
30
31
32
33
34
35
36
37
38
39
40 @AgentScoped
41 public class Mediator implements IMediator {
42
43 public static final Token COMPONENT_ID = Tokens.get("Mediator");
44
45
46
47
48 public static final String WORKER_THREAD_NAME_PREFIX = "MediatorWorker";
49
50
51
52
53
54
55 protected Worker worker = null;
56
57
58
59
60 protected Thread workerThread = null;
61
62
63
64
65 protected Object threadMutex = new Object();
66
67
68
69
70 private LogCategory log = null;
71
72
73
74
75 private IWorldChangeEventOutput producer;
76
77
78
79
80 private IWorldChangeEventInput consumer;
81
82 private ComponentController controller;
83
84 private IComponentBus eventBus;
85
86 private IAgentId agentId;
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 @Inject
104 public Mediator(IWorldChangeEventOutput producer, IComponentBus bus, IAgentLogger logger) {
105 this.agentId = logger.getAgentId();
106 this.log = logger.getCategory(getComponentId().getToken());
107 this.producer = producer;
108 this.eventBus = bus;
109 }
110
111 private IComponentControlHelper control = new ComponentControlHelper() {
112
113 @Override
114 public void stop() throws PogamutException {
115 Worker w = worker;
116 if (w != null) {
117 w.stop();
118 }
119 }
120
121 @Override
122 public void startPaused() {
123 start();
124 }
125
126 @Override
127 public void start() throws PogamutException {
128 if (workerThread != null) {
129 if (log.isLoggable(Level.WARNING)) log.warning("Mediator worker thread already exists, leaking resources?");
130 }
131 synchronized (threadMutex) {
132 if (log.isLoggable(Level.FINER)) log.finer("Starting mediator thread " + WORKER_THREAD_NAME_PREFIX + ".");
133 worker = new Worker();
134 workerThread = new Thread(worker, agentId.getName().getFlag() + " mediator");
135 workerThread.start();
136 }
137 }
138
139 @Override
140 public void kill() {
141 Worker w = worker;
142 if (w != null) w.kill();
143 Thread thread = workerThread;
144 if (thread != null) thread.interrupt();
145 worker = null;
146 workerThread = null;
147 }
148
149 public void pause() {
150 Worker w = worker;
151 if (w != null) {
152 w.pause();
153 }
154 }
155
156 public void resume() {
157 Worker w = worker;
158 if (w != null) {
159 w.resume();
160 }
161 }
162
163 @Override
164 public void reset() {
165 worker = null;
166 workerThread = null;
167 }
168
169 };
170
171 @Override
172 public Token getComponentId() {
173 return COMPONENT_ID;
174 }
175
176 public LogCategory getLog() {
177 return log;
178 }
179
180 @Override
181 public void setConsumer(IWorldChangeEventInput consumer) {
182 this.consumer = consumer;
183 this.controller = new ComponentController(this, control, eventBus, log, ComponentDependencyType.STARTS_AFTER, producer, consumer);
184 }
185
186 @Override
187 public String toString() {
188 if (this == null) return "Mediator";
189 return getClass().getSimpleName() + "[producer=" + producer + ", consumer=" + consumer +"]";
190 }
191
192
193 private class Worker implements Runnable {
194
195 private volatile CountDownLatch stopLatch = new CountDownLatch(1);
196
197
198
199
200 private volatile boolean shouldRun = true;
201
202 private volatile Flag<Boolean> shouldPause = new Flag<Boolean>(false);
203
204 private volatile boolean running = false;
205
206 private volatile boolean exceptionExpected = false;
207
208 private Thread myThread;
209
210 public void pause() {
211 shouldPause.setFlag(true);
212 }
213
214 public void resume() {
215 shouldPause.setFlag(false);
216 }
217
218
219
220
221 public void stop() {
222 this.shouldRun = false;
223 this.shouldPause.setFlag(false);
224 exceptionExpected = true;
225 myThread.interrupt();
226 }
227
228
229
230
231
232 public void kill() {
233 if (!running) return;
234 this.shouldRun = false;
235 this.shouldPause.setFlag(false);
236 try {
237 Thread.sleep(200);
238 } catch (InterruptedException e) {
239 }
240 exceptionExpected = true;
241 myThread.interrupt();
242 }
243
244
245
246
247
248
249 @Override
250 public void run() {
251 myThread = Thread.currentThread();
252
253
254 running = true;
255
256
257 logWorker(Level.INFO, "Started.");
258
259 try {
260 IWorldChangeEventInput currentConsumer;
261 IWorldChangeEventOutput currentProducer;
262 IWorldChangeEvent worldEvent = null;
263
264 while (shouldRun && !myThread.isInterrupted()) {
265
266 if (shouldPause.getFlag()) {
267 logWorker(Level.INFO, "Paused.");
268 shouldPause.waitFor(false);
269 logWorker(Level.INFO, "Resumed.");
270 }
271
272
273 if (!shouldRun || myThread.isInterrupted()) {
274 break;
275 }
276
277 currentConsumer = consumer;
278 if (currentConsumer == null) {
279 running = false;
280 if (!exceptionExpected) {
281 controller.fatalError("Event consumer lost (is null).");
282 }
283 break;
284 }
285
286 currentProducer = producer;
287 if (currentProducer == null) {
288 running = false;
289 if (!exceptionExpected) {
290 controller.fatalError("Event producer lost (is null).");
291 }
292 break;
293 }
294
295
296 if (worldEvent == null) {
297 try {
298
299 worldEvent = producer.getEvent();
300 logWorker(Level.FINEST, "received - " + String.valueOf(worldEvent), worldEvent);
301 } catch (ComponentPausedException cpe) {
302 logWorker(Level.INFO, "Producer is paused, pausing mediator.");
303 shouldPause.setFlag(true);
304 continue;
305 } catch (ComponentNotRunningException nre) {
306 logWorker(Level.WARNING, "Producer is not running, stopping the mediator worker.");
307 running = false;
308 break;
309 } catch (Exception ce) {
310 running = false;
311 if (!exceptionExpected) {
312 controller.fatalError(WORKER_THREAD_NAME_PREFIX + ": Producer exception.", ce);
313 } else {
314 logWorker(Level.FINE, "Producer exception expected, caught: " + ce);
315 }
316 break;
317 }
318 }
319
320
321 if (!shouldRun || myThread.isInterrupted()) {
322 break;
323 }
324
325
326 try {
327 currentConsumer.notify(worldEvent);
328 } catch (ComponentPausedException e) {
329 logWorker(Level.INFO, "Consumer is paused, pausing mediator.");
330 shouldPause.setFlag(true);
331 continue;
332 } catch (ComponentNotRunningException e) {
333 logWorker(Level.WARNING, "Consumer is not running, stopping mediator worker.");
334 running = false;
335 break;
336 } catch (Exception e) {
337 running = false;
338 if (!exceptionExpected) {
339 controller.fatalError(WORKER_THREAD_NAME_PREFIX + ": Consumer exception.", e);
340 } else {
341 logWorker(Level.FINE, "Consumer exception expected, caught: " + e);
342 }
343 break;
344 }
345
346 worldEvent = null;
347 }
348 } catch (Exception e) {
349 running = false;
350 if (!exceptionExpected) {
351 controller.fatalError(WORKER_THREAD_NAME_PREFIX + ": Exception.", e);
352 } else {
353 logWorker(Level.FINE, "Exception expected, caught: " + e);
354 }
355 }
356
357 try {
358 stopLatch.countDown();
359 } finally {
360
361 shouldRun = false;
362 running = false;
363
364 synchronized(threadMutex) {
365 if (workerThread == myThread) {
366 worker = null;
367 workerThread = null;
368 }
369 }
370
371 logWorker(Level.WARNING, "Stopped.");
372 }
373 }
374
375 private void logWorker(Level level, String message) {
376 log.log(level, WORKER_THREAD_NAME_PREFIX + ": " + message);
377 }
378
379 private void logWorker(Level level, String message, Object obj) {
380 if (obj == null)
381 log.log(level, WORKER_THREAD_NAME_PREFIX + ": " + message);
382 else
383 log.log(level, WORKER_THREAD_NAME_PREFIX + ": " + message, obj);
384 }
385
386 }
387
388 }