Truncate exclusion names to have a terminal '\0'
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 6f69b1058a958720bb1e86b4868fcdb259bea417..0cf2665161251a71ce1351e76c5f997598c5251d 100644 (file)
@@ -17,7 +17,6 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <getopt.h>
 #include <grp.h>
@@ -40,7 +39,6 @@
 #include <urcu/rculist.h>
 #include <unistd.h>
 #include <fcntl.h>
-#include <config.h>
 
 #include <lttng/lttng.h>
 #include <common/common.h>
@@ -307,24 +305,11 @@ int make_viewer_streams(struct relay_session *session,
                                continue;
                        }
                        /*
-                        * stream published is protected by the session
-                        * lock.
+                        * stream published is protected by the session lock.
                         */
                        if (!stream->published) {
                                goto next;
                        }
-                       /*
-                        * Stream has no data, don't consider it yet.
-                        */
-                       if (stream->is_metadata) {
-                               if (!stream->metadata_received) {
-                                       goto next;
-                               }
-                       } else {
-                               if (stream->prev_seq == -1ULL) {
-                                       goto next;
-                               }
-                       }
                        vstream = viewer_stream_get_by_id(stream->stream_handle);
                        if (!vstream) {
                                vstream = viewer_stream_create(stream, seek_t);
@@ -555,10 +540,7 @@ restart:
                                goto exit;
                        }
 
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("socket poll error");
-                               goto error;
-                       } else if (revents & LPOLLIN) {
+                       if (revents & LPOLLIN) {
                                /*
                                 * A new connection is requested, therefore a
                                 * viewer connection is allocated in this
@@ -601,6 +583,12 @@ restart:
                                 * exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&viewer_conn_queue.futex);
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("socket poll error");
+                               goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
@@ -1130,7 +1118,7 @@ static int try_open_index(struct relay_viewer_stream *vstream,
        /*
         * First time, we open the index file and at least one index is ready.
         */
-       if (rstream->total_index_received == 0) {
+       if (rstream->index_received_seqcount == 0) {
                ret = -ENOENT;
                goto end;
        }
@@ -1172,14 +1160,14 @@ static int check_index_status(struct relay_viewer_stream *vstream,
        int ret;
 
        if (trace->session->connection_closed
-                       && rstream->total_index_received
-                               == vstream->last_sent_index) {
+                       && rstream->index_received_seqcount
+                               == vstream->index_sent_seqcount) {
                /* Last index sent and session connection is closed. */
                index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
-                       rstream->total_index_received
-                               == vstream->last_sent_index) {
+                       rstream->index_received_seqcount
+                               == vstream->index_sent_seqcount) {
                /*
                 * We've received a synchronization beacon and the last index
                 * available has been sent, the index for now is inactive.
@@ -1193,21 +1181,24 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
                index->stream_id = htobe64(rstream->ctf_stream_id);
                goto index_ready;
-       } else if (rstream->total_index_received <= vstream->last_sent_index) {
+       } else if (rstream->index_received_seqcount
+                       == vstream->index_sent_seqcount) {
                /*
-                * This actually checks the case where recv == last_sent.
-                * In this case, we have not received a beacon. Therefore, we
-                * can only ask the client to retry later.
+                * This checks whether received == sent seqcount. In
+                * this case, we have not received a beacon. Therefore,
+                * we can only ask the client to retry later.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                goto index_ready;
-       } else if (!viewer_stream_is_tracefile_seq_readable(vstream,
-                       vstream->current_tracefile_seq)) {
+       } else if (!tracefile_array_seq_in_file(rstream->tfa,
+                       vstream->current_tracefile_id,
+                       vstream->index_sent_seqcount)) {
                /*
-                * The producer has overwritten our current file. We
-                * need to rotate.
+                * The next index we want to send cannot be read either
+                * because we need to perform a rotation, or due to
+                * the producer having overwritten its trace file.
                 */
-               DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
+               DBG("Viewer stream %" PRIu64 " rotation",
                                vstream->stream->stream_handle);
                ret = viewer_stream_rotate(vstream);
                if (ret < 0) {
@@ -1217,50 +1208,34 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                        index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                        goto hup;
                }
-               assert(viewer_stream_is_tracefile_seq_readable(vstream,
-                       vstream->current_tracefile_seq));
-               /* ret == 0 means successful so we continue. */
-               ret = 0;
-       } else {
-               ssize_t read_ret;
-               char tmp[1];
-
                /*
-                * Use EOF on current index file to find out when we
-                * need to rotate.
+                * If we have been pushed due to overwrite, it
+                * necessarily means there is data that can be read in
+                * the stream. If we rotated because we reached the end
+                * of a tracefile, it means the following tracefile
+                * needs to contain at least one index, else we would
+                * have already returned LTTNG_VIEWER_INDEX_RETRY to the
+                * viewer. The updated index_sent_seqcount needs to
+                * point to a readable index entry now.
+                *
+                * In the case where we "rotate" on a single file, we
+                * can end up in a case where the requested index is
+                * still unavailable.
                 */
-               read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
-               if (read_ret == 1) {
-                       off_t seek_ret;
-
-                       /* There is still data to read. Rewind position. */
-                       seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
-                       if (seek_ret < 0) {
-                               ret = -1;
-                               goto end;
-                       }
-                       ret = 0;
-               } else if (read_ret == 0) {
-                       /* EOF. We need to rotate. */
-                       DBG("Viewer stream %" PRIu64 " rotation due to EOF",
-                                       vstream->stream->stream_handle);
-                       ret = viewer_stream_rotate(vstream);
-                       if (ret < 0) {
-                               goto end;
-                       } else if (ret == 1) {
-                               /* EOF across entire stream. */
-                               index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
-                               goto hup;
-                       }
-                       assert(viewer_stream_is_tracefile_seq_readable(vstream,
-                               vstream->current_tracefile_seq));
-                       /* ret == 0 means successful so we continue. */
-                       ret = 0;
-               } else {
-                       /* Error reading index. */
-                       ret = -1;
+               if (rstream->tracefile_count == 1 &&
+                               !tracefile_array_seq_in_file(
+                                       rstream->tfa,
+                                       vstream->current_tracefile_id,
+                                       vstream->index_sent_seqcount)) {
+                       index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       goto index_ready;
                }
+               assert(tracefile_array_seq_in_file(rstream->tfa,
+                               vstream->current_tracefile_id,
+                               vstream->index_sent_seqcount));
        }
+       /* ret == 0 means successful so we continue. */
+       ret = 0;
 end:
        return ret;
 
@@ -1303,6 +1278,8 @@ int viewer_get_next_index(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
        if (!vstream) {
+               DBG("Client requested index of unknown stream id %" PRIu64,
+                               be64toh(request_index.stream_id));
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                goto send_reply;
        }
@@ -1409,7 +1386,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        } else {
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
-               vstream->last_sent_index++;
+               vstream->index_sent_seqcount++;
        }
 
        /*
@@ -1454,9 +1431,11 @@ send_reply:
        }
        health_code_update();
 
-       DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
-                       vstream->last_sent_index,
-                       vstream->stream->stream_handle);
+       if (vstream) {
+               DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
+                               vstream->index_sent_seqcount,
+                               vstream->stream->stream_handle);
+       }
 end:
        if (metadata_viewer_stream) {
                viewer_stream_put(metadata_viewer_stream);
@@ -1507,6 +1486,8 @@ int viewer_get_packet(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
        if (!vstream) {
+               DBG("Client requested packet of unknown stream id %" PRIu64,
+                               be64toh(get_packet_info.stream_id));
                reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
                goto send_reply_nolock;
        }
@@ -1618,6 +1599,8 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * Reply back to the client with an error if we cannot
                 * find it.
                 */
+               DBG("Client requested metadata of unknown stream id %" PRIu64,
+                               be64toh(request.stream_id));
                reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
@@ -1926,10 +1909,7 @@ restart:
 
                        /* Inspect the relay conn pipe for new connection. */
                        if (pollfd == live_conn_pipe[0]) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       ERR("Relay live pipe error");
-                                       goto error;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        struct relay_connection *conn;
 
                                        ret = lttng_read(live_conn_pipe[0],
@@ -1941,6 +1921,12 @@ restart:
                                                        LPOLLIN | LPOLLRDHUP);
                                        connection_ht_add(viewer_connections_ht, conn);
                                        DBG("Connection socket %d added to poll", conn->sock->fd);
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("Relay live pipe error");
+                                       goto error;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        } else {
                                /* Connection activity. */
@@ -1951,11 +1937,7 @@ restart:
                                        continue;
                                }
 
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       cleanup_connection_pollfd(&events, pollfd);
-                                       /* Put "create" ownership reference. */
-                                       connection_put(conn);
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
                                                        sizeof(recv_hdr), 0);
                                        if (ret <= 0) {
@@ -1974,6 +1956,14 @@ restart:
                                                        DBG("Viewer connection closed with %d", pollfd);
                                                }
                                        }
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       cleanup_connection_pollfd(&events, pollfd);
+                                       /* Put "create" ownership reference. */
+                                       connection_put(conn);
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       connection_put(conn);
+                                       goto error;
                                }
                                /* Put local "get_by_sock" reference. */
                                connection_put(conn);
This page took 0.027464 seconds and 4 git commands to generate.