1 package cz.cuni.amis.pogamut.base.communication.connection.impl;
2
3 import java.io.Reader;
4 import java.io.Writer;
5 import java.util.logging.Level;
6
7 import cz.cuni.amis.pogamut.base.communication.connection.IWorldConnection;
8 import cz.cuni.amis.pogamut.base.communication.connection.IWorldConnectionAddress;
9 import cz.cuni.amis.pogamut.base.communication.connection.WorldReader;
10 import cz.cuni.amis.pogamut.base.communication.connection.WorldWriter;
11 import cz.cuni.amis.pogamut.base.communication.connection.exception.AlreadyConnectedException;
12 import cz.cuni.amis.pogamut.base.communication.connection.exception.ConnectionException;
13 import cz.cuni.amis.pogamut.base.component.IComponent;
14 import cz.cuni.amis.pogamut.base.component.bus.IComponentBus;
15 import cz.cuni.amis.pogamut.base.component.bus.event.IStoppedEvent;
16 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
17 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
18 import cz.cuni.amis.pogamut.base.component.controller.ComponentControlHelper;
19 import cz.cuni.amis.pogamut.base.component.controller.ComponentController;
20 import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencies;
21 import cz.cuni.amis.pogamut.base.component.controller.IComponentControlHelper;
22 import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
23 import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
24 import cz.cuni.amis.pogamut.base.utils.logging.LogCategory;
25 import cz.cuni.amis.utils.NullCheck;
26 import cz.cuni.amis.utils.StringCutter;
27 import cz.cuni.amis.utils.exception.PogamutIOException;
28 import cz.cuni.amis.utils.token.Token;
29 import cz.cuni.amis.utils.token.Tokens;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 @AgentScoped
66 public abstract class AbstractConnection<ADDRESS extends IWorldConnectionAddress> implements IWorldConnection<ADDRESS> {
67
68 public static final Token COMPONENT_ID = Tokens.get("Connection");
69
70 public static final String DEFAULT_LINE_END = "\r\n";
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 private int connectionToken = 0;
88
89
90
91
92 private Object mutex = new Object();
93
94
95
96
97 protected ADDRESS address = null;
98
99
100
101
102 private ConnectionWriter writer = new ConnectionWriter(this);
103
104
105
106
107 private ConnectionReader reader = new ConnectionReader(this);
108
109
110
111
112 protected LogCategory log = null;
113
114
115
116
117 protected IComponentBus eventBus;
118
119
120
121
122 protected ComponentController<IComponent> controller;
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 protected abstract void unsyncConnect(ADDRESS address) throws ConnectionException;
138
139
140
141
142
143
144 protected abstract void unsyncClose();
145
146
147
148
149
150
151
152
153 protected abstract Reader getConnectionReader() throws ConnectionException;
154
155
156
157
158
159
160
161
162 protected abstract Writer getConnectionWriter() throws ConnectionException;
163
164
165
166
167
168
169
170 public AbstractConnection(
171 ComponentDependencies dependencies,
172 IComponentBus bus,
173 IAgentLogger logger
174 ) {
175 this(null, dependencies, bus, logger);
176 }
177
178 public AbstractConnection(
179 ADDRESS address,
180 ComponentDependencies dependencies,
181 IComponentBus bus,
182 IAgentLogger logger
183 ) {
184 log = logger.getCategory(getComponentId().getToken());
185 NullCheck.check(this.log, "log initialization");
186 eventBus = bus;
187 NullCheck.check(this.eventBus, "eventBus");
188 controller = new ComponentController(this, control, eventBus, log, dependencies);
189 if (address != null) {
190 setAddress(address);
191 }
192 }
193
194
195
196
197
198
199
200 private IComponentControlHelper control = new ComponentControlHelper() {
201
202 @Override
203 public void stop() {
204 cleanUp();
205 }
206
207 @Override
208 public void startPaused() {
209 start();
210 }
211
212 @Override
213 public void start() {
214 synchronized(mutex) {
215 if (address == null) throw new ConnectionException("address is null, can't connect()", log, this);
216 if (log.isLoggable(Level.WARNING)) log.warning("Connecting to " + address + ".");
217 unsyncConnect(address);
218 }
219 }
220
221 @Override
222 public void kill() {
223 cleanUp();
224 }
225
226 @Override
227 public void reset() {
228 cleanUp();
229 }
230
231 private void cleanUp() {
232 synchronized(mutex) {
233 try {
234 reader.reader = null;
235 writer.writer = null;
236 unsyncClose();
237 } finally {
238 ++connectionToken;
239 }
240 }
241 }
242
243 };
244
245
246
247
248
249
250
251 @Override
252 public Token getComponentId() {
253 return COMPONENT_ID;
254 }
255
256 public LogCategory getLog() {
257 return log;
258 }
259
260 @Override
261 public void setAddress(ADDRESS address) throws ConnectionException {
262 synchronized(mutex) {
263 if (controller.isRunning()) throw new AlreadyConnectedException("Can't set address when connected.", log, this);
264 this.address = address;
265 }
266 }
267
268 @Override
269 public WorldWriter getWriter() throws ConnectionException {
270 return this.writer;
271 }
272
273 @Override
274 public WorldReader getReader() throws ConnectionException {
275 return this.reader;
276 }
277
278 @Override
279 public ADDRESS getAddress() {
280 return address;
281 }
282
283 @Override
284 public String toString() {
285 if (this != null) {
286 return this.getClass().getSimpleName() + "["+String.valueOf(address)+",connected:"+controller.isRunning()+"]";
287 } else {
288 return "AbstractConnection["+String.valueOf(address)+",connected:"+controller.isRunning()+")";
289 }
290 }
291
292 @Override
293 public void setLogMessages(boolean logMessages) {
294 this.reader.setLogMessages(logMessages);
295 this.writer.setLogMessages(logMessages);
296 }
297
298 public String getMessageEnd() {
299 return DEFAULT_LINE_END;
300 }
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319 private class ConnectionReader extends WorldReader {
320
321
322
323
324 private AbstractConnection<ADDRESS> owner = null;
325
326
327
328
329 private StringCutter line = new StringCutter(getMessageEnd());
330
331
332
333
334 private Reader reader = null;
335
336
337
338
339
340
341 private int currentConnectionToken = -1;
342
343
344
345
346 private Object logMessagesMutex = new Object();
347
348
349
350
351 private boolean logMessages = false;
352
353 public ConnectionReader(AbstractConnection<ADDRESS> owner) {
354 this.owner = owner;
355 }
356
357
358
359
360
361 public void setLogMessages(boolean state) {
362 synchronized(logMessagesMutex) {
363 if (logMessages == state) return;
364 logMessages = state;
365 if (logMessages) line.clear();
366 }
367 }
368
369 @Override
370 public void close() {
371 if (controller.isRunning()) {
372 this.owner.controller.manualStop("connection close() requested");
373 }
374 }
375
376 @Override
377 public boolean ready() throws PogamutIOException {
378 try {
379 if (!controller.isRunning()) return false;
380 Reader currentReader = this.getReader();
381 if (currentReader != null) return currentReader.ready();
382 } catch (Exception e) {
383 handleException(e);
384 }
385 return false;
386 }
387
388 @Override
389 public synchronized int read(char[] ac, int i, int j) throws ComponentNotRunningException, ComponentPausedException, PogamutIOException {
390 try {
391 if (controller.isPaused()) {
392 throw new ComponentPausedException(controller.getState().getFlag(), this);
393 }
394 if (!controller.isRunning()) {
395 throw new ComponentNotRunningException(controller.getState().getFlag(), this);
396 }
397 Reader currentReader = this.getReader();
398 if (currentReader == null) {
399 throw new PogamutIOException("inner reader of the connection is null, can't read", this);
400 }
401
402 int result = currentReader.read(ac, i, j);
403
404 synchronized(logMessagesMutex) {
405
406 if (logMessages){
407 String[] lines = line.add(new String(ac, i, result));
408 for (int index = 0; index < lines.length; ++index) {
409 if (log.isLoggable(Level.INFO)) log.info("Message read: " + lines[index]);
410 }
411 return result;
412 } else {
413 return result;
414 }
415 }
416
417 } catch (Exception e) {
418 handleException(e);
419 return 0;
420 }
421 }
422
423
424
425
426
427
428
429 private Reader getReader() throws PogamutIOException {
430 synchronized(mutex) {
431 if (currentConnectionToken != connectionToken || this.reader == null) {
432 currentConnectionToken = connectionToken;
433 line.clear();
434 this.reader = getConnectionReader();
435 }
436 return this.reader;
437 }
438 }
439
440 private void handleException(Throwable e) throws PogamutIOException {
441 if (e instanceof PogamutIOException) throw (PogamutIOException)e;
442 if (e instanceof ComponentPausedException) throw (ComponentPausedException)e;
443 if (e instanceof ComponentNotRunningException) throw (ComponentNotRunningException)e;
444 if (!controller.isRunning()) throw new ComponentNotRunningException(controller.getState().getFlag(), this);
445 throw new PogamutIOException(e, this);
446 }
447
448 public String toString() {
449 return AbstractConnection.this.getClass().getSimpleName() + "-Reader";
450 }
451
452 }
453
454
455
456
457
458
459
460
461 private class ConnectionWriter extends WorldWriter {
462
463
464
465
466 private AbstractConnection<ADDRESS> owner = null;
467
468
469
470
471 private StringCutter line = new StringCutter(getMessageEnd());
472
473
474
475
476 private Writer writer = null;
477
478
479
480
481
482
483 private int currentConnectionToken = -1;
484
485
486
487
488 private Object logMessagesMutex = new Object();
489
490
491
492
493 private boolean logMessages = false;
494
495 public ConnectionWriter(AbstractConnection<ADDRESS> owner) {
496 this.owner = owner;
497 }
498
499
500
501
502
503 public void setLogMessages(boolean state) {
504 synchronized(logMessagesMutex) {
505 if (logMessages == state) return;
506 logMessages = state;
507 if (logMessages) line.clear();
508 }
509 }
510
511 @Override
512 public void close() {
513 if (controller.isRunning()) {
514 controller.manualStop("connection close() requested");
515 }
516 }
517
518 @Override
519 public void flush() throws PogamutIOException {
520 try {
521 Writer currentWriter = getWriter();
522 if (currentWriter != null) currentWriter.flush();
523 } catch (Exception e) {
524 handleException(e);
525 }
526 }
527
528 @Override
529 public boolean ready() throws PogamutIOException {
530 try {
531 if (!controller.isRunning()) return false;
532 Writer currentWriter = this.getWriter();
533 return currentWriter != null;
534 } catch (Exception e) {
535 handleException(e);
536 return false;
537 }
538 }
539
540 @Override
541 public synchronized void write(char cbuf[], int off, int len) throws PogamutIOException, ComponentNotRunningException {
542 try {
543 if (controller.isPaused()) {
544 throw new ComponentPausedException(controller.getState().getFlag(), this);
545 }
546 if (!controller.isRunning()) {
547 throw new ComponentNotRunningException(controller.getState().getFlag(), this);
548 }
549 Writer currentWriter = this.getWriter();
550 if (currentWriter == null) {
551 throw new PogamutIOException("inner reader of the connection is null, can't read", this);
552 }
553 currentWriter.write(cbuf, off, len);
554 synchronized(logMessagesMutex) {
555
556 if (logMessages){
557 String[] lines = line.add(new String(cbuf, off, len));
558 for (int index = 0; index < lines.length; ++index) {
559 if (log.isLoggable(Level.INFO)) log.info("Message written: " + lines[index]);
560 }
561 }
562 }
563 } catch (Exception e) {
564 handleException(e);
565 }
566 }
567
568
569
570
571
572
573
574 private Writer getWriter() throws PogamutIOException {
575 synchronized(mutex) {
576 if (currentConnectionToken != connectionToken || this.writer == null) {
577 currentConnectionToken = connectionToken;
578 line.clear();
579 this.writer = getConnectionWriter();
580 }
581 return this.writer;
582 }
583 }
584
585 private void handleException(Throwable e) throws PogamutIOException {
586 if (e instanceof PogamutIOException) throw (PogamutIOException)e;
587 if (e instanceof ComponentPausedException) throw (ComponentPausedException)e;
588 if (e instanceof ComponentNotRunningException) throw (ComponentNotRunningException)e;
589 if (!controller.isRunning()) throw new ComponentNotRunningException(controller.getState().getFlag(), this);
590 throw new PogamutIOException(e, this);
591 }
592
593 public String toString() {
594 return AbstractConnection.this.getClass().getSimpleName() + "-Writer";
595 }
596
597 }
598
599 }