X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=94a0cc3efb1d557da732ed9a201848b0ee4dc3e4;hp=0565f54dfad4da97503a6eb6575e4c51110edc32;hb=774d490c42598710944f54a53d81d1cab2a1dfaa;hpb=07b86b528dc279d59cdf16e6cb946c144fe773f2 diff --git a/src/common/consumer.c b/src/common/consumer.c index 0565f54df..94a0cc3ef 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -165,20 +165,20 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, return stream; } -static void steal_stream_key(int key, struct lttng_ht *ht) +static void steal_stream_key(uint64_t key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; rcu_read_lock(); stream = find_stream(key, ht); if (stream) { - stream->key = -1ULL; + stream->key = (uint64_t) -1ULL; /* * We don't want the lookup to match, but we still need * to iterate on this stream when iterating over the hash table. Just * change the node key. */ - stream->node.key = -1ULL; + stream->node.key = (uint64_t) -1ULL; } rcu_read_unlock(); } @@ -291,19 +291,24 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) DBG("Consumer delete channel key %" PRIu64, channel->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); + + /* Delete streams that might have been left in the stream list. */ + cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, + send_node) { + cds_list_del(&stream->send_node); + /* + * Once a stream is added to this list, the buffers were created so + * we have a guarantee that this call will succeed. + */ + consumer_stream_destroy(stream, NULL); + } switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - /* Delete streams that might have been left in the stream list. */ - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - cds_list_del(&stream->send_node); - lttng_ustconsumer_del_stream(stream); - free(stream); - } lttng_ustconsumer_del_channel(channel); break; default: @@ -312,25 +317,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) goto end; } - /* Empty no monitor streams list. */ - if (!channel->monitor) { - struct lttng_consumer_stream *stream, *stmp; - - /* - * So, these streams are not visible to any data thread. This is why we - * close and free them because they were never added to any data - * structure apart from this one. - */ - cds_list_for_each_entry_safe(stream, stmp, - &channel->stream_no_monitor_list.head, no_monitor_node) { - cds_list_del(&stream->no_monitor_node); - /* Close everything in that stream. */ - consumer_stream_close(stream); - /* Free the ressource. */ - consumer_stream_free(stream); - } - } - rcu_read_lock(); iter.iter.node = &channel->node.node; ret = lttng_ht_del(consumer_data.channel_ht, &iter); @@ -339,6 +325,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) call_rcu(&channel->node.head, free_channel_rcu); end: + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); } @@ -370,13 +357,13 @@ static void cleanup_relayd_ht(void) * It's atomically set without having the stream mutex locked which is fine * because we handle the write/read race with a pipe wakeup for each thread. */ -static void update_endpoint_status_by_netidx(int net_seq_idx, +static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, enum consumer_endpoint_status status) { struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; - DBG("Consumer set delete flag on stream by idx %d", net_seq_idx); + DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); rcu_read_lock(); @@ -409,7 +396,7 @@ static void update_endpoint_status_by_netidx(int net_seq_idx, static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, struct lttng_consumer_local_data *ctx) { - int netidx; + uint64_t netidx; assert(relayd); @@ -480,7 +467,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t session_id, int cpu, int *alloc_ret, - enum consumer_channel_type type) + enum consumer_channel_type type, + unsigned int monitor) { int ret; struct lttng_consumer_stream *stream; @@ -502,6 +490,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->gid = gid; stream->net_seq_idx = relayd_id; stream->session_id = session_id; + stream->monitor = monitor; + stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; pthread_mutex_init(&stream->lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ @@ -553,7 +543,6 @@ static int add_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { int ret = 0; - struct consumer_relayd_sock_pair *relayd; assert(stream); assert(ht); @@ -561,6 +550,7 @@ static int add_stream(struct lttng_consumer_stream *stream, DBG3("Adding consumer stream %" PRIu64, stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); rcu_read_lock(); @@ -579,12 +569,6 @@ static int add_stream(struct lttng_consumer_stream *stream, */ lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id); - /* Check and cleanup relayd */ - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - uatomic_inc(&relayd->refcount); - } - /* * When nb_init_stream_left reaches 0, we don't need to trigger any action * in terms of destroying the associated channel, because the action that @@ -604,6 +588,7 @@ static int add_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; @@ -637,12 +622,12 @@ end: * Allocate and return a consumer relayd socket. */ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( - int net_seq_idx) + uint64_t net_seq_idx) { struct consumer_relayd_sock_pair *obj = NULL; - /* Negative net sequence index is a failure */ - if (net_seq_idx < 0) { + /* net sequence index of -1 is a failure */ + if (net_seq_idx == (uint64_t) -1ULL) { goto error; } @@ -693,6 +678,67 @@ error: return relayd; } +/* + * Find a relayd and send the stream + * + * Returns 0 on success, < 0 on error + */ +int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, + char *path) +{ + int ret = 0; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); + assert(stream->net_seq_idx != -1ULL); + assert(path); + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_add_stream(&relayd->control_sock, stream->name, + path, &stream->relayd_stream_id, + stream->chan->tracefile_size, stream->chan->tracefile_count); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + uatomic_inc(&relayd->refcount); + stream->sent_to_relayd = 1; + } else { + ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", + stream->key, stream->net_seq_idx); + ret = -1; + goto end; + } + + DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, + stream->name, stream->key, stream->net_seq_idx); + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Find a relayd and close the stream + */ +void close_relayd_stream(struct lttng_consumer_stream *stream) +{ + struct consumer_relayd_sock_pair *relayd; + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd) { + consumer_stream_relayd_close(stream, relayd); + } + rcu_read_unlock(); +} + /* * Handle stream for relayd transmission if the stream applies for network * streaming where the net sequence index is set. @@ -769,6 +815,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, enum lttng_event_output output, uint64_t tracefile_size, uint64_t tracefile_count, + uint64_t session_id_per_pid, unsigned int monitor) { struct lttng_consumer_channel *channel; @@ -782,6 +829,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->key = key; channel->refcount = 0; channel->session_id = session_id; + channel->session_id_per_pid = session_id_per_pid; channel->uid = uid; channel->gid = gid; channel->relayd_id = relayd_id; @@ -789,6 +837,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; channel->monitor = monitor; + pthread_mutex_init(&channel->lock, NULL); /* * In monitor mode, the streams associated with the channel will be put in @@ -803,20 +852,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->refcount = 1; } - switch (output) { - case LTTNG_EVENT_SPLICE: - channel->output = CONSUMER_CHANNEL_SPLICE; - break; - case LTTNG_EVENT_MMAP: - channel->output = CONSUMER_CHANNEL_MMAP; - break; - default: - ERR("Allocate channel output unknown %d", output); - free(channel); - channel = NULL; - goto end; - } - strncpy(channel->pathname, pathname, sizeof(channel->pathname)); channel->pathname[sizeof(channel->pathname) - 1] = '\0'; @@ -828,7 +863,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->wait_fd = -1; CDS_INIT_LIST_HEAD(&channel->streams.head); - CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head); DBG("Allocated channel (key %" PRIu64 ")", channel->key) @@ -849,6 +883,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, struct lttng_ht_iter iter; pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); rcu_read_lock(); lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter); @@ -865,10 +900,11 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); if (!ret && channel->wait_fd != -1 && - channel->metadata_stream == NULL) { + channel->type == CONSUMER_CHANNEL_TYPE_DATA) { notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD); } return ret; @@ -1100,7 +1136,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( struct lttng_consumer_local_data *ctx), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), - int (*update_stream)(int stream_key, uint32_t state)) + int (*update_stream)(uint64_t stream_key, uint32_t state)) { int ret; struct lttng_consumer_local_data *ctx; @@ -1271,7 +1307,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( rcu_read_lock(); /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != -1) { + if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { goto end; @@ -1470,7 +1506,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( rcu_read_lock(); /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != -1) { + if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { goto end; @@ -1824,6 +1860,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, } pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); switch (consumer_data.type) { @@ -1917,6 +1954,7 @@ end: stream->chan->metadata_stream = NULL; pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); if (free_chan) { @@ -1935,7 +1973,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { int ret = 0; - struct consumer_relayd_sock_pair *relayd; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; @@ -1945,6 +1982,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); /* @@ -1962,12 +2000,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, node = lttng_ht_iter_get_node_u64(&iter); assert(!node); - /* Find relayd and, if one is found, increment refcount. */ - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - uatomic_inc(&relayd->refcount); - } - /* * When nb_init_stream_left reaches 0, we don't need to trigger any action * in terms of destroying the associated channel, because the action that @@ -1996,6 +2028,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; } @@ -2270,7 +2303,11 @@ void *consumer_thread_data_poll(void *data) goto end; } - local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); + local_stream = zmalloc(sizeof(struct lttng_consumer_stream *)); + if (local_stream == NULL) { + PERROR("local_stream malloc"); + goto end; + } while (1) { high_prio = 0; @@ -2756,7 +2793,6 @@ restart: lttng_poll_del(&events, chan->wait_fd); ret = lttng_ht_del(channel_ht, &iter); assert(ret == 0); - assert(cds_list_empty(&chan->streams.head)); consumer_close_channel_streams(chan); /* Release our own refcount */ @@ -3008,10 +3044,10 @@ void lttng_consumer_init(void) * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ -int consumer_add_relayd_socket(int net_seq_idx, int sock_type, +int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, - struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id) + struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id) { int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; @@ -3020,18 +3056,20 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, assert(ctx); assert(relayd_sock); - DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); + DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx); /* Get relayd reference if exists. */ relayd = consumer_find_relayd(net_seq_idx); if (relayd == NULL) { + assert(sock_type == LTTNG_STREAM_CONTROL); /* Not found. Allocate one. */ relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); if (relayd == NULL) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; ret = -ENOMEM; + ret_code = LTTCOMM_CONSUMERD_ENOMEM; + goto error; } else { - relayd->sessiond_session_id = (uint64_t) sessiond_id; + relayd->sessiond_session_id = sessiond_id; relayd_created = 1; } @@ -3040,26 +3078,31 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, * we can notify the session daemon and continue our work without * killing everything. */ + } else { + /* + * relayd key should never be found for control socket. + */ + assert(sock_type != LTTNG_STREAM_CONTROL); } /* First send a status message before receiving the fds. */ - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0 || ret_code != LTTNG_OK) { + ret = consumer_send_status_msg(sock, LTTNG_OK); + if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto error; + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL); + goto error_nosignal; } /* Poll on consumer socket. */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); ret = -EINTR; - goto error; + goto error_nosignal; } /* Get relayd socket from session daemon */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { - ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD; ret = -1; fd = -1; /* Just in case it gets set with an invalid value. */ @@ -3073,18 +3116,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, * issue when reaching the fd limit. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); - - /* - * This code path MUST continue to the consumer send status message so - * we can send the error to the thread expecting a reply. The above - * call will make everything stop. - */ - } - - /* We have the fds without error. Send status back. */ - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0 || ret_code != LTTNG_OK) { - /* Somehow, the session daemon is not responding anymore. */ + ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD; goto error; } @@ -3094,19 +3126,23 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock); ret = lttcomm_create_sock(&relayd->control_sock.sock); - /* Immediately try to close the created socket if valid. */ - if (relayd->control_sock.sock.fd >= 0) { - if (close(relayd->control_sock.sock.fd)) { - PERROR("close relayd control socket"); - } - } /* Handle create_sock error. */ if (ret < 0) { + ret_code = LTTCOMM_CONSUMERD_ENOMEM; goto error; } + /* + * Close the socket created internally by + * lttcomm_create_sock, so we can replace it by the one + * received from sessiond. + */ + if (close(relayd->control_sock.sock.fd)) { + PERROR("close"); + } /* Assign new file descriptor */ relayd->control_sock.sock.fd = fd; + fd = -1; /* For error path */ /* Assign version values. */ relayd->control_sock.major = relayd_sock->major; relayd->control_sock.minor = relayd_sock->minor; @@ -3131,6 +3167,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, */ (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; goto error; } @@ -3139,19 +3176,23 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Copy received lttcomm socket */ lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock); ret = lttcomm_create_sock(&relayd->data_sock.sock); - /* Immediately try to close the created socket if valid. */ - if (relayd->data_sock.sock.fd >= 0) { - if (close(relayd->data_sock.sock.fd)) { - PERROR("close relayd data socket"); - } - } /* Handle create_sock error. */ if (ret < 0) { + ret_code = LTTCOMM_CONSUMERD_ENOMEM; goto error; } + /* + * Close the socket created internally by + * lttcomm_create_sock, so we can replace it by the one + * received from sessiond. + */ + if (close(relayd->data_sock.sock.fd)) { + PERROR("close"); + } /* Assign new file descriptor */ relayd->data_sock.sock.fd = fd; + fd = -1; /* for eventual error paths */ /* Assign version values. */ relayd->data_sock.major = relayd_sock->major; relayd->data_sock.minor = relayd_sock->minor; @@ -3159,6 +3200,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, default: ERR("Unknown relayd socket type (%d)", sock_type); ret = -1; + ret_code = LTTCOMM_CONSUMERD_FATAL; goto error; } @@ -3166,6 +3208,14 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", relayd->net_seq_idx, fd); + /* We successfully added the socket. Send status back. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL); + goto error_nosignal; + } + /* * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. @@ -3176,6 +3226,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, return 0; error: + if (consumer_send_status_msg(sock, ret_code) < 0) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL); + } + +error_nosignal: /* Close received socket if valid. */ if (fd >= 0) { if (close(fd)) { @@ -3414,3 +3469,23 @@ int consumer_send_status_channel(int sock, return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); } + +/* + * Using a maximum stream size with the produced and consumed position of a + * stream, computes the new consumed position to be as close as possible to the + * maximum possible stream size. + * + * If maximum stream size is lower than the possible buffer size (produced - + * consumed), the consumed_pos given is returned untouched else the new value + * is returned. + */ +unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos, + unsigned long produced_pos, uint64_t max_stream_size) +{ + if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) { + /* Offset from the produced position to get the latest buffers. */ + return produced_pos - max_stream_size; + } + + return consumed_pos; +}