X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=4b657f33249f7a4f777b7b3ff784b379b15f5878;hp=bcede1ef5d8f3857515baeaf709ad9b78294a684;hb=6a00837f8cb0431a2ad90974d67fae138ea97dd5;hpb=30319bcbeabf573068172289850aea0bcfe23abc diff --git a/src/common/consumer.c b/src/common/consumer.c index bcede1ef5..4b657f332 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -291,6 +291,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) DBG("Consumer delete channel key %" PRIu64, channel->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, @@ -324,6 +325,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) call_rcu(&channel->node.head, free_channel_rcu); end: + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); } @@ -455,6 +457,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, consumer_stream_destroy(stream, ht); } +/* + * XXX naming of del vs destroy is all mixed up. + */ +void consumer_del_stream_for_data(struct lttng_consumer_stream *stream) +{ + consumer_stream_destroy(stream, data_ht); +} + +void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) +{ + consumer_stream_destroy(stream, metadata_ht); +} + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -489,6 +504,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->net_seq_idx = relayd_id; stream->session_id = session_id; stream->monitor = monitor; + stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; pthread_mutex_init(&stream->lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ @@ -536,9 +552,9 @@ end: /* * Add a stream to the global list protected by a mutex. */ -static int add_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_data_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = data_ht; int ret = 0; assert(stream); @@ -547,6 +563,8 @@ static int add_stream(struct lttng_consumer_stream *stream, DBG3("Adding consumer stream %" PRIu64, stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); rcu_read_lock(); @@ -584,11 +602,18 @@ static int add_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->timer_lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; } +void consumer_del_data_stream(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, data_ht); +} + /* * Add relayd socket to global consumer data hashtable. RCU read side lock MUST * be acquired before calling this. @@ -832,6 +857,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; channel->monitor = monitor; + pthread_mutex_init(&channel->lock, NULL); + pthread_mutex_init(&channel->timer_lock, NULL); /* * In monitor mode, the streams associated with the channel will be put in @@ -877,6 +904,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, struct lttng_ht_iter iter; pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); + pthread_mutex_lock(&channel->timer_lock); rcu_read_lock(); lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter); @@ -893,6 +922,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->timer_lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); if (!ret && channel->wait_fd != -1 && @@ -1145,6 +1176,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( ctx->consumer_error_socket = -1; ctx->consumer_metadata_socket = -1; + pthread_mutex_init(&ctx->metadata_socket_lock, NULL); /* assign the callbacks */ ctx->on_buffer_ready = buffer_ready; ctx->on_recv_channel = recv_channel; @@ -1852,6 +1884,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, } pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); switch (consumer_data.type) { @@ -1871,6 +1904,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: + if (stream->monitor) { + /* close the write-side in close_metadata */ + ret = close(stream->ust_metadata_poll_pipe[0]); + if (ret < 0) { + PERROR("Close UST metadata read-side poll pipe"); + } + } lttng_ustconsumer_del_stream(stream); break; default: @@ -1939,12 +1979,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, end: /* * Nullify the stream reference so it is not used after deletion. The - * consumer data lock MUST be acquired before being able to check for a - * NULL pointer value. + * channel lock MUST be acquired before being able to check for + * a NULL pointer value. */ stream->chan->metadata_stream = NULL; pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); if (free_chan) { @@ -1959,9 +2000,9 @@ free_stream_rcu: * Action done with the metadata stream when adding it to the consumer internal * data structures to handle it. */ -static int add_metadata_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = metadata_ht; int ret = 0; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; @@ -1972,6 +2013,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); /* @@ -2017,6 +2060,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); + pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&consumer_data.lock); return ret; } @@ -2162,7 +2207,7 @@ restart: pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, &stream, sizeof(stream)); if (pipe_len < 0) { - ERR("read metadata stream, ret: %ld", pipe_len); + ERR("read metadata stream, ret: %zd", pipe_len); /* * Continue here to handle the rest of the streams. */ @@ -2179,14 +2224,6 @@ restart: DBG("Adding metadata stream %d to poll set", stream->wait_fd); - ret = add_metadata_stream(stream, metadata_ht); - if (ret) { - ERR("Unable to add metadata stream"); - /* Stream was not setup properly. Continuing. */ - consumer_del_metadata_stream(stream, NULL); - continue; - } - /* Add metadata stream to the global poll events list */ lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI); @@ -2240,14 +2277,21 @@ restart: DBG("Metadata available on fd %d", pollfd); assert(stream->wait_fd == pollfd); - len = ctx->on_buffer_ready(stream, ctx); + do { + len = ctx->on_buffer_ready(stream, ctx); + /* + * We don't check the return value here since if we get + * a negative len, it means an error occured thus we + * simply remove it from the poll set and free the + * stream. + */ + } while (len > 0); + /* It's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { /* Clean up stream from consumer and free it. */ lttng_poll_del(&events, stream->wait_fd); consumer_del_metadata_stream(stream, metadata_ht); - } else if (len > 0) { - stream->data_read = 1; } } @@ -2378,7 +2422,7 @@ void *consumer_thread_data_poll(void *data) pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe, &new_stream, sizeof(new_stream)); if (pipe_readlen < 0) { - ERR("Consumer data pipe ret %ld", pipe_readlen); + ERR("Consumer data pipe ret %zd", pipe_readlen); /* Continue so we can at least handle the current stream(s). */ continue; } @@ -2393,17 +2437,6 @@ void *consumer_thread_data_poll(void *data) continue; } - ret = add_stream(new_stream, data_ht); - if (ret) { - ERR("Consumer add stream %" PRIu64 " failed. Continuing", - new_stream->key); - /* - * At this point, if the add_stream fails, it is not in the - * hash table thus passing the NULL value here. - */ - consumer_del_stream(new_stream, NULL); - } - /* Continue to update the local streams and handle prio ones */ continue; } @@ -3457,3 +3490,23 @@ int consumer_send_status_channel(int sock, return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); } + +/* + * Using a maximum stream size with the produced and consumed position of a + * stream, computes the new consumed position to be as close as possible to the + * maximum possible stream size. + * + * If maximum stream size is lower than the possible buffer size (produced - + * consumed), the consumed_pos given is returned untouched else the new value + * is returned. + */ +unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos, + unsigned long produced_pos, uint64_t max_stream_size) +{ + if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) { + /* Offset from the produced position to get the latest buffers. */ + return produced_pos - max_stream_size; + } + + return consumed_pos; +}