*
* Returns 0 on success, < 0 on error
*/
-int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
{
int ret = 0;
int infd = stream->wait_fd;
*
* Returns 0 on success, < 0 on error
*/
-int lttng_kconsumer_get_produced_snapshot(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream,
+int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
unsigned long *pos)
{
int ret;
int sock, struct pollfd *consumer_sockpoll)
{
ssize_t ret;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct lttcomm_consumer_msg msg;
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
return ret;
}
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
+ /*
+ * Notify the session daemon that the command is completed.
+ *
+ * On transport layer error, the function call will print an error
+ * message so handling the returned code is a bit useless since we
+ * return an error code anyway.
+ */
+ (void) consumer_send_status_msg(sock, ret_code);
return -ENOENT;
}
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
+ /* Session daemon status message are handled in the following call. */
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
- &msg.u.relayd_sock.sock);
+ &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
{
struct lttng_consumer_channel *new_channel;
- DBG("consumer_add_channel %d", msg.u.channel.channel_key);
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
- -1, -1,
- msg.u.channel.mmap_len,
- msg.u.channel.max_sb_size,
- msg.u.channel.nb_init_streams);
+ msg.u.channel.session_id, msg.u.channel.pathname,
+ msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
+ msg.u.channel.relayd_id, msg.u.channel.output);
if (new_channel == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
}
+ new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
+
+ /* Translate and save channel type. */
+ switch (msg.u.channel.type) {
+ case CONSUMER_CHANNEL_TYPE_DATA:
+ case CONSUMER_CHANNEL_TYPE_METADATA:
+ new_channel->type = msg.u.channel.type;
+ break;
+ default:
+ assert(0);
+ goto end_nosignal;
+ };
+
if (ctx->on_recv_channel != NULL) {
ret = ctx->on_recv_channel(new_channel);
if (ret == 0) {
- consumer_add_channel(new_channel);
+ consumer_add_channel(new_channel, ctx);
} else if (ret < 0) {
goto end_nosignal;
}
} else {
- consumer_add_channel(new_channel);
+ consumer_add_channel(new_channel, ctx);
}
goto end_nosignal;
}
int fd, stream_pipe;
struct consumer_relayd_sock_pair *relayd = NULL;
struct lttng_consumer_stream *new_stream;
+ struct lttng_consumer_channel *channel;
int alloc_ret = 0;
+ /*
+ * Get stream's channel reference. Needed when adding the stream to the
+ * global hash table.
+ */
+ channel = consumer_find_channel(msg.u.stream.channel_key);
+ if (!channel) {
+ /*
+ * We could not find the channel. Can happen if cpu hotplug
+ * happens while tearing down.
+ */
+ ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
+ ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
+ }
+
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0 || ret_code != LTTNG_OK) {
+ /*
+ * Somehow, the session daemon is not responding anymore or the
+ * channel was not found.
+ */
+ goto end_nosignal;
+ }
+
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
rcu_read_unlock();
return ret;
}
- new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
- msg.u.stream.stream_key,
- fd, fd,
- msg.u.stream.state,
- msg.u.stream.mmap_len,
- msg.u.stream.output,
- msg.u.stream.path_name,
- msg.u.stream.uid,
- msg.u.stream.gid,
- msg.u.stream.net_index,
- msg.u.stream.metadata_flag,
- msg.u.stream.session_id,
- &alloc_ret);
+ /*
+ * Send status code to session daemon only if the recv works. If the
+ * above recv() failed, the session daemon is notified through the
+ * error socket and the teardown is eventually done.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ new_stream = consumer_allocate_stream(channel->key,
+ fd,
+ LTTNG_CONSUMER_ACTIVE_STREAM,
+ channel->name,
+ channel->uid,
+ channel->gid,
+ channel->relayd_id,
+ channel->session_id,
+ msg.u.stream.cpu,
+ &alloc_ret,
+ channel->type);
if (new_stream == NULL) {
switch (alloc_ret) {
case -ENOMEM:
default:
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
break;
- case -ENOENT:
- /*
- * We could not find the channel. Can happen if cpu hotplug
- * happens while tearing down.
- */
- DBG3("Could not find channel");
- break;
}
goto end_nosignal;
}
+ new_stream->chan = channel;
+ new_stream->wait_fd = fd;
/*
* The buffer flush is done on the session daemon side for the kernel
new_stream->hangup_flush_done = 0;
/* The stream is not metadata. Get relayd reference if exists. */
- relayd = consumer_find_relayd(msg.u.stream.net_index);
+ relayd = consumer_find_relayd(new_stream->net_seq_idx);
if (relayd != NULL) {
/* Add stream on the relayd */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_add_stream(&relayd->control_sock,
- msg.u.stream.name, msg.u.stream.path_name,
+ new_stream->name, new_stream->chan->pathname,
&new_stream->relayd_stream_id);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}
- } else if (msg.u.stream.net_index != -1) {
- ERR("Network sequence index %d unknown. Not adding stream.",
- msg.u.stream.net_index);
+ } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+ ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
+ new_stream->net_seq_idx);
consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}
}
DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
- msg.u.stream.path_name, fd, new_stream->relayd_stream_id);
+ new_stream->name, fd, new_stream->relayd_stream_id);
break;
}
case LTTNG_CONSUMER_UPDATE_STREAM:
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(index);
if (relayd == NULL) {
- ERR("Unable to find relayd %" PRIu64, index);
- goto end_nosignal;
+ DBG("Unable to find relayd %" PRIu64, index);
+ ret_code = LTTNG_ERR_NO_CONSUMER;
}
/*
*
* The destroy can happen either here or when a stream fd hangs up.
*/
- consumer_flag_relayd_for_destroy(relayd);
+ if (relayd) {
+ consumer_flag_relayd_for_destroy(relayd);
+ }
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
goto end_nosignal;
}
- case LTTNG_CONSUMER_DATA_AVAILABLE:
+ case LTTNG_CONSUMER_DATA_PENDING:
{
- rcu_read_unlock();
- return -ENOSYS;
+ int32_t ret;
+ uint64_t id = msg.u.data_pending.session_id;
+
+ DBG("Kernel consumer data pending command for id %" PRIu64, id);
+
+ ret = consumer_data_pending(id);
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send data pending ret code");
+ }
+
+ /*
+ * No need to send back a status message since the data pending
+ * returned value is the response.
+ */
+ break;
}
default:
goto end_nosignal;
goto end;
}
- switch (stream->output) {
+ switch (stream->chan->output) {
case LTTNG_EVENT_SPLICE:
/*
* network streaming or the full padding (len) size when we are _not_
* streaming.
*/
- if ((ret != subbuf_size && stream->net_seq_idx != -1) ||
- (ret != len && stream->net_seq_idx == -1)) {
+ if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
+ (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
/*
* Display the error but continue processing to try to release the
* subbuffer
int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
{
int ret;
+ char full_path[PATH_MAX];
+
+ assert(stream);
+
+ ret = snprintf(full_path, sizeof(full_path), "%s/%s",
+ stream->chan->pathname, stream->name);
+ if (ret < 0) {
+ PERROR("snprintf on_recv_stream");
+ goto error;
+ }
/* Opening the tracefile in write mode */
- if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) {
- ret = run_as_open(stream->path_name,
- O_WRONLY|O_CREAT|O_TRUNC,
- S_IRWXU|S_IRWXG|S_IRWXO,
- stream->uid, stream->gid);
+ if (stream->net_seq_idx == (uint64_t) -1ULL) {
+ ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC,
+ S_IRWXU|S_IRWXG|S_IRWXO, stream->uid, stream->gid);
if (ret < 0) {
- ERR("Opening %s", stream->path_name);
- perror("open");
+ PERROR("open kernel stream path %s", full_path);
goto error;
}
stream->out_fd = ret;
ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
if (ret != 0) {
errno = -ret;
- perror("kernctl_get_mmap_len");
+ PERROR("kernctl_get_mmap_len");
goto error_close_fd;
}
stream->mmap_len = (size_t) mmap_len;
- stream->mmap_base = mmap(NULL, stream->mmap_len,
- PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
+ stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
+ MAP_PRIVATE, stream->wait_fd, 0);
if (stream->mmap_base == MAP_FAILED) {
- perror("Error mmaping");
+ PERROR("Error mmaping");
ret = -1;
goto error_close_fd;
}
/*
* Check if data is still being extracted from the buffers for a specific
- * stream. Consumer data lock MUST be acquired before calling this function.
+ * stream. Consumer data lock MUST be acquired before calling this function
+ * and the stream lock.
*
- * Return 0 if the traced data are still getting read else 1 meaning that the
+ * Return 1 if the traced data are still getting read else 0 meaning that the
* data is available for trace viewer reading.
*/
-int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream)
+int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
{
int ret;
assert(stream);
- /*
- * Try to lock the stream mutex. On failure, we know that the stream is
- * being used else where hence there is data still being extracted.
- */
- ret = pthread_mutex_trylock(&stream->lock);
- if (ret == EBUSY) {
- goto data_not_available;
- }
- /* The stream is now locked so we can do our ustctl calls */
-
ret = kernctl_get_next_subbuf(stream->wait_fd);
if (ret == 0) {
/* There is still data so let's put back this subbuffer. */
ret = kernctl_put_subbuf(stream->wait_fd);
assert(ret == 0);
- pthread_mutex_unlock(&stream->lock);
- goto data_not_available;
+ ret = 1; /* Data is pending */
+ goto end;
}
- /* Data is available to be read for this stream. */
- pthread_mutex_unlock(&stream->lock);
- return 1;
+ /* Data is NOT pending and ready to be read. */
+ ret = 0;
-data_not_available:
- return 0;
+end:
+ return ret;
}