View Javadoc

1   package cz.cuni.amis.pogamut.base.utils.logging;
2   
3   import java.io.BufferedReader;
4   import java.io.IOException;
5   import java.io.PrintWriter;
6   import java.net.InetSocketAddress;
7   import java.nio.channels.Channels;
8   import java.nio.channels.SocketChannel;
9   import java.nio.charset.CharsetDecoder;
10  import java.nio.charset.CharsetEncoder;
11  import java.util.logging.Level;
12  
13  import cz.cuni.amis.pogamut.base.utils.Pogamut;
14  import cz.cuni.amis.pogamut.base.utils.PogamutProperty;
15  import cz.cuni.amis.utils.ExceptionToString;
16  import cz.cuni.amis.utils.exception.PogamutException;
17  import cz.cuni.amis.utils.exception.PogamutIOException;
18  import cz.cuni.amis.utils.flag.Flag;
19  import cz.cuni.amis.utils.listener.IListener;
20  import cz.cuni.amis.utils.listener.Listeners;
21  
22  /**
23   * Client that may be used to obtain logs from arbitrary number of agents that are publishing their logs via {@link NetworkLogManager}.
24   * <p><p>
25   * It needs to be initialized with address:port and "agent id" which specifies where to connect + which agent's logs you want to receive.
26   * <p><p>
27   * The class is specifying 3 callback (aka listeners) / events that you may attach to it:
28   * <ol>
29   * <li>{@link LoggingStarted} event and {@link ILoggingStartedListener} - called whenever successful connection to address:port is made and agent id is sent</li>
30   * <li>{@link LogRead} event and {@link ILogReadListener} - called whenever a new log entry has been read providing {@link NetworkLogEnvelope} specifying the log details</li>
31   * <li>{@link LoggingStopped} event and {@link ILoggingStoppedListener} - called whenever the logging is terminated</li>
32   * </ol>
33   * <p><p> 
34   * Note that there is a problem with object GC() whenever an anonymous listeners are used (for instance for log collecting). Whenever you start
35   * a client, receive logs via listener, DROP THE REFERENCE to the client, the client won't be stopped. It will be stopped whenever the
36   * remote socket is closed resulting in {@link IOException} inside the client's thread. Additionally, if you do not hook a {@link ILoggingStoppedListener}
37   * detaching your log-reading-callback listener, the client instance will be ignored by the GC().
38   * <p><p>
39   * Therefore, as an implicit behavior, whenever the logging is stopped, all listeners are actually deleted (which is the way you will 
40   * probably need to use the class). If you do not want this behavior, just call {@link NetworkLogClient#setImplicitRemoveListeners(boolean)} with parameter
41   * 'false'.
42   * <p><p>
43   * NOTE: the object must be {@link NetworkLogClient#start()} manually after the instantiation.
44   * <p><p>
45   * If you want to stop the client prematurely, call {@link NetworkLogClient#stop()}.
46   * 
47   * @author Jimmy
48   */
49  public class NetworkLogClient {
50  	
51  	/**
52  	 * Event that marks that the client has successfully connected to the remote side
53  	 * and is ready to receive logs.
54  	 * 
55  	 * @author Jimmy
56  	 */
57  	public static class LoggingStarted {		
58  	}
59  	
60  	/**
61  	 * Event/message containing another log-record produced by the remote agent.
62  	 * 
63  	 * @author Jimmy
64  	 */
65  	public static class LogRead {
66  		
67  		/**
68  		 * Contains log-record details.
69  		 */
70  		private NetworkLogEnvelope record;
71  		
72  		public LogRead(NetworkLogEnvelope record) {
73  			this.record = record;
74  		}
75  
76  		/**
77  		 * @return log-record details
78  		 */
79  		public NetworkLogEnvelope getRecord() {
80  			return record;
81  		}
82  		
83  	}
84  	
85  	/**
86  	 * Event that marks that client has been disconnected (or stopped).
87  	 * 
88  	 * @author Jimmy
89  	 */
90  	public static class LoggingStopped {
91  		
92  		private boolean expected;
93  		
94  		private Throwable exception;
95  
96  		public LoggingStopped() {
97  			this.expected = true;
98  		}
99  		
100 		public LoggingStopped(Throwable e) {
101 			this.expected = false;
102 			this.exception = e;
103 		}
104 		
105 		public Throwable getException() {
106 			return exception;
107 		}
108 
109 		public boolean isExpected() {
110 			return expected;
111 		}
112 		
113 		public boolean isFailure() {
114 			return !expected;
115 		}
116 		
117 	}
118 		
119 	public static interface ILoggingStartedListener extends IListener<LoggingStarted> {		
120 	}
121 	
122 	public static interface ILogReadListener extends IListener<LogRead> {		
123 	}
124 	
125 	public static interface ILoggingStoppedListener extends IListener<LoggingStopped> {		
126 	}	
127 	
128 	private Listeners<ILoggingStartedListener> loggingStartedCallback = new Listeners<ILoggingStartedListener>();
129 	
130 	private Listeners.AdaptableListenerNotifier<ILoggingStartedListener> loggingStartedNotifier = new Listeners.AdaptableListenerNotifier<ILoggingStartedListener>();
131 	
132 	private Listeners<ILogReadListener> logReadCallback = new Listeners<ILogReadListener>();
133 	
134 	private Listeners.AdaptableListenerNotifier<ILogReadListener> logReadNotifier = new Listeners.AdaptableListenerNotifier<ILogReadListener>();
135 	
136 	private Listeners<ILoggingStoppedListener> loggingStoppedCallback = new Listeners<ILoggingStoppedListener>();
137 	
138 	private Listeners.AdaptableListenerNotifier<ILoggingStoppedListener> loggingStoppedNotifier = new Listeners.AdaptableListenerNotifier<ILoggingStoppedListener>();
139 	
140 	private String address;
141 	private int port;
142 	private String agentId;
143 	
144 	protected LogCategory log;
145 	
146 	private boolean implicitRemoveListeners = true;
147 
148 	public NetworkLogClient(String address, int port, String agentId) {
149 		log = new LogCategory("NetworkLogClient");
150 		log.addConsoleHandler();
151 		String logLevel = Pogamut.getPlatform().getProperty(PogamutProperty.POGAMUT_NETWORK_LOG_MANAGER_AND_CLIENT_LEVEL.getKey());
152 		if (logLevel != null) {
153 			log.setLevel(Level.parse(logLevel));
154 		} else {
155 			log.setLevel(Level.WARNING);
156 		}
157 		this.address = address;
158 		this.port = port;
159 		this.agentId = agentId;
160 		this.logReadingWorker = new LogReadingWorker();
161 		loggingStartedCallback.setLog(log, "LoggingStarted");
162 		logReadCallback.setLog(log, "LogRead");
163 		loggingStoppedCallback.setLog(log, "LoggingStopped");
164 	}
165 	
166 	/**
167 	 * @return logger used by this instance
168 	 */
169 	public LogCategory getLogger() {
170 		return log;
171 	}
172 	
173 	/**
174 	 * Note that there is a problem with object GC() whenever an anonymous listeners are used (for instance for log collecting). Whenever you start
175 	 * a client, receive logs via listener, DROP THE REFERENCE to the client, the client won't be stopped. It will be stopped whenever the
176 	 * remote socket is closed resulting in {@link IOException} inside the client's thread. Additionally, if you do not hook a {@link ILoggingStoppedListener}
177 	 * detaching your log-reading-callback listener, the client instance will be ignored by the GC().
178 	 * <p><p>
179 	 * Therefore, as an implicit behavior, whenever the logging is stopped, all listeners are actually deleted (which is the way you will 
180 	 * probably need to use the class). If you do not want this behavior, just call this method with 'false' as a parameter.
181 	 * 'false'.
182 	 * 
183 	 * @param state
184 	 */
185 	public void setImplicitRemoveListeners(boolean state) {
186 		this.implicitRemoveListeners = state;
187 	}
188 	
189 	public boolean isImplicitRemoveListeners() {
190 		return implicitRemoveListeners;
191 	}
192 	
193 	/**
194 	 * Does exception occured inside the thread that was/is reading logs?
195 	 * @return
196 	 */
197 	public boolean isException() {
198 		LogReadingWorker worker = logReadingWorker;
199 		return worker != null && worker.exception != null;
200 	}
201 	
202 	/**
203 	 * @return exception that has happened inside the thread that was/is reading logs
204 	 */
205 	public Throwable getException() {
206 		LogReadingWorker worker = logReadingWorker;
207 		return worker == null ? null : worker.exception;
208 	}
209 	
210 	public String getAddress() {
211 		return address;
212 	}
213 
214 	public int getPort() {
215 		return port;
216 	}
217 
218 	public String getAgentId() {
219 		return agentId;
220 	}
221 
222 	public synchronized void start() {
223 		log.warning("Starting for " + agentId + " @ " + address + ":" + port);
224 		if (logReadingWorker.running.getFlag()) {
225 			log.warning("Old connection is still up... closing.");
226 			stop();
227 			log.warning("Resuming start for " + agentId + " @ " + address + ":" + port);
228 		}		
229 		workerThread = new Thread(logReadingWorker, "NetworkLogClient-" + address + ":" + port);		
230 		workerThread.start();
231 		
232 		logReadingWorker.connected.waitFor(true, false);
233 		
234 		if (logReadingWorker.connected.getFlag() == false) {
235 			throw new PogamutException("Could not start reading logs from the network.", logReadingWorker.exception, this);
236 		}
237 	}
238 	
239 	public synchronized void stop() {
240 		log.warning("Stopping for " + agentId + " @ " + address + ":" + port);
241 		if (logReadingWorker != null) {
242 			if (logReadingWorker.running.getFlag()) {
243 				logReadingWorker.kill();
244 				logReadingWorker.running.waitFor(false);
245 			}
246 		}
247 	}
248 	
249 	public Flag<Boolean> getRunning() {
250 		return logReadingWorker.running;
251 	}
252 	
253 	public Flag<Boolean> getConnected() {
254 		return logReadingWorker.connected;
255 	}
256 	
257 	public void addListener(ILoggingStartedListener listener) {
258 		loggingStartedCallback.addStrongListener(listener);
259 	}
260 	
261 	public void removeListener(ILoggingStartedListener listener) {
262 		loggingStartedCallback.removeListener(listener);
263 	}
264 	
265 	public void addListener(ILoggingStoppedListener listener) {
266 		loggingStoppedCallback.addStrongListener(listener);	
267 	}
268 	
269 	public void removeListener(ILoggingStoppedListener listener) {
270 		loggingStoppedCallback.removeListener(listener);
271 	}
272 	
273 	public void addListener(ILogReadListener listener) {
274 		logReadCallback.addStrongListener(listener);
275 	}
276 	
277 	public void removeListener(ILogReadListener listener) {
278 		logReadCallback.removeListener(listener);
279 	}
280 	
281 	// ----
282     // --------
283     // ==================
284     // LOG READING WORKER
285     // ==================
286     // --------
287     // ----
288 
289     /**
290      * Worker instance - it implements Runnable interface and is continuously
291      * reading messages from the connection object and passing them to the
292      * callbacks.
293      */
294     protected LogReadingWorker logReadingWorker = null;
295     
296     /**
297      * Thread of the worker.
298      */
299     protected Thread workerThread = null;
300     
301     private static final String NEW_LINE = System.getProperty("line.separator");
302     
303     /**
304      * Handles trailing '\r' in the 'msg'.
305      * @param msg
306      * @return
307      */
308     private String checkLineEnd(String msg) {
309     	if (msg == null) return null;
310     	if (NEW_LINE.length() == 2) {
311     		// WINDOWS-STYLE-LINE-ENDS
312     		return msg;
313     	} else {
314     		// CHECK-FOR-TRAILING \r
315     		if (msg.charAt(msg.length()-1) == '\r') {
316     			// TRAILING \r, CUT IT OF
317     			return msg.substring(0, msg.length()-1);
318     		}
319     		// ALL-OK
320     		return msg;
321     	}
322     }
323     
324     private class LogReadingWorker implements Runnable {
325 
326         /**
327          * Simple flag that is telling us whether the Worker should run.
328          */
329         private volatile boolean shouldRun = true;
330         public volatile Flag<Boolean> running = new Flag<Boolean>(false);
331         public volatile Flag<Boolean> connected = new Flag<Boolean>(null);
332         private volatile boolean exceptionExpected = false;
333         public volatile Throwable exception = null;
334         private Thread myThread = null;
335         
336         /**
337          * Drops the shouldRun flag, waits for 200ms and then interrupts the
338          * thread in hope it helps.
339          */
340         public void kill() {
341             if (!running.getFlag()) {
342                 return;
343             }
344             this.shouldRun = false;
345             try {
346                 Thread.sleep(200);
347             } catch (InterruptedException e) {
348             }
349             exceptionExpected = true;
350             myThread.interrupt();
351         }
352 
353         /**
354          * Contains main while cycle that is continuously reading messages from
355          * the connection (using parser), notifying listeners and then passing
356          * them to the message receiver.
357          */
358         @Override
359         public void run() {
360             
361         	// initialize
362             myThread = Thread.currentThread();
363             exception = null;
364             shouldRun = true;
365             exceptionExpected = false;
366             // set the running flag, we've been started
367             running.setFlag(true);
368             connected.setFlag(null);
369 
370             // notify that logger started
371             log.info("LogReadingWorker: Started.");
372             
373             SocketChannel socket = null;
374             
375             try {
376 				socket = SocketChannel.open(new InetSocketAddress(address, port));
377 			} catch (IOException e1) {
378 				try {
379 					socket.close();
380 				} catch (Exception e2) {					
381 				}
382 				log.severe("LogReadingWorker: Could not open socket for " + address + ":" + port + ".");
383 				socket = null;
384 				exception = e1;
385 				loggingStoppedNotifier.setEvent(new LoggingStopped(new PogamutIOException("Could not open " + address + ":" + port + ".", e1)));
386 				loggingStoppedCallback.notifySafe(loggingStoppedNotifier, log);
387 				connected.setFlag(false);
388 				running.setFlag(false);
389 				log.warning("LogReadingWorker stopped.");
390 				return;
391 			}
392 			
393 			log.fine("LogReadingWorker: Socket opened for " + address + ":" + port + ".");
394 
395 			CharsetEncoder encoder = NetworkLogManager.USED_CHARSET.newEncoder();
396 	        CharsetDecoder decoder = NetworkLogManager.USED_CHARSET.newDecoder();
397 	        
398 	        PrintWriter writer = new PrintWriter(Channels.newWriter(socket, encoder, -1));
399 			BufferedReader reader = new BufferedReader(Channels.newReader(socket, decoder, -1));
400 			
401 			try {
402 				writer.println(agentId);
403 				writer.flush();
404 			} catch (Exception e1) {
405 				try {
406 					socket.close();
407 				} catch (Exception e2) {					
408 				}
409 				socket = null;
410 				exception = e1;
411 				loggingStoppedNotifier.setEvent(new LoggingStopped(new PogamutIOException("Could not send 'agent id' to " + address + ":" + port + ".", e1)));
412 				loggingStoppedCallback.notifySafe(loggingStoppedNotifier, log);
413 				connected.setFlag(false);
414 				running.setFlag(false);				
415 				log.warning("LogReadingWorker: stopped.");
416 				return;
417 			}
418 			
419 			log.fine("LogReadingWorker: Agent id sent.");
420 			
421 			loggingStartedNotifier.setEvent(new LoggingStarted());
422 			loggingStartedCallback.notify(loggingStartedNotifier);
423 			
424 			log.fine("LogReadingWorker: Starting reading logs.");
425 			
426             try {
427             	
428             	StringBuffer message = new StringBuffer();
429             	
430             	connected.setFlag(true);
431             	
432             	String category = null;
433         		String level    = null;
434         		String time     = null;
435             	
436                 while (shouldRun && !myThread.isInterrupted()) {
437                 	try {
438                 		try {
439 	                		category = checkLineEnd(reader.readLine());
440 	                		level    = checkLineEnd(reader.readLine());
441 	                		time     = checkLineEnd(reader.readLine());
442 	                		
443 	                		if (message.length() > 0) {
444 	                			message.delete(0, message.length());
445 	                		}
446 	                		
447 	                		boolean first = true;
448 	                		
449 	                		while (true) {
450 	                			String msg = checkLineEnd(reader.readLine());
451 	                			if (msg.equals("</end>")) break;
452 	                			if (first) first = false;
453 	                			else message.append(NEW_LINE);
454 	                			message.append(msg);
455 	                		}
456                 		} catch (Exception e) {
457                 			log.severe(ExceptionToString.process("LogReadingWorker: Exception while reading data from the socket, connection closed from the remote side? Shutting down...", e));
458                 			break;
459                 		}
460                 		
461                 		NetworkLogEnvelope logEnvelope = null;
462                 		try {
463                 			logEnvelope = new NetworkLogEnvelope(category, level, time, message.toString());
464                 		} catch (Exception e) {
465                 			log.warning(ExceptionToString.process("LogReadingWorker: MALFORMED log record, category='" + category + "', level='" + level + "', time='" + time + "', message='" + message + "'", e));
466                 			continue;
467                 		}
468                 		logReadCallback.notify(logReadNotifier.setEvent(new LogRead(logEnvelope)));                		
469                 	} catch (Exception e) { 
470                 		if (!exceptionExpected) {
471                             log.severe(ExceptionToString.process("LogReadingWorker: Exception at LogSendingWorker.", e));
472                             exception = e;
473                         } else {
474                             log.fine(ExceptionToString.process("LogReadingWorker: Exception at LogSendingWorker, expected.", e));
475                         }	
476                 		break;
477                 	}
478                 }
479             } catch (Exception e) {
480             	if (!exceptionExpected) {
481                     log.severe(ExceptionToString.process("LogReadingWorker: Exception at LogSendingWorker.", e));
482                     exception = e;
483                 } else {
484                     log.fine(ExceptionToString.process("LogReadingWorker: Exception at LogSendingWorker, expected.", e));
485                 }		                
486             } finally {
487             	try {
488             		socket.close();
489             	} catch (Exception e) {            		
490             	}
491             	connected.setFlag(false);
492             	running.setFlag(false);
493             	if (exception != null) {
494             		loggingStoppedCallback.notifySafe(loggingStoppedNotifier.setEvent(new LoggingStopped(exception)), log);
495             	} else {
496             		loggingStoppedCallback.notifySafe(loggingStoppedNotifier.setEvent(new LoggingStopped()), log);
497             	}
498             	if (implicitRemoveListeners) {
499                 	log.warning("LogReadingWorker: Removing all listeners...");
500                 	loggingStartedCallback.clearListeners();
501                 	logReadCallback.clearListeners();
502                 	loggingStoppedCallback.clearListeners();
503                 }
504 
505                 log.warning("LogSendingWorker: Stopped.");
506             }
507         }
508 
509     }
510 
511 }