#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
+#include <inttypes.h>
#include <common/common.h>
#include <common/defaults.h>
* Send command. Fill up the header and append the data.
*/
static int send_command(struct lttcomm_sock *sock,
- enum lttcomm_sessiond_command cmd, void *data, size_t size,
+ enum lttcomm_relayd_command cmd, void *data, size_t size,
int flags)
{
int ret;
ret = sock->ops->sendmsg(sock, buf, buf_size, flags);
if (ret < 0) {
+ ret = -errno;
goto error;
}
- DBG3("Relayd sending command %d", cmd);
+ DBG3("Relayd sending command %d of size %" PRIu64, cmd, buf_size);
error:
free(buf);
{
int ret;
- DBG3("Relayd waiting for reply...");
+ DBG3("Relayd waiting for reply of size %ld", size);
ret = sock->ops->recvmsg(sock, data, size, 0);
- if (ret < 0) {
+ if (ret <= 0 || ret != size) {
+ if (ret == 0) {
+ /* Orderly shutdown. */
+ DBG("Socket %d has performed an orderly shutdown", sock->fd);
+ } else {
+ DBG("Receiving reply failed on sock %d for size %lu with ret %d",
+ sock->fd, size, ret);
+ }
+ /* Always return -1 here and the caller can use errno. */
+ ret = -1;
goto error;
}
return ret;
}
-#if 0
/*
- * Create session on the relayd.
+ * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
+ * set session_id of the relayd if we have a successful reply from the relayd.
*
- * On error, return ret_code negative value else return 0.
+ * On success, return 0 else a negative value which is either an errno error or
+ * a lttng error code from the relayd.
*/
-int relayd_create_session(struct lttcomm_sock *sock, const char *hostname,
- const char *session_name)
+int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id)
{
int ret;
- struct lttcomm_relayd_create_session msg;
- struct lttcomm_relayd_generic_reply reply;
+ struct lttcomm_relayd_status_session reply;
- /* Code flow error. Safety net. */
assert(sock);
- assert(hostname);
- assert(session_name);
-
- DBG("Relayd creating session for hostname %s and session name %s",
- hostname, session_name);
+ assert(session_id);
- strncpy(msg.hostname, hostname, sizeof(msg.hostname));
- strncpy(msg.session_name, session_name, sizeof(msg.session_name));
+ DBG("Relayd create session");
/* Send command */
- ret = send_command(sock, RELAYD_CREATE_SESSION, (void *) &msg,
- sizeof(msg), 0);
+ ret = send_command(sock, RELAYD_CREATE_SESSION, NULL, 0, 0);
if (ret < 0) {
goto error;
}
- /* Recevie response */
+ /* Receive response */
ret = recv_reply(sock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
}
+ reply.session_id = be64toh(reply.session_id);
+ reply.ret_code = be32toh(reply.ret_code);
+
/* Return session id or negative ret code. */
- if (reply.ret_code != LTTCOMM_OK) {
- ret = -reply.ret_code;
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd create session replied error %d", reply.ret_code);
+ goto error;
} else {
- /* Success */
ret = 0;
+ *session_id = reply.session_id;
}
- DBG2("Relayd created session for %s", session_name);
+ DBG("Relayd session created with id %" PRIu64, reply.session_id);
error:
return ret;
}
-#endif
/*
* Add stream on the relayd and assign stream handle to the stream_id argument.
goto error;
}
- /* Recevie response */
+ /* Waiting for reply */
ret = recv_reply(sock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
reply.ret_code = be32toh(reply.ret_code);
/* Return session id or negative ret code. */
- if (reply.ret_code != LTTCOMM_OK) {
- ret = -reply.ret_code;
- ERR("Relayd add stream replied error %d", ret);
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd add stream replied error %d", reply.ret_code);
} else {
/* Success */
ret = 0;
*stream_id = reply.handle;
}
- DBG("Relayd stream added successfully with handle %zu", reply.handle);
+ DBG("Relayd stream added successfully with handle %" PRIu64,
+ reply.handle);
error:
return ret;
uint32_t minor)
{
int ret;
- struct lttcomm_relayd_version reply;
+ struct lttcomm_relayd_version msg;
/* Code flow error. Safety net. */
assert(sock);
DBG("Relayd version check for major.minor %u.%u", major, minor);
+ /* Prepare network byte order before transmission. */
+ msg.major = htobe32(major);
+ msg.minor = htobe32(minor);
+
/* Send command */
- ret = send_command(sock, RELAYD_VERSION, NULL, 0, 0);
+ ret = send_command(sock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
if (ret < 0) {
goto error;
}
- /* Recevie response */
- ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ /* Receive response */
+ ret = recv_reply(sock, (void *) &msg, sizeof(msg));
if (ret < 0) {
goto error;
}
/* Set back to host bytes order */
- reply.major = be32toh(reply.major);
- reply.minor = be32toh(reply.minor);
-
- /* Validate version */
- if (reply.major <= major) {
- if (reply.minor <= minor) {
- /* Compatible */
- ret = 0;
- DBG2("Relayd version is compatible");
- goto error;
- }
- }
-
- /* Version number not compatible */
- DBG2("Relayd version is NOT compatible %u.%u > %u.%u", reply.major,
- reply.minor, major, minor);
- ret = -1;
+ msg.major = be32toh(msg.major);
+ msg.minor = be32toh(msg.minor);
-error:
- return ret;
-}
-
-#if 0
-/*
- * Start data command on the relayd.
- *
- * On success return 0 else return ret_code negative value.
- */
-int relayd_start_data(struct lttcomm_sock *sock)
-{
- int ret;
- struct lttcomm_relayd_generic_reply reply;
-
- /* Code flow error. Safety net. */
- assert(sock);
-
- DBG("Relayd start data command");
-
- /* Send command */
- ret = send_command(sock, RELAYD_START_DATA, NULL, 0, 0);
- if (ret < 0) {
+ /*
+ * Only validate the major version. If the other side is higher,
+ * communication is not possible. Only major version equal can talk to each
+ * other. If the minor version differs, the lowest version is used by both
+ * sides.
+ *
+ * For now, before 2.1.0 stable release, we don't have to check the minor
+ * because this new mechanism with the relayd will only be available with
+ * 2.1 and NOT 2.0.x.
+ */
+ if (msg.major == major) {
+ /* Compatible */
+ ret = 0;
+ DBG2("Relayd version is compatible");
goto error;
}
- /* Recevie response */
- ret = recv_reply(sock, (void *) &reply, sizeof(reply));
- if (ret < 0) {
- goto error;
- }
+ /*
+ * After 2.1.0 release, for the 2.2 release, at this point will have to
+ * check the minor version in order for the session daemon to know which
+ * structure to use to communicate with the relayd. If the relayd's minor
+ * version is higher, it will adapt to our version so we can continue to
+ * use the latest relayd communication data structure.
+ */
- /* Return session id or negative ret code. */
- if (reply.ret_code != LTTCOMM_OK) {
- ret = -reply.ret_code;
- } else {
- /* Success */
- ret = 0;
- }
+ /* Version number not compatible */
+ DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
+ msg.major, major);
+ ret = -1;
error:
return ret;
}
-#endif
/*
* Add stream on the relayd and assign stream handle to the stream_id argument.
/* Code flow error. Safety net. */
assert(sock);
- DBG("Relayd sending metadata of size %lu", len);
+ DBG("Relayd sending metadata of size %zu", len);
/* Send command */
ret = send_command(sock, RELAYD_SEND_METADATA, NULL, len, 0);
/*
* After that call, the metadata data MUST be sent to the relayd so the
* receive size on the other end matches the len of the metadata packet
- * header.
+ * header. This is why we don't wait for a reply here.
*/
error:
/*
* Close relayd socket with an allocated lttcomm_sock.
+ *
+ * If no socket operations are found, simply return 0 meaning that everything
+ * is fine. Without operations, the socket can not possibly be opened or used.
+ * This is possible if the socket was allocated but not created. However, the
+ * caller could simply use it to store a valid file descriptor for instance
+ * passed over a Unix socket and call this to cleanup but still without a valid
+ * ops pointer.
+ *
+ * Return the close returned value. On error, a negative value is usually
+ * returned back from close(2).
*/
int relayd_close(struct lttcomm_sock *sock)
{
+ int ret;
+
/* Code flow error. Safety net. */
assert(sock);
+ /* An invalid fd is fine, return success. */
+ if (sock->fd < 0) {
+ ret = 0;
+ goto end;
+ }
+
DBG3("Relayd closing socket %d", sock->fd);
- return sock->ops->close(sock);
+ if (sock->ops) {
+ ret = sock->ops->close(sock);
+ } else {
+ /* Default call if no specific ops found. */
+ ret = close(sock->fd);
+ if (ret < 0) {
+ PERROR("relayd_close default close");
+ }
+ }
+
+end:
+ return ret;
}
/*
assert(sock);
assert(hdr);
- DBG3("Relayd sending data header...");
+ DBG3("Relayd sending data header of size %ld", size);
/* Again, safety net */
if (size == 0) {
/* Only send data header. */
ret = sock->ops->sendmsg(sock, hdr, size, 0);
if (ret < 0) {
+ ret = -errno;
goto error;
}
/* Code flow error. Safety net. */
assert(sock);
- DBG("Relayd closing stream id %zu", stream_id);
+ DBG("Relayd closing stream id %" PRIu64, stream_id);
msg.stream_id = htobe64(stream_id);
msg.last_net_seq_num = htobe64(last_net_seq_num);
goto error;
}
- /* Recevie response */
+ /* Receive response */
ret = recv_reply(sock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
reply.ret_code = be32toh(reply.ret_code);
/* Return session id or negative ret code. */
- if (reply.ret_code != LTTCOMM_OK) {
- ret = -reply.ret_code;
- ERR("Relayd close stream replied error %d", ret);
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd close stream replied error %d", reply.ret_code);
} else {
/* Success */
ret = 0;
}
- DBG("Relayd close stream id %zu successfully", stream_id);
+ DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
+
+error:
+ return ret;
+}
+
+/*
+ * Check for data availability for a given stream id.
+ *
+ * Return 0 if NOT pending, 1 if so and a negative value on error.
+ */
+int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
+ uint64_t last_net_seq_num)
+{
+ int ret;
+ struct lttcomm_relayd_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(sock);
+
+ DBG("Relayd data pending for stream id %" PRIu64, stream_id);
+
+ msg.stream_id = htobe64(stream_id);
+ msg.last_net_seq_num = htobe64(last_net_seq_num);
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_DATA_PENDING, (void *) &msg,
+ sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Receive response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code >= LTTNG_OK) {
+ ERR("Relayd data pending replied error %d", reply.ret_code);
+ }
+
+ /* At this point, the ret code is either 1 or 0 */
+ ret = reply.ret_code;
+
+ DBG("Relayd data is %s pending for stream id %" PRIu64,
+ ret == 1 ? "" : "NOT", stream_id);
+
+error:
+ return ret;
+}
+
+/*
+ * Check on the relayd side for a quiescent state on the control socket.
+ */
+int relayd_quiescent_control(struct lttcomm_sock *sock,
+ uint64_t metadata_stream_id)
+{
+ int ret;
+ struct lttcomm_relayd_quiescent_control msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(sock);
+
+ DBG("Relayd checking quiescent control state");
+
+ msg.stream_id = htobe64(metadata_stream_id);
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Receive response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd quiescent control replied error %d", reply.ret_code);
+ goto error;
+ }
+
+ /* Control socket is quiescent */
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * Begin a data pending command for a specific session id.
+ */
+int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id)
+{
+ int ret;
+ struct lttcomm_relayd_begin_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(sock);
+
+ DBG("Relayd begin data pending");
+
+ msg.session_id = htobe64(id);
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Receive response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd begin data pending replied error %d", reply.ret_code);
+ goto error;
+ }
+
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * End a data pending command for a specific session id.
+ *
+ * Return 0 on success and set is_data_inflight to 0 if no data is being
+ * streamed or 1 if it is the case.
+ */
+int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+ unsigned int *is_data_inflight)
+{
+ int ret;
+ struct lttcomm_relayd_end_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(sock);
+
+ DBG("Relayd end data pending");
+
+ msg.session_id = htobe64(id);
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Receive response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+ if (reply.ret_code < 0) {
+ ret = reply.ret_code;
+ goto error;
+ }
+
+ *is_data_inflight = reply.ret_code;
+
+ DBG("Relayd end data pending is data inflight: %d", reply.ret_code);
+
+ return 0;
error:
return ret;