View Javadoc

1   package cz.cuni.amis.pogamut.base.communication.connection.impl;
2   
3   import java.io.Reader;
4   import java.io.Writer;
5   import java.util.logging.Level;
6   
7   import cz.cuni.amis.pogamut.base.communication.connection.IWorldConnection;
8   import cz.cuni.amis.pogamut.base.communication.connection.IWorldConnectionAddress;
9   import cz.cuni.amis.pogamut.base.communication.connection.WorldReader;
10  import cz.cuni.amis.pogamut.base.communication.connection.WorldWriter;
11  import cz.cuni.amis.pogamut.base.communication.connection.exception.AlreadyConnectedException;
12  import cz.cuni.amis.pogamut.base.communication.connection.exception.ConnectionException;
13  import cz.cuni.amis.pogamut.base.component.IComponent;
14  import cz.cuni.amis.pogamut.base.component.bus.IComponentBus;
15  import cz.cuni.amis.pogamut.base.component.bus.event.IStoppedEvent;
16  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
17  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
18  import cz.cuni.amis.pogamut.base.component.controller.ComponentControlHelper;
19  import cz.cuni.amis.pogamut.base.component.controller.ComponentController;
20  import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencies;
21  import cz.cuni.amis.pogamut.base.component.controller.IComponentControlHelper;
22  import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
23  import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
24  import cz.cuni.amis.pogamut.base.utils.logging.LogCategory;
25  import cz.cuni.amis.utils.NullCheck;
26  import cz.cuni.amis.utils.StringCutter;
27  import cz.cuni.amis.utils.exception.PogamutIOException;
28  import cz.cuni.amis.utils.token.Token;
29  import cz.cuni.amis.utils.token.Tokens;
30  
31  /**
32   * Implementation of the basic connection to the world server. Note that it has some nice features :-)
33   * <p><p>
34   * This implementation is THREAD-SAFE! It is {@link IComponent}!
35   * <p><p>
36   * Calling getReader().read(), getWriter().write() is synchronized as well!
37   * <p><p>
38   * Reader and writer can be got in advance (no need to start() the connection before getReader() or getWriter() is invoked).
39   * <p><p>
40   * Calling reader.close() or writer.close() will stop() the connection as well ... you don't have to have
41   * this instance referenced directly.
42   * <p><p>
43   * Whenever an exception is thrown during read/write operation, the connection is immediately stop()ed,
44   * not kill()ed and {@link IStoppedEvent} is broadcasted. The flag will be changed correctly as well.
45   * <p><p>
46   * ... if you're waiting on the reader.read() and the socket is closed meanwhile, be ready
47   * to catch the SocketException ...
48   * <p><p>
49   * The instance of the class can be reused (e.g. you may start(), stop() it repeatedly).
50   * <p><p>
51   * All you have to implement:
52   * <ol>
53   * <li>unsyncConnect(IConnectionAddress address) - no need to care of anything else then connection to 'address' + provide correct behavior for getConnectionReader(), getConnectionWriter()</li>
54   * <li>unsyncClose() - just close the connection, no need to care of anything else</li>
55   * <li>Reader getConnectionReader() - if connection is up, return raw reader for your connection (no extra assumptions), it will be wrapped with ConnectionReader ... if conection is down, return null</li>
56   * <li>Writer getConnectionWriter() - if connection is down, return raw writer for your connection (no extra assumptions), it will be wrapped with ConnectionWriter ... if conection is down, return null</li>
57   * </ol>
58   * You might want to override method getMessageEnd() to return correct "message-end" string, so the messages are correctly split
59   * by the readers/writers.
60   * <p><p>
61   * Ignores {@link IComponentControlHelper#startPaused()}, performs {@link IComponentControlHelper#start()} in both start cases.
62   * 
63   * @author Jimmy
64   */
65  @AgentScoped
66  public abstract class AbstractConnection<ADDRESS extends IWorldConnectionAddress> implements IWorldConnection<ADDRESS> {
67  	
68  	public static final Token COMPONENT_ID = Tokens.get("Connection");
69  
70  	public static final String DEFAULT_LINE_END = "\r\n";
71  	
72  	/**
73  	 * Now this is complicated ... what's this? :-)
74  	 * <BR><BR>
75  	 * We've got this problem: <BR>
76  	 * 1) we want this class to be thread-safe <BR>
77  	 * BUT <BR>
78  	 * 2) we're giving access to writer/reader to any thread ...
79  	 * <BR><BR>
80  	 * What should happen if read/write fails? <BR>
81  	 * 1) if it is the same connection -> close it <BR>
82  	 * 2) if it is different connection -> just raise the exception
83  	 * <BR><BR>
84  	 * Therefore we need some kind of marker in which connection we are. Study the code if you want to learn more.<BR>
85  	 * Note that reader/writer provided by this object may be used many times across different connections.
86  	 */
87  	private int connectionToken = 0;
88  
89  	/**
90  	 * Used to synchronize the behavior of the object.
91  	 */
92  	private Object mutex = new Object();
93  				
94  	/**
95  	 * Current remote side address of the connection.
96  	 */
97  	protected ADDRESS address = null;
98  	
99  	/**
100 	 * Writer of the connection. Serves for sending messages through the connection.
101 	 */
102 	private ConnectionWriter writer = new ConnectionWriter(this);
103 	
104 	/**
105 	 * Reader of the connection. Serves for reading messages from the connection.
106 	 */
107 	private ConnectionReader reader = new ConnectionReader(this);
108 	
109 	/**
110 	 * Special category for the connection.
111 	 */
112 	protected LogCategory log = null;
113 	
114 	/**
115 	 * Event bus of the agent.
116 	 */
117 	protected IComponentBus eventBus;
118 	
119 	/**
120 	 * Control helper starting/stopping the component.
121 	 */
122 	protected ComponentController<IComponent> controller;
123 	
124 	//
125 	//
126 	// ABSTRACT METHODS
127 	//
128 	//
129 	
130 	/**
131 	 * Inner implementation of connect, unsynchronized, this is called from
132 	 * connect(IConnectionDescriptor). This is called only iff the connection is down
133 	 * and the address is a new address.
134 	 * 
135 	 * @throws ConnectionException
136 	 */
137 	protected abstract void unsyncConnect(ADDRESS address) throws ConnectionException;
138 	
139 	/**
140 	 * Inner unsynchronized implementation of the close(), should close the connection
141 	 * to the remote side without throwing any exception. You may be sure that the connection
142 	 * is up (according to the flag) when this method is called.
143 	 */
144 	protected abstract void unsyncClose();
145 	
146 	/**
147 	 * This should return plain reader for the current connection. If connection is down,
148 	 * this should throw WorldConnectionException. We will wrap this reader with our own
149 	 * implementation that is capable of sniffing messages as they come (if required).
150 	 * 
151 	 * @return
152 	 */
153 	protected abstract Reader getConnectionReader() throws ConnectionException;
154 	
155 	/**
156 	 * This should return plain writer for the current connection. If connection is down,
157 	 * this should throw WorldConnectionException. We will wrap this writer with our own
158 	 * implementation that is capable of sniffing messages as they go (if required).
159 	 * 
160 	 * @return
161 	 */
162 	protected abstract Writer getConnectionWriter() throws ConnectionException;
163 	
164 	//
165 	//
166 	// CONSTURCTOR
167 	//
168 	//
169 	
170 	public AbstractConnection(
171 			ComponentDependencies dependencies, 
172 			IComponentBus bus, 
173 			IAgentLogger logger
174 	) {
175 		this(null, dependencies, bus, logger);
176 	}
177 	
178 	public AbstractConnection(
179 			ADDRESS address,
180 			ComponentDependencies dependencies, 
181 			IComponentBus bus, 
182 			IAgentLogger logger
183 	) {
184 		log = logger.getCategory(getComponentId().getToken());
185 		NullCheck.check(this.log, "log initialization");
186 		eventBus = bus;
187 		NullCheck.check(this.eventBus, "eventBus");
188 		controller = new ComponentController(this, control, eventBus, log, dependencies);
189 		if (address != null) {
190 			setAddress(address);
191 		}
192 	}
193 	
194 	//
195 	//
196 	// COMPONENT CONTROL
197 	//
198 	//
199 	
200 	private IComponentControlHelper control = new ComponentControlHelper() {
201 		
202 		@Override
203 		public void stop() {
204 			cleanUp();
205 		}
206 		
207 		@Override
208 		public void startPaused() {
209 			start();
210 		}
211 		
212 		@Override
213 		public void start() {
214 			synchronized(mutex) {
215 				if (address == null) throw new ConnectionException("address is null, can't connect()", log, this);
216 				if (log.isLoggable(Level.WARNING)) log.warning("Connecting to " + address + ".");
217 				unsyncConnect(address);	
218 			}
219 		}
220 		
221 		@Override
222 		public void kill() {
223 			cleanUp();
224 		}
225 		
226 		@Override
227 		public void reset() {
228 			cleanUp();
229 		}
230 		
231 		private void cleanUp() {
232 			synchronized(mutex) {
233 				try {
234 					reader.reader = null;
235 					writer.writer = null;
236 					unsyncClose();
237 				} finally {
238 					++connectionToken;
239 				}
240 			}
241 		}
242 		
243 	};	
244 
245 	//
246 	//
247 	// PUBLIC INTERFACE
248 	//
249 	//
250 	
251 	@Override
252 	public Token getComponentId() {
253 		return COMPONENT_ID;
254 	}
255 
256 	public LogCategory getLog() {
257 		return log;
258 	}
259 	
260 	@Override
261 	public void setAddress(ADDRESS address) throws ConnectionException {
262 		synchronized(mutex) {
263 			if (controller.isRunning()) throw new AlreadyConnectedException("Can't set address when connected.", log, this);
264 			this.address = address;
265 		}
266 	}	
267 	
268 	@Override
269 	public WorldWriter getWriter() throws ConnectionException {
270 		return this.writer;
271 	}
272 
273 	@Override
274 	public WorldReader getReader() throws ConnectionException {
275 		return this.reader;
276 	}
277 
278 	@Override
279 	public ADDRESS getAddress() {
280 		return address;
281 	}
282 
283 	@Override
284 	public String toString() {
285 		if (this != null) {
286 			return this.getClass().getSimpleName() + "["+String.valueOf(address)+",connected:"+controller.isRunning()+"]";
287 		} else {
288 			return "AbstractConnection["+String.valueOf(address)+",connected:"+controller.isRunning()+")";
289 		}
290 	}
291 			
292 	@Override
293 	public void setLogMessages(boolean logMessages) {
294 		this.reader.setLogMessages(logMessages);
295 		this.writer.setLogMessages(logMessages);
296 	}
297 	
298 	public String getMessageEnd() {
299 		return DEFAULT_LINE_END;
300 	}
301 	
302 	//
303 	//       ................
304 	//    -----------------------
305 	// ==============================
306 	// INNER CLASS DEFINITION FOLLOWS
307 	// ==============================
308 	//    -----------------------
309 	//       ................
310 	//
311 		
312 	/**
313 	 * Reader for the connection (wrapper for the getConnectionReader()),
314 	 * that takes care of sniffing messages (if required) + makes reader persistent
315 	 * over the connect() calls of the connection.  
316 	 * 
317 	 * @author Jimmy
318 	 */
319 	private class ConnectionReader extends WorldReader {
320 		
321 		/**
322 		 * Owner of the ConnectionReader (because of close() method).
323 		 */
324 		private AbstractConnection<ADDRESS> owner = null;
325 		
326 		/**
327 		 * Used when the observer is hooked to the connection.
328 		 */
329 		private StringCutter line = new StringCutter(getMessageEnd());
330 
331 		/**
332 		 * Cached reader of the connection.
333 		 */
334 		private Reader reader = null;
335 		
336 		/**
337 		 * Connection token - we use it to distinguish between connect() calls to be able
338 		 * to correctly close the connection if the read fails (we have to close the
339 		 * connection iff it is the same of the cached reader)
340 		 */
341 		private int currentConnectionToken = -1;
342 		
343 		/**
344 		 * Mutex that handles access to logMessages field.
345 		 */
346 		private Object logMessagesMutex = new Object();
347 		
348 		/**
349 		 * Whether we have to sniff (log) messages from the reader.
350 		 */
351 		private boolean logMessages = false;
352 
353 		public ConnectionReader(AbstractConnection<ADDRESS> owner) {
354 			this.owner = owner;
355 		}
356 
357 		/**
358 		 * Sets whether we have to log messages from reader.
359 		 * @param state
360 		 */
361 		public void setLogMessages(boolean state) {
362 			synchronized(logMessagesMutex) {
363 				if (logMessages == state) return;
364 				logMessages = state;
365 				if (logMessages) line.clear();
366 			}
367 		}
368 		
369 		@Override
370 		public void close() {
371 			if (controller.isRunning()) {
372 				this.owner.controller.manualStop("connection close() requested");
373 			}
374 		}
375 		
376 		@Override
377 		public boolean ready() throws PogamutIOException {
378 			try {
379 				if (!controller.isRunning()) return false;
380 				Reader currentReader = this.getReader();
381 				if (currentReader != null) return currentReader.ready();
382 			} catch (Exception e) {
383 				handleException(e);
384 			}			
385 			return false;
386 		}
387 		
388 		@Override
389 		public synchronized int read(char[] ac, int i, int j) throws ComponentNotRunningException, ComponentPausedException, PogamutIOException {
390 			try {
391 				if (controller.isPaused()) {
392 					throw new ComponentPausedException(controller.getState().getFlag(), this);
393 				}
394 				if (!controller.isRunning()) {
395 					throw new ComponentNotRunningException(controller.getState().getFlag(), this);
396 				}
397 				Reader currentReader  = this.getReader();
398 				if (currentReader == null) {
399 					throw new PogamutIOException("inner reader of the connection is null, can't read", this);
400 				}			
401 			
402 				int result = currentReader.read(ac, i, j);
403 				
404 				synchronized(logMessagesMutex) {
405 					// should we log the messages?
406 					if (logMessages){
407 						String[] lines = line.add(new String(ac, i, result));
408 						for (int index = 0; index < lines.length; ++index) {
409 							if (log.isLoggable(Level.INFO)) log.info("Message read: " + lines[index]);
410 						}
411 						return result;
412 					} else {
413 						return result;
414 					}
415 				}
416 							
417 			} catch (Exception e) {
418 				handleException(e);
419 				return 0;
420 			}
421 		}
422 		
423 		/**
424 		 * Inner method to get the reader of current connection. It always check whether the
425 		 * reader hasn't been changed - so it always returns a current one.
426 		 * @return
427 		 * @throws PogamutIOException
428 		 */
429 		private Reader getReader() throws PogamutIOException {
430 			synchronized(mutex) {
431 				if (currentConnectionToken != connectionToken || this.reader == null) {
432 					currentConnectionToken = connectionToken;
433 					line.clear();
434 					this.reader = getConnectionReader();
435 				}
436 				return this.reader;
437 			}
438 		}
439 		
440 		private void handleException(Throwable e) throws PogamutIOException {
441 			if (e instanceof PogamutIOException) throw (PogamutIOException)e;
442 			if (e instanceof ComponentPausedException) throw (ComponentPausedException)e;
443 			if (e instanceof ComponentNotRunningException) throw (ComponentNotRunningException)e;
444 			if (!controller.isRunning()) throw new ComponentNotRunningException(controller.getState().getFlag(), this);
445 			throw new PogamutIOException(e, this);
446 		}
447 		
448 		public String toString() {
449 			return AbstractConnection.this.getClass().getSimpleName() + "-Reader";
450 		}
451 
452 	}
453 	
454 	/**
455 	 * Writer for the connection (wrapper for the getConnectionWriter()),
456 	 * that takes care of sniffing messages (if required) + makes writer persistent
457 	 * over the connect() calls of the connection.  
458 	 * 
459 	 * @author Jimmy
460 	 */
461 	private class ConnectionWriter extends WorldWriter {
462 		
463 		/**
464 		 * Owner of the ConnectionWriter (because of close() method).
465 		 */
466 		private AbstractConnection<ADDRESS> owner = null;
467 		
468 		/**
469 		 * Used when the observer is hooked to the connection.
470 		 */
471 		private StringCutter line = new StringCutter(getMessageEnd());
472 
473 		/**
474 		 * Cached writer of the connection.
475 		 */
476 		private Writer writer = null;
477 		
478 		/**
479 		 * Connection token - we use it to distinguish between connect() calls to be able
480 		 * to correctly close the connection if the write fails (we have to close the
481 		 * connection iff it is the same of the cached writer)
482 		 */
483 		private int currentConnectionToken = -1;
484 		
485 		/**
486 		 * Mutex that handles access to logMessages field.
487 		 */
488 		private Object logMessagesMutex = new Object();
489 		
490 		/**
491 		 * Whether we have to sniff (log) messages from the writer.
492 		 */
493 		private boolean logMessages = false;
494 
495 		public ConnectionWriter(AbstractConnection<ADDRESS> owner) {
496 			this.owner = owner;
497 		}
498 
499 		/**
500 		 * Sets whether we have to log messages from writer.
501 		 * @param state
502 		 */
503 		public void setLogMessages(boolean state) {
504 			synchronized(logMessagesMutex) {
505 				if (logMessages == state) return;
506 				logMessages = state;
507 				if (logMessages) line.clear();
508 			}			
509 		}
510 				
511 		@Override
512 		public void close() {
513 			if (controller.isRunning()) {
514 				controller.manualStop("connection close() requested");
515 			}
516 		}
517 		
518 		@Override
519 		public void flush() throws PogamutIOException {
520 			try {
521 				Writer currentWriter = getWriter();
522 				if (currentWriter != null) currentWriter.flush();
523 			} catch (Exception e) {
524 				handleException(e);
525 			}
526 		}
527 		
528 		@Override
529 		public boolean ready() throws PogamutIOException {
530 			try {
531 				if (!controller.isRunning()) return false;			
532 				Writer currentWriter = this.getWriter();
533 				return currentWriter != null;
534 			} catch (Exception e) {
535 				handleException(e);
536 				return false;
537 			}
538 		}
539 				
540 		@Override
541 		public synchronized void write(char cbuf[], int off, int len) throws PogamutIOException, ComponentNotRunningException {
542 			try {
543 				if (controller.isPaused()) {
544 					throw new ComponentPausedException(controller.getState().getFlag(), this);
545 				}
546 				if (!controller.isRunning()) {					
547 					throw new ComponentNotRunningException(controller.getState().getFlag(), this);
548 				}
549 				Writer currentWriter = this.getWriter();
550 				if (currentWriter == null) {
551 					throw new PogamutIOException("inner reader of the connection is null, can't read", this);
552 				}
553 				currentWriter.write(cbuf, off, len);
554 				synchronized(logMessagesMutex) {
555 					// should we log the messages?
556 					if (logMessages){
557 						String[] lines = line.add(new String(cbuf, off, len));
558 						for (int index = 0; index < lines.length; ++index) {
559 							if (log.isLoggable(Level.INFO)) log.info("Message written: " + lines[index]);
560 						}
561 					}
562 				}
563 			} catch (Exception e) {
564 				handleException(e);
565 			}
566 		}
567 		
568 		/**
569 		 * Inner method to get the writer of current connection. It always check whether the
570 		 * writer hasn't been changed - so it always returns a current one.
571 		 * @return
572 		 * @throws PogamutIOException
573 		 */
574 		private Writer getWriter() throws PogamutIOException {
575 			synchronized(mutex) {
576 				if (currentConnectionToken != connectionToken || this.writer == null) {
577 					currentConnectionToken = connectionToken;
578 					line.clear();
579 					this.writer = getConnectionWriter();
580 				}
581 				return this.writer;
582 			}
583 		}
584 		
585 		private void handleException(Throwable e) throws PogamutIOException {
586 			if (e instanceof PogamutIOException) throw (PogamutIOException)e;
587 			if (e instanceof ComponentPausedException) throw (ComponentPausedException)e;
588 			if (e instanceof ComponentNotRunningException) throw (ComponentNotRunningException)e;
589 			if (!controller.isRunning()) throw new ComponentNotRunningException(controller.getState().getFlag(), this);
590 			throw new PogamutIOException(e, this);
591 		}
592 		
593 		public String toString() {
594 			return AbstractConnection.this.getClass().getSimpleName() + "-Writer";
595 		}
596 		
597 	}	
598 
599 }