X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;fp=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=be440e69492b020b4281f83cc94c12fe29c9c38f;hp=6505490cdc04e212c54aadd3d8eb58d569eb523e;hb=04ed9e10dfa0b3c88d4a7abe9fa59b8e03b7e49a;hpb=ad8bec244fdbb0e7705fd1865ae71f36f06d2b94 diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 6505490cd..be440e694 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3425,6 +3425,9 @@ static enum open_packet_status open_packet(struct lttng_consumer_stream *stream) status = produced_pos_before != produced_pos_after ? OPEN_PACKET_STATUS_OPENED : OPEN_PACKET_STATUS_NO_SPACE; + if (status == OPEN_PACKET_STATUS_OPENED) { + stream->opened_packet_in_current_trace_chunk = true; + } end: return status; } @@ -3565,14 +3568,12 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, ", channel name = %s, session id = %" PRIu64, stream->key, stream->chan->name, stream->chan->session_id); - stream->opened_packet_in_current_trace_chunk = - true; break; case OPEN_PACKET_STATUS_NO_SPACE: /* * Can't open a packet as there is no space left. * This means that new events were produced, resulting - * in a packet being opened, which is what we want + * in a packet being opened, which is what we wanted * anyhow. */ DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64 @@ -3588,8 +3589,6 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, default: abort(); } - - stream->opened_packet_in_current_trace_chunk = true; } sleep_stream: @@ -4294,8 +4293,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, ", channel name = %s, session id = %" PRIu64, stream->key, stream->chan->name, stream->chan->session_id); - stream->opened_packet_in_current_trace_chunk = - true; break; case OPEN_PACKET_STATUS_NO_SPACE: /* @@ -5172,3 +5169,70 @@ int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel) end: return ret; } + +enum lttcomm_return_code lttng_consumer_open_channel_packets( + struct lttng_consumer_channel *channel) +{ + struct lttng_consumer_stream *stream; + enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS; + + if (channel->metadata_stream) { + ERR("Open channel packets command attempted on a metadata channel"); + ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS; + goto end; + } + + rcu_read_lock(); + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + enum open_packet_status status; + + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + + status = open_packet(stream); + switch (status) { + case OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case OPEN_PACKET_STATUS_NO_SPACE: + DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + break; + case OPEN_PACKET_STATUS_ERROR: + /* + * Only unexpected internal errors can lead to this + * failing. Report an unknown error. + */ + ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 + ", channel id = %" PRIu64 + ", channel name = %s" + ", session id = %" PRIu64, + stream->key, channel->key, + channel->name, channel->session_id); + ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR; + goto error_unlock; + default: + abort(); + } + + next: + pthread_mutex_unlock(&stream->lock); + } + +end_rcu_unlock: + rcu_read_unlock(); +end: + return ret; + +error_unlock: + pthread_mutex_unlock(&stream->lock); + goto end_rcu_unlock; +}