Fix: stream intersection fails on snapshot of cleared session
[lttng-tools.git] / src / common / consumer / consumer.c
index 6505490cdc04e212c54aadd3d8eb58d569eb523e..be440e69492b020b4281f83cc94c12fe29c9c38f 100644 (file)
@@ -3425,6 +3425,9 @@ static enum open_packet_status open_packet(struct lttng_consumer_stream *stream)
        status = produced_pos_before != produced_pos_after ?
                        OPEN_PACKET_STATUS_OPENED :
                        OPEN_PACKET_STATUS_NO_SPACE;
+       if (status == OPEN_PACKET_STATUS_OPENED) {
+               stream->opened_packet_in_current_trace_chunk = true;
+       }
 end:
        return status;
 }
@@ -3565,14 +3568,12 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        ", channel name = %s, session id = %" PRIu64,
                                        stream->key, stream->chan->name,
                                        stream->chan->session_id);
-                       stream->opened_packet_in_current_trace_chunk =
-                                       true;
                        break;
                case OPEN_PACKET_STATUS_NO_SPACE:
                        /*
                         * Can't open a packet as there is no space left.
                         * This means that new events were produced, resulting
-                        * in a packet being opened, which is what we want
+                        * in a packet being opened, which is what we wanted
                         * anyhow.
                         */
                        DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
@@ -3588,8 +3589,6 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                default:
                        abort();
                }
-
-               stream->opened_packet_in_current_trace_chunk = true;
        }
 
 sleep_stream:
@@ -4294,8 +4293,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                                ", channel name = %s, session id = %" PRIu64,
                                                stream->key, stream->chan->name,
                                                stream->chan->session_id);
-                               stream->opened_packet_in_current_trace_chunk =
-                                               true;
                                break;
                        case OPEN_PACKET_STATUS_NO_SPACE:
                                /*
@@ -5172,3 +5169,70 @@ int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
 end:
        return ret;
 }
+
+enum lttcomm_return_code lttng_consumer_open_channel_packets(
+               struct lttng_consumer_channel *channel)
+{
+       struct lttng_consumer_stream *stream;
+       enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
+
+       if (channel->metadata_stream) {
+               ERR("Open channel packets command attempted on a metadata channel");
+               ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+               goto end;
+       }
+
+       rcu_read_lock();
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               enum open_packet_status status;
+
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+
+               status = open_packet(stream);
+               switch (status) {
+               case OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case OPEN_PACKET_STATUS_NO_SPACE:
+                       DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case OPEN_PACKET_STATUS_ERROR:
+                       /*
+                        * Only unexpected internal errors can lead to this
+                        * failing. Report an unknown error.
+                        */
+                       ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel id = %" PRIu64
+                                       ", channel name = %s"
+                                       ", session id = %" PRIu64,
+                                       stream->key, channel->key,
+                                       channel->name, channel->session_id);
+                       ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+                       goto error_unlock;
+               default:
+                       abort();
+               }
+
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+end_rcu_unlock:
+       rcu_read_unlock();
+end:
+       return ret;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       goto end_rcu_unlock;
+}
This page took 0.027792 seconds and 4 git commands to generate.