Command metadata regenerate
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 2ce9bf048bdd17e0c0402a7ef9ac467683885392..cc5009940e13cf05b49d5122879bfc24a110a838 100644 (file)
@@ -18,7 +18,6 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <getopt.h>
 #include <grp.h>
@@ -40,7 +39,6 @@
 #include <urcu/uatomic.h>
 #include <unistd.h>
 #include <fcntl.h>
-#include <config.h>
 
 #include <lttng/lttng.h>
 #include <common/common.h>
@@ -56,7 +54,7 @@
 #include <common/sessiond-comm/relayd.h>
 #include <common/uri.h>
 #include <common/utils.h>
-#include <common/config/config.h>
+#include <common/config/session-config.h>
 #include <urcu/rculist.h>
 
 #include "cmd.h"
@@ -307,7 +305,7 @@ end:
 
 /*
  * config_entry_handler_cb used to handle options read from a config file.
- * See config_entry_handler_cb comment in common/config/config.h for the
+ * See config_entry_handler_cb comment in common/config/session-config.h for the
  * return value conventions.
  */
 static int config_entry_handler(const struct config_entry *entry, void *unused)
@@ -864,10 +862,7 @@ restart:
                                goto exit;
                        }
 
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("socket poll error");
-                               goto error;
-                       } else if (revents & LPOLLIN) {
+                       if (revents & LPOLLIN) {
                                /*
                                 * A new connection is requested, therefore a
                                 * sessiond/consumerd connection is allocated in
@@ -919,6 +914,12 @@ restart:
                                 * exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&relay_conn_queue.futex);
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("socket poll error");
+                               goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
@@ -1035,7 +1036,8 @@ error_testpoint:
  * Set index data from the control port to a given index object.
  */
 static int set_index_control_data(struct relay_index *index,
-               struct lttcomm_relayd_index *data)
+               struct lttcomm_relayd_index *data,
+               struct relay_connection *conn)
 {
        struct ctf_packet_index index_data;
 
@@ -1051,6 +1053,12 @@ static int set_index_control_data(struct relay_index *index,
        index_data.timestamp_end = data->timestamp_end;
        index_data.events_discarded = data->events_discarded;
        index_data.stream_id = data->stream_id;
+
+       if (conn->minor >= 8) {
+               index->index_data.stream_instance_id = data->stream_instance_id;
+               index->index_data.packet_seq_num = data->packet_seq_num;
+       }
+
        return relay_index_set_data(index, &index_data);
 }
 
@@ -1065,13 +1073,13 @@ static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
        int ret = 0, send_ret;
        struct relay_session *session;
        struct lttcomm_relayd_status_session reply;
-       char session_name[NAME_MAX];
-       char hostname[HOST_NAME_MAX];
+       char session_name[LTTNG_NAME_MAX];
+       char hostname[LTTNG_HOST_NAME_MAX];
        uint32_t live_timer = 0;
        bool snapshot = false;
 
-       memset(session_name, 0, NAME_MAX);
-       memset(hostname, 0, HOST_NAME_MAX);
+       memset(session_name, 0, LTTNG_NAME_MAX);
+       memset(hostname, 0, LTTNG_HOST_NAME_MAX);
 
        memset(&reply, 0, sizeof(reply));
 
@@ -1322,6 +1330,90 @@ end_no_session:
        return ret;
 }
 
+/*
+ * relay_reset_metadata: reset a metadata stream
+ */
+static
+int relay_reset_metadata(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       int ret, send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_reset_metadata stream_info;
+       struct lttcomm_relayd_generic_reply reply;
+       struct relay_stream *stream;
+
+       DBG("Reset metadata received");
+
+       if (!session || conn->version_check_done == 0) {
+               ERR("Trying to reset a metadata stream before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       ret = conn->sock->ops->recvmsg(conn->sock, &stream_info,
+                       sizeof(struct lttcomm_relayd_reset_metadata), 0);
+       if (ret < sizeof(struct lttcomm_relayd_reset_metadata)) {
+               if (ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Relay didn't receive valid reset_metadata struct "
+                                       "size : %d", ret);
+               }
+               ret = -1;
+               goto end_no_session;
+       }
+       DBG("Update metadata to version %" PRIu64, be64toh(stream_info.version));
+
+       /* Unsupported for live sessions for now. */
+       if (session->live_timer != 0) {
+               ret = -1;
+               goto end;
+       }
+
+       stream = stream_get_by_id(be64toh(stream_info.stream_id));
+       if (!stream) {
+               ret = -1;
+               goto end;
+       }
+       pthread_mutex_lock(&stream->lock);
+       if (!stream->is_metadata) {
+               ret = -1;
+               goto end_unlock;
+       }
+
+       ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
+                       0, 0, -1, -1, stream->stream_fd->fd, NULL,
+                       &stream->stream_fd->fd);
+       if (ret < 0) {
+               ERR("Failed to rotate metadata file %s of channel %s",
+                               stream->path_name, stream->channel_name);
+               goto end_unlock;
+       }
+
+end_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       stream_put(stream);
+
+end:
+       memset(&reply, 0, sizeof(reply));
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply), 0);
+       if (send_ret < 0) {
+               ERR("Relay sending reset metadata reply");
+               ret = send_ret;
+       }
+
+end_no_session:
+       return ret;
+}
+
 /*
  * relay_unknown_command: send -1 if received unknown command
  */
@@ -1924,7 +2016,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                ERR("relay_index_get_by_id_or_create index NULL");
                goto end_stream_put;
        }
-       if (set_index_control_data(index, &index_info)) {
+       if (set_index_control_data(index, &index_info, conn)) {
                ERR("set_index_control_data error");
                relay_index_put(index);
                ret = -1;
@@ -2052,6 +2144,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_STREAMS_SENT:
                ret = relay_streams_sent(recv_hdr, conn);
                break;
+       case RELAYD_RESET_METADATA:
+               ret = relay_reset_metadata(recv_hdr, conn);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
@@ -2081,8 +2176,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
        /* Get data offset because we are about to update the index. */
        data_offset = htobe64(stream->tracefile_size_current);
 
-       DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64,
-               stream->stream_handle, stream->tracefile_size_current);
+       DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
+                       stream->stream_handle, net_seq_num, stream->tracefile_size_current);
 
        /*
         * Lookup for an existing index for that stream id/sequence
@@ -2180,6 +2275,7 @@ static int relay_process_data(struct relay_connection *conn)
        stream_id = be64toh(data_hdr.stream_id);
        stream = stream_get_by_id(stream_id);
        if (!stream) {
+               ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id);
                ret = -1;
                goto end;
        }
@@ -2209,6 +2305,8 @@ static int relay_process_data(struct relay_connection *conn)
                if (ret == 0) {
                        /* Orderly shutdown. Not necessary to print an error. */
                        DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Socket %d error %d", conn->sock->fd, ret);
                }
                ret = -1;
                goto end_stream_put;
@@ -2252,6 +2350,8 @@ static int relay_process_data(struct relay_connection *conn)
        if (session->minor >= 4 && !session->snapshot) {
                ret = handle_index_data(stream, net_seq_num, rotate_index);
                if (ret < 0) {
+                       ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+                                       stream->stream_handle, net_seq_num, ret);
                        goto end_stream_unlock;
                }
        }
@@ -2270,6 +2370,8 @@ static int relay_process_data(struct relay_connection *conn)
        ret = write_padding_to_file(stream->stream_fd->fd,
                        be32toh(data_hdr.padding_size));
        if (ret < 0) {
+               ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+                               stream->stream_handle, net_seq_num, ret);
                goto end_stream_unlock;
        }
        stream->tracefile_size_current +=
@@ -2429,10 +2531,7 @@ restart:
 
                        /* Inspect the relay conn pipe for new connection */
                        if (pollfd == relay_conn_pipe[0]) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       ERR("Relay connection pipe error");
-                                       goto error;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        struct relay_connection *conn;
 
                                        ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
@@ -2443,6 +2542,12 @@ restart:
                                                        LPOLLIN | LPOLLRDHUP);
                                        connection_ht_add(relay_connections_ht, conn);
                                        DBG("Connection socket %d added", conn->sock->fd);
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("Relay connection pipe error");
+                                       goto error;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        } else {
                                struct relay_connection *ctrl_conn;
@@ -2451,29 +2556,8 @@ restart:
                                /* If not found, there is a synchronization issue. */
                                assert(ctrl_conn);
 
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       relay_thread_close_connection(&events, pollfd, ctrl_conn);
-                                       if (last_seen_data_fd == pollfd) {
-                                               last_seen_data_fd = last_notdel_data_fd;
-                                       }
-                               } else if (revents & LPOLLIN) {
-                                       if (ctrl_conn->type == RELAY_CONTROL) {
-                                               ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, &recv_hdr,
-                                                               sizeof(recv_hdr), 0);
-                                               if (ret <= 0) {
-                                                       /* Connection closed */
-                                                       relay_thread_close_connection(&events, pollfd,
-                                                               ctrl_conn);
-                                               } else {
-                                                       ret = relay_process_control(&recv_hdr, ctrl_conn);
-                                                       if (ret < 0) {
-                                                               /* Clear the session on error. */
-                                                               relay_thread_close_connection(&events, pollfd,
-                                                                       ctrl_conn);
-                                                       }
-                                                       seen_control = 1;
-                                               }
-                                       } else {
+                               if (ctrl_conn->type == RELAY_DATA) {
+                                       if (revents & LPOLLIN) {
                                                /*
                                                 * Flag the last seen data fd not deleted. It will be
                                                 * used as the last seen fd if any fd gets deleted in
@@ -2481,9 +2565,39 @@ restart:
                                                 */
                                                last_notdel_data_fd = pollfd;
                                        }
+                                       goto put_ctrl_connection;
+                               }
+                               assert(ctrl_conn->type == RELAY_CONTROL);
+
+                               if (revents & LPOLLIN) {
+                                       ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock,
+                                                       &recv_hdr, sizeof(recv_hdr), 0);
+                                       if (ret <= 0) {
+                                               /* Connection closed */
+                                               relay_thread_close_connection(&events, pollfd,
+                                                               ctrl_conn);
+                                       } else {
+                                               ret = relay_process_control(&recv_hdr, ctrl_conn);
+                                               if (ret < 0) {
+                                                       /* Clear the session on error. */
+                                                       relay_thread_close_connection(&events,
+                                                                       pollfd, ctrl_conn);
+                                               }
+                                               seen_control = 1;
+                                       }
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       relay_thread_close_connection(&events,
+                                                       pollfd, ctrl_conn);
+                                       if (last_seen_data_fd == pollfd) {
+                                               last_seen_data_fd = last_notdel_data_fd;
+                                       }
                                } else {
-                                       ERR("Unknown poll events %u for sock %d", revents, pollfd);
+                                       ERR("Unexpected poll events %u for control sock %d",
+                                                       revents, pollfd);
+                                       connection_put(ctrl_conn);
+                                       goto error;
                                }
+                       put_ctrl_connection:
                                connection_put(ctrl_conn);
                        }
                }
@@ -2533,17 +2647,17 @@ restart:
                                /* Skip it. Might be removed before. */
                                continue;
                        }
+                       if (data_conn->type == RELAY_CONTROL) {
+                               goto put_data_connection;
+                       }
+                       assert(data_conn->type == RELAY_DATA);
 
                        if (revents & LPOLLIN) {
-                               if (data_conn->type != RELAY_DATA) {
-                                       goto put_connection;
-                               }
-
                                ret = relay_process_data(data_conn);
                                /* Connection closed */
                                if (ret < 0) {
                                        relay_thread_close_connection(&events, pollfd,
-                                               data_conn);
+                                                       data_conn);
                                        /*
                                         * Every goto restart call sets the last seen fd where
                                         * here we don't really care since we gracefully
@@ -2555,8 +2669,14 @@ restart:
                                        connection_put(data_conn);
                                        goto restart;
                                }
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               relay_thread_close_connection(&events, pollfd,
+                                               data_conn);
+                       } else {
+                               ERR("Unknown poll events %u for data sock %d",
+                                               revents, pollfd);
                        }
-               put_connection:
+               put_data_connection:
                        connection_put(data_conn);
                }
                last_seen_data_fd = -1;
This page took 0.028962 seconds and 4 git commands to generate.