* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <inttypes.h>
#include <common/common.h>
-#include <common/consumer.h>
+#include <common/consumer/consumer.h>
#include <common/defaults.h>
#include "consumer.h"
ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
ua_sess->euid, ua_sess->egid);
if (ret < 0) {
- if (ret != -EEXIST) {
+ if (errno != EEXIST) {
ERR("Trace directory creation error");
goto error;
}
struct ust_app_channel *ua_chan, struct consumer_output *consumer,
struct consumer_socket *socket, struct ust_registry_session *registry)
{
- int ret;
+ int ret, output;
uint32_t chan_id;
uint64_t key, chan_reg_key;
char *pathname = NULL;
struct lttcomm_consumer_msg msg;
struct ust_registry_channel *chan_reg;
+ char shm_path[PATH_MAX] = "";
+ char root_shm_path[PATH_MAX] = "";
assert(ua_sess);
assert(ua_chan);
if (ua_chan->attr.type == LTTNG_UST_CHAN_METADATA) {
chan_id = -1U;
+ /*
+ * Metadata channels shm_path (buffers) are handled within
+ * session daemon. Consumer daemon should not try to create
+ * those buffer files.
+ */
} else {
chan_reg = ust_registry_channel_find(registry, chan_reg_key);
assert(chan_reg);
chan_id = chan_reg->chan_id;
+ if (ua_sess->shm_path[0]) {
+ strncpy(shm_path, ua_sess->shm_path, sizeof(shm_path));
+ shm_path[sizeof(shm_path) - 1] = '\0';
+ strncat(shm_path, "/",
+ sizeof(shm_path) - strlen(shm_path) - 1);
+ strncat(shm_path, ua_chan->name,
+ sizeof(shm_path) - strlen(shm_path) - 1);
+ strncat(shm_path, "_",
+ sizeof(shm_path) - strlen(shm_path) - 1);
+ }
+ strncpy(root_shm_path, ua_sess->root_shm_path, sizeof(root_shm_path));
+ root_shm_path[sizeof(root_shm_path) - 1] = '\0';
+ }
+
+ switch (ua_chan->attr.output) {
+ case LTTNG_UST_MMAP:
+ default:
+ output = LTTNG_EVENT_MMAP;
+ break;
}
consumer_init_ask_channel_comm_msg(&msg,
ua_chan->attr.switch_timer_interval,
ua_chan->attr.read_timer_interval,
ua_sess->live_timer_interval,
- (int) ua_chan->attr.output,
+ output,
(int) ua_chan->attr.type,
ua_sess->tracing_id,
pathname,
ua_chan->tracefile_count,
ua_sess->id,
ua_sess->output_traces,
- ua_sess->uid);
+ ua_sess->uid,
+ root_shm_path, shm_path);
health_code_update();
}
pthread_mutex_lock(socket->lock);
-
ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry);
+ pthread_mutex_unlock(socket->lock);
if (ret < 0) {
goto error;
}
error:
- pthread_mutex_unlock(socket->lock);
return ret;
}
assert(ua_chan);
assert(socket);
+ memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_GET_CHANNEL;
msg.u.get_channel.key = ua_chan->key;
cds_list_add_tail(&stream->list, &ua_chan->streams.head);
ua_chan->streams.count++;
- DBG2("UST app stream %d received succesfully", ua_chan->streams.count);
+ DBG2("UST app stream %d received successfully", ua_chan->streams.count);
}
/* This MUST match or else we have a synchronization problem. */
assert(ua_chan);
assert(socket);
+ memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
msg.u.destroy_channel.key = ua_chan->key;
DBG2("UST consumer send stream to app %d", app->sock);
/* Relay stream to application. */
+ pthread_mutex_lock(&app->sock_lock);
ret = ustctl_send_stream_to_ust(app->sock, channel->obj, stream->obj);
+ pthread_mutex_unlock(&app->sock_lock);
if (ret < 0) {
if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
- ERR("Error ustctl send stream %s to app pid: %d with ret %d",
- stream->name, app->pid, ret);
+ ERR("ustctl send stream handle %d to app pid: %d with ret %d",
+ stream->obj->handle, app->pid, ret);
} else {
DBG3("UST app send stream to ust failed. Application is dead.");
}
app->sock, app->pid, channel->name, channel->tracing_channel_id);
/* Send stream to application. */
+ pthread_mutex_lock(&app->sock_lock);
ret = ustctl_send_channel_to_ust(app->sock, ua_sess->handle, channel->obj);
+ pthread_mutex_unlock(&app->sock_lock);
if (ret < 0) {
if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
ERR("Error ustctl send channel %s to app pid: %d with ret %d",
assert(socket);
rcu_read_lock();
- pthread_mutex_lock(socket->lock);
-
health_code_update();
/* Wait for a metadata request */
+ pthread_mutex_lock(socket->lock);
ret = consumer_socket_recv(socket, &request, sizeof(request));
+ pthread_mutex_unlock(socket->lock);
if (ret < 0) {
goto end;
}
DBG("PID registry not found for session id %" PRIu64,
request.session_id_per_pid);
+ memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_ERR_UND;
(void) consumer_send_msg(socket, &msg);
/*
}
assert(ust_reg);
+ pthread_mutex_lock(&ust_reg->lock);
ret_push = ust_app_push_metadata(ust_reg, socket, 1);
- if (ret_push < 0) {
+ pthread_mutex_unlock(&ust_reg->lock);
+ if (ret_push == -EPIPE) {
+ DBG("Application or relay closed while pushing metadata");
+ } else if (ret_push < 0) {
ERR("Pushing metadata");
ret = -1;
goto end;
+ } else {
+ DBG("UST Consumer metadata pushed successfully");
}
- DBG("UST Consumer metadata pushed successfully");
ret = 0;
end:
- pthread_mutex_unlock(socket->lock);
rcu_read_unlock();
return ret;
}