X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=bd9048bf517d54b9ccb8148310d82b5ee21bc9b1;hb=211b734b85bc6f587dd768bd1528d07a15030ec2;hp=fcda59363dfb23019aa793dd244c400c044296de;hpb=fb83fe64f250bec7416f18891a8264450c61ead3;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index fcda59363..bd9048bf5 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -563,6 +563,8 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) output->net_seq_index = obj->net_seq_index; memcpy(output->subdir, obj->subdir, PATH_MAX); output->snapshot = obj->snapshot; + output->relay_major_version = obj->relay_major_version; + output->relay_minor_version = obj->relay_minor_version; memcpy(&output->dst, &obj->dst, sizeof(output->dst)); ret = consumer_copy_sockets(output, obj); if (ret < 0) { @@ -713,7 +715,10 @@ int consumer_set_network_uri(struct consumer_output *obj, goto error; } - strncpy(obj->subdir, tmp_path, sizeof(obj->subdir)); + if (lttng_strncpy(obj->subdir, tmp_path, sizeof(obj->subdir))) { + ret = -LTTNG_ERR_INVALID; + goto error; + } DBG3("Consumer set network uri subdir path %s", tmp_path); } @@ -743,7 +748,6 @@ int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) } ret = consumer_recv_status_reply(sock); - error: return ret; } @@ -758,6 +762,7 @@ int consumer_send_msg(struct consumer_socket *sock, assert(msg); assert(sock); + assert(pthread_mutex_trylock(sock->lock) == EBUSY); ret = consumer_socket_send(sock, msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { @@ -801,6 +806,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int switch_timer_interval, unsigned int read_timer_interval, unsigned int live_timer_interval, + unsigned int monitor_timer_interval, int output, int type, uint64_t session_id, @@ -817,6 +823,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t session_id_per_pid, unsigned int monitor, uint32_t ust_app_uid, + int64_t blocking_timeout, const char *root_shm_path, const char *shm_path) { @@ -832,6 +839,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.switch_timer_interval = switch_timer_interval; msg->u.ask_channel.read_timer_interval = read_timer_interval; msg->u.ask_channel.live_timer_interval = live_timer_interval; + msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval; msg->u.ask_channel.output = output; msg->u.ask_channel.type = type; msg->u.ask_channel.session_id = session_id; @@ -845,6 +853,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.tracefile_count = tracefile_count; msg->u.ask_channel.monitor = monitor; msg->u.ask_channel.ust_app_uid = ust_app_uid; + msg->u.ask_channel.blocking_timeout = blocking_timeout; memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid)); @@ -887,7 +896,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_size, uint64_t tracefile_count, unsigned int monitor, - unsigned int live_timer_interval) + unsigned int live_timer_interval, + unsigned int monitor_timer_interval) { assert(msg); @@ -908,6 +918,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.tracefile_count = tracefile_count; msg->u.channel.monitor = monitor; msg->u.channel.live_timer_interval = live_timer_interval; + msg->u.channel.monitor_timer_interval = monitor_timer_interval; strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname)); @@ -1043,6 +1054,35 @@ error: return ret; } +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, + int pipe) +{ + int ret; + struct lttcomm_consumer_msg msg; + + /* Code flow error. Safety net. */ + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE; + + DBG3("Sending set_channel_monitor_pipe command to consumer"); + ret = consumer_send_msg(consumer_sock, &msg); + if (ret < 0) { + goto error; + } + + DBG3("Sending channel monitoring pipe %d to consumer on socket %d", + pipe, *consumer_sock->fd_ptr); + ret = consumer_send_fds(consumer_sock, &pipe, 1); + if (ret < 0) { + goto error; + } + + DBG2("Channel monitoring pipe successfully sent"); +error: + return ret; +} + /* * Set consumer subdirectory using the session name and a generated datetime if * needed. This is appended to the current subdirectory. @@ -1084,7 +1124,11 @@ int consumer_set_subdir(struct consumer_output *consumer, goto error; } - strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir)); + if (lttng_strncpy(consumer->subdir, tmp_path, + sizeof(consumer->subdir))) { + ret = -EINVAL; + goto error; + } DBG2("Consumer subdir set to %s", consumer->subdir); error: @@ -1182,6 +1226,38 @@ end: return ret; } +/* + * Send a clear quiescent command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + + DBG2("Consumer clear quiescent channel key %" PRIu64, key); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL; + msg.u.clear_quiescent_channel.key = key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto end; + } + +end: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +} + /* * Send a close metadata command to consumer using the given channel key. * Called with registry lock held. @@ -1257,7 +1333,7 @@ end: */ int consumer_push_metadata(struct consumer_socket *socket, uint64_t metadata_key, char *metadata_str, size_t len, - size_t target_offset) + size_t target_offset, uint64_t version) { int ret; struct lttcomm_consumer_msg msg; @@ -1273,6 +1349,7 @@ int consumer_push_metadata(struct consumer_socket *socket, msg.u.push_metadata.key = metadata_key; msg.u.push_metadata.target_offset = target_offset; msg.u.push_metadata.len = len; + msg.u.push_metadata.version = version; health_code_update(); ret = consumer_send_msg(socket, &msg);