X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=ff5360aae755aa082ec6517cb6c4f10df99c9d9d;hb=d0b96690836f4b876096f3dc14801f8e25281a77;hp=90c700129159129876fb1d54ac19bb9f566dde3b;hpb=a6cd2b97ca69d302670109fef8340bd927270a30;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 90c700129..ff5360aae 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -60,13 +60,55 @@ int consumer_recv_status_reply(struct consumer_socket *sock) ret = 0; } else { ret = -reply.ret_code; - DBG("Consumer ret code %d", reply.ret_code); + DBG("Consumer ret code %d", ret); } end: return ret; } +/* + * Once the ASK_CHANNEL command is sent to the consumer, the channel + * information are sent back. This call receives that data and populates key + * and stream_count. + * + * On success return 0 and both key and stream_count are set. On error, a + * negative value is sent back and both parameters are untouched. + */ +int consumer_recv_status_channel(struct consumer_socket *sock, + unsigned long *key, unsigned int *stream_count) +{ + int ret; + struct lttcomm_consumer_status_channel reply; + + assert(sock); + assert(stream_count); + assert(key); + + ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply)); + if (ret <= 0) { + if (ret == 0) { + /* Orderly shutdown. Don't return 0 which means success. */ + ret = -1; + } + /* The above call will print a PERROR on error. */ + DBG("Fail to receive status reply on sock %d", sock->fd); + goto end; + } + + /* An error is possible so don't touch the key and stream_count. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + goto end; + } + + *key = reply.key; + *stream_count = reply.stream_count; + +end: + return ret; +} + /* * Send destroy relayd command to consumer. * @@ -81,7 +123,7 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock, assert(consumer); assert(sock); - DBG2("Sending destroy relayd command to consumer..."); + DBG2("Sending destroy relayd command to consumer sock %d", sock->fd); /* Bail out if consumer is disabled */ if (!consumer->enabled) { @@ -376,10 +418,12 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) /* Putting back the HT pointer and start copying socket(s). */ output->socks = tmp_ht_ptr; + rcu_read_lock(); cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) { /* Create new socket object. */ copy_sock = consumer_allocate_socket(socket->fd); if (copy_sock == NULL) { + rcu_read_unlock(); goto malloc_error; } @@ -387,6 +431,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) copy_sock->lock = socket->lock; consumer_add_socket(copy_sock, output); } + rcu_read_unlock(); error: return output; @@ -508,6 +553,32 @@ error: return ret; } +/* + * Consumer send communication message structure to consumer. + */ +int consumer_send_msg(struct consumer_socket *sock, + struct lttcomm_consumer_msg *msg) +{ + int ret; + + assert(msg); + assert(sock); + assert(sock->fd >= 0); + + ret = lttcomm_send_unix_sock(sock->fd, msg, + sizeof(struct lttcomm_consumer_msg)); + if (ret < 0) { + /* The above call will print a PERROR on error. */ + DBG("Error when sending consumer channel on sock %d", sock->fd); + goto error; + } + + ret = consumer_recv_status_reply(sock); + +error: + return ret; +} + /* * Consumer send channel communication message structure to consumer. */ @@ -534,30 +605,94 @@ error: return ret; } +/* + * Populate the given consumer msg structure with the ask_channel command + * information. + */ +void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, + uint64_t subbuf_size, + uint64_t num_subbuf, + int overwrite, + unsigned int switch_timer_interval, + unsigned int read_timer_interval, + int output, + int type, + uint64_t session_id, + const char *pathname, + const char *name, + uid_t uid, + gid_t gid, + int relayd_id, + unsigned long key, + unsigned char *uuid) +{ + assert(msg); + + /* Zeroed structure */ + memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + + msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION; + msg->u.ask_channel.subbuf_size = subbuf_size; + msg->u.ask_channel.num_subbuf = num_subbuf ; + msg->u.ask_channel.overwrite = overwrite; + msg->u.ask_channel.switch_timer_interval = switch_timer_interval; + msg->u.ask_channel.read_timer_interval = read_timer_interval; + msg->u.ask_channel.output = output; + msg->u.ask_channel.type = type; + msg->u.ask_channel.session_id = session_id; + msg->u.ask_channel.uid = uid; + msg->u.ask_channel.gid = gid; + msg->u.ask_channel.relayd_id = relayd_id; + msg->u.ask_channel.key = key; + + memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid)); + + strncpy(msg->u.ask_channel.pathname, pathname, + sizeof(msg->u.ask_channel.pathname)); + msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname)-1] = '\0'; + + strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name)); + msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0'; +} + /* * Init channel communication message structure. */ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, int channel_key, - uint64_t max_sb_size, - uint64_t mmap_len, + uint64_t session_id, + const char *pathname, + uid_t uid, + gid_t gid, + int relayd_id, const char *name, - unsigned int nb_init_streams) + unsigned int nb_init_streams, + enum lttng_event_output output, + int type) { assert(msg); - /* TODO: Args validation */ - /* Zeroed structure */ memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); /* Send channel */ msg->cmd_type = cmd; msg->u.channel.channel_key = channel_key; - msg->u.channel.max_sb_size = max_sb_size; - msg->u.channel.mmap_len = mmap_len; + msg->u.channel.session_id = session_id; + msg->u.channel.uid = uid; + msg->u.channel.gid = gid; + msg->u.channel.relayd_id = relayd_id; msg->u.channel.nb_init_streams = nb_init_streams; + msg->u.channel.output = output; + msg->u.channel.type = type; + + strncpy(msg->u.channel.pathname, pathname, + sizeof(msg->u.channel.pathname)); + msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0'; + + strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name)); + msg->u.channel.name[sizeof(msg->u.channel.name) - 1] = '\0'; } /* @@ -567,39 +702,16 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, int channel_key, int stream_key, - uint32_t state, - enum lttng_event_output output, - uint64_t mmap_len, - uid_t uid, - gid_t gid, - int net_index, - unsigned int metadata_flag, - const char *name, - const char *pathname, - unsigned int session_id) + int cpu) { assert(msg); memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); - /* TODO: Args validation */ - msg->cmd_type = cmd; msg->u.stream.channel_key = channel_key; msg->u.stream.stream_key = stream_key; - msg->u.stream.state = state; - msg->u.stream.output = output; - msg->u.stream.mmap_len = mmap_len; - msg->u.stream.uid = uid; - msg->u.stream.gid = gid; - msg->u.stream.net_index = net_index; - msg->u.stream.metadata_flag = metadata_flag; - msg->u.stream.session_id = (uint64_t) session_id; - strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name)); - msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0'; - strncpy(msg->u.stream.path_name, pathname, - sizeof(msg->u.stream.path_name)); - msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0'; + msg->u.stream.cpu = cpu; } /* @@ -614,29 +726,7 @@ int consumer_send_stream(struct consumer_socket *sock, assert(msg); assert(dst); assert(sock); - - switch (dst->type) { - case CONSUMER_DST_NET: - /* Consumer should send the stream on the network. */ - msg->u.stream.net_index = dst->net_seq_index; - break; - case CONSUMER_DST_LOCAL: - /* Add stream file name to stream path */ - strncat(msg->u.stream.path_name, "/", - sizeof(msg->u.stream.path_name) - - strlen(msg->u.stream.path_name) - 1); - strncat(msg->u.stream.path_name, msg->u.stream.name, - sizeof(msg->u.stream.path_name) - - strlen(msg->u.stream.path_name) - 1); - msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0'; - /* Indicate that the stream is NOT network */ - msg->u.stream.net_index = -1; - break; - default: - ERR("Consumer unknown output type (%d)", dst->type); - ret = -1; - goto error; - } + assert(fds); /* Send on socket */ ret = lttcomm_send_unix_sock(sock->fd, msg, @@ -793,6 +883,7 @@ int consumer_is_data_pending(unsigned int id, DBG3("Consumer data pending for id %u", id); /* Send command for each consumer */ + rcu_read_lock(); cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, node.node) { /* Code flow error */ @@ -805,7 +896,7 @@ int consumer_is_data_pending(unsigned int id, /* The above call will print a PERROR on error. */ DBG("Error on consumer is data pending on sock %d", socket->fd); pthread_mutex_unlock(socket->lock); - goto error; + goto error_unlock; } /* @@ -822,7 +913,7 @@ int consumer_is_data_pending(unsigned int id, /* The above call will print a PERROR on error. */ DBG("Error on recv consumer is data pending on sock %d", socket->fd); pthread_mutex_unlock(socket->lock); - goto error; + goto error_unlock; } pthread_mutex_unlock(socket->lock); @@ -831,11 +922,13 @@ int consumer_is_data_pending(unsigned int id, break; } } + rcu_read_unlock(); DBG("Consumer data is %s pending for session id %u", ret_code == 1 ? "" : "NOT", id); return ret_code; -error: +error_unlock: + rcu_read_unlock(); return -1; }