1 package cz.cuni.amis.pogamut.base.utils.logging;
2
3 import java.io.BufferedReader;
4 import java.io.IOException;
5 import java.io.InputStreamReader;
6 import java.io.OutputStreamWriter;
7 import java.io.PrintWriter;
8 import java.io.Writer;
9 import java.net.InetAddress;
10 import java.net.InetSocketAddress;
11 import java.net.ServerSocket;
12 import java.net.Socket;
13 import java.net.SocketTimeoutException;
14 import java.net.UnknownHostException;
15 import java.nio.channels.ServerSocketChannel;
16 import java.nio.channels.SocketChannel;
17 import java.nio.charset.Charset;
18 import java.nio.charset.CharsetDecoder;
19 import java.nio.charset.CharsetEncoder;
20 import java.util.Iterator;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.logging.Level;
25
26 import cz.cuni.amis.pogamut.base.utils.DefaultPogamutPlatform;
27 import cz.cuni.amis.pogamut.base.utils.Pogamut;
28 import cz.cuni.amis.pogamut.base.utils.PogamutProperty;
29 import cz.cuni.amis.utils.ExceptionToString;
30 import cz.cuni.amis.utils.exception.PogamutException;
31 import cz.cuni.amis.utils.exception.PogamutIOException;
32 import cz.cuni.amis.utils.maps.LazyMap;
33 import cz.cuni.amis.utils.token.IToken;
34 import cz.cuni.amis.utils.token.Tokens;
35
36
37 /**
38 * A class used for network logging of agents.
39 * <p><p>
40 * It is a singleton which accepts logs from agents of one JVM and publishes them on certain port, where clients may accept them.
41 *
42 * @author Pyroh
43 * @author Jimmy
44 */
45 public class NetworkLogManager {
46
47 /**
48 * Number of logs we are buffering for every agent, these logs are sent to the client whenever it
49 * opens a socket that wants to listen for a particular agent.
50 */
51 private static final int MAXIMUM_LOGS_PER_AGENT = 100;
52
53 /**
54 * {@link Charset} that is used by the {@link NetworkLogManager} to send logs over the socket.
55 * <p><p>
56 * Initialized to "UTF-8" as default.
57 */
58 public static final Charset USED_CHARSET = Charset.forName("UTF-8");
59
60 /**
61 * How often the logs are flushed to the socket (value < 100
62 * results in unacceptable throughput).
63 * <p><p>
64 * Initialized to 200 millis;
65 */
66 public static final long NETWORK_FLUSH_PERIOD_MILLIS = 200;
67
68 /**
69 * How long do we wait for the agent-id before we drop the connection.
70 * <p><p>
71 * Initialized to 1000 millis.
72 */
73 public static final long NETWORK_LOG_MANAGER_SOCKET_TIMEOUT_MILLIS = 1000;
74
75 // /**
76 // * How many logs may be buffered before the network logging starts to lag.
77 // */
78 // public static final int LOG_BUFFER_LENGTH = 2000;
79
80 /**
81 * The reference to singleton of {@link NetworkLogManager} (this class).
82 */
83 private static NetworkLogManager manager = null;
84
85 /**
86 * Mutex that synchronizes construction and destruction of the {@link NetworkLogManager#manager}.
87 */
88 private static Object managerMutex = new Object();
89
90 /**
91 * @return instance of {@link NetworkLogManager}, if called for the first time, creates an instance, before returning it.
92 */
93 public static NetworkLogManager getNetworkLogManager() {
94 NetworkLogManager instance = manager;
95 if (instance != null && instance.operating) {
96 return instance;
97 }
98 synchronized(managerMutex) {
99 instance = manager;
100 if (instance == null || !instance.operating) {
101 manager = instance = new NetworkLogManager();
102 }
103 return instance;
104 }
105 }
106
107 // /**
108 // * Log record that contains its destination ({@link LogSocket} where we should send the log).
109 // *
110 // * @author Jimmy
111 // */
112 // private static class LogSocketEntry {
113 //
114 // private LogSocket logSocket;
115 //
116 // private IToken agentId;
117 //
118 // private NetworkLogEnvelope logRecord;
119 //
120 // public LogSocketEntry(LogSocket logSocket, IToken agentId, NetworkLogEnvelope logRecord) {
121 // this.logSocket = logSocket;
122 // this.agentId = agentId;
123 // this.logRecord = logRecord;
124 // }
125 //
126 // public LogSocket getSocket() {
127 // return logSocket;
128 // }
129 //
130 // public IToken getAgentId() {
131 // return agentId;
132 // }
133 //
134 // public NetworkLogEnvelope getRecord() {
135 // return logRecord;
136 // }
137 //
138 // }
139
140 /**
141 * Used for sending {@link NetworkLogEnvelope} down the {@link LogSocket#socket} via {@link LogSocket#writer} using
142 * serialization.
143 * <p><p>
144 * THREAD-UNSAFE! ACCESS TO ALL METHODS MUST BE SYNCHRONIZED!
145 *
146 * @author Jimmy
147 */
148 private static class LogSocket {
149
150 /**
151 * {@link CharsetEncoder} that is used to serialize {@link String}s into the {@link LogSocket#socket}, used by
152 * {@link LogSocket#writer}.
153 */
154 public CharsetEncoder encoder = USED_CHARSET.newEncoder();
155
156 /**
157 * Socket that is used for sending logs down the hole.
158 */
159 private Socket socket;
160
161 /**
162 * Whether the socked should be opened, i.e., it was not closed via {@link LogSocket#close()}.
163 */
164 private boolean opened = true;
165
166 /**
167 * {@link Writer} that is being used for sending the logs down through the {@link LogSocket#socket}.
168 */
169 private PrintWriter writer;
170
171 /**
172 * What is the last time we have performed {@link LogSocket#flush()} operation. Frequency of {@link LogSocket#flush()} operations
173 * are influencing a throughput.
174 */
175 private long lastFlush = System.currentTimeMillis();
176
177 /**
178 * Wraps 'socket' with a service layer providing a means for sending {@link NetworkLogEnvelope}s through it.
179 * @param socket
180 * @throws IOException
181 */
182 public LogSocket(Socket socket) throws IOException {
183 this.socket = socket;
184 this.writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), encoder));
185 }
186
187 /**
188 * Whether the socket is opened.
189 * <p><p>
190 * If the socket is not opened, you CAN'T CALL {@link LogSocket#send(NetworkLogEnvelope)} or {@link LogSocket#flush()} as they would
191 * result in {@link NullPointerException}.
192 * @return
193 */
194 public boolean isOpened() {
195 return opened;
196 }
197
198 /**
199 * Closes the {@link LogSocket#socket} and frees internal data structures.
200 */
201 public void close() {
202 if (!opened) return;
203 opened = false;
204 try {
205 socket.close();
206 } catch (Exception e) {
207 }
208 encoder = null;
209 writer = null;
210 socket = null;
211 }
212
213 /**
214 * Serialize the {@link NetworkLogEnvelope} into the {@link LogSocket#socket} via {@link LogSocket#writer}.
215 *
216 * @param log
217 * @throws IOException
218 */
219 public void send(NetworkLogEnvelope log) throws IOException {
220 writer.println(log.getCategory());
221 writer.println(log.getLevel());
222 writer.println(String.valueOf(log.getMillis()));
223 writer.println(log.getMessage());
224 writer.println("</end>");
225 checkFlush();
226 }
227
228 /**
229 * Unconditionally performing {@link Writer#flush()} operation to send the byte buffer through the {@link LogSocket#socket}.
230 * @throws IOException
231 */
232 public void flush() throws IOException {
233 writer.flush();
234 lastFlush = System.currentTimeMillis();
235 }
236
237 /**
238 * Checks whether the time has come to perform {@link LogSocket#socket} based on the {@link LogSocket#lastFlush} and {@link NetworkLogManager#NETWORK_FLUSH_PERIOD_MILLIS}.
239 * @throws IOException
240 */
241 public void checkFlush() throws IOException {
242 if (System.currentTimeMillis() - lastFlush > NETWORK_FLUSH_PERIOD_MILLIS) {
243 flush();
244 }
245 }
246
247 }
248
249 // /**
250 // * Logs we need to publish.
251 // */
252 // private ArrayBlockingQueue<LogSocketEntry> logsToSend = new ArrayBlockingQueue<LogSocketEntry>(LOG_BUFFER_LENGTH);
253
254 /**
255 * Used for logging stuff of this class.
256 */
257 private static LogCategory log = new LogCategory("NetworkLogManager");
258
259 static {
260 String level = Pogamut.getPlatform().getProperty(PogamutProperty.POGAMUT_NETWORK_LOG_MANAGER_AND_CLIENT_LEVEL.getKey());
261 if (level == null) level = "WARNING";
262 log.setLevel(Level.parse(level));
263 log.addConsoleHandler();
264 }
265
266 /**
267 * Mutex synchronizing construction and destruction of the {@link NetworkLogManager#serverWorker}.
268 */
269 private Object serverWorkerMutex = new Object();
270
271 /**
272 * This maps agents to various sockets which accept the agent's logs.
273 */
274 private LazyMap<IToken, ConcurrentLinkedQueue<LogSocket>> idToSocketList = new LazyMap<IToken, ConcurrentLinkedQueue<LogSocket>>() {
275
276 @Override
277 protected ConcurrentLinkedQueue<LogSocket> create(IToken key) {
278 return new ConcurrentLinkedQueue<LogSocket>();
279 }
280
281 };
282
283 /**
284 * This maps agents to their logs in memory. It does not keep all of the agent's logs; these which are sent are deleted from the map.
285 */
286 private LazyMap<IToken, ConcurrentLinkedQueue<NetworkLogEnvelope>> idToEnvelopeList = new LazyMap<IToken, ConcurrentLinkedQueue<NetworkLogEnvelope>>() {
287
288 @Override
289 protected ConcurrentLinkedQueue<NetworkLogEnvelope> create(IToken key) {
290 return new ConcurrentLinkedQueue<NetworkLogEnvelope>();
291 }
292
293 };
294
295 /**
296 * Flag that tells whether the {@link NetworkLogManager} is operating. Once {@link NetworkLogManager#shutdown()} this
297 * flag is set to false and no new threads are created / no logs are published, etc.
298 */
299 private boolean operating = true;
300
301 /**
302 * A constructor, initializes {@link NetworkLogManager#serverSocket} and the manager's logger.
303 * <p><p>
304 * Called from {@link NetworkLogManager#getNetworkLogManager()} (lazy-singleton initialization).
305 */
306 private NetworkLogManager() {
307 start();
308 }
309
310 /**
311 * Called from the constructor to startup this instance of {@link NetworkLogManager}.
312 * <p><p>
313 * Initialize {@link NetworkLogManager#serverWorker}.
314 * <p><p>
315 * MUST NOT BE CALLED AGAIN! (Constructor-call / one-time only).
316 */
317 private void start() {
318 synchronized(serverWorkerMutex) {
319 if (log != null && log.isLoggable(Level.FINER)) log.finer("Starting...");
320 try {
321 serverWorker = new ServerWorker();
322 } catch (IOException e) {
323 throw new PogamutIOException("Could not initialize NetworkLogManager, could not open server socket.", e, log, this);
324 }
325 serverWorkerThread = new Thread(serverWorker, "NetworkLogManager-ServerSocket");
326 serverWorkerThread.start();
327
328 // logSendingWorker = new LogSendingWorker();
329 // workerThread = new Thread(logSendingWorker, "NetworkLogManager-LogSender");
330 // workerThread.start();
331
332 log.fine("Started.");
333 }
334 }
335
336 /**
337 * Whether this instance is active, i.e., it was not {@link NetworkLogManager#shutdown()}.
338 *
339 * @return whether the manager is running, i.e., accepting new connections and publishing logs
340 */
341 public boolean isRunning() {
342 return operating;
343 }
344
345 /**
346 * Method called from {@link DefaultPogamutPlatform#close()} to shutdown the network
347 * logging, terminating its thread to let JVM die gracefully.
348 * <p><p>
349 * YOU DO NOT PROBABLY WANT TO CALL THIS METHOD MANUALLY!!!
350 * <p><p>
351 * But if you do, it will shutdown all sockets and the worker, but it will start it up again if there is any agent will wish
352 * to log something again. Therefore it is a good idea to call the shutdown after all Pogamut's agents are stopped/killed (i.e., dead).
353 */
354 public void kill() {
355 // we're shutting down == we're not operating any more!
356 operating = false;
357 synchronized(managerMutex) {
358 // nullify the static manager var, but only iff it is still us!
359 if (manager == this) {
360 manager = null;
361 }
362 }
363 if (log != null && log.isLoggable(Level.WARNING)) log.warning("Shutting down!");
364 // shutdown the logging
365 synchronized(serverWorkerMutex) {
366
367 serverWorker.kill();
368 serverWorker = null;
369
370 while (idToSocketList.size() > 0) {
371 IToken agent;
372 synchronized(idToSocketList) {
373 if (idToSocketList.size() == 0) break;
374 agent = idToSocketList.keySet().iterator().next();
375 }
376 removeAgent(agent);
377 }
378
379 log.severe("Shutdown.");
380 }
381 }
382
383 /**
384 * @return port on which the serverSocket is listening or -1 if the socket is not initialized (i.e., there is no logging agent registered inside the manager).
385 */
386 public int getLoggerPort() {
387 if (!operating) return -1;
388 ServerWorker serverWorker = this.serverWorker;
389 if (serverWorker == null) return -1;
390 return serverWorker.serverSocket.getLocalPort();
391 }
392
393 /**
394 * @return host where we're listening for logging client or null if the socket is not initialized (i.e., there is no logging agent regist9ered inside the manager).
395 */
396 public String getLoggerHost() {
397 if (!operating) return null;
398 if (serverWorker == null) return null;
399 try {
400 byte[] addr = InetAddress.getLocalHost().getAddress();
401 return InetAddress.getLocalHost().getHostAddress();
402 } catch (UnknownHostException e) {
403 throw new PogamutException("Could not determine host IP address.", e, this);
404 }
405 //return serverWorker.serverSocket.getInetAddress().getHostAddress();
406 }
407
408 /**
409 * Initializes logging for the 'agent'.
410 * @param agent
411 */
412 public void addAgent(IToken agent) {
413 if (!operating) {
414 return;
415 }
416 if (log != null && log.isLoggable(Level.FINE)) log.fine("Adding network logging for agent: " + agent.getToken());
417 synchronized(idToSocketList) {
418 idToSocketList.get(agent);
419 }
420 synchronized(idToEnvelopeList) {
421 idToEnvelopeList.get(agent);
422 }
423 }
424
425 /**
426 * Removes an agent from the manager - stops accepting its logs, closes sockets where its logs are being sent etc...
427 * @param agent The agent whose logging is to be stopped.
428 */
429 public void removeAgent(IToken agent) {
430
431 // NOTE: possible memory leaks when removeAgent() is called together with processLog() for a same agent at the same time
432 // which is totally weird (should not happen!)
433
434 if (log != null && log.isLoggable(Level.WARNING)) log.warning("Removing network logging for agent: " + agent);
435
436 if (log != null && log.isLoggable(Level.INFO)) log.info("Closing logging sockets for: " + agent);
437 ConcurrentLinkedQueue<LogSocket> agentSockets;
438 synchronized(idToSocketList) {
439 agentSockets = idToSocketList.get(agent);
440 idToSocketList.remove(agent);
441 }
442 for (LogSocket socket : agentSockets) {
443 synchronized (socket){
444 try {
445 socket.flush();
446 } catch (IOException e) {
447 }
448 socket.close();
449 }
450 }
451
452 log.info("Removing bruffered logs for: " + agent);
453 synchronized(idToEnvelopeList) {
454 idToEnvelopeList.remove(agent);
455 }
456
457 }
458
459 /**
460 * A method called from NetworkLogPublisher when a log is to be published.
461 * @param record The envelope containing informations to be published
462 * @param id Who has sent the log.
463 */
464 public void processLog(NetworkLogEnvelope record, IToken agent) {
465 if (!operating) {
466 return;
467 }
468
469 if (log != null && log.isLoggable(Level.FINEST)) log.finest("Processing log: (" + agent.getToken() + ") " + record);
470
471 // NOTE: possible memory leaks when processLog() is called together with removeAgent() for a same agent at the same time
472
473 ConcurrentLinkedQueue<LogSocket> agentSockets = null;
474 ConcurrentLinkedQueue<NetworkLogEnvelope> agentLogs;
475
476 synchronized(idToSocketList) {
477 if (idToSocketList.containsKey(agent)) {
478 agentSockets = idToSocketList.get(agent);
479 }
480 }
481 if (agentSockets != null) {
482 Iterator<LogSocket> iter = agentSockets.iterator();
483 while (iter.hasNext()) {
484 LogSocket logSocket = iter.next();
485 // if (logSocket.isOpened()) {
486 // try {
487 // logsToSend.put(new LogSocketEntry(logSocket, agent, record));
488 // } catch (InterruptedException e) {
489 // throw new PogamutInterruptedException(e, this);
490 // }
491 // }
492 synchronized(logSocket) {
493 if (!logSocket.isOpened()) {
494 logSocket.close();
495 iter.remove();
496 continue;
497 }
498 try {
499 logSocket.send(record);
500 } catch (Exception e) {
501 logSocket.close();
502 iter.remove();
503 continue;
504 }
505 }
506 }
507 }
508
509 synchronized(idToEnvelopeList) {
510 agentLogs = idToEnvelopeList.get(agent);
511 }
512
513 agentLogs.add(record);
514
515 while(agentLogs.size() > MAXIMUM_LOGS_PER_AGENT) {
516 try {
517 agentLogs.remove();
518 } catch (Exception e) {
519 }
520 }
521 }
522
523 /**
524 * Returns a logger that the {@link NetworkLogManager} is using.
525 * @return
526 */
527 public static LogCategory getLog() {
528 return log;
529 }
530
531 // // ----
532 // // --------
533 // // ==================
534 // // LOG SENDING WORKER
535 // // ==================
536 // // --------
537 // // ----
538 //
539 // /**
540 // * Worker instance - it implements Runnable interface and is continuously
541 // * reading messages from the connection object and passing them to the
542 // * receiver.
543 // */
544 // protected LogSendingWorker logSendingWorker = null;
545 //
546 // /**
547 // * Thread of the worker.
548 // */
549 // protected Thread workerThread = null;
550 //
551 // private class LogSendingWorker implements Runnable {
552 //
553 // /**
554 // * Simple flag that is telling us whether the Worker should run.
555 // */
556 // private volatile boolean shouldRun = true;
557 // private volatile boolean running = false;
558 // private volatile boolean exceptionExpected = false;
559 // private Thread myThread;
560 //
561 // /**
562 // * Drops the shouldRun flag, waits for 200ms and then interrupts the
563 // * thread in hope it helps.
564 // */
565 // public void kill() {
566 // if (!running) {
567 // return;
568 // }
569 // this.shouldRun = false;
570 // try {
571 // Thread.sleep(200);
572 // } catch (InterruptedException e) {
573 // }
574 // exceptionExpected = true;
575 // myThread.interrupt();
576 // }
577 //
578 // private void send(LogSocketEntry entry) {
579 // synchronized(entry.logSocket) {
580 // try {
581 // if (entry.getSocket().isOpened()) {
582 // log.finest("Sending log: " + entry.getAgentId().getToken() + " " + entry.getRecord() + " -> " + entry.getSocket().socket.socket().getRemoteSocketAddress());
583 // entry.getSocket().send(entry.getRecord());
584 // } else {
585 // // could not send data through the socket...
586 // log.info("Logging socket closed from the remote side for the agent: " + entry.getAgentId() + ", removing it from the socket list.");
587 // entry.getSocket().close();
588 // idToSocketList.get(entry.getAgentId()).remove(entry.getSocket());
589 // }
590 // } catch (IOException e) {
591 // // could not send data through the socket...
592 // log.info("Logging socket closed from the remote side for the agent: " + entry.getAgentId() + ", removing it from the socket list.");
593 // entry.getSocket().close();
594 // idToSocketList.get(entry.getAgentId()).remove(entry.getSocket());
595 // }
596 // }
597 // }
598 //
599 // /**
600 // * Contains main while cycle that is continuously reading messages from
601 // * the connection (using parser), notifying listeners and then passing
602 // * them to the message receiver.
603 // */
604 // @Override
605 // public void run() {
606 //
607 // myThread = Thread.currentThread();
608 //
609 // // set the running flag, we've been started
610 // running = true;
611 //
612 // // notify that gateway started
613 // log.info("LogSendingWorker started.");
614 //
615 // List<LogSocketEntry> buffer = new ArrayList<LogSocketEntry>(LOG_BUFFER_LENGTH);
616 //
617 // int performanceLimit = (int) (LOG_BUFFER_LENGTH * 0.9);
618 //
619 // try {
620 // while (operating && shouldRun && !myThread.isInterrupted()) {
621 // try {
622 // LogSocketEntry entry = logsToSend.take();
623 // if (!operating || !shouldRun || myThread.isInterrupted()) break;
624 // send(entry);
625 // if (logsToSend.size() > performanceLimit) {
626 // log.warning("Network logging reached its performance limit! Lower the levels of logging! logsToSend.size() == " + logsToSend.size());
627 // }
628 // logsToSend.drainTo(buffer);
629 // if (!operating || !shouldRun || myThread.isInterrupted()) break;
630 // for (LogSocketEntry logSocketEntry : buffer) {
631 // send(logSocketEntry);
632 // if (!operating || !shouldRun || myThread.isInterrupted()) break;
633 // }
634 // buffer.clear();
635 // } catch (Exception e) {
636 // if (!exceptionExpected) {
637 // log.severe(ExceptionToString.process("Exception at LogSendingWorker.", e));
638 // } else {
639 // log.fine(ExceptionToString.process("Exception at LogSendingWorker, expected.", e));
640 // }
641 // break;
642 // }
643 // }
644 // } catch (Exception e) {
645 // if (!exceptionExpected) {
646 // log.severe(ExceptionToString.process("Exception at LogSendingWorker.", e));
647 // } else {
648 // log.fine(ExceptionToString.process("Exception at LogSendingWorker, expected.", e));
649 // }
650 // } finally {
651 // running = false;
652 // }
653 //
654 // log.warning("LogSendingWorker Stopped.");
655 // }
656 // }
657
658 // ----
659 // --------
660 // =============
661 // SERVER WORKER
662 // =============
663 // --------
664 // ----
665
666
667 protected ServerWorker serverWorker = null;
668
669 /**
670 * Thread of the server worker.
671 */
672 protected Thread serverWorkerThread = null;
673
674 /**
675 * Used to store socket where we awaits the 'agent id' that should we be logging.
676 * @author Jimmy
677 */
678 private class DanglingSocket {
679
680 CharsetEncoder encoder = USED_CHARSET.newEncoder();
681 CharsetDecoder decoder = USED_CHARSET.newDecoder();
682
683 public Socket socket;
684 public final long created = System.currentTimeMillis();
685 private PrintWriter writer;
686 private BufferedReader reader;
687
688 private StringBuffer agentId = new StringBuffer();
689
690 private char[] buf = new char[128];
691
692 public DanglingSocket(Socket socket) throws IOException {
693 this.socket = socket;
694 this.writer = new PrintWriter(socket.getOutputStream());
695 this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
696 }
697
698 public String readAgentId() throws IOException {
699 int read = reader.read(buf);
700 if (read == 0) return null;
701 if (buf[read-1] == '\n') {
702 int minus = 1;
703 if (read > 1 && buf[read-2] == '\r') {
704 minus = 2;
705 }
706 agentId.append(buf, 0, read-minus);
707 return agentId.toString();
708 } else {
709 agentId.append(buf, 0, read);
710 return null;
711 }
712
713 }
714
715 }
716
717 /**
718 * Accepts new connections via {@link ServerWorker#serverSocket}, reads agent id that we should be logging on new sockets,
719 * populates {@link NetworkLogManager#idToSocketList}.
720 *
721 * @author Jimmy
722 */
723 private class ServerWorker implements Runnable {
724
725 /**
726 * The only instance of {@link ServerSocket} which accepts new clients.
727 */
728 private ServerSocket serverSocket;
729
730 /**
731 * Simple flag that is telling us whether the Worker should run.
732 */
733 private volatile boolean shouldRun = true;
734 private volatile boolean running = false;
735 private volatile boolean exceptionExpected = false;
736 private Thread myThread;
737
738 /**
739 * List of sockets where we did not received AgentID yet.
740 */
741 private List<DanglingSocket> danglingSockets = new LinkedList<DanglingSocket>();
742
743 /**
744 * Opens {@link ServerWorker#serverSocket}.
745 *
746 * @throws IOException
747 */
748 public ServerWorker() throws IOException {
749 serverSocket = new ServerSocket();
750 serverSocket.bind (new InetSocketAddress (0));
751 //serverSocket.configureBlocking(true);
752 }
753
754 /**
755 * Drops the {@link ServerWorker#shouldRun} flag, waits for 100ms and then interrupts the
756 * thread in hope it helps.
757 */
758 public void kill() {
759 if (!running) {
760 return;
761 }
762 this.shouldRun = false;
763 try {
764 Thread.sleep(100);
765 } catch (InterruptedException e) {
766 }
767 exceptionExpected = true;
768 myThread.interrupt();
769 }
770
771 /**
772 * Contains main while cycle that is awaiting new connections at {@link ServerWorker#serverSocket}
773 * and populating {@link NetworkLogManager#idToSocketList} with them.
774 */
775 @Override
776 public void run() {
777
778 myThread = Thread.currentThread();
779
780 // set the running flag, we've been started
781 running = true;
782
783 // notify that gateway started
784 if (log != null && log.isLoggable(Level.INFO)) log.info("ServerWorker started.");
785
786 try {
787 while (operating && shouldRun && !myThread.isInterrupted()) {
788 try {
789 Socket socket = null;
790
791 if (danglingSockets.size() == 0) {
792 //serverSocket.configureBlocking(false);
793 socket = serverSocket.accept();
794 } else {
795 //serverSocket.configureBlocking(true);
796 serverSocket.setSoTimeout(100);
797 try {
798 socket = serverSocket.accept();
799 } catch (SocketTimeoutException ex) {
800 }
801 }
802
803 if (socket != null) {
804 log.fine("Accepted new connection from " + socket.getRemoteSocketAddress());
805 //socket.configureBlocking(false);
806 try {
807 danglingSockets.add(new DanglingSocket(socket));
808 } catch (IOException e1) {
809 try {
810 socket.close();
811 } catch (Exception e2) {
812 }
813 }
814 }
815
816 Iterator<DanglingSocket> iter = danglingSockets.iterator();
817 while (iter.hasNext()) {
818 DanglingSocket danglingSocket = iter.next();
819 String agentId = danglingSocket.readAgentId();
820 if (agentId != null) {
821 // agentId has been received!
822 IToken token = Tokens.get(agentId);
823 log.fine("Connection " + danglingSocket.socket.getRemoteSocketAddress() + " sent agent id '" + token.getToken() + "'.");
824 Iterator<NetworkLogEnvelope> iter2;
825 synchronized(idToEnvelopeList) {
826 ConcurrentLinkedQueue<NetworkLogEnvelope> queue = idToEnvelopeList.get(token);
827 int size = queue.size();
828 log.finer("Sending buffered " + size + " log records to the " + danglingSocket.socket.getRemoteSocketAddress());
829 iter2 = queue.iterator();
830 }
831 LogSocket logSocket = new LogSocket(danglingSocket.socket);
832 synchronized(logSocket) {
833 synchronized(idToSocketList) {
834 idToSocketList.get(token).add(logSocket);
835 }
836 //System.out.println("SENDING FIRST LOGS!");
837 while(iter2.hasNext()) {
838 NetworkLogEnvelope env = iter2.next();
839 //System.out.println(env);
840 logSocket.send(env);
841 }
842 //System.out.println("FINISHED");
843 }
844 iter.remove();
845 continue;
846 }
847 if (System.currentTimeMillis() - danglingSocket.created < NETWORK_LOG_MANAGER_SOCKET_TIMEOUT_MILLIS) {
848 log.warning("Connection " + danglingSocket.socket.getRemoteSocketAddress() + " timed out. We did not receive agent id in " + NETWORK_LOG_MANAGER_SOCKET_TIMEOUT_MILLIS + " ms. Closing the socket.");
849 try {
850 danglingSocket.socket.close();
851 } catch (IOException e) {
852 }
853 iter.remove();
854 continue;
855 }
856 }
857 } catch (Exception e) {
858 if (!exceptionExpected) {
859 log.severe(ExceptionToString.process("Exception at ServerWorker.", e));
860 } else {
861 log.fine(ExceptionToString.process("Exception at ServerWorker, expected.", e));
862 break;
863 }
864 }
865 }
866 } catch (Exception e) {
867 if (!exceptionExpected) {
868 log.severe(ExceptionToString.process("Exception at ServerWorker.", e));
869 } else {
870 log.fine(ExceptionToString.process("Exception ServerWorker, expected.", e));
871 }
872 } finally {
873 running = false;
874 for (DanglingSocket socket : danglingSockets) {
875 try {
876 socket.socket.close();
877 } catch (Exception e) {
878 }
879 }
880 danglingSockets.clear();
881 try {
882 serverSocket.close();
883 } catch (IOException e) {
884 }
885 }
886
887 log.warning("ServerWorker Stopped.");
888 }
889
890 }
891
892 }