View Javadoc

1   package cz.cuni.amis.pogamut.base.utils.logging;
2   
3   import java.io.BufferedReader;
4   import java.io.IOException;
5   import java.io.InputStreamReader;
6   import java.io.OutputStreamWriter;
7   import java.io.PrintWriter;
8   import java.io.Writer;
9   import java.net.InetAddress;
10  import java.net.InetSocketAddress;
11  import java.net.ServerSocket;
12  import java.net.Socket;
13  import java.net.SocketTimeoutException;
14  import java.net.UnknownHostException;
15  import java.nio.channels.ServerSocketChannel;
16  import java.nio.channels.SocketChannel;
17  import java.nio.charset.Charset;
18  import java.nio.charset.CharsetDecoder;
19  import java.nio.charset.CharsetEncoder;
20  import java.util.Iterator;
21  import java.util.LinkedList;
22  import java.util.List;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.logging.Level;
25  
26  import cz.cuni.amis.pogamut.base.utils.DefaultPogamutPlatform;
27  import cz.cuni.amis.pogamut.base.utils.Pogamut;
28  import cz.cuni.amis.pogamut.base.utils.PogamutProperty;
29  import cz.cuni.amis.utils.ExceptionToString;
30  import cz.cuni.amis.utils.exception.PogamutException;
31  import cz.cuni.amis.utils.exception.PogamutIOException;
32  import cz.cuni.amis.utils.maps.LazyMap;
33  import cz.cuni.amis.utils.token.IToken;
34  import cz.cuni.amis.utils.token.Tokens;
35  
36  
37  /**
38   * A class used for network logging of agents.
39   * <p><p>
40   * It is a singleton which accepts logs from agents of one JVM and publishes them on certain port, where clients may accept them.
41   * 
42   * @author Pyroh
43   * @author Jimmy
44   */
45  public class NetworkLogManager {
46  	
47  	/**
48  	 * Number of logs we are buffering for every agent, these logs are sent to the client whenever it
49  	 * opens a socket that wants to listen for a particular agent.
50  	 */
51      private static final int MAXIMUM_LOGS_PER_AGENT = 100;
52  	
53  	/**
54  	 * {@link Charset} that is used by the {@link NetworkLogManager} to send logs over the socket.
55  	 * <p><p>
56  	 * Initialized to "UTF-8" as default.
57  	 */
58  	public static final Charset USED_CHARSET = Charset.forName("UTF-8");	
59  	
60  	/**
61  	 * How often the logs are flushed to the socket (value < 100
62  	 * results in unacceptable throughput). 
63  	 * <p><p>
64  	 * Initialized to 200 millis;
65  	 */
66  	public static final long NETWORK_FLUSH_PERIOD_MILLIS = 200;
67  
68  	/**
69  	 * How long do we wait for the agent-id before we drop the connection.
70  	 * <p><p>
71  	 * Initialized to 1000 millis. 
72  	 */
73  	public static final long NETWORK_LOG_MANAGER_SOCKET_TIMEOUT_MILLIS = 1000;
74  	
75  //	  /**
76  //     * How many logs may be buffered before the network logging starts to lag.
77  //     */
78  //    public static final int LOG_BUFFER_LENGTH = 2000;
79  	
80  	/**
81       * The reference to singleton of {@link NetworkLogManager} (this class).
82       */
83      private static NetworkLogManager manager = null;
84      
85      /**
86       * Mutex that synchronizes construction and destruction of the {@link NetworkLogManager#manager}.
87       */
88      private static Object managerMutex = new Object();
89      
90      /**     
91       * @return instance of {@link NetworkLogManager}, if called for the first time, creates an instance, before returning it.
92       */
93      public static NetworkLogManager getNetworkLogManager() {
94      	NetworkLogManager instance = manager;
95      	if (instance != null && instance.operating) {
96      		return instance;
97      	}
98      	synchronized(managerMutex) {
99      		instance = manager;    		
100     		if (instance == null || !instance.operating) {
101     			manager = instance = new NetworkLogManager();
102     		}
103     		return instance;
104     	}        
105     }
106 	
107 //	/**
108 //	 * Log record that contains its destination ({@link LogSocket} where we should send the log).
109 //	 * 
110 //	 * @author Jimmy
111 //	 */
112 //	private static class LogSocketEntry {
113 //		
114 //		private LogSocket logSocket;
115 //		
116 //		private IToken agentId;
117 //		
118 //		private NetworkLogEnvelope logRecord;		
119 //		
120 //		public LogSocketEntry(LogSocket logSocket, IToken agentId, NetworkLogEnvelope logRecord) {
121 //			this.logSocket = logSocket;
122 //			this.agentId = agentId;
123 //			this.logRecord = logRecord;
124 //		}
125 //
126 //		public LogSocket getSocket() {
127 //			return logSocket;
128 //		}
129 //		
130 //		public IToken getAgentId() {
131 //			return agentId;
132 //		}
133 //
134 //		public NetworkLogEnvelope getRecord() {
135 //			return logRecord;
136 //		}
137 //				
138 //	}
139 		
140 	/**
141 	 * Used for sending {@link NetworkLogEnvelope} down the {@link LogSocket#socket} via {@link LogSocket#writer} using
142 	 * serialization.
143 	 * <p><p>
144 	 * THREAD-UNSAFE! ACCESS TO ALL METHODS MUST BE SYNCHRONIZED!
145 	 * 
146 	 * @author Jimmy
147 	 */
148 	private static class LogSocket {
149 				
150 		/**
151 		 * {@link CharsetEncoder} that is used to serialize {@link String}s into the {@link LogSocket#socket}, used by
152 		 * {@link LogSocket#writer}.
153 		 */
154 		public CharsetEncoder encoder = USED_CHARSET.newEncoder();
155 		
156 		/**
157 		 * Socket that is used for sending logs down the hole.
158 		 */
159 		private Socket socket;
160 		
161 		/**
162 		 * Whether the socked should be opened, i.e., it was not closed via {@link LogSocket#close()}.
163 		 */
164 		private boolean opened = true;
165 			
166 		/**
167 		 * {@link Writer} that is being used for sending the logs down through the {@link LogSocket#socket}.
168 		 */
169 		private PrintWriter writer;
170 		
171 		/**
172 		 * What is the last time we have performed {@link LogSocket#flush()} operation. Frequency of {@link LogSocket#flush()} operations
173 		 * are influencing a throughput.
174 		 */
175 		private long lastFlush = System.currentTimeMillis();
176 		
177 		/**
178 		 * Wraps 'socket' with a service layer providing a means for sending {@link NetworkLogEnvelope}s through it.
179 		 * @param socket
180 		 * @throws IOException
181 		 */
182 		public LogSocket(Socket socket) throws IOException {
183 			this.socket = socket;
184 			this.writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), encoder));
185 		}
186 		
187 		/**
188 		 * Whether the socket is opened.
189 		 * <p><p>
190 		 * If the socket is not opened, you CAN'T CALL {@link LogSocket#send(NetworkLogEnvelope)} or {@link LogSocket#flush()} as they would
191 		 * result in {@link NullPointerException}.
192 		 * @return
193 		 */
194 		public boolean isOpened() {
195 			return opened;
196 		}
197 		
198 		/**
199 		 * Closes the {@link LogSocket#socket} and frees internal data structures.
200 		 */
201 		public void close() {
202 			if (!opened) return;
203 			opened = false;
204 			try {
205 				socket.close();
206 			} catch (Exception e) {					
207 			}
208 			encoder = null;
209 			writer = null;
210 			socket = null;
211 		}
212 		
213 		/**
214 		 * Serialize the {@link NetworkLogEnvelope} into the {@link LogSocket#socket} via {@link LogSocket#writer}.
215 		 * 
216 		 * @param log
217 		 * @throws IOException
218 		 */
219 		public void send(NetworkLogEnvelope log) throws IOException {
220 			writer.println(log.getCategory());
221 			writer.println(log.getLevel());			
222 			writer.println(String.valueOf(log.getMillis()));
223 			writer.println(log.getMessage());
224 			writer.println("</end>");
225 			checkFlush();
226 		}
227 		
228 		/**
229 		 * Unconditionally performing {@link Writer#flush()} operation to send the byte buffer through the {@link LogSocket#socket}.
230 		 * @throws IOException
231 		 */
232 		public void flush() throws IOException {
233 			writer.flush();
234 			lastFlush = System.currentTimeMillis();
235 		}
236 		
237 		/**
238 		 * Checks whether the time has come to perform {@link LogSocket#socket} based on the {@link LogSocket#lastFlush} and {@link NetworkLogManager#NETWORK_FLUSH_PERIOD_MILLIS}.
239 		 * @throws IOException
240 		 */
241 		public void checkFlush() throws IOException {
242 			if (System.currentTimeMillis() - lastFlush > NETWORK_FLUSH_PERIOD_MILLIS) {
243 				flush();
244 			}	
245 		}
246 		
247 	}
248     
249 //	  /**
250 //     * Logs we need to publish.
251 //     */
252 //    private ArrayBlockingQueue<LogSocketEntry> logsToSend = new ArrayBlockingQueue<LogSocketEntry>(LOG_BUFFER_LENGTH);
253     
254     /**
255      * Used for logging stuff of this class.
256      */
257     private static LogCategory log = new LogCategory("NetworkLogManager");
258     
259     static {
260     	String level = Pogamut.getPlatform().getProperty(PogamutProperty.POGAMUT_NETWORK_LOG_MANAGER_AND_CLIENT_LEVEL.getKey());
261     	if (level == null) level = "WARNING";
262     	log.setLevel(Level.parse(level));
263     	log.addConsoleHandler();    	
264     }
265     
266     /**
267      * Mutex synchronizing construction and destruction of the {@link NetworkLogManager#serverWorker}.
268      */
269     private Object serverWorkerMutex = new Object();
270     
271     /**
272      * This maps agents to various sockets which accept the agent's logs.
273      */
274     private LazyMap<IToken, ConcurrentLinkedQueue<LogSocket>> idToSocketList = new LazyMap<IToken, ConcurrentLinkedQueue<LogSocket>>() {
275 
276 		@Override
277 		protected ConcurrentLinkedQueue<LogSocket> create(IToken key) {
278 			return new ConcurrentLinkedQueue<LogSocket>();
279 		}
280     	
281     };
282     
283     /**
284      * This maps agents to their logs in memory. It does not keep all of the agent's logs; these which are sent are deleted from the map.
285      */
286     private LazyMap<IToken, ConcurrentLinkedQueue<NetworkLogEnvelope>> idToEnvelopeList = new LazyMap<IToken, ConcurrentLinkedQueue<NetworkLogEnvelope>>() {
287 
288 		@Override
289 		protected ConcurrentLinkedQueue<NetworkLogEnvelope> create(IToken key) {
290 			return new ConcurrentLinkedQueue<NetworkLogEnvelope>();
291 		}
292     	
293     };
294     
295     /**
296      * Flag that tells whether the {@link NetworkLogManager} is operating. Once {@link NetworkLogManager#shutdown()} this
297      * flag is set to false and no new threads are created / no logs are published, etc. 
298      */
299 	private boolean operating = true;    
300 
301     /**
302      * A constructor, initializes {@link NetworkLogManager#serverSocket} and the manager's logger.
303      * <p><p>
304      * Called from {@link NetworkLogManager#getNetworkLogManager()} (lazy-singleton initialization).
305      */
306     private NetworkLogManager() {
307     	start();
308     }
309     
310     /**
311      * Called from the constructor to startup this instance of {@link NetworkLogManager}.
312      * <p><p>
313      * Initialize {@link NetworkLogManager#serverWorker}.
314      * <p><p>
315      * MUST NOT BE CALLED AGAIN! (Constructor-call / one-time only). 
316      */
317 	private void start() {
318     	synchronized(serverWorkerMutex) {
319     		if (log != null && log.isLoggable(Level.FINER)) log.finer("Starting...");
320      		try {
321 				serverWorker = new ServerWorker();
322 			} catch (IOException e) {
323 				throw new PogamutIOException("Could not initialize NetworkLogManager, could not open server socket.", e, log, this);
324 			}
325     		serverWorkerThread = new Thread(serverWorker, "NetworkLogManager-ServerSocket");
326     		serverWorkerThread.start();
327     		
328 //    		logSendingWorker = new LogSendingWorker();
329 //    		workerThread = new Thread(logSendingWorker, "NetworkLogManager-LogSender");
330 //    		workerThread.start();
331     		    		
332     		log.fine("Started.");
333     	}
334     }    
335     
336     /**
337      * Whether this instance is active, i.e., it was not {@link NetworkLogManager#shutdown()}.
338      * 
339      * @return whether the manager is running, i.e., accepting new connections and publishing logs
340      */
341     public boolean isRunning() {
342 		return operating;
343 	}
344 
345     /**
346      * Method called from {@link DefaultPogamutPlatform#close()} to shutdown the network
347      * logging, terminating its thread to let JVM die gracefully.
348      * <p><p>
349      * YOU DO NOT PROBABLY WANT TO CALL THIS METHOD MANUALLY!!!
350      * <p><p> 
351      * But if you do, it will shutdown all sockets and the worker, but it will start it up again if there is any agent will wish 
352      * to log something again. Therefore it is a good idea to call the shutdown after all Pogamut's agents are stopped/killed (i.e., dead). 
353      */
354     public void kill()  {
355     	// we're shutting down == we're not operating any more!
356     	operating = false; 
357     	synchronized(managerMutex) {
358     		// nullify the static manager var, but only iff it is still us!
359 			if (manager == this) {
360 				manager = null;
361 			}
362 		}
363     	if (log != null && log.isLoggable(Level.WARNING)) log.warning("Shutting down!");
364     	// shutdown the logging
365     	synchronized(serverWorkerMutex) {    		    		
366 
367     		serverWorker.kill();
368     		serverWorker = null;
369     		
370     		while (idToSocketList.size() > 0) {
371     			IToken agent;
372     			synchronized(idToSocketList) {
373     				if (idToSocketList.size() == 0) break;
374     				agent = idToSocketList.keySet().iterator().next();
375     			}
376     			removeAgent(agent);
377     		}
378     		
379             log.severe("Shutdown.");        
380     	}
381     }
382 
383     /**
384      * @return port on which the serverSocket is listening or -1 if the socket is not initialized (i.e., there is no logging agent registered inside the manager).
385      */
386     public int getLoggerPort() {
387     	if (!operating) return -1;
388     	ServerWorker serverWorker = this.serverWorker;
389     	if (serverWorker == null) return -1;
390         return serverWorker.serverSocket.getLocalPort();
391     }
392     
393     /**
394      * @return host where we're listening for logging client or null if the socket is not initialized (i.e., there is no logging agent regist9ered inside the manager).
395      */
396     public String getLoggerHost() {
397 		if (!operating) return null;
398 		if (serverWorker == null) return null;		
399 		try {
400 			byte[] addr = InetAddress.getLocalHost().getAddress();
401 			return InetAddress.getLocalHost().getHostAddress();
402 		} catch (UnknownHostException e) {
403 			throw new PogamutException("Could not determine host IP address.", e, this);
404 		}
405         //return serverWorker.serverSocket.getInetAddress().getHostAddress();
406 	}
407     
408     /**
409      * Initializes logging for the 'agent'.
410      * @param agent
411      */
412     public void addAgent(IToken agent) {
413     	if (!operating) {
414     		return;
415     	}
416     	if (log != null && log.isLoggable(Level.FINE)) log.fine("Adding network logging for agent: " + agent.getToken());
417     	synchronized(idToSocketList) {
418     		idToSocketList.get(agent);
419     	}
420     	synchronized(idToEnvelopeList) {
421     		idToEnvelopeList.get(agent);
422     	}
423     }
424 
425     /**
426      * Removes an agent from the manager - stops accepting its logs, closes sockets where its logs are being sent etc...
427      * @param agent The agent whose logging is to be stopped.
428      */
429     public void removeAgent(IToken agent) {
430     	
431     	// NOTE: possible memory leaks when removeAgent() is called together with processLog() for a same agent at the same time
432     	//       which is totally weird (should not happen!)
433     	
434     	if (log != null && log.isLoggable(Level.WARNING)) log.warning("Removing network logging for agent: " + agent);		
435 		
436     	if (log != null && log.isLoggable(Level.INFO)) log.info("Closing logging sockets for: " + agent);
437 		ConcurrentLinkedQueue<LogSocket> agentSockets;
438     	synchronized(idToSocketList) {
439     		agentSockets = idToSocketList.get(agent);
440     		idToSocketList.remove(agent);
441     	}
442     	for (LogSocket socket : agentSockets) {
443     		synchronized (socket){
444     			try {
445 					socket.flush();
446 				} catch (IOException e) {
447 				}
448     			socket.close();
449     		}
450 		}
451     	
452     	log.info("Removing bruffered logs for: " + agent);
453    		synchronized(idToEnvelopeList) {
454    			idToEnvelopeList.remove(agent);
455    		}
456    		
457     }
458     
459     /**
460      * A method called from NetworkLogPublisher when a log is to be published.
461      * @param record The envelope containing informations to be published
462      * @param id Who has sent the log.
463      */
464     public void processLog(NetworkLogEnvelope record, IToken agent) {
465     	if (!operating) {
466     		return;
467     	}
468     	
469     	if (log != null && log.isLoggable(Level.FINEST)) log.finest("Processing log: (" + agent.getToken() + ") " + record);
470     	
471     	// NOTE: possible memory leaks when processLog() is called together with removeAgent() for a same agent at the same time
472     	
473     	ConcurrentLinkedQueue<LogSocket> agentSockets = null;
474     	ConcurrentLinkedQueue<NetworkLogEnvelope> agentLogs;
475     	
476     	synchronized(idToSocketList) {
477     		if (idToSocketList.containsKey(agent)) {
478     			agentSockets = idToSocketList.get(agent);
479     		}
480     	}    	
481     	if (agentSockets != null) {
482     		Iterator<LogSocket> iter = agentSockets.iterator();
483     		while (iter.hasNext()) {
484     			LogSocket logSocket = iter.next();
485 //    			if (logSocket.isOpened()) {
486 //    				try {
487 //						logsToSend.put(new LogSocketEntry(logSocket, agent, record));
488 //					} catch (InterruptedException e) {
489 //						throw new PogamutInterruptedException(e, this);
490 //					}
491 //    			}
492     			synchronized(logSocket) {
493     				if (!logSocket.isOpened()) {
494     					logSocket.close();
495     					iter.remove();
496     					continue;
497     				}
498     				try {
499 						logSocket.send(record);								
500 					} catch (Exception e) {
501 						logSocket.close();
502     					iter.remove();
503     					continue;
504 					}
505     			}
506 	    	}
507     	}
508     	
509     	synchronized(idToEnvelopeList) {
510     		agentLogs = idToEnvelopeList.get(agent);
511     	}
512     	
513     	agentLogs.add(record);
514     	
515     	while(agentLogs.size() > MAXIMUM_LOGS_PER_AGENT) {
516     		try {
517     			agentLogs.remove();
518     		} catch (Exception e) {    			
519     		}
520     	}
521     }
522     
523     /**
524      * Returns a logger that the {@link NetworkLogManager} is using.
525      * @return
526      */
527     public static LogCategory getLog() {
528     	return log;
529     }
530     
531 //    // ----
532 //    // --------
533 //    // ==================
534 //    // LOG SENDING WORKER
535 //    // ==================
536 //    // --------
537 //    // ----
538 //
539 //    /**
540 //     * Worker instance - it implements Runnable interface and is continuously
541 //     * reading messages from the connection object and passing them to the
542 //     * receiver.
543 //     */
544 //    protected LogSendingWorker logSendingWorker = null;
545 //    
546 //    /**
547 //     * Thread of the worker.
548 //     */
549 //    protected Thread workerThread = null;
550 //    
551 //    private class LogSendingWorker implements Runnable {
552 //
553 //        /**
554 //         * Simple flag that is telling us whether the Worker should run.
555 //         */
556 //        private volatile boolean shouldRun = true;
557 //        private volatile boolean running = false;
558 //        private volatile boolean exceptionExpected = false;
559 //        private Thread myThread;
560 //
561 //        /**
562 //         * Drops the shouldRun flag, waits for 200ms and then interrupts the
563 //         * thread in hope it helps.
564 //         */
565 //        public void kill() {
566 //            if (!running) {
567 //                return;
568 //            }
569 //            this.shouldRun = false;
570 //            try {
571 //                Thread.sleep(200);
572 //            } catch (InterruptedException e) {
573 //            }
574 //            exceptionExpected = true;
575 //            myThread.interrupt();
576 //        }
577 //
578 //        private void send(LogSocketEntry entry) {
579 //        	synchronized(entry.logSocket) {
580 //	        	try {
581 //	    			if (entry.getSocket().isOpened()) {
582 //	    				log.finest("Sending log: " + entry.getAgentId().getToken() + " " + entry.getRecord() + " -> " + entry.getSocket().socket.socket().getRemoteSocketAddress());
583 //	    				entry.getSocket().send(entry.getRecord());
584 //	    			} else {
585 //	    				// could not send data through the socket...
586 //	        			log.info("Logging socket closed from the remote side for the agent: " + entry.getAgentId() + ", removing it from the socket list.");
587 //	        			entry.getSocket().close();
588 //	        			idToSocketList.get(entry.getAgentId()).remove(entry.getSocket());
589 //	    			}
590 //	    		} catch (IOException e) {
591 //	    			// could not send data through the socket...
592 //	    			log.info("Logging socket closed from the remote side for the agent: " + entry.getAgentId() + ", removing it from the socket list.");
593 //	    			entry.getSocket().close();
594 //	    			idToSocketList.get(entry.getAgentId()).remove(entry.getSocket());
595 //	    		}
596 //        	}
597 //        }
598 //        
599 //        /**
600 //         * Contains main while cycle that is continuously reading messages from
601 //         * the connection (using parser), notifying listeners and then passing
602 //         * them to the message receiver.
603 //         */
604 //        @Override
605 //        public void run() {
606 //            
607 //            myThread = Thread.currentThread();
608 //
609 //            // set the running flag, we've been started
610 //            running = true;
611 //
612 //            // notify that gateway started
613 //            log.info("LogSendingWorker started.");
614 //
615 //            List<LogSocketEntry> buffer = new ArrayList<LogSocketEntry>(LOG_BUFFER_LENGTH);
616 //            
617 //            int performanceLimit = (int) (LOG_BUFFER_LENGTH * 0.9);
618 //            
619 //            try {
620 //                while (operating && shouldRun && !myThread.isInterrupted()) {
621 //                	try {
622 //                		LogSocketEntry entry = logsToSend.take();
623 //                		if (!operating || !shouldRun || myThread.isInterrupted()) break;
624 //                		send(entry);
625 //                		if (logsToSend.size() > performanceLimit) {
626 //                			log.warning("Network logging reached its performance limit! Lower the levels of logging! logsToSend.size() == " + logsToSend.size());
627 //                		}
628 //                		logsToSend.drainTo(buffer);
629 //                		if (!operating || !shouldRun || myThread.isInterrupted()) break;
630 //                		for (LogSocketEntry logSocketEntry : buffer) {
631 //                			send(logSocketEntry);
632 //                    		if (!operating || !shouldRun || myThread.isInterrupted()) break;
633 //                		}
634 //                		buffer.clear();
635 //                	} catch (Exception e) { 
636 //                		if (!exceptionExpected) {
637 //                            log.severe(ExceptionToString.process("Exception at LogSendingWorker.", e));
638 //                        } else {
639 //                            log.fine(ExceptionToString.process("Exception at LogSendingWorker, expected.", e));
640 //                        }	
641 //                		break;
642 //                	}
643 //                }
644 //            } catch (Exception e) {
645 //            	if (!exceptionExpected) {
646 //                    log.severe(ExceptionToString.process("Exception at LogSendingWorker.", e));
647 //                } else {
648 //                    log.fine(ExceptionToString.process("Exception at LogSendingWorker, expected.", e));
649 //                }		                
650 //            } finally {
651 //            	running = false;
652 //            }
653 //
654 //            log.warning("LogSendingWorker Stopped.");
655 //        }
656 //    }
657     
658     // ----
659     // --------
660     // =============
661     // SERVER WORKER
662     // =============
663     // --------
664     // ----
665     
666     
667     protected ServerWorker serverWorker  = null;
668     
669     /**
670      * Thread of the server worker.
671      */
672     protected Thread serverWorkerThread = null;
673 
674     /**
675      * Used to store socket where we awaits the 'agent id' that should we be logging.
676      * @author Jimmy
677      */
678     private class DanglingSocket {
679     	
680     	CharsetEncoder encoder = USED_CHARSET.newEncoder();
681         CharsetDecoder decoder = USED_CHARSET.newDecoder();
682     	
683     	public Socket socket;
684     	public final long created = System.currentTimeMillis();
685     	private PrintWriter writer;
686     	private BufferedReader reader;
687     	
688     	private StringBuffer agentId = new StringBuffer();
689     	
690     	private char[] buf = new char[128];
691     	
692     	public DanglingSocket(Socket socket) throws IOException {
693     		this.socket = socket;
694     		this.writer = new PrintWriter(socket.getOutputStream());
695     		this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
696     	}
697     	
698     	public String readAgentId() throws IOException {    		
699 			int read = reader.read(buf);
700 			if (read == 0) return null;
701 			if (buf[read-1] == '\n') {
702 				int minus = 1;
703 				if (read > 1 && buf[read-2] == '\r') {
704 					minus = 2;
705 				}
706 				agentId.append(buf, 0, read-minus);
707 				return agentId.toString();
708 			} else {
709 				agentId.append(buf, 0, read);
710 				return null;
711 			}    			
712     		
713     	}
714     	
715     }
716     
717     /**
718      * Accepts new connections via {@link ServerWorker#serverSocket}, reads agent id that we should be logging on new sockets,
719      * populates {@link NetworkLogManager#idToSocketList}.
720      * 
721      * @author Jimmy
722      */
723     private class ServerWorker implements Runnable {
724     	
725         /**
726          * The only instance of {@link ServerSocket} which accepts new clients.
727          */
728         private ServerSocket serverSocket;
729 
730         /**
731          * Simple flag that is telling us whether the Worker should run.
732          */
733         private volatile boolean shouldRun = true;
734         private volatile boolean running = false;
735         private volatile boolean exceptionExpected = false;
736         private Thread myThread;
737         
738         /**
739          * List of sockets where we did not received AgentID yet.
740          */
741         private List<DanglingSocket> danglingSockets = new LinkedList<DanglingSocket>();
742         
743         /**
744          * Opens {@link ServerWorker#serverSocket}.
745          * 
746          * @throws IOException
747          */
748         public ServerWorker() throws IOException {
749         	serverSocket = new ServerSocket();
750         	serverSocket.bind (new InetSocketAddress (0));
751             //serverSocket.configureBlocking(true);
752         }
753 
754         /**
755          * Drops the {@link ServerWorker#shouldRun} flag, waits for 100ms and then interrupts the
756          * thread in hope it helps.
757          */
758         public void kill() {
759             if (!running) {
760                 return;
761             }
762             this.shouldRun = false;
763             try {
764                 Thread.sleep(100);
765             } catch (InterruptedException e) {
766             }
767             exceptionExpected = true;
768             myThread.interrupt();
769         }
770 
771         /**
772          * Contains main while cycle that is awaiting new connections at {@link ServerWorker#serverSocket}
773          * and populating {@link NetworkLogManager#idToSocketList} with them.
774          */
775         @Override
776         public void run() {
777             
778             myThread = Thread.currentThread();
779 
780             // set the running flag, we've been started
781             running = true;
782 
783             // notify that gateway started
784             if (log != null && log.isLoggable(Level.INFO)) log.info("ServerWorker started.");
785 
786             try {
787                 while (operating && shouldRun && !myThread.isInterrupted()) {
788                 	try {
789                 		Socket socket = null;
790                 		
791                 		if (danglingSockets.size() == 0) {
792                 			//serverSocket.configureBlocking(false);
793                 			socket = serverSocket.accept();
794                 		} else {
795                 			//serverSocket.configureBlocking(true);
796                 			serverSocket.setSoTimeout(100);
797                 			try {
798                 				socket = serverSocket.accept();                				
799                 			} catch (SocketTimeoutException ex) {
800                 			}
801                 		}
802                 		
803                 		if (socket != null) {
804                 			log.fine("Accepted new connection from " + socket.getRemoteSocketAddress());
805                 			//socket.configureBlocking(false);
806 	            			try {
807 	            				danglingSockets.add(new DanglingSocket(socket));
808 	            			} catch (IOException e1) {
809 	            				try {
810 	            					socket.close();
811 	            				} catch (Exception e2) {                					
812 	            				}
813 	            			}
814                 		}
815                 		
816                 		Iterator<DanglingSocket> iter = danglingSockets.iterator();
817                 		while (iter.hasNext()) {
818                 			DanglingSocket danglingSocket = iter.next();
819                 			String agentId = danglingSocket.readAgentId();
820                 			if (agentId != null) {
821                 				// agentId has been received!
822                 				IToken token = Tokens.get(agentId);
823                 				log.fine("Connection " + danglingSocket.socket.getRemoteSocketAddress() + " sent agent id '" + token.getToken() + "'.");
824                 				Iterator<NetworkLogEnvelope> iter2;
825             					synchronized(idToEnvelopeList) {
826             						ConcurrentLinkedQueue<NetworkLogEnvelope> queue = idToEnvelopeList.get(token);
827             						int size = queue.size();
828             						log.finer("Sending buffered " + size + " log records to the " + danglingSocket.socket.getRemoteSocketAddress());
829             						iter2 = queue.iterator();
830             					}            					
831                 				LogSocket logSocket = new LogSocket(danglingSocket.socket);
832                 				synchronized(logSocket) {
833                 					synchronized(idToSocketList) { 
834                 						idToSocketList.get(token).add(logSocket);
835                 					}   
836                 					//System.out.println("SENDING FIRST LOGS!");
837                 					while(iter2.hasNext()) {
838                 						NetworkLogEnvelope env = iter2.next();
839                 						//System.out.println(env);
840                 						logSocket.send(env);
841                 					}
842                 					//System.out.println("FINISHED");
843                 				}                				
844                 				iter.remove();
845                 				continue;
846                 			}
847                 			if (System.currentTimeMillis() - danglingSocket.created < NETWORK_LOG_MANAGER_SOCKET_TIMEOUT_MILLIS) {
848                 				log.warning("Connection " + danglingSocket.socket.getRemoteSocketAddress() + " timed out. We did not receive agent id in " + NETWORK_LOG_MANAGER_SOCKET_TIMEOUT_MILLIS + " ms. Closing the socket.");
849                 				try {
850                 					danglingSocket.socket.close();
851                 				} catch (IOException e) {                					
852                 				}
853                 				iter.remove();
854                 				continue;
855                 			}
856                 		}                		
857                 	} catch (Exception e) { 
858                 		if (!exceptionExpected) {
859                             log.severe(ExceptionToString.process("Exception at ServerWorker.", e));
860                         } else {
861                             log.fine(ExceptionToString.process("Exception at ServerWorker, expected.", e));
862                             break;
863                         }	
864                 	}
865                 }
866             } catch (Exception e) {
867             	if (!exceptionExpected) {
868                     log.severe(ExceptionToString.process("Exception at ServerWorker.", e));
869                 } else {
870                     log.fine(ExceptionToString.process("Exception ServerWorker, expected.", e));
871                 }		                
872             } finally {
873             	running = false;            	
874             	for (DanglingSocket socket : danglingSockets) {
875             		try {
876 						socket.socket.close();
877 					} catch (Exception e) {
878 					}
879             	}
880             	danglingSockets.clear();
881             	try {
882 					serverSocket.close();
883 				} catch (IOException e) {
884 				}
885             }
886 
887             log.warning("ServerWorker Stopped.");
888         }
889 
890     }
891 
892 }