X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=5a716859a386236fd480fbf1bb133b96a52012e7;hb=6d805429e9cb049eb0c9205fcf742a53e3166caf;hp=11706877a7f5b3f147034abcf106a0bb5e77c5d2;hpb=e316aad5fbbe3782872083cb68dfdd58bccea811;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 11706877a..5a716859a 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -171,7 +171,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_STREAM: { struct lttng_consumer_stream *new_stream; - int fds[2]; + int fds[2], stream_pipe; size_t nb_fd = 2; struct consumer_relayd_sock_pair *relayd = NULL; int alloc_ret = 0; @@ -205,6 +205,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.gid, msg.u.stream.net_index, msg.u.stream.metadata_flag, + msg.u.stream.session_id, &alloc_ret); if (new_stream == NULL) { switch (alloc_ret) { @@ -253,32 +254,25 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } - /* Send stream to the metadata thread */ + /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - do { - ret = write(ctx->consumer_metadata_pipe[1], &new_stream, - sizeof(new_stream)); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write metadata pipe"); - consumer_del_metadata_stream(new_stream, NULL); - goto end_nosignal; - } + stream_pipe = ctx->consumer_metadata_pipe[1]; } else { - ret = consumer_add_stream(new_stream); - if (ret) { - ERR("Consumer add stream %d failed. Continuing", - new_stream->key); - /* - * At this point, if the add_stream fails, it is not in the - * hash table thus passing the NULL value here. - */ - consumer_del_stream(new_stream, NULL); - goto end_nosignal; - } + stream_pipe = ctx->consumer_data_pipe[1]; } - DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64, + do { + ret = write(stream_pipe, &new_stream, sizeof(new_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("Consumer write %s stream to pipe %d", + new_stream->metadata_flag ? "metadata" : "data", + stream_pipe); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; + } + + DBG("UST consumer ADD_STREAM %s (%d,%d) with relayd id %" PRIu64, msg.u.stream.path_name, fds[0], fds[1], new_stream->relayd_stream_id); break; @@ -315,39 +309,27 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { rcu_read_unlock(); return -ENOSYS; -#if 0 - if (ctx->on_update_stream != NULL) { - ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); - if (ret == 0) { - consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); - } else if (ret < 0) { - goto end; - } - } else { - consumer_change_stream_state(msg.u.stream.stream_key, - msg.u.stream.state); + } + case LTTNG_CONSUMER_DATA_PENDING: + { + int32_t ret; + uint64_t id = msg.u.data_pending.session_id; + + DBG("UST consumer data pending command for id %" PRIu64, id); + + ret = consumer_data_pending(id); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send data pending ret code"); } break; -#endif } default: break; } - /* - * Wake-up the other end by writing a null byte in the pipe (non-blocking). - * Important note: Because writing into the pipe is non-blocking (and - * therefore we allow dropping wakeup data, as long as there is wakeup data - * present in the pipe buffer to wake up the other end), the other end - * should perform the following sequence for waiting: - * - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). - */ - do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); - } while (ret < 0 && errno == EINTR); end_nosignal: rcu_read_unlock(); @@ -528,9 +510,48 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) stream->out_fd = ret; } + ret = lttng_ustconsumer_add_stream(stream); + if (ret) { + consumer_del_stream(stream, NULL); + ret = -1; + goto error; + } + /* we return 0 to let the library handle the FD internally */ return 0; error: return ret; } + +/* + * Check if data is still being extracted from the buffers for a specific + * stream. Consumer data lock MUST be acquired before calling this function + * and the stream lock. + * + * Return 1 if the traced data are still getting read else 0 meaning that the + * data is available for trace viewer reading. + */ +int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + + DBG("UST consumer checking data pending"); + + ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf); + if (ret == 0) { + /* There is still data so let's put back this subbuffer. */ + ret = ustctl_put_subbuf(stream->chan->handle, stream->buf); + assert(ret == 0); + ret = 1; /* Data is pending */ + goto end; + } + + /* Data is NOT pending so ready to be read. */ + ret = 0; + +end: + return ret; +}