X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=a8c5fb871d9521ba7853ae1019e0ce9e9571c4a7;hp=6eb84d6aee403318df005dad3393c7c24f9e19b7;hb=bfc6eff048df5515de3aedb7e2eb52d41ba64d22;hpb=5e280d7780be0616d07b9523fc14ec48e2d3caad diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 6eb84d6ae..a8c5fb871 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -15,7 +15,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -564,6 +563,8 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) output->net_seq_index = obj->net_seq_index; memcpy(output->subdir, obj->subdir, PATH_MAX); output->snapshot = obj->snapshot; + output->relay_major_version = obj->relay_major_version; + output->relay_minor_version = obj->relay_minor_version; memcpy(&output->dst, &obj->dst, sizeof(output->dst)); ret = consumer_copy_sockets(output, obj); if (ret < 0) { @@ -714,7 +715,10 @@ int consumer_set_network_uri(struct consumer_output *obj, goto error; } - strncpy(obj->subdir, tmp_path, sizeof(obj->subdir)); + if (lttng_strncpy(obj->subdir, tmp_path, sizeof(obj->subdir))) { + ret = -LTTNG_ERR_INVALID; + goto error; + } DBG3("Consumer set network uri subdir path %s", tmp_path); } @@ -1258,7 +1262,7 @@ end: */ int consumer_push_metadata(struct consumer_socket *socket, uint64_t metadata_key, char *metadata_str, size_t len, - size_t target_offset) + size_t target_offset, uint64_t version) { int ret; struct lttcomm_consumer_msg msg; @@ -1274,6 +1278,7 @@ int consumer_push_metadata(struct consumer_socket *socket, msg.u.push_metadata.key = metadata_key; msg.u.push_metadata.target_offset = target_offset; msg.u.push_metadata.len = len; + msg.u.push_metadata.version = version; health_code_update(); ret = consumer_send_msg(socket, &msg); @@ -1353,7 +1358,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname, S_IRWXU | S_IRWXG, uid, gid); if (ret < 0) { - if (ret != -EEXIST) { + if (errno != EEXIST) { ERR("Trace directory creation error"); goto error; } @@ -1370,3 +1375,117 @@ error: health_code_update(); return ret; } + +/* + * Ask the consumer the number of discarded events for a channel. + */ +int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *discarded) +{ + int ret; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + DBG3("Consumer discarded events id %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS; + msg.u.discarded_events.session_id = session_id; + msg.u.discarded_events.channel_key = channel_key; + + *discarded = 0; + + /* Send command for each consumer */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + uint64_t consumer_discarded = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_discarded, + sizeof(consumer_discarded)); + if (ret < 0) { + ERR("get discarded events"); + pthread_mutex_unlock(socket->lock); + goto end; + } + pthread_mutex_unlock(socket->lock); + *discarded += consumer_discarded; + } + ret = 0; + DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, + *discarded, session_id); + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Ask the consumer the number of lost packets for a channel. + */ +int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *lost) +{ + int ret; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + DBG3("Consumer lost packets id %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS; + msg.u.lost_packets.session_id = session_id; + msg.u.lost_packets.channel_key = channel_key; + + *lost = 0; + + /* Send command for each consumer */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + uint64_t consumer_lost = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_lost, + sizeof(consumer_lost)); + if (ret < 0) { + ERR("get lost packets"); + pthread_mutex_unlock(socket->lock); + goto end; + } + pthread_mutex_unlock(socket->lock); + *lost += consumer_lost; + } + ret = 0; + DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, + *lost, session_id); + +end: + rcu_read_unlock(); + return ret; +}