health_code_update();
break;
}
+ case LTTNG_CONSUMER_DISCARDED_EVENTS:
+ {
+ uint64_t ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ uint64_t id = msg.u.discarded_events.session_id;
+ uint64_t key = msg.u.discarded_events.channel_key;
+
+ DBG("UST consumer discarded events command for session id %"
+ PRIu64, id);
+ rcu_read_lock();
+ pthread_mutex_lock(&consumer_data.lock);
+
+ ht = consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no events are dropped if the channel is not yet in
+ * use).
+ */
+ ret = 0;
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
+ if (stream->chan->key == key) {
+ ret = stream->chan->discarded_events;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+
+ DBG("UST consumer discarded events command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send discarded events");
+ goto error_fatal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_LOST_PACKETS:
+ {
+ uint64_t ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ uint64_t id = msg.u.lost_packets.session_id;
+ uint64_t key = msg.u.lost_packets.channel_key;
+
+ DBG("UST consumer lost packets command for session id %"
+ PRIu64, id);
+ rcu_read_lock();
+ pthread_mutex_lock(&consumer_data.lock);
+
+ ht = consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no packets lost if the channel is not yet in use).
+ */
+ ret = 0;
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
+ if (stream->chan->key == key) {
+ ret = stream->chan->lost_packets;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+
+ DBG("UST consumer lost packets command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send lost packets");
+ goto error_fatal;
+ }
+
+ break;
+ }
default:
break;
}
return ustctl_get_current_timestamp(stream->ustream, ts);
}
+int lttng_ustconsumer_get_sequence_number(
+ struct lttng_consumer_stream *stream, uint64_t *seq)
+{
+ assert(stream);
+ assert(stream->ustream);
+ assert(seq);
+
+ return ustctl_get_sequence_number(stream->ustream, seq);
+}
+
/*
* Called when the stream signal the consumer that it has hang up.
*/
return ret;
}
+static
+int update_stream_stats(struct lttng_consumer_stream *stream)
+{
+ int ret;
+ uint64_t seq, discarded;
+
+ ret = ustctl_get_sequence_number(stream->ustream, &seq);
+ if (ret < 0) {
+ PERROR("ustctl_get_sequence_number");
+ goto end;
+ }
+ /*
+ * Start the sequence when we extract the first packet in case we don't
+ * start at 0 (for example if a consumer is not connected to the
+ * session immediately after the beginning).
+ */
+ if (stream->last_sequence_number == -1ULL) {
+ stream->last_sequence_number = seq;
+ } else if (seq > stream->last_sequence_number) {
+ stream->chan->lost_packets += seq -
+ stream->last_sequence_number - 1;
+ } else {
+ /* seq <= last_sequence_number */
+ ERR("Sequence number inconsistent : prev = %" PRIu64
+ ", current = %" PRIu64,
+ stream->last_sequence_number, seq);
+ ret = -1;
+ goto end;
+ }
+ stream->last_sequence_number = seq;
+
+ ret = ustctl_get_events_discarded(stream->ustream, &discarded);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ goto end;
+ }
+ if (discarded < stream->last_discarded_events) {
+ /*
+ * Overflow has occured. We assume only one wrap-around
+ * has occured.
+ */
+ stream->chan->discarded_events +=
+ (1ULL << (CAA_BITS_PER_LONG - 1)) -
+ stream->last_discarded_events + discarded;
+ } else {
+ stream->chan->discarded_events += discarded -
+ stream->last_discarded_events;
+ }
+ stream->last_discarded_events = discarded;
+ ret = 0;
+
+end:
+ return ret;
+}
+
/*
* Read subbuffer from the given stream.
*
if (ret < 0) {
goto end;
}
+
+ /* Update the stream's sequence and discarded events count. */
+ ret = update_stream_stats(stream);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ goto end;
+ }
} else {
write_index = 0;
}