1 package cz.cuni.amis.pogamut.base.utils.logging;
2
3 import java.io.BufferedReader;
4 import java.io.IOException;
5 import java.io.InputStreamReader;
6 import java.io.OutputStreamWriter;
7 import java.io.PrintWriter;
8 import java.io.Writer;
9 import java.net.InetAddress;
10 import java.net.InetSocketAddress;
11 import java.net.ServerSocket;
12 import java.net.Socket;
13 import java.net.SocketTimeoutException;
14 import java.net.UnknownHostException;
15 import java.nio.channels.ServerSocketChannel;
16 import java.nio.channels.SocketChannel;
17 import java.nio.charset.Charset;
18 import java.nio.charset.CharsetDecoder;
19 import java.nio.charset.CharsetEncoder;
20 import java.util.Iterator;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.logging.Level;
25
26 import cz.cuni.amis.pogamut.base.utils.DefaultPogamutPlatform;
27 import cz.cuni.amis.pogamut.base.utils.Pogamut;
28 import cz.cuni.amis.pogamut.base.utils.PogamutProperty;
29 import cz.cuni.amis.utils.ExceptionToString;
30 import cz.cuni.amis.utils.exception.PogamutException;
31 import cz.cuni.amis.utils.exception.PogamutIOException;
32 import cz.cuni.amis.utils.maps.LazyMap;
33 import cz.cuni.amis.utils.token.IToken;
34 import cz.cuni.amis.utils.token.Tokens;
35
36
37
38
39
40
41
42
43
44
45 public class NetworkLogManager {
46
47
48
49
50
51 private static final int MAXIMUM_LOGS_PER_AGENT = 100;
52
53
54
55
56
57
58 public static final Charset USED_CHARSET = Charset.forName("UTF-8");
59
60
61
62
63
64
65
66 public static final long NETWORK_FLUSH_PERIOD_MILLIS = 200;
67
68
69
70
71
72
73 public static final long NETWORK_LOG_MANAGER_SOCKET_TIMEOUT_MILLIS = 1000;
74
75
76
77
78
79
80
81
82
83 private static NetworkLogManager manager = null;
84
85
86
87
88 private static Object managerMutex = new Object();
89
90
91
92
93 public static NetworkLogManager getNetworkLogManager() {
94 NetworkLogManager instance = manager;
95 if (instance != null && instance.operating) {
96 return instance;
97 }
98 synchronized(managerMutex) {
99 instance = manager;
100 if (instance == null || !instance.operating) {
101 manager = instance = new NetworkLogManager();
102 }
103 return instance;
104 }
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 private static class LogSocket {
149
150
151
152
153
154 public CharsetEncoder encoder = USED_CHARSET.newEncoder();
155
156
157
158
159 private Socket socket;
160
161
162
163
164 private boolean opened = true;
165
166
167
168
169 private PrintWriter writer;
170
171
172
173
174
175 private long lastFlush = System.currentTimeMillis();
176
177
178
179
180
181
182 public LogSocket(Socket socket) throws IOException {
183 this.socket = socket;
184 this.writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), encoder));
185 }
186
187
188
189
190
191
192
193
194 public boolean isOpened() {
195 return opened;
196 }
197
198
199
200
201 public void close() {
202 if (!opened) return;
203 opened = false;
204 try {
205 socket.close();
206 } catch (Exception e) {
207 }
208 encoder = null;
209 writer = null;
210 socket = null;
211 }
212
213
214
215
216
217
218
219 public void send(NetworkLogEnvelope log) throws IOException {
220 writer.println(log.getCategory());
221 writer.println(log.getLevel());
222 writer.println(String.valueOf(log.getMillis()));
223 writer.println(log.getMessage());
224 writer.println("</end>");
225 checkFlush();
226 }
227
228
229
230
231
232 public void flush() throws IOException {
233 writer.flush();
234 lastFlush = System.currentTimeMillis();
235 }
236
237
238
239
240
241 public void checkFlush() throws IOException {
242 if (System.currentTimeMillis() - lastFlush > NETWORK_FLUSH_PERIOD_MILLIS) {
243 flush();
244 }
245 }
246
247 }
248
249
250
251
252
253
254
255
256
257 private static LogCategory log = new LogCategory("NetworkLogManager");
258
259 static {
260 String level = Pogamut.getPlatform().getProperty(PogamutProperty.POGAMUT_NETWORK_LOG_MANAGER_AND_CLIENT_LEVEL.getKey());
261 if (level == null) level = "WARNING";
262 log.setLevel(Level.parse(level));
263 log.addConsoleHandler();
264 }
265
266
267
268
269 private Object serverWorkerMutex = new Object();
270
271
272
273
274 private LazyMap<IToken, ConcurrentLinkedQueue<LogSocket>> idToSocketList = new LazyMap<IToken, ConcurrentLinkedQueue<LogSocket>>() {
275
276 @Override
277 protected ConcurrentLinkedQueue<LogSocket> create(IToken key) {
278 return new ConcurrentLinkedQueue<LogSocket>();
279 }
280
281 };
282
283
284
285
286 private LazyMap<IToken, ConcurrentLinkedQueue<NetworkLogEnvelope>> idToEnvelopeList = new LazyMap<IToken, ConcurrentLinkedQueue<NetworkLogEnvelope>>() {
287
288 @Override
289 protected ConcurrentLinkedQueue<NetworkLogEnvelope> create(IToken key) {
290 return new ConcurrentLinkedQueue<NetworkLogEnvelope>();
291 }
292
293 };
294
295
296
297
298
299 private boolean operating = true;
300
301
302
303
304
305
306 private NetworkLogManager() {
307 start();
308 }
309
310
311
312
313
314
315
316
317 private void start() {
318 synchronized(serverWorkerMutex) {
319 if (log != null && log.isLoggable(Level.FINER)) log.finer("Starting...");
320 try {
321 serverWorker = new ServerWorker();
322 } catch (IOException e) {
323 throw new PogamutIOException("Could not initialize NetworkLogManager, could not open server socket.", e, log, this);
324 }
325 serverWorkerThread = new Thread(serverWorker, "NetworkLogManager-ServerSocket");
326 serverWorkerThread.start();
327
328
329
330
331
332 log.fine("Started.");
333 }
334 }
335
336
337
338
339
340
341 public boolean isRunning() {
342 return operating;
343 }
344
345
346
347
348
349
350
351
352
353
354 public void kill() {
355
356 operating = false;
357 synchronized(managerMutex) {
358
359 if (manager == this) {
360 manager = null;
361 }
362 }
363 if (log != null && log.isLoggable(Level.WARNING)) log.warning("Shutting down!");
364
365 synchronized(serverWorkerMutex) {
366
367 serverWorker.kill();
368 serverWorker = null;
369
370 while (idToSocketList.size() > 0) {
371 IToken agent;
372 synchronized(idToSocketList) {
373 if (idToSocketList.size() == 0) break;
374 agent = idToSocketList.keySet().iterator().next();
375 }
376 removeAgent(agent);
377 }
378
379 log.severe("Shutdown.");
380 }
381 }
382
383
384
385
386 public int getLoggerPort() {
387 if (!operating) return -1;
388 ServerWorker serverWorker = this.serverWorker;
389 if (serverWorker == null) return -1;
390 return serverWorker.serverSocket.getLocalPort();
391 }
392
393
394
395
396 public String getLoggerHost() {
397 if (!operating) return null;
398 if (serverWorker == null) return null;
399 try {
400 byte[] addr = InetAddress.getLocalHost().getAddress();
401 return InetAddress.getLocalHost().getHostAddress();
402 } catch (UnknownHostException e) {
403 throw new PogamutException("Could not determine host IP address.", e, this);
404 }
405
406 }
407
408
409
410
411
412 public void addAgent(IToken agent) {
413 if (!operating) {
414 return;
415 }
416 if (log != null && log.isLoggable(Level.FINE)) log.fine("Adding network logging for agent: " + agent.getToken());
417 synchronized(idToSocketList) {
418 idToSocketList.get(agent);
419 }
420 synchronized(idToEnvelopeList) {
421 idToEnvelopeList.get(agent);
422 }
423 }
424
425
426
427
428
429 public void removeAgent(IToken agent) {
430
431
432
433
434 if (log != null && log.isLoggable(Level.WARNING)) log.warning("Removing network logging for agent: " + agent);
435
436 if (log != null && log.isLoggable(Level.INFO)) log.info("Closing logging sockets for: " + agent);
437 ConcurrentLinkedQueue<LogSocket> agentSockets;
438 synchronized(idToSocketList) {
439 agentSockets = idToSocketList.get(agent);
440 idToSocketList.remove(agent);
441 }
442 for (LogSocket socket : agentSockets) {
443 synchronized (socket){
444 try {
445 socket.flush();
446 } catch (IOException e) {
447 }
448 socket.close();
449 }
450 }
451
452 log.info("Removing bruffered logs for: " + agent);
453 synchronized(idToEnvelopeList) {
454 idToEnvelopeList.remove(agent);
455 }
456
457 }
458
459
460
461
462
463
464 public void processLog(NetworkLogEnvelope record, IToken agent) {
465 if (!operating) {
466 return;
467 }
468
469 if (log != null && log.isLoggable(Level.FINEST)) log.finest("Processing log: (" + agent.getToken() + ") " + record);
470
471
472
473 ConcurrentLinkedQueue<LogSocket> agentSockets = null;
474 ConcurrentLinkedQueue<NetworkLogEnvelope> agentLogs;
475
476 synchronized(idToSocketList) {
477 if (idToSocketList.containsKey(agent)) {
478 agentSockets = idToSocketList.get(agent);
479 }
480 }
481 if (agentSockets != null) {
482 Iterator<LogSocket> iter = agentSockets.iterator();
483 while (iter.hasNext()) {
484 LogSocket logSocket = iter.next();
485
486
487
488
489
490
491
492 synchronized(logSocket) {
493 if (!logSocket.isOpened()) {
494 logSocket.close();
495 iter.remove();
496 continue;
497 }
498 try {
499 logSocket.send(record);
500 } catch (Exception e) {
501 logSocket.close();
502 iter.remove();
503 continue;
504 }
505 }
506 }
507 }
508
509 synchronized(idToEnvelopeList) {
510 agentLogs = idToEnvelopeList.get(agent);
511 }
512
513 agentLogs.add(record);
514
515 while(agentLogs.size() > MAXIMUM_LOGS_PER_AGENT) {
516 try {
517 agentLogs.remove();
518 } catch (Exception e) {
519 }
520 }
521 }
522
523
524
525
526
527 public static LogCategory getLog() {
528 return log;
529 }
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667 protected ServerWorker serverWorker = null;
668
669
670
671
672 protected Thread serverWorkerThread = null;
673
674
675
676
677
678 private class DanglingSocket {
679
680 CharsetEncoder encoder = USED_CHARSET.newEncoder();
681 CharsetDecoder decoder = USED_CHARSET.newDecoder();
682
683 public Socket socket;
684 public final long created = System.currentTimeMillis();
685 private PrintWriter writer;
686 private BufferedReader reader;
687
688 private StringBuffer agentId = new StringBuffer();
689
690 private char[] buf = new char[128];
691
692 public DanglingSocket(Socket socket) throws IOException {
693 this.socket = socket;
694 this.writer = new PrintWriter(socket.getOutputStream());
695 this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
696 }
697
698 public String readAgentId() throws IOException {
699 int read = reader.read(buf);
700 if (read == 0) return null;
701 if (buf[read-1] == '\n') {
702 int minus = 1;
703 if (read > 1 && buf[read-2] == '\r') {
704 minus = 2;
705 }
706 agentId.append(buf, 0, read-minus);
707 return agentId.toString();
708 } else {
709 agentId.append(buf, 0, read);
710 return null;
711 }
712
713 }
714
715 }
716
717
718
719
720
721
722
723 private class ServerWorker implements Runnable {
724
725
726
727
728 private ServerSocket serverSocket;
729
730
731
732
733 private volatile boolean shouldRun = true;
734 private volatile boolean running = false;
735 private volatile boolean exceptionExpected = false;
736 private Thread myThread;
737
738
739
740
741 private List<DanglingSocket> danglingSockets = new LinkedList<DanglingSocket>();
742
743
744
745
746
747
748 public ServerWorker() throws IOException {
749 serverSocket = new ServerSocket();
750 serverSocket.bind (new InetSocketAddress (0));
751
752 }
753
754
755
756
757
758 public void kill() {
759 if (!running) {
760 return;
761 }
762 this.shouldRun = false;
763 try {
764 Thread.sleep(100);
765 } catch (InterruptedException e) {
766 }
767 exceptionExpected = true;
768 myThread.interrupt();
769 }
770
771
772
773
774
775 @Override
776 public void run() {
777
778 myThread = Thread.currentThread();
779
780
781 running = true;
782
783
784 if (log != null && log.isLoggable(Level.INFO)) log.info("ServerWorker started.");
785
786 try {
787 while (operating && shouldRun && !myThread.isInterrupted()) {
788 try {
789 Socket socket = null;
790
791 if (danglingSockets.size() == 0) {
792
793 socket = serverSocket.accept();
794 } else {
795
796 serverSocket.setSoTimeout(100);
797 try {
798 socket = serverSocket.accept();
799 } catch (SocketTimeoutException ex) {
800 }
801 }
802
803 if (socket != null) {
804 log.fine("Accepted new connection from " + socket.getRemoteSocketAddress());
805
806 try {
807 danglingSockets.add(new DanglingSocket(socket));
808 } catch (IOException e1) {
809 try {
810 socket.close();
811 } catch (Exception e2) {
812 }
813 }
814 }
815
816 Iterator<DanglingSocket> iter = danglingSockets.iterator();
817 while (iter.hasNext()) {
818 DanglingSocket danglingSocket = iter.next();
819 String agentId = danglingSocket.readAgentId();
820 if (agentId != null) {
821
822 IToken token = Tokens.get(agentId);
823 log.fine("Connection " + danglingSocket.socket.getRemoteSocketAddress() + " sent agent id '" + token.getToken() + "'.");
824 Iterator<NetworkLogEnvelope> iter2;
825 synchronized(idToEnvelopeList) {
826 ConcurrentLinkedQueue<NetworkLogEnvelope> queue = idToEnvelopeList.get(token);
827 int size = queue.size();
828 log.finer("Sending buffered " + size + " log records to the " + danglingSocket.socket.getRemoteSocketAddress());
829 iter2 = queue.iterator();
830 }
831 LogSocket logSocket = new LogSocket(danglingSocket.socket);
832 synchronized(logSocket) {
833 synchronized(idToSocketList) {
834 idToSocketList.get(token).add(logSocket);
835 }
836
837 while(iter2.hasNext()) {
838 NetworkLogEnvelope env = iter2.next();
839
840 logSocket.send(env);
841 }
842
843 }
844 iter.remove();
845 continue;
846 }
847 if (System.currentTimeMillis() - danglingSocket.created < NETWORK_LOG_MANAGER_SOCKET_TIMEOUT_MILLIS) {
848 log.warning("Connection " + danglingSocket.socket.getRemoteSocketAddress() + " timed out. We did not receive agent id in " + NETWORK_LOG_MANAGER_SOCKET_TIMEOUT_MILLIS + " ms. Closing the socket.");
849 try {
850 danglingSocket.socket.close();
851 } catch (IOException e) {
852 }
853 iter.remove();
854 continue;
855 }
856 }
857 } catch (Exception e) {
858 if (!exceptionExpected) {
859 log.severe(ExceptionToString.process("Exception at ServerWorker.", e));
860 } else {
861 log.fine(ExceptionToString.process("Exception at ServerWorker, expected.", e));
862 break;
863 }
864 }
865 }
866 } catch (Exception e) {
867 if (!exceptionExpected) {
868 log.severe(ExceptionToString.process("Exception at ServerWorker.", e));
869 } else {
870 log.fine(ExceptionToString.process("Exception ServerWorker, expected.", e));
871 }
872 } finally {
873 running = false;
874 for (DanglingSocket socket : danglingSockets) {
875 try {
876 socket.socket.close();
877 } catch (Exception e) {
878 }
879 }
880 danglingSockets.clear();
881 try {
882 serverSocket.close();
883 } catch (IOException e) {
884 }
885 }
886
887 log.warning("ServerWorker Stopped.");
888 }
889
890 }
891
892 }