X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fust-consumer.c;h=ffa6429375b3d7c60d67f6545ce5c4a8e7d8d4e5;hp=b7e0d359261dbdec4a686a893dbb3a0416ea04ac;hb=e7fe706f887aa4d753b102a610f802f7dd816655;hpb=173af62f4804133d4a7f45e34b6f72126f3eca5f diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index b7e0d3592..ffa642937 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -32,15 +32,17 @@ /* * Send a single channel to the consumer using command ADD_CHANNEL. */ -static int send_channel(int sock, struct ust_app_channel *uchan) +static int send_channel(struct consumer_socket *sock, + struct ust_app_channel *uchan) { int ret, fd; struct lttcomm_consumer_msg msg; /* Safety net */ assert(uchan); + assert(sock); - if (sock < 0) { + if (sock->fd < 0) { ret = -EINVAL; goto error; } @@ -52,19 +54,26 @@ static int send_channel(int sock, struct ust_app_channel *uchan) uchan->obj->shm_fd, uchan->attr.subbuf_size, uchan->obj->memory_map_size, - uchan->name); + uchan->name, + uchan->streams.count); + + health_code_update(&health_thread_cmd); ret = consumer_send_channel(sock, &msg); if (ret < 0) { goto error; } + health_code_update(&health_thread_cmd); + fd = uchan->obj->shm_fd; ret = consumer_send_fds(sock, &fd, 1); if (ret < 0) { goto error; } + health_code_update(&health_thread_cmd); + error: return ret; } @@ -72,9 +81,10 @@ error: /* * Send a single stream to the consumer using ADD_STREAM command. */ -static int send_channel_stream(int sock, struct ust_app_channel *uchan, - struct ust_app_session *usess, struct ltt_ust_stream *stream, - struct consumer_output *consumer, const char *pathname) +static int send_channel_stream(struct consumer_socket *sock, + struct ust_app_channel *uchan, struct ust_app_session *usess, + struct ust_app_stream *stream, struct consumer_output *consumer, + const char *pathname) { int ret, fds[2]; struct lttcomm_consumer_msg msg; @@ -84,6 +94,7 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan, assert(usess); assert(stream); assert(consumer); + assert(sock); DBG2("Sending stream %d of channel %s to kernel consumer", stream->obj->shm_fd, uchan->name); @@ -100,7 +111,10 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan, consumer->net_seq_index, 0, /* Metadata flag unset */ stream->name, - pathname); + pathname, + usess->id); + + health_code_update(&health_thread_cmd); /* Send stream and file descriptor */ fds[0] = stream->obj->shm_fd; @@ -110,6 +124,8 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan, goto error; } + health_code_update(&health_thread_cmd); + error: return ret; } @@ -117,14 +133,16 @@ error: /* * Send all stream fds of UST channel to the consumer. */ -static int send_channel_streams(int sock, +static int send_channel_streams(struct consumer_socket *sock, struct ust_app_channel *uchan, struct ust_app_session *usess, struct consumer_output *consumer) { int ret; char tmp_path[PATH_MAX]; const char *pathname; - struct ltt_ust_stream *stream, *tmp; + struct ust_app_stream *stream, *tmp; + + assert(sock); DBG("Sending streams of channel %s to UST consumer", uchan->name); @@ -136,8 +154,8 @@ static int send_channel_streams(int sock, /* Get the right path name destination */ if (consumer->type == CONSUMER_DST_LOCAL) { /* Set application path to the destination path */ - ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - consumer->dst.trace_path, usess->path); + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s", + consumer->dst.trace_path, consumer->subdir, usess->path); if (ret < 0) { PERROR("snprintf stream path"); goto error; @@ -178,8 +196,8 @@ error: /* * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM. */ -static int send_metadata(int sock, struct ust_app_session *usess, - struct consumer_output *consumer) +static int send_metadata(struct consumer_socket *sock, + struct ust_app_session *usess, struct consumer_output *consumer) { int ret, fd, fds[2]; char tmp_path[PATH_MAX]; @@ -189,9 +207,10 @@ static int send_metadata(int sock, struct ust_app_session *usess, /* Safety net */ assert(usess); assert(consumer); + assert(sock); - if (sock < 0) { - ERR("Consumer socket is negative (%d)", sock); + if (sock->fd < 0) { + ERR("Consumer socket is negative (%d)", sock->fd); return -EINVAL; } @@ -208,13 +227,18 @@ static int send_metadata(int sock, struct ust_app_session *usess, usess->metadata->obj->shm_fd, usess->metadata->attr.subbuf_size, usess->metadata->obj->memory_map_size, - "metadata"); + "metadata", + 1); + + health_code_update(&health_thread_cmd); ret = consumer_send_channel(sock, &msg); if (ret < 0) { goto error; } + health_code_update(&health_thread_cmd); + /* Sending metadata shared memory fd */ fd = usess->metadata->obj->shm_fd; ret = consumer_send_fds(sock, &fd, 1); @@ -222,11 +246,13 @@ static int send_metadata(int sock, struct ust_app_session *usess, goto error; } + health_code_update(&health_thread_cmd); + /* Get correct path name destination */ if (consumer->type == CONSUMER_DST_LOCAL) { /* Set application path to the destination path */ - ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - consumer->dst.trace_path, usess->path); + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s", + consumer->dst.trace_path, consumer->subdir, usess->path); if (ret < 0) { PERROR("snprintf stream path"); goto error; @@ -264,7 +290,10 @@ static int send_metadata(int sock, struct ust_app_session *usess, consumer->net_seq_index, 1, /* Flag metadata set */ "metadata", - pathname); + pathname, + usess->id); + + health_code_update(&health_thread_cmd); /* Send stream and file descriptor */ fds[0] = usess->metadata->stream_obj->shm_fd; @@ -274,6 +303,8 @@ static int send_metadata(int sock, struct ust_app_session *usess, goto error; } + health_code_update(&health_thread_cmd); + error: return ret; } @@ -289,15 +320,19 @@ int ust_consumer_send_session(struct ust_app_session *usess, struct ust_app_channel *ua_chan; assert(usess); - assert(consumer); - assert(sock); + + if (consumer == NULL || sock == NULL) { + /* There is no consumer so just ignoring the command. */ + DBG("UST consumer does not exist. Not sending streams"); + return 0; + } DBG("Sending metadata stream fd to consumer on %d", sock->fd); pthread_mutex_lock(sock->lock); /* Sending metadata information to the consumer */ - ret = send_metadata(sock->fd, usess, consumer); + ret = send_metadata(sock, usess, consumer); if (ret < 0) { goto error; } @@ -314,7 +349,7 @@ int ust_consumer_send_session(struct ust_app_session *usess, continue; } - ret = send_channel_streams(sock->fd, ua_chan, usess, consumer); + ret = send_channel_streams(sock, ua_chan, usess, consumer); if (ret < 0) { rcu_read_unlock(); goto error;