1 package cz.cuni.amis.pogamut.base.utils.logging;
2
3 import java.io.BufferedReader;
4 import java.io.IOException;
5 import java.io.PrintWriter;
6 import java.net.InetSocketAddress;
7 import java.nio.channels.Channels;
8 import java.nio.channels.SocketChannel;
9 import java.nio.charset.CharsetDecoder;
10 import java.nio.charset.CharsetEncoder;
11 import java.util.logging.Level;
12
13 import cz.cuni.amis.pogamut.base.utils.Pogamut;
14 import cz.cuni.amis.pogamut.base.utils.PogamutProperty;
15 import cz.cuni.amis.utils.ExceptionToString;
16 import cz.cuni.amis.utils.exception.PogamutException;
17 import cz.cuni.amis.utils.exception.PogamutIOException;
18 import cz.cuni.amis.utils.flag.Flag;
19 import cz.cuni.amis.utils.listener.IListener;
20 import cz.cuni.amis.utils.listener.Listeners;
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public class NetworkLogClient {
50
51
52
53
54
55
56
57 public static class LoggingStarted {
58 }
59
60
61
62
63
64
65 public static class LogRead {
66
67
68
69
70 private NetworkLogEnvelope record;
71
72 public LogRead(NetworkLogEnvelope record) {
73 this.record = record;
74 }
75
76
77
78
79 public NetworkLogEnvelope getRecord() {
80 return record;
81 }
82
83 }
84
85
86
87
88
89
90 public static class LoggingStopped {
91
92 private boolean expected;
93
94 private Throwable exception;
95
96 public LoggingStopped() {
97 this.expected = true;
98 }
99
100 public LoggingStopped(Throwable e) {
101 this.expected = false;
102 this.exception = e;
103 }
104
105 public Throwable getException() {
106 return exception;
107 }
108
109 public boolean isExpected() {
110 return expected;
111 }
112
113 public boolean isFailure() {
114 return !expected;
115 }
116
117 }
118
119 public static interface ILoggingStartedListener extends IListener<LoggingStarted> {
120 }
121
122 public static interface ILogReadListener extends IListener<LogRead> {
123 }
124
125 public static interface ILoggingStoppedListener extends IListener<LoggingStopped> {
126 }
127
128 private Listeners<ILoggingStartedListener> loggingStartedCallback = new Listeners<ILoggingStartedListener>();
129
130 private Listeners.AdaptableListenerNotifier<ILoggingStartedListener> loggingStartedNotifier = new Listeners.AdaptableListenerNotifier<ILoggingStartedListener>();
131
132 private Listeners<ILogReadListener> logReadCallback = new Listeners<ILogReadListener>();
133
134 private Listeners.AdaptableListenerNotifier<ILogReadListener> logReadNotifier = new Listeners.AdaptableListenerNotifier<ILogReadListener>();
135
136 private Listeners<ILoggingStoppedListener> loggingStoppedCallback = new Listeners<ILoggingStoppedListener>();
137
138 private Listeners.AdaptableListenerNotifier<ILoggingStoppedListener> loggingStoppedNotifier = new Listeners.AdaptableListenerNotifier<ILoggingStoppedListener>();
139
140 private String address;
141 private int port;
142 private String agentId;
143
144 protected LogCategory log;
145
146 private boolean implicitRemoveListeners = true;
147
148 public NetworkLogClient(String address, int port, String agentId) {
149 log = new LogCategory("NetworkLogClient");
150 log.addConsoleHandler();
151 String logLevel = Pogamut.getPlatform().getProperty(PogamutProperty.POGAMUT_NETWORK_LOG_MANAGER_AND_CLIENT_LEVEL.getKey());
152 if (logLevel != null) {
153 log.setLevel(Level.parse(logLevel));
154 } else {
155 log.setLevel(Level.WARNING);
156 }
157 this.address = address;
158 this.port = port;
159 this.agentId = agentId;
160 this.logReadingWorker = new LogReadingWorker();
161 loggingStartedCallback.setLog(log, "LoggingStarted");
162 logReadCallback.setLog(log, "LogRead");
163 loggingStoppedCallback.setLog(log, "LoggingStopped");
164 }
165
166
167
168
169 public LogCategory getLogger() {
170 return log;
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185 public void setImplicitRemoveListeners(boolean state) {
186 this.implicitRemoveListeners = state;
187 }
188
189 public boolean isImplicitRemoveListeners() {
190 return implicitRemoveListeners;
191 }
192
193
194
195
196
197 public boolean isException() {
198 LogReadingWorker worker = logReadingWorker;
199 return worker != null && worker.exception != null;
200 }
201
202
203
204
205 public Throwable getException() {
206 LogReadingWorker worker = logReadingWorker;
207 return worker == null ? null : worker.exception;
208 }
209
210 public String getAddress() {
211 return address;
212 }
213
214 public int getPort() {
215 return port;
216 }
217
218 public String getAgentId() {
219 return agentId;
220 }
221
222 public synchronized void start() {
223 log.warning("Starting for " + agentId + " @ " + address + ":" + port);
224 if (logReadingWorker.running.getFlag()) {
225 log.warning("Old connection is still up... closing.");
226 stop();
227 log.warning("Resuming start for " + agentId + " @ " + address + ":" + port);
228 }
229 workerThread = new Thread(logReadingWorker, "NetworkLogClient-" + address + ":" + port);
230 workerThread.start();
231
232 logReadingWorker.connected.waitFor(true, false);
233
234 if (logReadingWorker.connected.getFlag() == false) {
235 throw new PogamutException("Could not start reading logs from the network.", logReadingWorker.exception, this);
236 }
237 }
238
239 public synchronized void stop() {
240 log.warning("Stopping for " + agentId + " @ " + address + ":" + port);
241 if (logReadingWorker != null) {
242 if (logReadingWorker.running.getFlag()) {
243 logReadingWorker.kill();
244 logReadingWorker.running.waitFor(false);
245 }
246 }
247 }
248
249 public Flag<Boolean> getRunning() {
250 return logReadingWorker.running;
251 }
252
253 public Flag<Boolean> getConnected() {
254 return logReadingWorker.connected;
255 }
256
257 public void addListener(ILoggingStartedListener listener) {
258 loggingStartedCallback.addStrongListener(listener);
259 }
260
261 public void removeListener(ILoggingStartedListener listener) {
262 loggingStartedCallback.removeListener(listener);
263 }
264
265 public void addListener(ILoggingStoppedListener listener) {
266 loggingStoppedCallback.addStrongListener(listener);
267 }
268
269 public void removeListener(ILoggingStoppedListener listener) {
270 loggingStoppedCallback.removeListener(listener);
271 }
272
273 public void addListener(ILogReadListener listener) {
274 logReadCallback.addStrongListener(listener);
275 }
276
277 public void removeListener(ILogReadListener listener) {
278 logReadCallback.removeListener(listener);
279 }
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294 protected LogReadingWorker logReadingWorker = null;
295
296
297
298
299 protected Thread workerThread = null;
300
301 private static final String NEW_LINE = System.getProperty("line.separator");
302
303
304
305
306
307
308 private String checkLineEnd(String msg) {
309 if (msg == null) return null;
310 if (NEW_LINE.length() == 2) {
311
312 return msg;
313 } else {
314
315 if (msg.charAt(msg.length()-1) == '\r') {
316
317 return msg.substring(0, msg.length()-1);
318 }
319
320 return msg;
321 }
322 }
323
324 private class LogReadingWorker implements Runnable {
325
326
327
328
329 private volatile boolean shouldRun = true;
330 public volatile Flag<Boolean> running = new Flag<Boolean>(false);
331 public volatile Flag<Boolean> connected = new Flag<Boolean>(null);
332 private volatile boolean exceptionExpected = false;
333 public volatile Throwable exception = null;
334 private Thread myThread = null;
335
336
337
338
339
340 public void kill() {
341 if (!running.getFlag()) {
342 return;
343 }
344 this.shouldRun = false;
345 try {
346 Thread.sleep(200);
347 } catch (InterruptedException e) {
348 }
349 exceptionExpected = true;
350 myThread.interrupt();
351 }
352
353
354
355
356
357
358 @Override
359 public void run() {
360
361
362 myThread = Thread.currentThread();
363 exception = null;
364 shouldRun = true;
365 exceptionExpected = false;
366
367 running.setFlag(true);
368 connected.setFlag(null);
369
370
371 log.info("LogReadingWorker: Started.");
372
373 SocketChannel socket = null;
374
375 try {
376 socket = SocketChannel.open(new InetSocketAddress(address, port));
377 } catch (IOException e1) {
378 try {
379 socket.close();
380 } catch (Exception e2) {
381 }
382 log.severe("LogReadingWorker: Could not open socket for " + address + ":" + port + ".");
383 socket = null;
384 exception = e1;
385 loggingStoppedNotifier.setEvent(new LoggingStopped(new PogamutIOException("Could not open " + address + ":" + port + ".", e1)));
386 loggingStoppedCallback.notifySafe(loggingStoppedNotifier, log);
387 connected.setFlag(false);
388 running.setFlag(false);
389 log.warning("LogReadingWorker stopped.");
390 return;
391 }
392
393 log.fine("LogReadingWorker: Socket opened for " + address + ":" + port + ".");
394
395 CharsetEncoder encoder = NetworkLogManager.USED_CHARSET.newEncoder();
396 CharsetDecoder decoder = NetworkLogManager.USED_CHARSET.newDecoder();
397
398 PrintWriter writer = new PrintWriter(Channels.newWriter(socket, encoder, -1));
399 BufferedReader reader = new BufferedReader(Channels.newReader(socket, decoder, -1));
400
401 try {
402 writer.println(agentId);
403 writer.flush();
404 } catch (Exception e1) {
405 try {
406 socket.close();
407 } catch (Exception e2) {
408 }
409 socket = null;
410 exception = e1;
411 loggingStoppedNotifier.setEvent(new LoggingStopped(new PogamutIOException("Could not send 'agent id' to " + address + ":" + port + ".", e1)));
412 loggingStoppedCallback.notifySafe(loggingStoppedNotifier, log);
413 connected.setFlag(false);
414 running.setFlag(false);
415 log.warning("LogReadingWorker: stopped.");
416 return;
417 }
418
419 log.fine("LogReadingWorker: Agent id sent.");
420
421 loggingStartedNotifier.setEvent(new LoggingStarted());
422 loggingStartedCallback.notify(loggingStartedNotifier);
423
424 log.fine("LogReadingWorker: Starting reading logs.");
425
426 try {
427
428 StringBuffer message = new StringBuffer();
429
430 connected.setFlag(true);
431
432 String category = null;
433 String level = null;
434 String time = null;
435
436 while (shouldRun && !myThread.isInterrupted()) {
437 try {
438 try {
439 category = checkLineEnd(reader.readLine());
440 level = checkLineEnd(reader.readLine());
441 time = checkLineEnd(reader.readLine());
442
443 if (message.length() > 0) {
444 message.delete(0, message.length());
445 }
446
447 boolean first = true;
448
449 while (true) {
450 String msg = checkLineEnd(reader.readLine());
451 if (msg.equals("</end>")) break;
452 if (first) first = false;
453 else message.append(NEW_LINE);
454 message.append(msg);
455 }
456 } catch (Exception e) {
457 log.severe(ExceptionToString.process("LogReadingWorker: Exception while reading data from the socket, connection closed from the remote side? Shutting down...", e));
458 break;
459 }
460
461 NetworkLogEnvelope logEnvelope = null;
462 try {
463 logEnvelope = new NetworkLogEnvelope(category, level, time, message.toString());
464 } catch (Exception e) {
465 log.warning(ExceptionToString.process("LogReadingWorker: MALFORMED log record, category='" + category + "', level='" + level + "', time='" + time + "', message='" + message + "'", e));
466 continue;
467 }
468 logReadCallback.notify(logReadNotifier.setEvent(new LogRead(logEnvelope)));
469 } catch (Exception e) {
470 if (!exceptionExpected) {
471 log.severe(ExceptionToString.process("LogReadingWorker: Exception at LogSendingWorker.", e));
472 exception = e;
473 } else {
474 log.fine(ExceptionToString.process("LogReadingWorker: Exception at LogSendingWorker, expected.", e));
475 }
476 break;
477 }
478 }
479 } catch (Exception e) {
480 if (!exceptionExpected) {
481 log.severe(ExceptionToString.process("LogReadingWorker: Exception at LogSendingWorker.", e));
482 exception = e;
483 } else {
484 log.fine(ExceptionToString.process("LogReadingWorker: Exception at LogSendingWorker, expected.", e));
485 }
486 } finally {
487 try {
488 socket.close();
489 } catch (Exception e) {
490 }
491 connected.setFlag(false);
492 running.setFlag(false);
493 if (exception != null) {
494 loggingStoppedCallback.notifySafe(loggingStoppedNotifier.setEvent(new LoggingStopped(exception)), log);
495 } else {
496 loggingStoppedCallback.notifySafe(loggingStoppedNotifier.setEvent(new LoggingStopped()), log);
497 }
498 if (implicitRemoveListeners) {
499 log.warning("LogReadingWorker: Removing all listeners...");
500 loggingStartedCallback.clearListeners();
501 logReadCallback.clearListeners();
502 loggingStoppedCallback.clearListeners();
503 }
504
505 log.warning("LogSendingWorker: Stopped.");
506 }
507 }
508
509 }
510
511 }