X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=650d002cdda75ab6cbcb03b1676ed92505552818;hp=ce3e5da9adda44d3481c4bb07785a30f76ac14f5;hb=53e367f936beb2f9a1f49f6a2920c2f58bcb08d7;hpb=53efb85a242809ed5ed21e9ab40effa696ecbc6f diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index ce3e5da9a..650d002cd 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -15,7 +15,7 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -472,6 +472,7 @@ struct consumer_output *consumer_create_output(enum consumer_dst_type type) output->enabled = 1; output->type = type; output->net_seq_index = (uint64_t) -1ULL; + urcu_ref_init(&output->ref); output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); @@ -506,11 +507,10 @@ void consumer_destroy_output_sockets(struct consumer_output *obj) * * Should *NOT* be called with RCU read-side lock held. */ -void consumer_destroy_output(struct consumer_output *obj) +static void consumer_release_output(struct urcu_ref *ref) { - if (obj == NULL) { - return; - } + struct consumer_output *obj = + caa_container_of(ref, struct consumer_output, ref); consumer_destroy_output_sockets(obj); @@ -522,6 +522,27 @@ void consumer_destroy_output(struct consumer_output *obj) free(obj); } +/* + * Get the consumer_output object. + */ +void consumer_output_get(struct consumer_output *obj) +{ + urcu_ref_get(&obj->ref); +} + +/* + * Put the consumer_output object. + * + * Should *NOT* be called with RCU read-side lock held. + */ +void consumer_output_put(struct consumer_output *obj) +{ + if (!obj) { + return; + } + urcu_ref_put(&obj->ref, consumer_release_output); +} + /* * Copy consumer output and returned the newly allocated copy. * @@ -530,33 +551,28 @@ void consumer_destroy_output(struct consumer_output *obj) struct consumer_output *consumer_copy_output(struct consumer_output *obj) { int ret; - struct lttng_ht *tmp_ht_ptr; struct consumer_output *output; assert(obj); output = consumer_create_output(obj->type); if (output == NULL) { - goto error; + goto end; } - /* Avoid losing the HT reference after the memcpy() */ - tmp_ht_ptr = output->socks; - - memcpy(output, obj, sizeof(struct consumer_output)); - - /* Putting back the HT pointer and start copying socket(s). */ - output->socks = tmp_ht_ptr; - + output->enabled = obj->enabled; + output->net_seq_index = obj->net_seq_index; + memcpy(output->subdir, obj->subdir, PATH_MAX); + output->snapshot = obj->snapshot; + memcpy(&output->dst, &obj->dst, sizeof(output->dst)); ret = consumer_copy_sockets(output, obj); if (ret < 0) { - goto malloc_error; + goto error_put; } - -error: +end: return output; -malloc_error: - consumer_destroy_output(output); +error_put: + consumer_output_put(output); return NULL; } @@ -800,7 +816,9 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_count, uint64_t session_id_per_pid, unsigned int monitor, - uint32_t ust_app_uid) + uint32_t ust_app_uid, + const char *root_shm_path, + const char *shm_path) { assert(msg); @@ -838,6 +856,17 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name)); msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0'; + + if (root_shm_path) { + strncpy(msg->u.ask_channel.root_shm_path, root_shm_path, + sizeof(msg->u.ask_channel.root_shm_path)); + msg->u.ask_channel.root_shm_path[sizeof(msg->u.ask_channel.root_shm_path) - 1] = '\0'; + } + if (shm_path) { + strncpy(msg->u.ask_channel.shm_path, shm_path, + sizeof(msg->u.ask_channel.shm_path)); + msg->u.ask_channel.shm_path[sizeof(msg->u.ask_channel.shm_path) - 1] = '\0'; + } } /* @@ -1063,11 +1092,8 @@ error: } /* - * Ask the consumer if the data is ready to read (NOT pending) for the specific - * session id. - * - * This function has a different behavior with the consumer i.e. that it waits - * for a reply from the consumer if yes or no the data is pending. + * Ask the consumer if the data is pending for the specific session id. + * Returns 1 if data is pending, 0 otherwise, or < 0 on error. */ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer) @@ -1157,7 +1183,8 @@ end: } /* - * Send a close metdata command to consumer using the given channel key. + * Send a close metadata command to consumer using the given channel key. + * Called with registry lock held. * * Return 0 on success else a negative value. */ @@ -1223,7 +1250,8 @@ end: } /* - * Send metadata string to consumer. Socket lock MUST be acquired. + * Send metadata string to consumer. + * RCU read-side lock must be held to guarantee existence of socket. * * Return 0 on success else a negative value. */ @@ -1238,6 +1266,8 @@ int consumer_push_metadata(struct consumer_socket *socket, DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr); + pthread_mutex_lock(socket->lock); + memset(&msg, 0, sizeof(msg)); msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA; msg.u.push_metadata.key = metadata_key; @@ -1265,6 +1295,7 @@ int consumer_push_metadata(struct consumer_socket *socket, } end: + pthread_mutex_unlock(socket->lock); health_code_update(); return ret; } @@ -1276,7 +1307,7 @@ end: */ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, struct snapshot_output *output, int metadata, uid_t uid, gid_t gid, - const char *session_path, int wait, int max_stream_size) + const char *session_path, int wait, uint64_t nb_packets_per_stream) { int ret; struct lttcomm_consumer_msg msg; @@ -1290,7 +1321,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, memset(&msg, 0, sizeof(msg)); msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL; msg.u.snapshot_channel.key = key; - msg.u.snapshot_channel.max_stream_size = max_stream_size; + msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream; msg.u.snapshot_channel.metadata = metadata; if (output->consumer->type == CONSUMER_DST_NET) { @@ -1321,7 +1352,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname, S_IRWXU | S_IRWXG, uid, gid); if (ret < 0) { - if (ret != -EEXIST) { + if (errno != EEXIST) { ERR("Trace directory creation error"); goto error; }