X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=16521a8b863788fd957b756d7c4882f6b3c5abbd;hp=16b9eb64f88ae006a742ade5d3d05b162d5bb0ef;hb=8fa0b55b4931b9b5294120b20259be73b0b87ad5;hpb=cb365c03b0b4efc0d3f0875be586a8f4f270aaf6 diff --git a/src/common/consumer.c b/src/common/consumer.c index 16b9eb64f..16521a8b8 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -56,15 +56,27 @@ struct lttng_consumer_global_data consumer_data = { volatile int consumer_quit; /* - * The following two hash tables are visible by all threads which are separated - * in different source files. - * * Global hash table containing respectively metadata and data streams. The * stream element in this ht should only be updated by the metadata poll thread * for the metadata and the data poll thread for the data. */ -struct lttng_ht *metadata_ht; -struct lttng_ht *data_ht; +static struct lttng_ht *metadata_ht; +static struct lttng_ht *data_ht; + +/* + * This hash table contains the mapping between the session id of the sessiond + * and the relayd session id. Element of the ht are indexed by sessiond session + * id. + * + * Node can be added when a relayd communication is opened in the sessiond + * thread. + * + * Note that a session id of the session daemon is unique to a tracing session + * and not to a domain session. However, a domain session has one consumer + * which forces the 1-1 mapping between a consumer and a domain session (ex: + * UST). This means that we can't have duplicate in this ht. + */ +static struct lttng_ht *relayd_session_id_ht; /* * Notify a thread pipe to poll back again. This usually means that some global @@ -130,6 +142,12 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht) rcu_read_unlock(); } +/* + * Return a channel object for the given key. + * + * RCU read side lock MUST be acquired before calling this function and + * protects the channel ptr. + */ static struct lttng_consumer_channel *consumer_find_channel(int key) { struct lttng_ht_iter iter; @@ -141,8 +159,6 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) return NULL; } - rcu_read_lock(); - lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); @@ -150,8 +166,6 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) channel = caa_container_of(node, struct lttng_consumer_channel, node); } - rcu_read_unlock(); - return channel; } @@ -217,6 +231,7 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) { int ret; struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; if (relayd == NULL) { return; @@ -224,6 +239,22 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) DBG("Consumer destroy and close relayd socket pair"); + /* Loockup for a relayd node in the session id map hash table. */ + lttng_ht_lookup(relayd_session_id_ht, + (void *)((unsigned long) relayd->sessiond_session_id), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + /* We assume the relayd is being or is destroyed */ + return; + } + + /* + * Try to delete it from the relayd session id ht. The return value is of + * no importance since either way we are going to try to delete the relayd + * from the global relayd_ht. + */ + lttng_ht_del(relayd_session_id_ht, &iter); + iter.iter.node = &relayd->node.node; ret = lttng_ht_del(consumer_data.relayd_ht, &iter); if (ret != 0) { @@ -235,6 +266,29 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) call_rcu(&relayd->node.head, consumer_rcu_free_relayd); } +/* + * Iterate over the relayd hash table and destroy each element. Finally, + * destroy the whole hash table. + */ +static void cleanup_relayd_ht(void) +{ + struct lttng_ht_iter iter; + struct consumer_relayd_sock_pair *relayd; + + rcu_read_lock(); + + cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, + node.node) { + destroy_relayd(relayd); + } + + lttng_ht_destroy(consumer_data.relayd_ht); + /* The destroy_relayd call makes sure that this ht is empty here. */ + lttng_ht_destroy(relayd_session_id_ht); + + rcu_read_unlock(); +} + /* * Update the end point status of all streams having the given network sequence * index (relayd index). @@ -351,8 +405,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, goto free_stream; } - pthread_mutex_lock(&stream->lock); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->lock); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -444,8 +498,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, end: consumer_data.need_update = 1; - pthread_mutex_unlock(&consumer_data.lock); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&consumer_data.lock); if (free_chan) { consumer_del_channel(free_chan); @@ -478,6 +532,8 @@ struct lttng_consumer_stream *consumer_allocate_stream( goto end; } + rcu_read_lock(); + /* * Get stream's channel reference. Needed when adding the stream to the * global hash table. @@ -534,9 +590,12 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->path_name, stream->key, stream->shm_fd, stream->wait_fd, (unsigned long long) stream->mmap_len, stream->out_fd, stream->net_seq_idx, stream->session_id); + + rcu_read_unlock(); return stream; error: + rcu_read_unlock(); free(stream); end: return NULL; @@ -557,6 +616,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream, DBG3("Adding consumer stream %d", stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->lock); rcu_read_lock(); /* Steal stream identifier to avoid having streams with the same key */ @@ -596,6 +656,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream, consumer_data.need_update = 1; rcu_read_unlock(); + pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; @@ -758,6 +819,8 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) int ret; struct lttng_ht_iter iter; + DBG("Consumer delete channel key %d", channel->key); + pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { @@ -911,7 +974,7 @@ static int consumer_update_poll_array( * changed where this function will be called back again. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || - stream->endpoint_status) { + stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { continue; } DBG("Active FD %d", stream->wait_fd); @@ -995,8 +1058,8 @@ int lttng_consumer_send_error( } /* - * Close all the tracefiles and stream fds, should be called when all instances - * are destroyed. + * Close all the tracefiles and stream fds and MUST be called when all + * instances are destroyed i.e. when all threads were joined and are ended. */ void lttng_consumer_cleanup(void) { @@ -1015,6 +1078,15 @@ void lttng_consumer_cleanup(void) rcu_read_unlock(); lttng_ht_destroy(consumer_data.channel_ht); + + cleanup_relayd_ht(); + + /* + * This HT contains streams that are freed by either the metadata thread or + * the data thread so we do *nothing* on the hash table and simply destroy + * it. + */ + lttng_ht_destroy(consumer_data.stream_list_ht); } /* @@ -1261,6 +1333,8 @@ end: * core function for writing trace buffers to either the local filesystem or * the network. * + * It must be called with the stream lock held. + * * Careful review MUST be put if any changes occur! * * Returns the number of bytes written @@ -1281,8 +1355,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* RCU lock for the relayd pointer */ rcu_read_lock(); - pthread_mutex_lock(&stream->lock); - /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1407,7 +1479,6 @@ end: if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } - pthread_mutex_unlock(&stream->lock); rcu_read_unlock(); return written; @@ -1416,6 +1487,8 @@ end: /* * Splice the data from the ring buffer to the tracefile. * + * It must be called with the stream lock held. + * * Returns the number of bytes spliced. */ ssize_t lttng_consumer_on_read_subbuffer_splice( @@ -1448,8 +1521,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* RCU lock for the relayd pointer */ rcu_read_lock(); - pthread_mutex_lock(&stream->lock); - /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1617,7 +1688,6 @@ end: if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } - pthread_mutex_unlock(&stream->lock); rcu_read_unlock(); return written; @@ -1691,7 +1761,6 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ static void destroy_data_stream_ht(struct lttng_ht *ht) { - int ret; struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; @@ -1701,10 +1770,11 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - ret = lttng_ht_del(ht, &iter); - assert(!ret); - - call_rcu(&stream->node.head, consumer_free_stream); + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); } rcu_read_unlock(); @@ -1718,7 +1788,6 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) */ static void destroy_stream_ht(struct lttng_ht *ht) { - int ret; struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; @@ -1728,10 +1797,11 @@ static void destroy_stream_ht(struct lttng_ht *ht) rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - ret = lttng_ht_del(ht, &iter); - assert(!ret); - - call_rcu(&stream->node.head, consumer_free_stream); + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); } rcu_read_unlock(); @@ -1763,9 +1833,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, goto free_stream; } + pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&stream->lock); - pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: if (stream->mmap_base != NULL) { @@ -1855,8 +1925,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, } end: - pthread_mutex_unlock(&consumer_data.lock); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&consumer_data.lock); if (free_chan) { consumer_del_channel(free_chan); @@ -1875,6 +1945,8 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, { int ret = 0; struct consumer_relayd_sock_pair *relayd; + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; assert(stream); assert(ht); @@ -1882,6 +1954,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, DBG3("Adding metadata stream %d to hash table", stream->wait_fd); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->lock); /* * From here, refcounts are updated so be _careful_ when returning an error @@ -1889,6 +1962,15 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, */ rcu_read_lock(); + + /* + * Lookup the stream just to make sure it does not exist in our internal + * state. This should NEVER happen. + */ + lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + assert(!node); + /* Find relayd and, if one is found, increment refcount. */ relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != NULL) { @@ -1909,9 +1991,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, uatomic_dec(&stream->chan->nb_init_streams); } - /* Steal stream identifier to avoid having streams with the same key */ - consumer_steal_stream_key(stream->key, ht); - lttng_ht_add_unique_ulong(ht, &stream->node); /* @@ -1923,6 +2002,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); + pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; } @@ -1940,7 +2020,7 @@ static void validate_endpoint_status_data_stream(void) rcu_read_lock(); cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ - if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) { + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { continue; } /* Delete it right now */ @@ -1965,7 +2045,7 @@ static void validate_endpoint_status_metadata_stream( rcu_read_lock(); cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ - if (!stream->endpoint_status) { + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { continue; } /* @@ -1997,6 +2077,12 @@ void *consumer_thread_metadata_poll(void *data) rcu_register_thread(); + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (!metadata_ht) { + /* ENOMEM at this point. Better to bail out. */ + goto error; + } + DBG("Thread metadata poll started"); /* Size is set to 1 for the consumer_metadata pipe */ @@ -2054,7 +2140,10 @@ restart: * since their might be data to consume. */ lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]); - close(ctx->consumer_metadata_pipe[0]); + ret = close(ctx->consumer_metadata_pipe[0]); + if (ret < 0) { + PERROR("close metadata pipe"); + } continue; } else if (revents & LPOLLIN) { do { @@ -2160,9 +2249,7 @@ end: DBG("Metadata poll thread exiting"); lttng_poll_clean(&events); - if (metadata_ht) { - destroy_stream_ht(metadata_ht); - } + destroy_stream_ht(metadata_ht); rcu_unregister_thread(); return NULL; @@ -2187,6 +2274,7 @@ void *consumer_thread_data_poll(void *data) data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); if (data_ht == NULL) { + /* ENOMEM at this point. Better to bail out. */ goto end; } @@ -2418,12 +2506,13 @@ end: * only tracked fd in the poll set. The thread will take care of closing * the read side. */ - close(ctx->consumer_metadata_pipe[1]); - - if (data_ht) { - destroy_data_stream_ht(data_ht); + ret = close(ctx->consumer_metadata_pipe[1]); + if (ret < 0) { + PERROR("close data pipe"); } + destroy_data_stream_ht(data_ht); + rcu_unregister_thread(); return NULL; } @@ -2434,7 +2523,7 @@ end: */ void *consumer_thread_sessiond_poll(void *data) { - int sock, client_socket, ret; + int sock = -1, client_socket, ret; /* * structure to poll for incoming data on communication socket avoids * making blocking sockets. @@ -2494,6 +2583,13 @@ void *consumer_thread_sessiond_poll(void *data) goto end; } + /* This socket is not useful anymore. */ + ret = close(client_socket); + if (ret < 0) { + PERROR("close client_socket"); + } + client_socket = -1; + /* update the polling structure to poll on the established socket */ consumer_sockpoll[1].fd = sock; consumer_sockpoll[1].events = POLLIN | POLLPRI; @@ -2537,6 +2633,20 @@ end: */ notify_thread_pipe(ctx->consumer_data_pipe[1]); + /* Cleaning up possibly open sockets. */ + if (sock >= 0) { + ret = close(sock); + if (ret < 0) { + PERROR("close sock sessiond poll"); + } + } + if (client_socket >= 0) { + ret = close(sock); + if (ret < 0) { + PERROR("close client_socket sessiond poll"); + } + } + rcu_unregister_thread(); return NULL; } @@ -2544,17 +2654,27 @@ end: ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { + ssize_t ret; + + pthread_mutex_lock(&stream->lock); + switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - return lttng_kconsumer_read_subbuffer(stream, ctx); + ret = lttng_kconsumer_read_subbuffer(stream, ctx); + break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - return lttng_ustconsumer_read_subbuffer(stream, ctx); + ret = lttng_ustconsumer_read_subbuffer(stream, ctx); + break; default: ERR("Unknown consumer_data type"); assert(0); - return -ENOSYS; + ret = -ENOSYS; + break; } + + pthread_mutex_unlock(&stream->lock); + return ret; } int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) @@ -2580,11 +2700,7 @@ void lttng_consumer_init(void) consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - assert(metadata_ht); - data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - assert(data_ht); + relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); } /* @@ -2595,13 +2711,23 @@ void lttng_consumer_init(void) */ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, - struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock) + struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock, + unsigned int sessiond_id) { - int fd, ret = -1; + int fd = -1, ret = -1, relayd_created = 0; + enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; + struct consumer_relayd_session_id *relayd_id_node; DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error; + } + /* Get relayd reference if exists. */ relayd = consumer_find_relayd(net_seq_idx); if (relayd == NULL) { @@ -2611,6 +2737,8 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto error; } + relayd->sessiond_session_id = (uint64_t) sessiond_id; + relayd_created = 1; } /* Poll on consumer socket. */ @@ -2624,6 +2752,14 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); ret = -1; + fd = -1; /* Just in case it gets set with an invalid value. */ + goto error; + } + + /* We have the fds without error. Send status back. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ goto error; } @@ -2633,27 +2769,63 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->control_sock, relayd_sock); ret = lttcomm_create_sock(&relayd->control_sock); + /* Immediately try to close the created socket if valid. */ + if (relayd->control_sock.fd >= 0) { + if (close(relayd->control_sock.fd)) { + PERROR("close relayd control socket"); + } + } + /* Handle create_sock error. */ if (ret < 0) { goto error; } - /* Close the created socket fd which is useless */ - close(relayd->control_sock.fd); - /* Assign new file descriptor */ relayd->control_sock.fd = fd; + + /* + * Create a session on the relayd and store the returned id. No need to + * grab the socket lock since the relayd object is not yet visible. + */ + ret = relayd_create_session(&relayd->control_sock, + &relayd->relayd_session_id); + if (ret < 0) { + goto error; + } + + /* Set up a relayd session id node. */ + relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id)); + if (!relayd_id_node) { + PERROR("zmalloc relayd id node"); + goto error; + } + + relayd_id_node->relayd_id = relayd->relayd_session_id; + relayd_id_node->sessiond_id = (uint64_t) sessiond_id; + + /* Indexed by session id of the sessiond. */ + lttng_ht_node_init_ulong(&relayd_id_node->node, + relayd_id_node->sessiond_id); + rcu_read_lock(); + lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node); + rcu_read_unlock(); + break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->data_sock, relayd_sock); ret = lttcomm_create_sock(&relayd->data_sock); + /* Immediately try to close the created socket if valid. */ + if (relayd->data_sock.fd >= 0) { + if (close(relayd->data_sock.fd)) { + PERROR("close relayd data socket"); + } + } + /* Handle create_sock error. */ if (ret < 0) { goto error; } - /* Close the created socket fd which is useless */ - close(relayd->data_sock.fd); - /* Assign new file descriptor */ relayd->data_sock.fd = fd; break; @@ -2673,9 +2845,23 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, add_relayd(relayd); /* All good! */ - ret = 0; + return 0; error: + /* Close received socket if valid. */ + if (fd >= 0) { + if (close(fd)) { + PERROR("close received socket"); + } + } + + if (relayd_created) { + /* We just want to cleanup. Ignore ret value. */ + (void) relayd_close(&relayd->control_sock); + (void) relayd_close(&relayd->data_sock); + free(relayd); + } + return ret; } @@ -2707,6 +2893,42 @@ end: return ret; } +/* + * Search for a relayd associated to the session id and return the reference. + * + * A rcu read side lock MUST be acquire before calling this function and locked + * until the relayd object is no longer necessary. + */ +static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) +{ + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_session_id *session_id_map; + + /* Get the session id map. */ + lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + goto end; + } + + session_id_map = caa_container_of(node, struct consumer_relayd_session_id, + node); + + /* Iterate over all relayd since they are indexed by net_seq_idx. */ + cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, + node.node) { + if (relayd->relayd_session_id == session_id_map->relayd_id) { + /* Found the relayd. There can be only one per id. */ + break; + } + } + +end: + return relayd; +} + /* * Check if for a given session id there is still data needed to be extract * from the buffers. @@ -2719,7 +2941,7 @@ int consumer_data_pending(uint64_t id) struct lttng_ht_iter iter; struct lttng_ht *ht; struct lttng_consumer_stream *stream; - struct consumer_relayd_sock_pair *relayd; + struct consumer_relayd_sock_pair *relayd = NULL; int (*data_pending)(struct lttng_consumer_stream *); DBG("Consumer data pending command on session id %" PRIu64, id); @@ -2743,6 +2965,19 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; + relayd = find_relayd_by_session_id(id); + if (relayd) { + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + /* Communication error thus the relayd so no data pending. */ + goto data_not_pending; + } + } + cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed), ht->match_fct, (void *)((unsigned long) id), @@ -2750,7 +2985,7 @@ int consumer_data_pending(uint64_t id) /* If this call fails, the stream is being used hence data pending. */ ret = stream_try_lock(stream); if (!ret) { - goto data_not_pending; + goto data_pending; } /* @@ -2765,24 +3000,12 @@ int consumer_data_pending(uint64_t id) ret = data_pending(stream); if (ret == 1) { pthread_mutex_unlock(&stream->lock); - goto data_not_pending; + goto data_pending; } } /* Relayd check */ - if (stream->net_seq_idx != -1) { - relayd = consumer_find_relayd(stream->net_seq_idx); - if (!relayd) { - /* - * At this point, if the relayd object is not available for the - * given stream, it is because the relayd is being cleaned up - * so every stream associated with it (for a session id value) - * are or will be marked for deletion hence no data pending. - */ - pthread_mutex_unlock(&stream->lock); - goto data_not_pending; - } - + if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock); @@ -2793,27 +3016,55 @@ int consumer_data_pending(uint64_t id) pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) { pthread_mutex_unlock(&stream->lock); - goto data_not_pending; + goto data_pending; } } pthread_mutex_unlock(&stream->lock); } + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_end_data_pending(&relayd->control_sock, + relayd->relayd_session_id, &is_data_inflight); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0 || !is_data_inflight) { + /* On error or if NO data inflight, no data is pending. */ + goto data_not_pending; + } + } + /* - * Finding _no_ node in the hash table means that the stream(s) have been - * removed thus data is guaranteed to be available for analysis from the - * trace files. This is *only* true for local consumer and not network - * streaming. + * Finding _no_ node in the hash table and no inflight data means that the + * stream(s) have been removed thus data is guaranteed to be available for + * analysis from the trace files. */ +data_not_pending: /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&consumer_data.lock); rcu_read_unlock(); return 0; -data_not_pending: +data_pending: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&consumer_data.lock); rcu_read_unlock(); return 1; } + +/* + * Send a ret code status message to the sessiond daemon. + * + * Return the sendmsg() return value. + */ +int consumer_send_status_msg(int sock, int ret_code) +{ + struct lttcomm_consumer_status_msg msg; + + msg.ret_code = ret_code; + + return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); +}