Fix: remove unused session id map
[lttng-tools.git] / src / common / consumer.c
index 16521a8b863788fd957b756d7c4882f6b3c5abbd..27dfe32b8592487e6c5e881604a650085240f536 100644 (file)
@@ -63,21 +63,6 @@ volatile int consumer_quit;
 static struct lttng_ht *metadata_ht;
 static struct lttng_ht *data_ht;
 
-/*
- * This hash table contains the mapping between the session id of the sessiond
- * and the relayd session id. Element of the ht are indexed by sessiond session
- * id.
- *
- * Node can be added when a relayd communication is opened in the sessiond
- * thread.
- *
- * Note that a session id of the session daemon is unique to a tracing session
- * and not to a domain session. However, a domain session has one consumer
- * which forces the 1-1 mapping between a consumer and a domain session (ex:
- * UST). This means that we can't have duplicate in this ht.
- */
-static struct lttng_ht *relayd_session_id_ht;
-
 /*
  * Notify a thread pipe to poll back again. This usually means that some global
  * state has changed so we just send back the thread in a poll wait call.
@@ -231,7 +216,6 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret;
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
 
        if (relayd == NULL) {
                return;
@@ -239,22 +223,6 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 
        DBG("Consumer destroy and close relayd socket pair");
 
-       /* Loockup for a relayd node in the session id map hash table. */
-       lttng_ht_lookup(relayd_session_id_ht,
-                       (void *)((unsigned long) relayd->sessiond_session_id), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
-       if (node == NULL) {
-               /* We assume the relayd is being or is destroyed */
-               return;
-       }
-
-       /*
-        * Try to delete it from the relayd session id ht. The return value is of
-        * no importance since either way we are going to try to delete the relayd
-        * from the global relayd_ht.
-        */
-       lttng_ht_del(relayd_session_id_ht, &iter);
-
        iter.iter.node = &relayd->node.node;
        ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
        if (ret != 0) {
@@ -283,8 +251,6 @@ static void cleanup_relayd_ht(void)
        }
 
        lttng_ht_destroy(consumer_data.relayd_ht);
-       /* The destroy_relayd call makes sure that this ht is empty here. */
-       lttng_ht_destroy(relayd_session_id_ht);
 
        rcu_read_unlock();
 }
@@ -782,6 +748,13 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
                data_hdr.stream_id = htobe64(stream->relayd_stream_id);
                data_hdr.data_size = htobe32(data_size);
                data_hdr.padding_size = htobe32(padding);
+               /*
+                * Note that net_seq_num below is assigned with the *current* value of
+                * next_net_seq_num and only after that the next_net_seq_num will be
+                * increment. This is why when issuing a command on the relayd using
+                * this next value, 1 should always be substracted in order to compare
+                * the last seen sequence number on the relayd side to the last sent.
+                */
                data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
                /* Other fields are zeroed previously */
 
@@ -1099,7 +1072,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
        do {
                ret = write(ctx->consumer_should_quit[1], "4", 1);
        } while (ret < 0 && errno == EINTR);
-       if (ret < 0) {
+       if (ret < 0 || ret != 1) {
                PERROR("write consumer quit");
        }
 
@@ -1317,8 +1290,22 @@ static int write_relayd_metadata_id(int fd,
        do {
                ret = write(fd, (void *) &hdr, sizeof(hdr));
        } while (ret < 0 && errno == EINTR);
-       if (ret < 0) {
-               PERROR("write metadata stream id");
+       if (ret < 0 || ret != sizeof(hdr)) {
+               /*
+                * This error means that the fd's end is closed so ignore the perror
+                * not to clubber the error output since this can happen in a normal
+                * code path.
+                */
+               if (errno != EPIPE) {
+                       PERROR("write metadata stream id");
+               }
+               DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
+               /*
+                * Set ret to a negative value because if ret != sizeof(hdr), we don't
+                * handle writting the missing part so report that as an error and
+                * don't lie to the caller.
+                */
+               ret = -1;
                goto end;
        }
        DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
@@ -1435,7 +1422,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                } while (ret < 0 && errno == EINTR);
                DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
                if (ret < 0) {
-                       PERROR("Error in file write");
+                       /*
+                        * This is possible if the fd is closed on the other side (outfd)
+                        * or any write problem. It can be verbose a bit for a normal
+                        * execution if for instance the relayd is stopped abruptly. This
+                        * can happen so set this to a DBG statement.
+                        */
+                       DBG("Error in file write mmap");
                        if (written == 0) {
                                written = ret;
                        }
@@ -2101,17 +2094,13 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Metadata main loop started");
 
        while (1) {
-               lttng_poll_reset(&events);
-
-               nb_fd = LTTNG_POLL_GETNB(&events);
-
                /* Only the metadata pipe is set */
-               if (nb_fd == 0 && consumer_quit == 1) {
+               if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
                        goto end;
                }
 
 restart:
-               DBG("Metadata poll wait with %d fd(s)", nb_fd);
+               DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
                ret = lttng_poll_wait(&events, -1);
                DBG("Metadata event catched in thread");
                if (ret < 0) {
@@ -2122,6 +2111,8 @@ restart:
                        goto error;
                }
 
+               nb_fd = ret;
+
                /* From here, the event is a metadata wait fd */
                for (i = 0; i < nb_fd; i++) {
                        revents = LTTNG_POLL_GETEV(&events, i);
@@ -2366,6 +2357,11 @@ void *consumer_thread_data_poll(void *data)
                                pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
                                                sizeof(new_stream));
                        } while (pipe_readlen == -1 && errno == EINTR);
+                       if (pipe_readlen < 0) {
+                               PERROR("read consumer data pipe");
+                               /* Continue so we can at least handle the current stream(s). */
+                               continue;
+                       }
 
                        /*
                         * If the stream is NULL, just ignore it. It's also possible that
@@ -2573,7 +2569,7 @@ void *consumer_thread_sessiond_poll(void *data)
 
        /* Blocking call, waiting for transmission */
        sock = lttcomm_accept_unix_sock(client_socket);
-       if (sock <= 0) {
+       if (sock < 0) {
                WARN("On accept");
                goto end;
        }
@@ -2700,7 +2696,6 @@ void lttng_consumer_init(void)
        consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
-       relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 }
 
 /*
@@ -2717,7 +2712,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttng_error_code ret_code = LTTNG_OK;
        struct consumer_relayd_sock_pair *relayd;
-       struct consumer_relayd_session_id *relayd_id_node;
 
        DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
 
@@ -2735,6 +2729,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
                if (relayd == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+                       ret = -1;
                        goto error;
                }
                relayd->sessiond_session_id = (uint64_t) sessiond_id;
@@ -2784,32 +2779,21 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                relayd->control_sock.fd = fd;
 
                /*
-                * Create a session on the relayd and store the returned id. No need to
-                * grab the socket lock since the relayd object is not yet visible.
+                * Create a session on the relayd and store the returned id. Lock the
+                * control socket mutex if the relayd was NOT created before.
                 */
+               if (!relayd_created) {
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               }
                ret = relayd_create_session(&relayd->control_sock,
                                &relayd->relayd_session_id);
-               if (ret < 0) {
-                       goto error;
+               if (!relayd_created) {
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                }
-
-               /* Set up a relayd session id node. */
-               relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id));
-               if (!relayd_id_node) {
-                       PERROR("zmalloc relayd id node");
+               if (ret < 0) {
                        goto error;
                }
 
-               relayd_id_node->relayd_id = relayd->relayd_session_id;
-               relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
-
-               /* Indexed by session id of the sessiond. */
-               lttng_ht_node_init_ulong(&relayd_id_node->node,
-                               relayd_id_node->sessiond_id);
-               rcu_read_lock();
-               lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node);
-               rcu_read_unlock();
-
                break;
        case LTTNG_STREAM_DATA:
                /* Copy received lttcomm socket */
@@ -2831,6 +2815,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                break;
        default:
                ERR("Unknown relayd socket type (%d)", sock_type);
+               ret = -1;
                goto error;
        }
 
@@ -2902,30 +2887,24 @@ end:
 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
 {
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
        struct consumer_relayd_sock_pair *relayd = NULL;
-       struct consumer_relayd_session_id *session_id_map;
-
-       /* Get the session id map. */
-       lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
-       if (node == NULL) {
-               goto end;
-       }
-
-       session_id_map = caa_container_of(node, struct consumer_relayd_session_id,
-                       node);
 
        /* Iterate over all relayd since they are indexed by net_seq_idx. */
        cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
                        node.node) {
-               if (relayd->relayd_session_id == session_id_map->relayd_id) {
+               /*
+                * Check by sessiond id which is unique here where the relayd session
+                * id might not be when having multiple relayd.
+                */
+               if (relayd->sessiond_session_id == id) {
                        /* Found the relayd. There can be only one per id. */
-                       break;
+                       goto found;
                }
        }
 
-end:
+       return NULL;
+
+found:
        return relayd;
 }
 
@@ -3008,10 +2987,12 @@ int consumer_data_pending(uint64_t id)
                if (relayd) {
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        if (stream->metadata_flag) {
-                               ret = relayd_quiescent_control(&relayd->control_sock);
+                               ret = relayd_quiescent_control(&relayd->control_sock,
+                                               stream->relayd_stream_id);
                        } else {
                                ret = relayd_data_pending(&relayd->control_sock,
-                                               stream->relayd_stream_id, stream->next_net_seq_num);
+                                               stream->relayd_stream_id,
+                                               stream->next_net_seq_num - 1);
                        }
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret == 1) {
@@ -3030,10 +3011,12 @@ int consumer_data_pending(uint64_t id)
                ret = relayd_end_data_pending(&relayd->control_sock,
                                relayd->relayd_session_id, &is_data_inflight);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0 || !is_data_inflight) {
-                       /* On error or if NO data inflight, no data is pending. */
+               if (ret < 0) {
                        goto data_not_pending;
                }
+               if (is_data_inflight) {
+                       goto data_pending;
+               }
        }
 
        /*
This page took 0.026594 seconds and 4 git commands to generate.