View Javadoc

1   package cz.cuni.amis.pogamut.base.communication.connection.impl;
2   
3   import java.io.Reader;
4   import java.io.Writer;
5   import java.util.logging.Level;
6   
7   import cz.cuni.amis.pogamut.base.communication.connection.IWorldConnection;
8   import cz.cuni.amis.pogamut.base.communication.connection.IWorldConnectionAddress;
9   import cz.cuni.amis.pogamut.base.communication.connection.WorldReader;
10  import cz.cuni.amis.pogamut.base.communication.connection.WorldWriter;
11  import cz.cuni.amis.pogamut.base.communication.connection.exception.AlreadyConnectedException;
12  import cz.cuni.amis.pogamut.base.communication.connection.exception.ConnectionException;
13  import cz.cuni.amis.pogamut.base.component.IComponent;
14  import cz.cuni.amis.pogamut.base.component.bus.IComponentBus;
15  import cz.cuni.amis.pogamut.base.component.bus.event.IStoppedEvent;
16  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentNotRunningException;
17  import cz.cuni.amis.pogamut.base.component.bus.exception.ComponentPausedException;
18  import cz.cuni.amis.pogamut.base.component.controller.ComponentControlHelper;
19  import cz.cuni.amis.pogamut.base.component.controller.ComponentController;
20  import cz.cuni.amis.pogamut.base.component.controller.ComponentDependencies;
21  import cz.cuni.amis.pogamut.base.component.controller.IComponentControlHelper;
22  import cz.cuni.amis.pogamut.base.utils.guice.AgentScoped;
23  import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger;
24  import cz.cuni.amis.pogamut.base.utils.logging.LogCategory;
25  import cz.cuni.amis.utils.NullCheck;
26  import cz.cuni.amis.utils.StringCutter;
27  import cz.cuni.amis.utils.exception.PogamutIOException;
28  import cz.cuni.amis.utils.token.Token;
29  import cz.cuni.amis.utils.token.Tokens;
30  
31  /**
32   * Implementation of the basic connection to the world server. Note that it has some nice features :-)
33   * <p><p>
34   * This implementation is THREAD-SAFE! It is {@link IComponent}!
35   * <p><p>
36   * Calling getReader().read(), getWriter().write() is synchronized as well!
37   * <p><p>
38   * Reader and writer can be got in advance (no need to start() the connection before getReader() or getWriter() is invoked).
39   * <p><p>
40   * Calling reader.close() or writer.close() will stop() the connection as well ... you don't have to have
41   * this instance referenced directly.
42   * <p><p>
43   * Whenever an exception is thrown during read/write operation, the connection is immediately stop()ed,
44   * not kill()ed and {@link IStoppedEvent} is broadcasted. The flag will be changed correctly as well.
45   * <p><p>
46   * ... if you're waiting on the reader.read() and the socket is closed meanwhile, be ready
47   * to catch the SocketException ...
48   * <p><p>
49   * The instance of the class can be reused (e.g. you may start(), stop() it repeatedly).
50   * <p><p>
51   * All you have to implement:
52   * <ol>
53   * <li>unsyncConnect(IConnectionAddress address) - no need to care of anything else then connection to 'address' + provide correct behavior for getConnectionReader(), getConnectionWriter()</li>
54   * <li>unsyncClose() - just close the connection, no need to care of anything else</li>
55   * <li>Reader getConnectionReader() - if connection is up, return raw reader for your connection (no extra assumptions), it will be wrapped with ConnectionReader ... if conection is down, return null</li>
56   * <li>Writer getConnectionWriter() - if connection is down, return raw writer for your connection (no extra assumptions), it will be wrapped with ConnectionWriter ... if conection is down, return null</li>
57   * </ol>
58   * You might want to override method getMessageEnd() to return correct "message-end" string, so the messages are correctly split
59   * by the readers/writers.
60   * <p><p>
61   * Ignores {@link IComponentControlHelper#startPaused()}, performs {@link IComponentControlHelper#start()} in both start cases.
62   * 
63   * @author Jimmy
64   */
65  @AgentScoped
66  public abstract class AbstractConnection<ADDRESS extends IWorldConnectionAddress> implements IWorldConnection<ADDRESS> {
67  	
68  	public static final Token COMPONENT_ID = Tokens.get("Connection");
69  
70  	public static final String DEFAULT_LINE_END = "\r\n";
71  	
72  	/**
73  	 * Now this is complicated ... what's this? :-)
74  	 * <BR><BR>
75  	 * We've got this problem: <BR>
76  	 * 1) we want this class to be thread-safe <BR>
77  	 * BUT <BR>
78  	 * 2) we're giving access to writer/reader to any thread ...
79  	 * <BR><BR>
80  	 * What should happen if read/write fails? <BR>
81  	 * 1) if it is the same connection -> close it <BR>
82  	 * 2) if it is different connection -> just raise the exception
83  	 * <BR><BR>
84  	 * Therefore we need some kind of marker in which connection we are. Study the code if you want to learn more.<BR>
85  	 * Note that reader/writer provided by this object may be used many times across different connections.
86  	 */
87  	private int connectionToken = 0;
88  
89  	/**
90  	 * Used to synchronize the behavior of the object.
91  	 */
92  	private Object mutex = new Object();
93  				
94  	/**
95  	 * Current remote side address of the connection.
96  	 */
97  	protected ADDRESS address = null;
98  	
99  	/**
100 	 * Writer of the connection. Serves for sending messages through the connection.
101 	 */
102 	private ConnectionWriter writer = new ConnectionWriter(this);
103 	
104 	/**
105 	 * Reader of the connection. Serves for reading messages from the connection.
106 	 */
107 	private ConnectionReader reader = new ConnectionReader(this);
108 	
109 	/**
110 	 * Special category for the connection.
111 	 */
112 	protected LogCategory log = null;
113 	
114 	/**
115 	 * Event bus of the agent.
116 	 */
117 	protected IComponentBus eventBus;
118 	
119 	/**
120 	 * Control helper starting/stopping the component.
121 	 */
122 	protected ComponentController<IComponent> controller;
123 	
124 	//
125 	//
126 	// ABSTRACT METHODS
127 	//
128 	//
129 	
130 	/**
131 	 * Inner implementation of connect, unsynchronized, this is called from
132 	 * connect(IConnectionDescriptor). This is called only iff the connection is down
133 	 * and the address is a new address.
134 	 * 
135 	 * @throws ConnectionException
136 	 */
137 	protected abstract void unsyncConnect(ADDRESS address) throws ConnectionException;
138 	
139 	/**
140 	 * Inner unsynchronized implementation of the close(), should close the connection
141 	 * to the remote side without throwing any exception. You may be sure that the connection
142 	 * is up (according to the flag) when this method is called.
143 	 */
144 	protected abstract void unsyncClose();
145 	
146 	/**
147 	 * This should return plain reader for the current connection. If connection is down,
148 	 * this should throw WorldConnectionException. We will wrap this reader with our own
149 	 * implementation that is capable of sniffing messages as they come (if required).
150 	 * 
151 	 * @return
152 	 */
153 	protected abstract Reader getConnectionReader() throws ConnectionException;
154 	
155 	/**
156 	 * This should return plain writer for the current connection. If connection is down,
157 	 * this should throw WorldConnectionException. We will wrap this writer with our own
158 	 * implementation that is capable of sniffing messages as they go (if required).
159 	 * 
160 	 * @return
161 	 */
162 	protected abstract Writer getConnectionWriter() throws ConnectionException;
163 	
164 	//
165 	//
166 	// CONSTURCTOR
167 	//
168 	//
169 	
170 	public AbstractConnection(
171 			ComponentDependencies dependencies, 
172 			IComponentBus bus, 
173 			IAgentLogger logger
174 	) {
175 		this(null, dependencies, bus, logger);
176 	}
177 	
178 	public AbstractConnection(
179 			ADDRESS address,
180 			ComponentDependencies dependencies, 
181 			IComponentBus bus, 
182 			IAgentLogger logger
183 	) {
184 		log = logger.getCategory(getComponentId().getToken());
185 		NullCheck.check(this.log, "log initialization");
186 		eventBus = bus;
187 		NullCheck.check(this.eventBus, "eventBus");
188 		controller = new ComponentController(this, control, eventBus, log, dependencies);
189 		if (address != null) {
190 			setAddress(address);
191 		}
192 	}
193 	
194 	//
195 	//
196 	// COMPONENT CONTROL
197 	//
198 	//
199 	
200 	private IComponentControlHelper control = new ComponentControlHelper() {
201 		
202 		@Override
203 		public void stop() {
204 			cleanUp();
205 		}
206 		
207 		@Override
208 		public void startPaused() {
209 			start();
210 		}
211 		
212 		@Override
213 		public void start() {
214 			synchronized(mutex) {
215 				if (address == null) throw new ConnectionException("address is null, can't connect()", log, this);
216 				if (log.isLoggable(Level.WARNING)) log.warning("Connecting to " + address + ".");
217 				unsyncConnect(address);	
218 			}
219 		}
220 		
221 		@Override
222 		public void kill() {
223 			cleanUp();
224 		}
225 		
226 		@Override
227 		public void reset() {
228 			cleanUp();
229 		}
230 		
231 		private void cleanUp() {
232 			synchronized(mutex) {
233 				try {
234 					reader.reader = null;
235 					writer.writer = null;
236 					unsyncClose();
237 				} finally {
238 					++connectionToken;
239 				}
240 			}
241 		}
242 		
243 	};	
244 
245 	//
246 	//
247 	// PUBLIC INTERFACE
248 	//
249 	//
250 	
251 	@Override
252 	public Token getComponentId() {
253 		return COMPONENT_ID;
254 	}
255 
256 	public LogCategory getLog() {
257 		return log;
258 	}
259 	
260 	@Override
261 	public void setAddress(ADDRESS address) throws ConnectionException {
262 		synchronized(mutex) {
263 			if (controller.isRunning()) throw new AlreadyConnectedException("Can't set address when connected.", log, this);
264 			this.address = address;
265 		}
266 	}	
267 	
268 	@Override
269 	public WorldWriter getWriter() throws ConnectionException {
270 		return this.writer;
271 	}
272 
273 	@Override
274 	public WorldReader getReader() throws ConnectionException {
275 		return this.reader;
276 	}
277 
278 	@Override
279 	public ADDRESS getAddress() {
280 		return address;
281 	}
282 
283 	@Override
284 	public String toString() {
285 		if (this != null) {
286 			return this.getClass().getSimpleName() + "["+String.valueOf(address)+",connected:"+controller.isRunning()+"]";
287 		} else {
288 			return "AbstractConnection["+String.valueOf(address)+",connected:"+controller.isRunning()+")";
289 		}
290 	}
291 			
292 	@Override
293 	public void setLogMessages(boolean logMessages) {
294 		this.reader.setLogMessages(logMessages);
295 		this.writer.setLogMessages(logMessages);
296 	}
297 	
298 	public String getMessageEnd() {
299 		return DEFAULT_LINE_END;
300 	}
301 	
302 	//
303 	//       ................
304 	//    -----------------------
305 	// ==============================
306 	// INNER CLASS DEFINITION FOLLOWS
307 	// ==============================
308 	//    -----------------------
309 	//       ................
310 	//
311 		
312 	/**
313 	 * Reader for the connection (wrapper for the getConnectionReader()),
314 	 * that takes care of sniffing messages (if required) + makes reader persistent
315 	 * over the connect() calls of the connection.  
316 	 * 
317 	 * @author Jimmy
318 	 */
319 	private class ConnectionReader extends WorldReader {
320 		
321 		/**
322 		 * Owner of the ConnectionReader (because of close() method).
323 		 */
324 		private AbstractConnection<ADDRESS> owner = null;
325 		
326 		/**
327 		 * Used when the observer is hooked to the connection.
328 		 */
329 		private StringCutter line = new StringCutter(getMessageEnd());
330 
331 		/**
332 		 * Cached reader of the connection.
333 		 */
334 		private Reader reader = null;
335 		
336 		/**
337 		 * Connection token - we use it to distinguish between connect() calls to be able
338 		 * to correctly close the connection if the read fails (we have to close the
339 		 * connection iff it is the same of the cached reader)
340 		 */
341 		private int currentConnectionToken = -1;
342 		
343 		/**
344 		 * Mutex that handles access to logMessages field.
345 		 */
346 		private Object logMessagesMutex = new Object();
347 		
348 		/**
349 		 * Whether we have to sniff (log) messages from the reader.
350 		 */
351 		private boolean logMessages = false;
352 
353 		public ConnectionReader(AbstractConnection<ADDRESS> owner) {
354 			this.owner = owner;
355 		}
356 
357 		/**
358 		 * Sets whether we have to log messages from reader.
359 		 * @param state
360 		 */
361 		public void setLogMessages(boolean state) {
362 			synchronized(logMessagesMutex) {
363 				if (logMessages == state) return;
364 				logMessages = state;
365 				if (logMessages) line.clear();
366 			}
367 		}
368 		
369 		@Override
370 		public void close() {
371 			if (controller.isRunning()) {
372 				this.owner.controller.manualStop("connection close() requested");
373 			}
374 		}
375 		
376 		@Override
377 		public boolean ready() throws PogamutIOException {
378 			try {
379 				if (!controller.isRunning()) return false;
380 				Reader currentReader = this.getReader();
381 				if (currentReader != null) return currentReader.ready();
382 			} catch (Exception e) {
383 				handleException(e);
384 			}			
385 			return false;
386 		}
387 		
388 		@Override
389 		public synchronized int read(char[] ac, int i, int j) throws ComponentNotRunningException, ComponentPausedException, PogamutIOException {
390 			try {
391 				if (controller.isPaused()) {
392 					throw new ComponentPausedException(controller.getState().getFlag(), this);
393 				}
394 				if (!controller.isRunning()) {
395 					throw new ComponentNotRunningException(controller.getState().getFlag(), this);
396 				}
397 				Reader currentReader  = this.getReader();
398 				if (currentReader == null) {
399 					throw new PogamutIOException("inner reader of the connection is null, can't read", this);
400 				}			
401 			
402 				int result = currentReader.read(ac, i, j);
403 				
404 				// should we log the messages?
405 				if (logMessages){
406 					synchronized(logMessagesMutex) {
407 						if (logMessages){
408 							String[] lines = line.add(new String(ac, i, result));
409 							for (int index = 0; index < lines.length; ++index) {
410 								if (log.isLoggable(Level.INFO)) log.info("Message read: " + lines[index]);
411 							}
412 							return result;
413 						}
414 					}
415 				}
416 					
417 				return result;
418 							
419 			} catch (Exception e) {
420 				handleException(e);
421 				return 0;
422 			}
423 		}
424 		
425 		/**
426 		 * Inner method to get the reader of current connection. It always check whether the
427 		 * reader hasn't been changed - so it always returns a current one.
428 		 * @return
429 		 * @throws PogamutIOException
430 		 */
431 		private Reader getReader() throws PogamutIOException {
432 			synchronized(mutex) {
433 				if (currentConnectionToken != connectionToken || this.reader == null) {
434 					currentConnectionToken = connectionToken;
435 					line.clear();
436 					this.reader = getConnectionReader();
437 				}
438 				return this.reader;
439 			}
440 		}
441 		
442 		private void handleException(Throwable e) throws PogamutIOException {
443 			if (e instanceof PogamutIOException) throw (PogamutIOException)e;
444 			if (e instanceof ComponentPausedException) throw (ComponentPausedException)e;
445 			if (e instanceof ComponentNotRunningException) throw (ComponentNotRunningException)e;
446 			if (!controller.isRunning()) throw new ComponentNotRunningException(controller.getState().getFlag(), this);
447 			throw new PogamutIOException(e, this);
448 		}
449 		
450 		public String toString() {
451 			return AbstractConnection.this.getClass().getSimpleName() + "-Reader";
452 		}
453 
454 	}
455 	
456 	/**
457 	 * Writer for the connection (wrapper for the getConnectionWriter()),
458 	 * that takes care of sniffing messages (if required) + makes writer persistent
459 	 * over the connect() calls of the connection.  
460 	 * 
461 	 * @author Jimmy
462 	 */
463 	private class ConnectionWriter extends WorldWriter {
464 		
465 		/**
466 		 * Owner of the ConnectionWriter (because of close() method).
467 		 */
468 		private AbstractConnection<ADDRESS> owner = null;
469 		
470 		/**
471 		 * Used when the observer is hooked to the connection.
472 		 */
473 		private StringCutter line = new StringCutter(getMessageEnd());
474 
475 		/**
476 		 * Cached writer of the connection.
477 		 */
478 		private Writer writer = null;
479 		
480 		/**
481 		 * Connection token - we use it to distinguish between connect() calls to be able
482 		 * to correctly close the connection if the write fails (we have to close the
483 		 * connection iff it is the same of the cached writer)
484 		 */
485 		private int currentConnectionToken = -1;
486 		
487 		/**
488 		 * Mutex that handles access to logMessages field.
489 		 */
490 		private Object logMessagesMutex = new Object();
491 		
492 		/**
493 		 * Whether we have to sniff (log) messages from the writer.
494 		 */
495 		private boolean logMessages = false;
496 
497 		public ConnectionWriter(AbstractConnection<ADDRESS> owner) {
498 			this.owner = owner;
499 		}
500 
501 		/**
502 		 * Sets whether we have to log messages from writer.
503 		 * @param state
504 		 */
505 		public void setLogMessages(boolean state) {
506 			synchronized(logMessagesMutex) {
507 				if (logMessages == state) return;
508 				logMessages = state;
509 				if (logMessages) line.clear();
510 			}			
511 		}
512 				
513 		@Override
514 		public void close() {
515 			if (controller.isRunning()) {
516 				controller.manualStop("connection close() requested");
517 			}
518 		}
519 		
520 		@Override
521 		public void flush() throws PogamutIOException {
522 			try {
523 				Writer currentWriter = getWriter();
524 				if (currentWriter != null) currentWriter.flush();
525 			} catch (Exception e) {
526 				handleException(e);
527 			}
528 		}
529 		
530 		@Override
531 		public boolean ready() throws PogamutIOException {
532 			try {
533 				if (!controller.isRunning()) return false;			
534 				Writer currentWriter = this.getWriter();
535 				return currentWriter != null;
536 			} catch (Exception e) {
537 				handleException(e);
538 				return false;
539 			}
540 		}
541 				
542 		@Override
543 		public synchronized void write(char cbuf[], int off, int len) throws PogamutIOException, ComponentNotRunningException {
544 			try {
545 				if (controller.isPaused()) {
546 					throw new ComponentPausedException(controller.getState().getFlag(), this);
547 				}
548 				if (!controller.isRunning()) {					
549 					throw new ComponentNotRunningException(controller.getState().getFlag(), this);
550 				}
551 				Writer currentWriter = this.getWriter();
552 				if (currentWriter == null) {
553 					throw new PogamutIOException("inner reader of the connection is null, can't read", this);
554 				}
555 				currentWriter.write(cbuf, off, len);
556 				if (logMessages) {
557 					synchronized(logMessagesMutex) {
558 						// should we log the messages?
559 						if (logMessages){
560 							String[] lines = line.add(new String(cbuf, off, len));
561 							for (int index = 0; index < lines.length; ++index) {
562 								if (log.isLoggable(Level.INFO)) log.info("Message written: " + lines[index]);
563 							}
564 						}
565 					}
566 				}
567 			} catch (Exception e) {
568 				handleException(e);
569 			}
570 		}
571 		
572 		/**
573 		 * Inner method to get the writer of current connection. It always check whether the
574 		 * writer hasn't been changed - so it always returns a current one.
575 		 * @return
576 		 * @throws PogamutIOException
577 		 */
578 		private Writer getWriter() throws PogamutIOException {
579 			synchronized(mutex) {
580 				if (currentConnectionToken != connectionToken || this.writer == null) {
581 					currentConnectionToken = connectionToken;
582 					line.clear();
583 					this.writer = getConnectionWriter();
584 				}
585 				return this.writer;
586 			}
587 		}
588 		
589 		private void handleException(Throwable e) throws PogamutIOException {
590 			if (e instanceof PogamutIOException) throw (PogamutIOException)e;
591 			if (e instanceof ComponentPausedException) throw (ComponentPausedException)e;
592 			if (e instanceof ComponentNotRunningException) throw (ComponentNotRunningException)e;
593 			if (!controller.isRunning()) throw new ComponentNotRunningException(controller.getState().getFlag(), this);
594 			throw new PogamutIOException(e, this);
595 		}
596 		
597 		public String toString() {
598 			return AbstractConnection.this.getClass().getSimpleName() + "-Writer";
599 		}
600 		
601 	}	
602 
603 }