Fix: race with the viewer and readiness of streams
[lttng-tools.git] / src / common / consumer.c
index 187c0c30cdf003741f053417e1c9209c64b80713..f47d8de1b2669665a4a138a8b0fcbcdb65dc3b3d 100644 (file)
@@ -96,12 +96,10 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
 
 static void notify_health_quit_pipe(int *pipe)
 {
-       int ret;
+       ssize_t ret;
 
-       do {
-               ret = write(pipe[1], "4", 1);
-       } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != 1) {
+       ret = lttng_write(pipe[1], "4", 1);
+       if (ret < 1) {
                PERROR("write consumer health quit");
        }
 }
@@ -112,16 +110,17 @@ static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
                enum consumer_channel_action action)
 {
        struct consumer_channel_msg msg;
-       int ret;
+       ssize_t ret;
 
        memset(&msg, 0, sizeof(msg));
 
        msg.action = action;
        msg.chan = chan;
        msg.key = key;
-       do {
-               ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
-       } while (ret < 0 && errno == EINTR);
+       ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
+       if (ret < sizeof(msg)) {
+               PERROR("notify_channel_pipe write error");
+       }
 }
 
 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
@@ -136,17 +135,18 @@ static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
                enum consumer_channel_action *action)
 {
        struct consumer_channel_msg msg;
-       int ret;
+       ssize_t ret;
 
-       do {
-               ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
-       } while (ret < 0 && errno == EINTR);
-       if (ret > 0) {
-               *action = msg.action;
-               *chan = msg.chan;
-               *key = msg.key;
+       ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
+       if (ret < sizeof(msg)) {
+               ret = -1;
+               goto error;
        }
-       return ret;
+       *action = msg.action;
+       *chan = msg.chan;
+       *key = msg.key;
+error:
+       return (int) ret;
 }
 
 /*
@@ -768,6 +768,44 @@ end:
        return ret;
 }
 
+/*
+ * Find a relayd and send the streams sent message
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
+{
+       int ret = 0;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(net_seq_idx != -1ULL);
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(net_seq_idx);
+       if (relayd != NULL) {
+               /* Add stream on the relayd */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_streams_sent(&relayd->control_sock);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       goto end;
+               }
+       } else {
+               ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
+                               net_seq_idx);
+               ret = -1;
+               goto end;
+       }
+
+       ret = 0;
+       DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
 /*
  * Find a relayd and close the stream
  */
@@ -879,7 +917,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->uid = uid;
        channel->gid = gid;
        channel->relayd_id = relayd_id;
-       channel->output = output;
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
@@ -887,6 +924,20 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        pthread_mutex_init(&channel->lock, NULL);
        pthread_mutex_init(&channel->timer_lock, NULL);
 
+       switch (output) {
+       case LTTNG_EVENT_SPLICE:
+               channel->output = CONSUMER_CHANNEL_SPLICE;
+               break;
+       case LTTNG_EVENT_MMAP:
+               channel->output = CONSUMER_CHANNEL_MMAP;
+               break;
+       default:
+               assert(0);
+               free(channel);
+               channel = NULL;
+               goto end;
+       }
+
        /*
         * In monitor mode, the streams associated with the channel will be put in
         * a special list ONLY owned by this channel. So, the refcount is set to 1
@@ -1117,12 +1168,11 @@ void lttng_consumer_cleanup(void)
  */
 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
 {
-       int ret;
+       ssize_t ret;
+
        consumer_quit = 1;
-       do {
-               ret = write(ctx->consumer_should_quit[1], "4", 1);
-       } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != 1) {
+       ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
+       if (ret < 1) {
                PERROR("write consumer quit");
        }
 
@@ -1296,15 +1346,13 @@ static int write_relayd_metadata_id(int fd,
                struct lttng_consumer_stream *stream,
                struct consumer_relayd_sock_pair *relayd, unsigned long padding)
 {
-       int ret;
+       ssize_t ret;
        struct lttcomm_relayd_metadata_payload hdr;
 
        hdr.stream_id = htobe64(stream->relayd_stream_id);
        hdr.padding_size = htobe32(padding);
-       do {
-               ret = write(fd, (void *) &hdr, sizeof(hdr));
-       } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != sizeof(hdr)) {
+       ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
+       if (ret < sizeof(hdr)) {
                /*
                 * This error means that the fd's end is closed so ignore the perror
                 * not to clubber the error output since this can happen in a normal
@@ -1326,7 +1374,7 @@ static int write_relayd_metadata_id(int fd,
                        stream->relayd_stream_id, padding);
 
 end:
-       return ret;
+       return (int) ret;
 }
 
 /*
@@ -1344,7 +1392,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
                unsigned long padding,
-               struct lttng_packet_index *index)
+               struct ctf_packet_index *index)
 {
        unsigned long mmap_offset;
        void *mmap_base;
@@ -1482,11 +1530,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        }
 
        while (len > 0) {
-               do {
-                       ret = write(outfd, mmap_base + mmap_offset, len);
-               } while (ret < 0 && errno == EINTR);
+               ret = lttng_write(outfd, mmap_base + mmap_offset, len);
                DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
-               if (ret < 0) {
+               if (ret < len) {
                        /*
                         * This is possible if the fd is closed on the other side (outfd)
                         * or any write problem. It can be verbose a bit for a normal
@@ -1554,7 +1600,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
                unsigned long padding,
-               struct lttng_packet_index *index)
+               struct ctf_packet_index *index)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -2282,8 +2328,8 @@ restart:
 
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                        &stream, sizeof(stream));
-                                       if (pipe_len < 0) {
-                                               ERR("read metadata stream, ret: %zd", pipe_len);
+                                       if (pipe_len < sizeof(stream)) {
+                                               PERROR("read metadata stream");
                                                /*
                                                 * Continue here to handle the rest of the streams.
                                                 */
@@ -2517,8 +2563,8 @@ void *consumer_thread_data_poll(void *data)
                        DBG("consumer_data_pipe wake up");
                        pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
                                        &new_stream, sizeof(new_stream));
-                       if (pipe_readlen < 0) {
-                               ERR("Consumer data pipe ret %zd", pipe_readlen);
+                       if (pipe_readlen < sizeof(new_stream)) {
+                               PERROR("Consumer data pipe");
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
                        }
@@ -3231,7 +3277,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                uint64_t relayd_session_id)
 {
        int fd = -1, ret = -1, relayd_created = 0;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        assert(ctx);
@@ -3267,7 +3313,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        }
 
        /* First send a status message before receiving the fds. */
-       ret = consumer_send_status_msg(sock, LTTNG_OK);
+       ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
        if (ret < 0) {
                /* Somehow, the session daemon is not responding anymore. */
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
@@ -3628,9 +3674,9 @@ int consumer_send_status_channel(int sock,
        assert(sock >= 0);
 
        if (!channel) {
-               msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+               msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
        } else {
-               msg.ret_code = LTTNG_OK;
+               msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
                msg.key = channel->key;
                msg.stream_count = channel->streams.count;
        }
This page took 0.028508 seconds and 4 git commands to generate.