Truncate exclusion names to have a terminal '\0'
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 888ec2d4f616d10f5d4720bfbea05158361d48bf..0cf2665161251a71ce1351e76c5f997598c5251d 100644 (file)
@@ -17,7 +17,6 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <getopt.h>
 #include <grp.h>
@@ -40,7 +39,6 @@
 #include <urcu/rculist.h>
 #include <unistd.h>
 #include <fcntl.h>
-#include <config.h>
 
 #include <lttng/lttng.h>
 #include <common/common.h>
@@ -307,8 +305,7 @@ int make_viewer_streams(struct relay_session *session,
                                continue;
                        }
                        /*
-                        * stream published is protected by the session
-                        * lock.
+                        * stream published is protected by the session lock.
                         */
                        if (!stream->published) {
                                goto next;
@@ -327,17 +324,34 @@ int make_viewer_streams(struct relay_session *session,
                                        /* Update number of created stream counter. */
                                        (*nb_created)++;
                                }
+                               /*
+                                * Ensure a self-reference is preserved even
+                                * after we have put our local reference.
+                                */
+                               viewer_stream_get(vstream);
                        } else {
                                if (!vstream->sent_flag && nb_unsent) {
                                        /* Update number of unsent stream counter. */
                                        (*nb_unsent)++;
                                }
-                               viewer_stream_put(vstream);
                        }
                        /* Update number of total stream counter. */
                        if (nb_total) {
-                               (*nb_total)++;
+                               if (stream->is_metadata) {
+                                       if (!stream->closed ||
+                                                       stream->metadata_received > vstream->metadata_sent) {
+                                               (*nb_total)++;
+                                       }
+                               } else {
+                                       if (!stream->closed ||
+                                               !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+
+                                               (*nb_total)++;
+                                       }
+                               }
                        }
+                       /* Put local reference. */
+                       viewer_stream_put(vstream);
                next:
                        stream_put(stream);
                }
@@ -526,10 +540,7 @@ restart:
                                goto exit;
                        }
 
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("socket poll error");
-                               goto error;
-                       } else if (revents & LPOLLIN) {
+                       if (revents & LPOLLIN) {
                                /*
                                 * A new connection is requested, therefore a
                                 * viewer connection is allocated in this
@@ -572,6 +583,12 @@ restart:
                                 * exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&viewer_conn_queue.futex);
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("socket poll error");
+                               goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
@@ -911,7 +928,8 @@ int viewer_get_new_streams(struct relay_connection *conn)
        response.streams_count = htobe32(nb_streams);
 
        /*
-        * If the session is closed, HUP when there are no more streams.
+        * If the session is closed, HUP when there are no more streams
+        * with data.
         */
        if (closed && nb_total == 0) {
                send_streams = 0;
@@ -1100,7 +1118,7 @@ static int try_open_index(struct relay_viewer_stream *vstream,
        /*
         * First time, we open the index file and at least one index is ready.
         */
-       if (rstream->total_index_received == 0) {
+       if (rstream->index_received_seqcount == 0) {
                ret = -ENOENT;
                goto end;
        }
@@ -1142,14 +1160,14 @@ static int check_index_status(struct relay_viewer_stream *vstream,
        int ret;
 
        if (trace->session->connection_closed
-                       && rstream->total_index_received
-                               == vstream->last_sent_index) {
+                       && rstream->index_received_seqcount
+                               == vstream->index_sent_seqcount) {
                /* Last index sent and session connection is closed. */
                index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
-                       rstream->total_index_received
-                               == vstream->last_sent_index) {
+                       rstream->index_received_seqcount
+                               == vstream->index_sent_seqcount) {
                /*
                 * We've received a synchronization beacon and the last index
                 * available has been sent, the index for now is inactive.
@@ -1163,21 +1181,24 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
                index->stream_id = htobe64(rstream->ctf_stream_id);
                goto index_ready;
-       } else if (rstream->total_index_received <= vstream->last_sent_index) {
+       } else if (rstream->index_received_seqcount
+                       == vstream->index_sent_seqcount) {
                /*
-                * This actually checks the case where recv == last_sent.
-                * In this case, we have not received a beacon. Therefore, we
-                * can only ask the client to retry later.
+                * This checks whether received == sent seqcount. In
+                * this case, we have not received a beacon. Therefore,
+                * we can only ask the client to retry later.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                goto index_ready;
-       } else if (!viewer_stream_is_tracefile_seq_readable(vstream,
-                       vstream->current_tracefile_seq)) {
+       } else if (!tracefile_array_seq_in_file(rstream->tfa,
+                       vstream->current_tracefile_id,
+                       vstream->index_sent_seqcount)) {
                /*
-                * The producer has overwritten our current file. We
-                * need to rotate.
+                * The next index we want to send cannot be read either
+                * because we need to perform a rotation, or due to
+                * the producer having overwritten its trace file.
                 */
-               DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
+               DBG("Viewer stream %" PRIu64 " rotation",
                                vstream->stream->stream_handle);
                ret = viewer_stream_rotate(vstream);
                if (ret < 0) {
@@ -1187,50 +1208,34 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                        index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                        goto hup;
                }
-               assert(viewer_stream_is_tracefile_seq_readable(vstream,
-                       vstream->current_tracefile_seq));
-               /* ret == 0 means successful so we continue. */
-               ret = 0;
-       } else {
-               ssize_t read_ret;
-               char tmp[1];
-
                /*
-                * Use EOF on current index file to find out when we
-                * need to rotate.
+                * If we have been pushed due to overwrite, it
+                * necessarily means there is data that can be read in
+                * the stream. If we rotated because we reached the end
+                * of a tracefile, it means the following tracefile
+                * needs to contain at least one index, else we would
+                * have already returned LTTNG_VIEWER_INDEX_RETRY to the
+                * viewer. The updated index_sent_seqcount needs to
+                * point to a readable index entry now.
+                *
+                * In the case where we "rotate" on a single file, we
+                * can end up in a case where the requested index is
+                * still unavailable.
                 */
-               read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
-               if (read_ret == 1) {
-                       off_t seek_ret;
-
-                       /* There is still data to read. Rewind position. */
-                       seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
-                       if (seek_ret < 0) {
-                               ret = -1;
-                               goto end;
-                       }
-                       ret = 0;
-               } else if (read_ret == 0) {
-                       /* EOF. We need to rotate. */
-                       DBG("Viewer stream %" PRIu64 " rotation due to EOF",
-                                       vstream->stream->stream_handle);
-                       ret = viewer_stream_rotate(vstream);
-                       if (ret < 0) {
-                               goto end;
-                       } else if (ret == 1) {
-                               /* EOF across entire stream. */
-                               index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
-                               goto hup;
-                       }
-                       assert(viewer_stream_is_tracefile_seq_readable(vstream,
-                               vstream->current_tracefile_seq));
-                       /* ret == 0 means successful so we continue. */
-                       ret = 0;
-               } else {
-                       /* Error reading index. */
-                       ret = -1;
+               if (rstream->tracefile_count == 1 &&
+                               !tracefile_array_seq_in_file(
+                                       rstream->tfa,
+                                       vstream->current_tracefile_id,
+                                       vstream->index_sent_seqcount)) {
+                       index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       goto index_ready;
                }
+               assert(tracefile_array_seq_in_file(rstream->tfa,
+                               vstream->current_tracefile_id,
+                               vstream->index_sent_seqcount));
        }
+       /* ret == 0 means successful so we continue. */
+       ret = 0;
 end:
        return ret;
 
@@ -1273,8 +1278,10 @@ int viewer_get_next_index(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
        if (!vstream) {
-               ret = -1;
-               goto end;
+               DBG("Client requested index of unknown stream id %" PRIu64,
+                               be64toh(request_index.stream_id));
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+               goto send_reply;
        }
 
        /* Use back. ref. Protected by refcounts. */
@@ -1379,7 +1386,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        } else {
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
-               vstream->last_sent_index++;
+               vstream->index_sent_seqcount++;
        }
 
        /*
@@ -1397,7 +1404,9 @@ int viewer_get_next_index(struct relay_connection *conn)
        viewer_index.stream_id = packet_index.stream_id;
 
 send_reply:
-       pthread_mutex_unlock(&rstream->lock);
+       if (rstream) {
+               pthread_mutex_unlock(&rstream->lock);
+       }
 
        if (metadata_viewer_stream) {
                pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
@@ -1422,9 +1431,11 @@ send_reply:
        }
        health_code_update();
 
-       DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
-                       vstream->last_sent_index,
-                       vstream->stream->stream_handle);
+       if (vstream) {
+               DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
+                               vstream->index_sent_seqcount,
+                               vstream->stream->stream_handle);
+       }
 end:
        if (metadata_viewer_stream) {
                viewer_stream_put(metadata_viewer_stream);
@@ -1458,8 +1469,6 @@ int viewer_get_packet(struct relay_connection *conn)
        struct lttng_viewer_get_packet get_packet_info;
        struct lttng_viewer_trace_packet reply;
        struct relay_viewer_stream *vstream = NULL;
-       struct ctf_trace *ctf_trace;
-       struct relay_viewer_stream *metadata_viewer_stream = NULL;
 
        DBG2("Relay get data packet");
 
@@ -1477,60 +1486,13 @@ int viewer_get_packet(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
        if (!vstream) {
-               goto error;
-       }
-
-       ctf_trace = vstream->stream->trace;
-
-       /* metadata_viewer_stream may be NULL. */
-       metadata_viewer_stream =
-                       ctf_trace_get_viewer_metadata_stream(ctf_trace);
-
-       if (metadata_viewer_stream) {
-               bool get_packet_err = false;
-
-               pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
-               DBG("get packet metadata check: recv %" PRIu64 " sent %" PRIu64,
-                       metadata_viewer_stream->stream->metadata_received,
-                       metadata_viewer_stream->metadata_sent);
-               if (!metadata_viewer_stream->stream->metadata_received ||
-                               metadata_viewer_stream->stream->metadata_received >
-                                       metadata_viewer_stream->metadata_sent) {
-                       /*
-                        * We prevent the client from reading a data stream as
-                        * long as there is metadata left to consume. This
-                        * ensures that the client won't receive data of which
-                        * it can't make sense.
-                        */
-                       get_packet_err = true;
-               }
-               pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
-               viewer_stream_put(metadata_viewer_stream);
-               if (get_packet_err) {
-                       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
-                       reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
-                       goto send_reply_nolock;
-               }
+               DBG("Client requested packet of unknown stream id %" PRIu64,
+                               be64toh(get_packet_info.stream_id));
+               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+               goto send_reply_nolock;
        }
 
        pthread_mutex_lock(&vstream->stream->lock);
-       /*
-        * The vstream->stream_fd used here has been opened by
-        * get_next_index. It is opened there because this is what
-        * allows us to grab a reference to the file with stream lock
-        * held, thus protecting us against overwrite caused by
-        * tracefile rotation. Since tracefile rotation unlinks the old
-        * data file, we are ensured that we won't have our data
-        * overwritten under us.
-        */
-       ret = check_new_streams(conn);
-       if (ret < 0) {
-               goto end_free;
-       } else if (ret == 1) {
-               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
-               reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
-               goto send_reply;
-       }
 
        len = be32toh(get_packet_info.len);
        data = zmalloc(len);
@@ -1629,7 +1591,17 @@ int viewer_get_metadata(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(request.stream_id));
        if (!vstream) {
-               reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
+               /*
+                * The metadata stream can be closed by a CLOSE command
+                * just before we attach. It can also be closed by
+                * per-pid tracing during tracing. Therefore, it is
+                * possible that we cannot find this viewer stream.
+                * Reply back to the client with an error if we cannot
+                * find it.
+                */
+               DBG("Client requested metadata of unknown stream id %" PRIu64,
+                               be64toh(request.stream_id));
+               reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
        pthread_mutex_lock(&vstream->stream->lock);
@@ -1937,10 +1909,7 @@ restart:
 
                        /* Inspect the relay conn pipe for new connection. */
                        if (pollfd == live_conn_pipe[0]) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       ERR("Relay live pipe error");
-                                       goto error;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        struct relay_connection *conn;
 
                                        ret = lttng_read(live_conn_pipe[0],
@@ -1952,6 +1921,12 @@ restart:
                                                        LPOLLIN | LPOLLRDHUP);
                                        connection_ht_add(viewer_connections_ht, conn);
                                        DBG("Connection socket %d added to poll", conn->sock->fd);
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("Relay live pipe error");
+                                       goto error;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        } else {
                                /* Connection activity. */
@@ -1962,11 +1937,7 @@ restart:
                                        continue;
                                }
 
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       cleanup_connection_pollfd(&events, pollfd);
-                                       /* Put "create" ownership reference. */
-                                       connection_put(conn);
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
                                                        sizeof(recv_hdr), 0);
                                        if (ret <= 0) {
@@ -1985,6 +1956,14 @@ restart:
                                                        DBG("Viewer connection closed with %d", pollfd);
                                                }
                                        }
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       cleanup_connection_pollfd(&events, pollfd);
+                                       /* Put "create" ownership reference. */
+                                       connection_put(conn);
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       connection_put(conn);
+                                       goto error;
                                }
                                /* Put local "get_by_sock" reference. */
                                connection_put(conn);
This page took 0.02894 seconds and 4 git commands to generate.