Fix: Java agent: handle partial payload read
[lttng-ust.git] / liblttng-ust-java-agent / java / lttng-ust-agent-common / org / lttng / ust / agent / client / LttngTcpSessiondClient.java
index d7ed6da4af582f29c4d63ac3669c25b2cfa903b7..0f979a4f88da7d7fa97da19c6786f0dcc9fe1e90 100644 (file)
@@ -1,4 +1,5 @@
 /*
+ * Copyright (C) 2015-2016 EfficiOS Inc., Alexandre Montplaisir <alexmonthy@efficios.com>
  * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
  *
  * This library is free software; you can redistribute it and/or modify it
@@ -20,18 +21,20 @@ package org.lttng.ust.agent.client;
 import java.io.BufferedReader;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
 import java.net.Socket;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.charset.Charset;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.lttng.ust.agent.AbstractLttngAgent;
+import org.lttng.ust.agent.utils.LttngUstAgentLogger;
 
 /**
  * Client for agents to connect to a local session daemon, using a TCP socket.
@@ -43,12 +46,12 @@ public class LttngTcpSessiondClient implements Runnable {
        private static final String SESSION_HOST = "127.0.0.1";
        private static final String ROOT_PORT_FILE = "/var/run/lttng/agent.port";
        private static final String USER_PORT_FILE = "/.lttng/agent.port";
+       private static final Charset PORT_FILE_ENCODING = Charset.forName("UTF-8");
 
-       private static int protocolMajorVersion = 1;
-       private static int protocolMinorVersion = 0;
+       private static final int PROTOCOL_MAJOR_VERSION = 2;
+       private static final int PROTOCOL_MINOR_VERSION = 0;
 
        /** Command header from the session deamon. */
-       private final SessiondHeaderCommand headerCmd = new SessiondHeaderCommand();
        private final CountDownLatch registrationLatch = new CountDownLatch(1);
 
        private Socket sessiondSock;
@@ -57,21 +60,26 @@ public class LttngTcpSessiondClient implements Runnable {
        private DataInputStream inFromSessiond;
        private DataOutputStream outToSessiond;
 
-       private final AbstractLttngAgent<?> logAgent;
+       private final ILttngTcpClientListener logAgent;
+       private final int domainValue;
        private final boolean isRoot;
 
-
        /**
         * Constructor
         *
         * @param logAgent
-        *            The logging agent this client will operate on.
+        *            The listener this client will operate on, typically an LTTng
+        *            agent.
+        * @param domainValue
+        *            The integer to send to the session daemon representing the
+        *            tracing domain to handle.
         * @param isRoot
         *            True if this client should connect to the root session daemon,
         *            false if it should connect to the user one.
         */
-       public LttngTcpSessiondClient(AbstractLttngAgent<?> logAgent, boolean isRoot) {
+       public LttngTcpSessiondClient(ILttngTcpClientListener logAgent, int domainValue, boolean isRoot) {
                this.logAgent = logAgent;
+               this.domainValue = domainValue;
                this.isRoot = isRoot;
        }
 
@@ -106,12 +114,14 @@ public class LttngTcpSessiondClient implements Runnable {
                                /*
                                 * Connect to the session daemon before anything else.
                                 */
+                               log("Connecting to sessiond");
                                connectToSessiond();
 
                                /*
                                 * Register to the session daemon as the Java component of the
                                 * UST application.
                                 */
+                               log("Registering to sessiond");
                                registerToSessiond();
 
                                /*
@@ -119,6 +129,7 @@ public class LttngTcpSessiondClient implements Runnable {
                                 * session daemon. This will return if and only if there is a
                                 * fatal error or the socket closes.
                                 */
+                               log("Waiting on sessiond commands...");
                                handleSessiondCmd();
                        } catch (UnknownHostException uhe) {
                                uhe.printStackTrace();
@@ -136,6 +147,7 @@ public class LttngTcpSessiondClient implements Runnable {
         * Dispose this client and close any socket connection it may hold.
         */
        public void close() {
+               log("Closing client");
                this.quit = true;
 
                try {
@@ -147,54 +159,112 @@ public class LttngTcpSessiondClient implements Runnable {
                }
        }
 
-       /**
-        * Receive header data from the session daemon using the LTTng command
-        * static buffer of the right size.
-        */
-       private void recvHeader() throws IOException {
-               byte data[] = new byte[SessiondHeaderCommand.HEADER_SIZE];
+       private void connectToSessiond() throws IOException {
+               int rootPort = getPortFromFile(ROOT_PORT_FILE);
+               int userPort = getPortFromFile(getHomePath() + USER_PORT_FILE);
+
+               /*
+                * Check for the edge case of both files existing but pointing to the
+                * same port. In this case, let the root client handle it.
+                */
+               if ((rootPort != 0) && (rootPort == userPort) && (!isRoot)) {
+                       log("User and root config files both point to port " + rootPort +
+                                       ". Letting the root client handle it.");
+                       throw new IOException();
+               }
 
-               int readLen = this.inFromSessiond.read(data, 0, data.length);
-               if (readLen != data.length) {
+               int portToUse = (isRoot ? rootPort : userPort);
+
+               if (portToUse == 0) {
+                       /* No session daemon available. Stop and retry later. */
                        throw new IOException();
                }
-               this.headerCmd.populate(data);
+
+               this.sessiondSock = new Socket(SESSION_HOST, portToUse);
+               this.inFromSessiond = new DataInputStream(sessiondSock.getInputStream());
+               this.outToSessiond = new DataOutputStream(sessiondSock.getOutputStream());
+       }
+
+       private static String getHomePath() {
+               /*
+                * The environment variable LTTNG_HOME overrides HOME if
+                * defined.
+                */
+               String homePath = System.getenv("LTTNG_HOME");
+
+               if (homePath == null) {
+                       homePath = System.getProperty("user.home");
+               }
+               return homePath;
        }
 
        /**
-        * Receive payload from the session daemon. This MUST be done after a
-        * recvHeader() so the header value of a command are known.
+        * Read port number from file created by the session daemon.
         *
-        * The caller SHOULD use isPayload() before which returns true if a payload
-        * is expected after the header.
+        * @return port value if found else 0.
         */
-       private byte[] recvPayload() throws IOException {
-               byte payload[] = new byte[(int) this.headerCmd.getDataSize()];
+       private static int getPortFromFile(String path) throws IOException {
+               BufferedReader br = null;
 
-               /* Failsafe check so we don't waste our time reading 0 bytes. */
-               if (payload.length == 0) {
-                       return null;
+               try {
+                       br = new BufferedReader(new InputStreamReader(new FileInputStream(path), PORT_FILE_ENCODING));
+                       String line = br.readLine();
+                       if (line == null) {
+                               /* File exists but is empty. */
+                               return 0;
+                       }
+
+                       int port = Integer.parseInt(line, 10);
+                       if (port < 0 || port > 65535) {
+                               /* Invalid value. Ignore. */
+                               port = 0;
+                       }
+                       return port;
+
+               } catch (NumberFormatException e) {
+                       /* File contained something that was not a number. */
+                       return 0;
+               } catch (FileNotFoundException e) {
+                       /* No port available. */
+                       return 0;
+               } finally {
+                       if (br != null) {
+                               br.close();
+                       }
                }
+       }
 
-               this.inFromSessiond.read(payload, 0, payload.length);
-               return payload;
+       private void registerToSessiond() throws IOException {
+               byte data[] = new byte[16];
+               ByteBuffer buf = ByteBuffer.wrap(data);
+               String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
+
+               buf.putInt(domainValue);
+               buf.putInt(Integer.parseInt(pid));
+               buf.putInt(PROTOCOL_MAJOR_VERSION);
+               buf.putInt(PROTOCOL_MINOR_VERSION);
+               this.outToSessiond.write(data, 0, data.length);
+               this.outToSessiond.flush();
        }
 
        /**
         * Handle session command from the session daemon.
         */
        private void handleSessiondCmd() throws IOException {
-               byte data[] = null;
+               /* Data read from the socket */
+               byte inputData[] = null;
+               /* Reply data written to the socket, sent to the sessiond */
+               LttngAgentResponse response;
 
                while (true) {
                        /* Get header from session daemon. */
-                       recvHeader();
+                       SessiondCommandHeader cmdHeader = recvHeader();
 
-                       if (headerCmd.getDataSize() > 0) {
-                               data = recvPayload();
+                       if (cmdHeader.getDataSize() > 0) {
+                               inputData = recvPayload(cmdHeader);
                        }
 
-                       switch (headerCmd.getCommandType()) {
+                       switch (cmdHeader.getCommandType()) {
                        case CMD_REG_DONE:
                        {
                                /*
@@ -206,129 +276,144 @@ public class LttngTcpSessiondClient implements Runnable {
                                 * We don't send any reply to the registration done command.
                                 * This just marks the end of the initial session setup.
                                 */
+                               log("Registration done");
                                continue;
                        }
                        case CMD_LIST:
                        {
-                               SessiondListLoggersResponse listLoggerCmd = new SessiondListLoggersResponse();
-                               listLoggerCmd.execute(logAgent);
-                               data = listLoggerCmd.getBytes();
+                               SessiondCommand listLoggerCmd = new SessiondListLoggersCommand();
+                               response = listLoggerCmd.execute(logAgent);
+                               log("Received list loggers command");
+                               break;
+                       }
+                       case CMD_EVENT_ENABLE:
+                       {
+                               if (inputData == null) {
+                                       /* Invalid command */
+                                       response = LttngAgentResponse.FAILURE_RESPONSE;
+                                       break;
+                               }
+                               SessiondCommand enableEventCmd = new SessiondEnableEventCommand(inputData);
+                               response = enableEventCmd.execute(logAgent);
+                               log("Received enable event command: " + enableEventCmd.toString());
+                               break;
+                       }
+                       case CMD_EVENT_DISABLE:
+                       {
+                               if (inputData == null) {
+                                       /* Invalid command */
+                                       response = LttngAgentResponse.FAILURE_RESPONSE;
+                                       break;
+                               }
+                               SessiondCommand disableEventCmd = new SessiondDisableEventCommand(inputData);
+                               response = disableEventCmd.execute(logAgent);
+                               log("Received disable event command: " + disableEventCmd.toString());
                                break;
                        }
-                       case CMD_ENABLE:
+                       case CMD_APP_CTX_ENABLE:
                        {
-                               SessiondEnableHandler enableCmd = new SessiondEnableHandler();
-                               if (data == null) {
-                                       enableCmd.code = ISessiondResponse.LttngAgentRetCode.CODE_INVALID_CMD;
+                               if (inputData == null) {
+                                       /* This commands expects a payload, invalid command */
+                                       response = LttngAgentResponse.FAILURE_RESPONSE;
                                        break;
                                }
-                               enableCmd.populate(data);
-                               enableCmd.execute(logAgent);
-                               data = enableCmd.getBytes();
+                               SessiondCommand enableAppCtxCmd = new SessiondEnableAppContextCommand(inputData);
+                               response = enableAppCtxCmd.execute(logAgent);
+                               log("Received enable app-context command");
                                break;
                        }
-                       case CMD_DISABLE:
+                       case CMD_APP_CTX_DISABLE:
                        {
-                               SessiondDisableHandler disableCmd = new SessiondDisableHandler();
-                               if (data == null) {
-                                       disableCmd.setRetCode(ISessiondResponse.LttngAgentRetCode.CODE_INVALID_CMD);
+                               if (inputData == null) {
+                                       /* This commands expects a payload, invalid command */
+                                       response = LttngAgentResponse.FAILURE_RESPONSE;
                                        break;
                                }
-                               disableCmd.populate(data);
-                               disableCmd.execute(logAgent);
-                               data = disableCmd.getBytes();
+                               SessiondCommand disableAppCtxCmd = new SessiondDisableAppContextCommand(inputData);
+                               response = disableAppCtxCmd.execute(logAgent);
+                               log("Received disable app-context command");
                                break;
                        }
                        default:
                        {
-                               data = new byte[4];
-                               ByteBuffer buf = ByteBuffer.wrap(data);
-                               buf.order(ByteOrder.BIG_ENDIAN);
+                               /* Unknown command, send empty reply */
+                               response = null;
+                               log("Received unknown command, ignoring");
                                break;
                        }
                        }
 
-                       if (data == null) {
-                               /*
-                                * Simply used to silence a potential null access warning below.
-                                *
-                                * The flow analysis gets confused here and thinks "data" may be
-                                * null at this point. It should not happen according to program
-                                * logic, if it does we've done something wrong.
-                                */
-                               throw new IllegalStateException();
+                       /* Send response to the session daemon. */
+                       byte[] responseData;
+                       if (response == null) {
+                               responseData = new byte[4];
+                               ByteBuffer buf = ByteBuffer.wrap(responseData);
+                               buf.order(ByteOrder.BIG_ENDIAN);
+                       } else {
+                               log("Sending response: " + response.toString());
+                               responseData = response.getBytes();
                        }
-                       /* Send payload to session daemon. */
-                       this.outToSessiond.write(data, 0, data.length);
+                       this.outToSessiond.write(responseData, 0, responseData.length);
                        this.outToSessiond.flush();
                }
        }
 
-       private static String getHomePath() {
-               return System.getProperty("user.home");
+       /**
+        * Receive header data from the session daemon using the LTTng command
+        * static buffer of the right size.
+        */
+       private SessiondCommandHeader recvHeader() throws IOException {
+               byte data[] = new byte[SessiondCommandHeader.HEADER_SIZE];
+               int bytesLeft = data.length;
+               int bytesOffset = 0;
+
+               while (bytesLeft != 0) {
+                       int bytesRead = this.inFromSessiond.read(data, bytesOffset, bytesLeft);
+
+                       if (bytesRead < 0) {
+                               throw new IOException();
+                       }
+                       bytesLeft -= bytesRead;
+                       bytesOffset += bytesRead;
+               }
+               return new SessiondCommandHeader(data);
        }
 
        /**
-        * Read port number from file created by the session daemon.
+        * Receive payload from the session daemon. This MUST be done after a
+        * recvHeader() so the header value of a command are known.
         *
-        * @return port value if found else 0.
+        * The caller SHOULD use isPayload() before which returns true if a payload
+        * is expected after the header.
         */
-       private static int getPortFromFile(String path) throws IOException {
-               int port;
-               BufferedReader br = null;
+       private byte[] recvPayload(SessiondCommandHeader headerCmd) throws IOException {
+               byte payload[] = new byte[(int) headerCmd.getDataSize()];
+               int bytesLeft = payload.length;
+               int bytesOffset = 0;
 
-               try {
-                       br = new BufferedReader(new FileReader(path));
-                       String line = br.readLine();
-                       port = Integer.parseInt(line, 10);
-                       if (port < 0 || port > 65535) {
-                               /* Invalid value. Ignore. */
-                               port = 0;
-                       }
-               } catch (FileNotFoundException e) {
-                       /* No port available. */
-                       port = 0;
-               } finally {
-                       if (br != null) {
-                               br.close();
-                       }
+               /* Failsafe check so we don't waste our time reading 0 bytes. */
+               if (bytesLeft == 0) {
+                       return null;
                }
 
-               return port;
-       }
+               while (bytesLeft != 0) {
+                       int bytesRead = inFromSessiond.read(payload, bytesOffset, bytesLeft);
 
-       private void connectToSessiond() throws IOException {
-               int port;
-
-               if (this.isRoot) {
-                       port = getPortFromFile(ROOT_PORT_FILE);
-                       if (port == 0) {
-                               /* No session daemon available. Stop and retry later. */
-                               throw new IOException();
-                       }
-               } else {
-                       port = getPortFromFile(getHomePath() + USER_PORT_FILE);
-                       if (port == 0) {
-                               /* No session daemon available. Stop and retry later. */
+                       if (bytesRead < 0) {
                                throw new IOException();
                        }
+                       bytesLeft -= bytesRead;
+                       bytesOffset += bytesRead;
                }
-
-               this.sessiondSock = new Socket(SESSION_HOST, port);
-               this.inFromSessiond = new DataInputStream(sessiondSock.getInputStream());
-               this.outToSessiond = new DataOutputStream(sessiondSock.getOutputStream());
+               return payload;
        }
 
-       private void registerToSessiond() throws IOException {
-               byte data[] = new byte[16];
-               ByteBuffer buf = ByteBuffer.wrap(data);
-               String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
-
-               buf.putInt(logAgent.getDomain().value());
-               buf.putInt(Integer.parseInt(pid));
-               buf.putInt(protocolMajorVersion);
-               buf.putInt(protocolMinorVersion);
-               this.outToSessiond.write(data, 0, data.length);
-               this.outToSessiond.flush();
+       /**
+        * Wrapper for this class's logging, adds the connection's characteristics
+        * to help differentiate between multiple TCP clients.
+        */
+       private void log(String message) {
+               LttngUstAgentLogger.log(getClass(),
+                               "(root=" + isRoot + ", domain=" + domainValue + ") " + message);
        }
 }
This page took 0.029465 seconds and 4 git commands to generate.