1 package cz.cuni.amis.pogamut.base.communication.connection.impl;
2
3 import java.io.Reader;
4 import java.io.Writer;
5 import java.util.logging.Level;
6
7 import cz.cuni.amis.pogamut.base.communication.connection.IWorldConnection;
8 import cz.cuni.amis.pogamut.base.communication.connection.IWorldConnectionAddress;
9 import cz.cuni.amis.pogamut.base.communication.connection.WorldReader;
10 import cz.cuni.amis.pogamut.base.communication.connection.WorldWriter;
11 import cz.cuni.amis.pogamut.base.communication.connection.exception.AlreadyConnectedException;
12 import cz.cuni.amis.pogamut.base.communication.connection.exception.ConnectionException;
13 import cz.cuni.amis.pogamut.base.component.IComponent;
14 import cz.cuni.amis.pogamut.base.component.bus.IComponentBus;
15 import cz.cuni.amis.pogamut.base.component.bus.event.IStoppedEvent;
16 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
17 import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
18 import cz.cuni.amis.pogamut.base.component.controller.ComponentControlHelper;
19 import cz.cuni.amis.pogamut.base.component.controller.ComponentController;
20 import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencies;
21 import cz.cuni.amis.pogamut.base.component.controller.IComponentControlHelper;
22 import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
23 import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
24 import cz.cuni.amis.pogamut.base.utils.logging.LogCategory;
25 import cz.cuni.amis.utils.NullCheck;
26 import cz.cuni.amis.utils.StringCutter;
27 import cz.cuni.amis.utils.exception.PogamutIOException;
28 import cz.cuni.amis.utils.token.Token;
29 import cz.cuni.amis.utils.token.Tokens;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 @AgentScoped
66 public abstract class AbstractConnection<ADDRESS extends IWorldConnectionAddress> implements IWorldConnection<ADDRESS> {
67
68 public static final Token COMPONENT_ID = Tokens.get("Connection");
69
70 public static final String DEFAULT_LINE_END = "\r\n";
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 private int connectionToken = 0;
88
89
90
91
92 private Object mutex = new Object();
93
94
95
96
97 protected ADDRESS address = null;
98
99
100
101
102 private ConnectionWriter writer = new ConnectionWriter(this);
103
104
105
106
107 private ConnectionReader reader = new ConnectionReader(this);
108
109
110
111
112 protected LogCategory log = null;
113
114
115
116
117 protected IComponentBus eventBus;
118
119
120
121
122 protected ComponentController<IComponent> controller;
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 protected abstract void unsyncConnect(ADDRESS address) throws ConnectionException;
138
139
140
141
142
143
144 protected abstract void unsyncClose();
145
146
147
148
149
150
151
152
153 protected abstract Reader getConnectionReader() throws ConnectionException;
154
155
156
157
158
159
160
161
162 protected abstract Writer getConnectionWriter() throws ConnectionException;
163
164
165
166
167
168
169
170 public AbstractConnection(
171 ComponentDependencies dependencies,
172 IComponentBus bus,
173 IAgentLogger logger
174 ) {
175 this(null, dependencies, bus, logger);
176 }
177
178 public AbstractConnection(
179 ADDRESS address,
180 ComponentDependencies dependencies,
181 IComponentBus bus,
182 IAgentLogger logger
183 ) {
184 log = logger.getCategory(getComponentId().getToken());
185 NullCheck.check(this.log, "log initialization");
186 eventBus = bus;
187 NullCheck.check(this.eventBus, "eventBus");
188 controller = new ComponentController(this, control, eventBus, log, dependencies);
189 if (address != null) {
190 setAddress(address);
191 }
192 }
193
194
195
196
197
198
199
200 private IComponentControlHelper control = new ComponentControlHelper() {
201
202 @Override
203 public void stop() {
204 cleanUp();
205 }
206
207 @Override
208 public void startPaused() {
209 start();
210 }
211
212 @Override
213 public void start() {
214 synchronized(mutex) {
215 if (address == null) throw new ConnectionException("address is null, can't connect()", log, this);
216 if (log.isLoggable(Level.WARNING)) log.warning("Connecting to " + address + ".");
217 unsyncConnect(address);
218 }
219 }
220
221 @Override
222 public void kill() {
223 cleanUp();
224 }
225
226 @Override
227 public void reset() {
228 cleanUp();
229 }
230
231 private void cleanUp() {
232 synchronized(mutex) {
233 try {
234 reader.reader = null;
235 writer.writer = null;
236 unsyncClose();
237 } finally {
238 ++connectionToken;
239 }
240 }
241 }
242
243 };
244
245
246
247
248
249
250
251 @Override
252 public Token getComponentId() {
253 return COMPONENT_ID;
254 }
255
256 public LogCategory getLog() {
257 return log;
258 }
259
260 @Override
261 public void setAddress(ADDRESS address) throws ConnectionException {
262 synchronized(mutex) {
263 if (controller.isRunning()) throw new AlreadyConnectedException("Can't set address when connected.", log, this);
264 this.address = address;
265 }
266 }
267
268 @Override
269 public WorldWriter getWriter() throws ConnectionException {
270 return this.writer;
271 }
272
273 @Override
274 public WorldReader getReader() throws ConnectionException {
275 return this.reader;
276 }
277
278 @Override
279 public ADDRESS getAddress() {
280 return address;
281 }
282
283 @Override
284 public String toString() {
285 if (this != null) {
286 return this.getClass().getSimpleName() + "["+String.valueOf(address)+",connected:"+controller.isRunning()+"]";
287 } else {
288 return "AbstractConnection["+String.valueOf(address)+",connected:"+controller.isRunning()+")";
289 }
290 }
291
292 @Override
293 public void setLogMessages(boolean logMessages) {
294 this.reader.setLogMessages(logMessages);
295 this.writer.setLogMessages(logMessages);
296 }
297
298 public String getMessageEnd() {
299 return DEFAULT_LINE_END;
300 }
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319 private class ConnectionReader extends WorldReader {
320
321
322
323
324 private AbstractConnection<ADDRESS> owner = null;
325
326
327
328
329 private StringCutter line = new StringCutter(getMessageEnd());
330
331
332
333
334 private Reader reader = null;
335
336
337
338
339
340
341 private int currentConnectionToken = -1;
342
343
344
345
346 private Object logMessagesMutex = new Object();
347
348
349
350
351 private boolean logMessages = false;
352
353 public ConnectionReader(AbstractConnection<ADDRESS> owner) {
354 this.owner = owner;
355 }
356
357
358
359
360
361 public void setLogMessages(boolean state) {
362 synchronized(logMessagesMutex) {
363 if (logMessages == state) return;
364 logMessages = state;
365 if (logMessages) line.clear();
366 }
367 }
368
369 @Override
370 public void close() {
371 if (controller.isRunning()) {
372 this.owner.controller.manualStop("connection close() requested");
373 }
374 }
375
376 @Override
377 public boolean ready() throws PogamutIOException {
378 try {
379 if (!controller.isRunning()) return false;
380 Reader currentReader = this.getReader();
381 if (currentReader != null) return currentReader.ready();
382 } catch (Exception e) {
383 handleException(e);
384 }
385 return false;
386 }
387
388 @Override
389 public synchronized int read(char[] ac, int i, int j) throws ComponentNotRunningException, ComponentPausedException, PogamutIOException {
390 try {
391 if (controller.isPaused()) {
392 throw new ComponentPausedException(controller.getState().getFlag(), this);
393 }
394 if (!controller.isRunning()) {
395 throw new ComponentNotRunningException(controller.getState().getFlag(), this);
396 }
397 Reader currentReader = this.getReader();
398 if (currentReader == null) {
399 throw new PogamutIOException("inner reader of the connection is null, can't read", this);
400 }
401
402 int result = currentReader.read(ac, i, j);
403
404
405 if (logMessages){
406 synchronized(logMessagesMutex) {
407 if (logMessages){
408 String[] lines = line.add(new String(ac, i, result));
409 for (int index = 0; index < lines.length; ++index) {
410 if (log.isLoggable(Level.INFO)) log.info("Message read: " + lines[index]);
411 }
412 return result;
413 }
414 }
415 }
416
417 return result;
418
419 } catch (Exception e) {
420 handleException(e);
421 return 0;
422 }
423 }
424
425
426
427
428
429
430
431 private Reader getReader() throws PogamutIOException {
432 synchronized(mutex) {
433 if (currentConnectionToken != connectionToken || this.reader == null) {
434 currentConnectionToken = connectionToken;
435 line.clear();
436 this.reader = getConnectionReader();
437 }
438 return this.reader;
439 }
440 }
441
442 private void handleException(Throwable e) throws PogamutIOException {
443 if (e instanceof PogamutIOException) throw (PogamutIOException)e;
444 if (e instanceof ComponentPausedException) throw (ComponentPausedException)e;
445 if (e instanceof ComponentNotRunningException) throw (ComponentNotRunningException)e;
446 if (!controller.isRunning()) throw new ComponentNotRunningException(controller.getState().getFlag(), this);
447 throw new PogamutIOException(e, this);
448 }
449
450 public String toString() {
451 return AbstractConnection.this.getClass().getSimpleName() + "-Reader";
452 }
453
454 }
455
456
457
458
459
460
461
462
463 private class ConnectionWriter extends WorldWriter {
464
465
466
467
468 private AbstractConnection<ADDRESS> owner = null;
469
470
471
472
473 private StringCutter line = new StringCutter(getMessageEnd());
474
475
476
477
478 private Writer writer = null;
479
480
481
482
483
484
485 private int currentConnectionToken = -1;
486
487
488
489
490 private Object logMessagesMutex = new Object();
491
492
493
494
495 private boolean logMessages = false;
496
497 public ConnectionWriter(AbstractConnection<ADDRESS> owner) {
498 this.owner = owner;
499 }
500
501
502
503
504
505 public void setLogMessages(boolean state) {
506 synchronized(logMessagesMutex) {
507 if (logMessages == state) return;
508 logMessages = state;
509 if (logMessages) line.clear();
510 }
511 }
512
513 @Override
514 public void close() {
515 if (controller.isRunning()) {
516 controller.manualStop("connection close() requested");
517 }
518 }
519
520 @Override
521 public void flush() throws PogamutIOException {
522 try {
523 Writer currentWriter = getWriter();
524 if (currentWriter != null) currentWriter.flush();
525 } catch (Exception e) {
526 handleException(e);
527 }
528 }
529
530 @Override
531 public boolean ready() throws PogamutIOException {
532 try {
533 if (!controller.isRunning()) return false;
534 Writer currentWriter = this.getWriter();
535 return currentWriter != null;
536 } catch (Exception e) {
537 handleException(e);
538 return false;
539 }
540 }
541
542 @Override
543 public synchronized void write(char cbuf[], int off, int len) throws PogamutIOException, ComponentNotRunningException {
544 try {
545 if (controller.isPaused()) {
546 throw new ComponentPausedException(controller.getState().getFlag(), this);
547 }
548 if (!controller.isRunning()) {
549 throw new ComponentNotRunningException(controller.getState().getFlag(), this);
550 }
551 Writer currentWriter = this.getWriter();
552 if (currentWriter == null) {
553 throw new PogamutIOException("inner reader of the connection is null, can't read", this);
554 }
555 currentWriter.write(cbuf, off, len);
556 if (logMessages) {
557 synchronized(logMessagesMutex) {
558
559 if (logMessages){
560 String[] lines = line.add(new String(cbuf, off, len));
561 for (int index = 0; index < lines.length; ++index) {
562 if (log.isLoggable(Level.INFO)) log.info("Message written: " + lines[index]);
563 }
564 }
565 }
566 }
567 } catch (Exception e) {
568 handleException(e);
569 }
570 }
571
572
573
574
575
576
577
578 private Writer getWriter() throws PogamutIOException {
579 synchronized(mutex) {
580 if (currentConnectionToken != connectionToken || this.writer == null) {
581 currentConnectionToken = connectionToken;
582 line.clear();
583 this.writer = getConnectionWriter();
584 }
585 return this.writer;
586 }
587 }
588
589 private void handleException(Throwable e) throws PogamutIOException {
590 if (e instanceof PogamutIOException) throw (PogamutIOException)e;
591 if (e instanceof ComponentPausedException) throw (ComponentPausedException)e;
592 if (e instanceof ComponentNotRunningException) throw (ComponentNotRunningException)e;
593 if (!controller.isRunning()) throw new ComponentNotRunningException(controller.getState().getFlag(), this);
594 throw new PogamutIOException(e, this);
595 }
596
597 public String toString() {
598 return AbstractConnection.this.getClass().getSimpleName() + "-Writer";
599 }
600
601 }
602
603 }