case LTTNG_CONSUMER_ADD_STREAM:
{
struct lttng_consumer_stream *new_stream;
- int fds[2];
+ int fds[2], stream_pipe;
size_t nb_fd = 2;
struct consumer_relayd_sock_pair *relayd = NULL;
int alloc_ret = 0;
msg.u.stream.gid,
msg.u.stream.net_index,
msg.u.stream.metadata_flag,
+ msg.u.stream.session_id,
&alloc_ret);
if (new_stream == NULL) {
switch (alloc_ret) {
}
}
- /* Send stream to the metadata thread */
+ /* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
- do {
- ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
- sizeof(new_stream));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write metadata pipe");
- consumer_del_metadata_stream(new_stream, NULL);
- goto end_nosignal;
- }
+ stream_pipe = ctx->consumer_metadata_pipe[1];
} else {
- ret = consumer_add_stream(new_stream);
- if (ret) {
- ERR("Consumer add stream %d failed. Continuing",
- new_stream->key);
- /*
- * At this point, if the add_stream fails, it is not in the
- * hash table thus passing the NULL value here.
- */
- consumer_del_stream(new_stream, NULL);
- goto end_nosignal;
- }
+ stream_pipe = ctx->consumer_data_pipe[1];
}
- DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64,
+ do {
+ ret = write(stream_pipe, &new_stream, sizeof(new_stream));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("Consumer write %s stream to pipe %d",
+ new_stream->metadata_flag ? "metadata" : "data",
+ stream_pipe);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
+ }
+
+ DBG("UST consumer ADD_STREAM %s (%d,%d) with relayd id %" PRIu64,
msg.u.stream.path_name, fds[0], fds[1],
new_stream->relayd_stream_id);
break;
{
rcu_read_unlock();
return -ENOSYS;
-#if 0
- if (ctx->on_update_stream != NULL) {
- ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
- if (ret == 0) {
- consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
- } else if (ret < 0) {
- goto end;
- }
- } else {
- consumer_change_stream_state(msg.u.stream.stream_key,
- msg.u.stream.state);
+ }
+ case LTTNG_CONSUMER_DATA_PENDING:
+ {
+ int32_t ret;
+ uint64_t id = msg.u.data_pending.session_id;
+
+ DBG("UST consumer data pending command for id %" PRIu64, id);
+
+ ret = consumer_data_pending(id);
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send data pending ret code");
}
break;
-#endif
}
default:
break;
}
- /*
- * Wake-up the other end by writing a null byte in the pipe (non-blocking).
- * Important note: Because writing into the pipe is non-blocking (and
- * therefore we allow dropping wakeup data, as long as there is wakeup data
- * present in the pipe buffer to wake up the other end), the other end
- * should perform the following sequence for waiting:
- *
- * 1) empty the pipe (reads).
- * 2) perform update operation.
- * 3) wait on the pipe (poll).
- */
- do {
- ret = write(ctx->consumer_poll_pipe[1], "", 1);
- } while (ret < 0 && errno == EINTR);
end_nosignal:
rcu_read_unlock();
stream->out_fd = ret;
}
+ ret = lttng_ustconsumer_add_stream(stream);
+ if (ret) {
+ consumer_del_stream(stream, NULL);
+ ret = -1;
+ goto error;
+ }
+
/* we return 0 to let the library handle the FD internally */
return 0;
error:
return ret;
}
+
+/*
+ * Check if data is still being extracted from the buffers for a specific
+ * stream. Consumer data lock MUST be acquired before calling this function
+ * and the stream lock.
+ *
+ * Return 1 if the traced data are still getting read else 0 meaning that the
+ * data is available for trace viewer reading.
+ */
+int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+
+ DBG("UST consumer checking data pending");
+
+ ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf);
+ if (ret == 0) {
+ /* There is still data so let's put back this subbuffer. */
+ ret = ustctl_put_subbuf(stream->chan->handle, stream->buf);
+ assert(ret == 0);
+ ret = 1; /* Data is pending */
+ goto end;
+ }
+
+ /* Data is NOT pending so ready to be read. */
+ ret = 0;
+
+end:
+ return ret;
+}