Refactoring: use an opaque lttng_tracker_id type
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23 #include <inttypes.h>
24
25 #include <common/common.h>
26 #include <common/defaults.h>
27 #include <common/compat/string.h>
28
29 #include "consumer.h"
30 #include "health-sessiond.h"
31 #include "kernel-consumer.h"
32 #include "notification-thread-commands.h"
33 #include "session.h"
34 #include "lttng-sessiond.h"
35
36 static char *create_channel_path(struct consumer_output *consumer,
37 size_t *consumer_path_offset)
38 {
39 int ret;
40 char tmp_path[PATH_MAX];
41 char *pathname = NULL;
42
43 assert(consumer);
44
45 /* Get the right path name destination */
46 if (consumer->type == CONSUMER_DST_LOCAL ||
47 (consumer->type == CONSUMER_DST_NET &&
48 consumer->relay_major_version == 2 &&
49 consumer->relay_minor_version >= 11)) {
50 pathname = strdup(consumer->domain_subdir);
51 if (!pathname) {
52 PERROR("Failed to copy domain subdirectory string %s",
53 consumer->domain_subdir);
54 goto error;
55 }
56 *consumer_path_offset = strlen(consumer->domain_subdir);
57 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
58 pathname);
59 } else {
60 /* Network output, relayd < 2.11. */
61 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
62 consumer->dst.net.base_dir,
63 consumer->domain_subdir);
64 if (ret < 0) {
65 PERROR("snprintf kernel metadata path");
66 goto error;
67 } else if (ret >= sizeof(tmp_path)) {
68 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
69 sizeof(tmp_path), ret,
70 consumer->dst.net.base_dir,
71 consumer->domain_subdir);
72 goto error;
73 }
74 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
75 if (!pathname) {
76 PERROR("lttng_strndup");
77 goto error;
78 }
79 *consumer_path_offset = 0;
80 DBG3("Kernel network consumer subdir path: %s", pathname);
81 }
82
83 return pathname;
84
85 error:
86 free(pathname);
87 return NULL;
88 }
89
90 /*
91 * Sending a single channel to the consumer with command ADD_CHANNEL.
92 */
93 static
94 int kernel_consumer_add_channel(struct consumer_socket *sock,
95 struct ltt_kernel_channel *channel,
96 struct ltt_kernel_session *ksession,
97 unsigned int monitor)
98 {
99 int ret;
100 char *pathname = NULL;
101 struct lttcomm_consumer_msg lkm;
102 struct consumer_output *consumer;
103 enum lttng_error_code status;
104 struct ltt_session *session = NULL;
105 struct lttng_channel_extended *channel_attr_extended;
106 bool is_local_trace;
107 size_t consumer_path_offset = 0;
108
109 /* Safety net */
110 assert(channel);
111 assert(ksession);
112 assert(ksession->consumer);
113
114 consumer = ksession->consumer;
115 channel_attr_extended = (struct lttng_channel_extended *)
116 channel->channel->attr.extended.ptr;
117
118 DBG("Kernel consumer adding channel %s to kernel consumer",
119 channel->channel->name);
120 is_local_trace = consumer->net_seq_index == -1ULL;
121
122 pathname = create_channel_path(consumer, &consumer_path_offset);
123 if (!pathname) {
124 ret = -1;
125 goto error;
126 }
127
128 if (is_local_trace && ksession->current_trace_chunk) {
129 enum lttng_trace_chunk_status chunk_status;
130 char *pathname_index;
131
132 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
133 pathname);
134 if (ret < 0) {
135 ERR("Failed to format channel index directory");
136 ret = -1;
137 goto error;
138 }
139
140 /*
141 * Create the index subdirectory which will take care
142 * of implicitly creating the channel's path.
143 */
144 chunk_status = lttng_trace_chunk_create_subdirectory(
145 ksession->current_trace_chunk, pathname_index);
146 free(pathname_index);
147 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
148 ret = -1;
149 goto error;
150 }
151 }
152
153 /* Prep channel message structure */
154 consumer_init_add_channel_comm_msg(&lkm,
155 channel->key,
156 ksession->id,
157 &pathname[consumer_path_offset],
158 ksession->uid,
159 ksession->gid,
160 consumer->net_seq_index,
161 channel->channel->name,
162 channel->stream_count,
163 channel->channel->attr.output,
164 CONSUMER_CHANNEL_TYPE_DATA,
165 channel->channel->attr.tracefile_size,
166 channel->channel->attr.tracefile_count,
167 monitor,
168 channel->channel->attr.live_timer_interval,
169 channel_attr_extended->monitor_timer_interval,
170 ksession->current_trace_chunk);
171
172 health_code_update();
173
174 ret = consumer_send_channel(sock, &lkm);
175 if (ret < 0) {
176 goto error;
177 }
178
179 health_code_update();
180 rcu_read_lock();
181 session = session_find_by_id(ksession->id);
182 assert(session);
183 assert(pthread_mutex_trylock(&session->lock));
184 assert(session_trylock_list());
185
186 status = notification_thread_command_add_channel(
187 notification_thread_handle, session->name,
188 ksession->uid, ksession->gid,
189 channel->channel->name, channel->key,
190 LTTNG_DOMAIN_KERNEL,
191 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
192 rcu_read_unlock();
193 if (status != LTTNG_OK) {
194 ret = -1;
195 goto error;
196 }
197
198 channel->published_to_notification_thread = true;
199
200 error:
201 if (session) {
202 session_put(session);
203 }
204 free(pathname);
205 return ret;
206 }
207
208 /*
209 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
210 *
211 * The consumer socket lock must be held by the caller.
212 */
213 int kernel_consumer_add_metadata(struct consumer_socket *sock,
214 struct ltt_kernel_session *ksession, unsigned int monitor)
215 {
216 int ret;
217 struct lttcomm_consumer_msg lkm;
218 struct consumer_output *consumer;
219
220 rcu_read_lock();
221
222 /* Safety net */
223 assert(ksession);
224 assert(ksession->consumer);
225 assert(sock);
226
227 DBG("Sending metadata %d to kernel consumer",
228 ksession->metadata_stream_fd);
229
230 /* Get consumer output pointer */
231 consumer = ksession->consumer;
232
233 /* Prep channel message structure */
234 consumer_init_add_channel_comm_msg(&lkm,
235 ksession->metadata->key,
236 ksession->id,
237 "",
238 ksession->uid,
239 ksession->gid,
240 consumer->net_seq_index,
241 DEFAULT_METADATA_NAME,
242 1,
243 DEFAULT_KERNEL_CHANNEL_OUTPUT,
244 CONSUMER_CHANNEL_TYPE_METADATA,
245 0, 0,
246 monitor, 0, 0, ksession->current_trace_chunk);
247
248 health_code_update();
249
250 ret = consumer_send_channel(sock, &lkm);
251 if (ret < 0) {
252 goto error;
253 }
254
255 health_code_update();
256
257 /* Prep stream message structure */
258 consumer_init_add_stream_comm_msg(&lkm,
259 ksession->metadata->key,
260 ksession->metadata_stream_fd,
261 0 /* CPU: 0 for metadata. */);
262
263 health_code_update();
264
265 /* Send stream and file descriptor */
266 ret = consumer_send_stream(sock, consumer, &lkm,
267 &ksession->metadata_stream_fd, 1);
268 if (ret < 0) {
269 goto error;
270 }
271
272 health_code_update();
273
274 error:
275 rcu_read_unlock();
276 return ret;
277 }
278
279 /*
280 * Sending a single stream to the consumer with command ADD_STREAM.
281 */
282 static
283 int kernel_consumer_add_stream(struct consumer_socket *sock,
284 struct ltt_kernel_channel *channel,
285 struct ltt_kernel_stream *stream,
286 struct ltt_kernel_session *session, unsigned int monitor)
287 {
288 int ret;
289 struct lttcomm_consumer_msg lkm;
290 struct consumer_output *consumer;
291
292 assert(channel);
293 assert(stream);
294 assert(session);
295 assert(session->consumer);
296 assert(sock);
297
298 DBG("Sending stream %d of channel %s to kernel consumer",
299 stream->fd, channel->channel->name);
300
301 /* Get consumer output pointer */
302 consumer = session->consumer;
303
304 /* Prep stream consumer message */
305 consumer_init_add_stream_comm_msg(&lkm,
306 channel->key,
307 stream->fd,
308 stream->cpu);
309
310 health_code_update();
311
312 /* Send stream and file descriptor */
313 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
314 if (ret < 0) {
315 goto error;
316 }
317
318 health_code_update();
319
320 error:
321 return ret;
322 }
323
324 /*
325 * Sending the notification that all streams were sent with STREAMS_SENT.
326 */
327 int kernel_consumer_streams_sent(struct consumer_socket *sock,
328 struct ltt_kernel_session *session, uint64_t channel_key)
329 {
330 int ret;
331 struct lttcomm_consumer_msg lkm;
332 struct consumer_output *consumer;
333
334 assert(sock);
335 assert(session);
336
337 DBG("Sending streams_sent");
338 /* Get consumer output pointer */
339 consumer = session->consumer;
340
341 /* Prep stream consumer message */
342 consumer_init_streams_sent_comm_msg(&lkm,
343 LTTNG_CONSUMER_STREAMS_SENT,
344 channel_key, consumer->net_seq_index);
345
346 health_code_update();
347
348 /* Send stream and file descriptor */
349 ret = consumer_send_msg(sock, &lkm);
350 if (ret < 0) {
351 goto error;
352 }
353
354 error:
355 return ret;
356 }
357
358 /*
359 * Send all stream fds of kernel channel to the consumer.
360 *
361 * The consumer socket lock must be held by the caller.
362 */
363 int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
364 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
365 unsigned int monitor)
366 {
367 int ret = LTTNG_OK;
368 struct ltt_kernel_stream *stream;
369
370 /* Safety net */
371 assert(channel);
372 assert(ksession);
373 assert(ksession->consumer);
374 assert(sock);
375
376 rcu_read_lock();
377
378 /* Bail out if consumer is disabled */
379 if (!ksession->consumer->enabled) {
380 ret = LTTNG_OK;
381 goto error;
382 }
383
384 DBG("Sending streams of channel %s to kernel consumer",
385 channel->channel->name);
386
387 if (!channel->sent_to_consumer) {
388 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
389 if (ret < 0) {
390 goto error;
391 }
392 channel->sent_to_consumer = true;
393 }
394
395 /* Send streams */
396 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
397 if (!stream->fd || stream->sent_to_consumer) {
398 continue;
399 }
400
401 /* Add stream on the kernel consumer side. */
402 ret = kernel_consumer_add_stream(sock, channel, stream,
403 ksession, monitor);
404 if (ret < 0) {
405 goto error;
406 }
407 stream->sent_to_consumer = true;
408 }
409
410 error:
411 rcu_read_unlock();
412 return ret;
413 }
414
415 /*
416 * Send all stream fds of the kernel session to the consumer.
417 *
418 * The consumer socket lock must be held by the caller.
419 */
420 int kernel_consumer_send_session(struct consumer_socket *sock,
421 struct ltt_kernel_session *session)
422 {
423 int ret, monitor = 0;
424 struct ltt_kernel_channel *chan;
425
426 /* Safety net */
427 assert(session);
428 assert(session->consumer);
429 assert(sock);
430
431 /* Bail out if consumer is disabled */
432 if (!session->consumer->enabled) {
433 ret = LTTNG_OK;
434 goto error;
435 }
436
437 /* Don't monitor the streams on the consumer if in flight recorder. */
438 if (session->output_traces) {
439 monitor = 1;
440 }
441
442 DBG("Sending session stream to kernel consumer");
443
444 if (session->metadata_stream_fd >= 0 && session->metadata) {
445 ret = kernel_consumer_add_metadata(sock, session, monitor);
446 if (ret < 0) {
447 goto error;
448 }
449 }
450
451 /* Send channel and streams of it */
452 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
453 ret = kernel_consumer_send_channel_streams(sock, chan, session,
454 monitor);
455 if (ret < 0) {
456 goto error;
457 }
458 if (monitor) {
459 /*
460 * Inform the relay that all the streams for the
461 * channel were sent.
462 */
463 ret = kernel_consumer_streams_sent(sock, session, chan->key);
464 if (ret < 0) {
465 goto error;
466 }
467 }
468 }
469
470 DBG("Kernel consumer FDs of metadata and channel streams sent");
471
472 session->consumer_fds_sent = 1;
473 return 0;
474
475 error:
476 return ret;
477 }
478
479 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
480 struct ltt_kernel_channel *channel)
481 {
482 int ret;
483 struct lttcomm_consumer_msg msg;
484
485 assert(channel);
486 assert(socket);
487
488 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
489
490 memset(&msg, 0, sizeof(msg));
491 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
492 msg.u.destroy_channel.key = channel->key;
493
494 pthread_mutex_lock(socket->lock);
495 health_code_update();
496
497 ret = consumer_send_msg(socket, &msg);
498 if (ret < 0) {
499 goto error;
500 }
501
502 error:
503 health_code_update();
504 pthread_mutex_unlock(socket->lock);
505 return ret;
506 }
507
508 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
509 struct ltt_kernel_metadata *metadata)
510 {
511 int ret;
512 struct lttcomm_consumer_msg msg;
513
514 assert(metadata);
515 assert(socket);
516
517 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
518
519 memset(&msg, 0, sizeof(msg));
520 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
521 msg.u.destroy_channel.key = metadata->key;
522
523 pthread_mutex_lock(socket->lock);
524 health_code_update();
525
526 ret = consumer_send_msg(socket, &msg);
527 if (ret < 0) {
528 goto error;
529 }
530
531 error:
532 health_code_update();
533 pthread_mutex_unlock(socket->lock);
534 return ret;
535 }
This page took 0.043768 seconds and 4 git commands to generate.