consumerd: tag metadata channel as being part of a live session
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <sys/stat.h>
12 #include <unistd.h>
13 #include <inttypes.h>
14
15 #include <common/common.h>
16 #include <common/defaults.h>
17 #include <common/compat/string.h>
18
19 #include "consumer.h"
20 #include "health-sessiond.h"
21 #include "kernel-consumer.h"
22 #include "notification-thread-commands.h"
23 #include "session.h"
24 #include "lttng-sessiond.h"
25
26 static char *create_channel_path(struct consumer_output *consumer,
27 size_t *consumer_path_offset)
28 {
29 int ret;
30 char tmp_path[PATH_MAX];
31 char *pathname = NULL;
32
33 assert(consumer);
34
35 /* Get the right path name destination */
36 if (consumer->type == CONSUMER_DST_LOCAL ||
37 (consumer->type == CONSUMER_DST_NET &&
38 consumer->relay_major_version == 2 &&
39 consumer->relay_minor_version >= 11)) {
40 pathname = strdup(consumer->domain_subdir);
41 if (!pathname) {
42 PERROR("Failed to copy domain subdirectory string %s",
43 consumer->domain_subdir);
44 goto error;
45 }
46 *consumer_path_offset = strlen(consumer->domain_subdir);
47 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
48 pathname);
49 } else {
50 /* Network output, relayd < 2.11. */
51 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
52 consumer->dst.net.base_dir,
53 consumer->domain_subdir);
54 if (ret < 0) {
55 PERROR("snprintf kernel metadata path");
56 goto error;
57 } else if (ret >= sizeof(tmp_path)) {
58 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
59 sizeof(tmp_path), ret,
60 consumer->dst.net.base_dir,
61 consumer->domain_subdir);
62 goto error;
63 }
64 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
65 if (!pathname) {
66 PERROR("lttng_strndup");
67 goto error;
68 }
69 *consumer_path_offset = 0;
70 DBG3("Kernel network consumer subdir path: %s", pathname);
71 }
72
73 return pathname;
74
75 error:
76 free(pathname);
77 return NULL;
78 }
79
80 /*
81 * Sending a single channel to the consumer with command ADD_CHANNEL.
82 */
83 static
84 int kernel_consumer_add_channel(struct consumer_socket *sock,
85 struct ltt_kernel_channel *channel,
86 struct ltt_kernel_session *ksession,
87 unsigned int monitor)
88 {
89 int ret;
90 char *pathname = NULL;
91 struct lttcomm_consumer_msg lkm;
92 struct consumer_output *consumer;
93 enum lttng_error_code status;
94 struct ltt_session *session = NULL;
95 struct lttng_channel_extended *channel_attr_extended;
96 bool is_local_trace;
97 size_t consumer_path_offset = 0;
98
99 /* Safety net */
100 assert(channel);
101 assert(ksession);
102 assert(ksession->consumer);
103
104 consumer = ksession->consumer;
105 channel_attr_extended = (struct lttng_channel_extended *)
106 channel->channel->attr.extended.ptr;
107
108 DBG("Kernel consumer adding channel %s to kernel consumer",
109 channel->channel->name);
110 is_local_trace = consumer->net_seq_index == -1ULL;
111
112 pathname = create_channel_path(consumer, &consumer_path_offset);
113 if (!pathname) {
114 ret = -1;
115 goto error;
116 }
117
118 if (is_local_trace && ksession->current_trace_chunk) {
119 enum lttng_trace_chunk_status chunk_status;
120 char *pathname_index;
121
122 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
123 pathname);
124 if (ret < 0) {
125 ERR("Failed to format channel index directory");
126 ret = -1;
127 goto error;
128 }
129
130 /*
131 * Create the index subdirectory which will take care
132 * of implicitly creating the channel's path.
133 */
134 chunk_status = lttng_trace_chunk_create_subdirectory(
135 ksession->current_trace_chunk, pathname_index);
136 free(pathname_index);
137 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
138 ret = -1;
139 goto error;
140 }
141 }
142
143 /* Prep channel message structure */
144 consumer_init_add_channel_comm_msg(&lkm,
145 channel->key,
146 ksession->id,
147 &pathname[consumer_path_offset],
148 ksession->uid,
149 ksession->gid,
150 consumer->net_seq_index,
151 channel->channel->name,
152 channel->stream_count,
153 channel->channel->attr.output,
154 CONSUMER_CHANNEL_TYPE_DATA,
155 channel->channel->attr.tracefile_size,
156 channel->channel->attr.tracefile_count,
157 monitor,
158 channel->channel->attr.live_timer_interval,
159 ksession->is_live_session,
160 channel_attr_extended->monitor_timer_interval,
161 ksession->current_trace_chunk);
162
163 health_code_update();
164
165 ret = consumer_send_channel(sock, &lkm);
166 if (ret < 0) {
167 goto error;
168 }
169
170 health_code_update();
171 rcu_read_lock();
172 session = session_find_by_id(ksession->id);
173 assert(session);
174 assert(pthread_mutex_trylock(&session->lock));
175 assert(session_trylock_list());
176
177 status = notification_thread_command_add_channel(
178 notification_thread_handle, session->name,
179 ksession->uid, ksession->gid,
180 channel->channel->name, channel->key,
181 LTTNG_DOMAIN_KERNEL,
182 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
183 rcu_read_unlock();
184 if (status != LTTNG_OK) {
185 ret = -1;
186 goto error;
187 }
188
189 channel->published_to_notification_thread = true;
190
191 error:
192 if (session) {
193 session_put(session);
194 }
195 free(pathname);
196 return ret;
197 }
198
199 /*
200 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
201 *
202 * The consumer socket lock must be held by the caller.
203 */
204 int kernel_consumer_add_metadata(struct consumer_socket *sock,
205 struct ltt_kernel_session *ksession, unsigned int monitor)
206 {
207 int ret;
208 struct lttcomm_consumer_msg lkm;
209 struct consumer_output *consumer;
210
211 rcu_read_lock();
212
213 /* Safety net */
214 assert(ksession);
215 assert(ksession->consumer);
216 assert(sock);
217
218 DBG("Sending metadata %d to kernel consumer",
219 ksession->metadata_stream_fd);
220
221 /* Get consumer output pointer */
222 consumer = ksession->consumer;
223
224 /* Prep channel message structure */
225 consumer_init_add_channel_comm_msg(&lkm, ksession->metadata->key,
226 ksession->id, "", ksession->uid, ksession->gid,
227 consumer->net_seq_index, DEFAULT_METADATA_NAME, 1,
228 DEFAULT_KERNEL_CHANNEL_OUTPUT,
229 CONSUMER_CHANNEL_TYPE_METADATA, 0, 0, monitor, 0,
230 ksession->is_live_session, 0,
231 ksession->current_trace_chunk);
232
233 health_code_update();
234
235 ret = consumer_send_channel(sock, &lkm);
236 if (ret < 0) {
237 goto error;
238 }
239
240 health_code_update();
241
242 /* Prep stream message structure */
243 consumer_init_add_stream_comm_msg(&lkm,
244 ksession->metadata->key,
245 ksession->metadata_stream_fd,
246 0 /* CPU: 0 for metadata. */);
247
248 health_code_update();
249
250 /* Send stream and file descriptor */
251 ret = consumer_send_stream(sock, consumer, &lkm,
252 &ksession->metadata_stream_fd, 1);
253 if (ret < 0) {
254 goto error;
255 }
256
257 health_code_update();
258
259 error:
260 rcu_read_unlock();
261 return ret;
262 }
263
264 /*
265 * Sending a single stream to the consumer with command ADD_STREAM.
266 */
267 static
268 int kernel_consumer_add_stream(struct consumer_socket *sock,
269 struct ltt_kernel_channel *channel,
270 struct ltt_kernel_stream *stream,
271 struct ltt_kernel_session *session, unsigned int monitor)
272 {
273 int ret;
274 struct lttcomm_consumer_msg lkm;
275 struct consumer_output *consumer;
276
277 assert(channel);
278 assert(stream);
279 assert(session);
280 assert(session->consumer);
281 assert(sock);
282
283 DBG("Sending stream %d of channel %s to kernel consumer",
284 stream->fd, channel->channel->name);
285
286 /* Get consumer output pointer */
287 consumer = session->consumer;
288
289 /* Prep stream consumer message */
290 consumer_init_add_stream_comm_msg(&lkm,
291 channel->key,
292 stream->fd,
293 stream->cpu);
294
295 health_code_update();
296
297 /* Send stream and file descriptor */
298 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
299 if (ret < 0) {
300 goto error;
301 }
302
303 health_code_update();
304
305 error:
306 return ret;
307 }
308
309 /*
310 * Sending the notification that all streams were sent with STREAMS_SENT.
311 */
312 int kernel_consumer_streams_sent(struct consumer_socket *sock,
313 struct ltt_kernel_session *session, uint64_t channel_key)
314 {
315 int ret;
316 struct lttcomm_consumer_msg lkm;
317 struct consumer_output *consumer;
318
319 assert(sock);
320 assert(session);
321
322 DBG("Sending streams_sent");
323 /* Get consumer output pointer */
324 consumer = session->consumer;
325
326 /* Prep stream consumer message */
327 consumer_init_streams_sent_comm_msg(&lkm,
328 LTTNG_CONSUMER_STREAMS_SENT,
329 channel_key, consumer->net_seq_index);
330
331 health_code_update();
332
333 /* Send stream and file descriptor */
334 ret = consumer_send_msg(sock, &lkm);
335 if (ret < 0) {
336 goto error;
337 }
338
339 error:
340 return ret;
341 }
342
343 /*
344 * Send all stream fds of kernel channel to the consumer.
345 *
346 * The consumer socket lock must be held by the caller.
347 */
348 int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
349 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
350 unsigned int monitor)
351 {
352 int ret = LTTNG_OK;
353 struct ltt_kernel_stream *stream;
354
355 /* Safety net */
356 assert(channel);
357 assert(ksession);
358 assert(ksession->consumer);
359 assert(sock);
360
361 rcu_read_lock();
362
363 /* Bail out if consumer is disabled */
364 if (!ksession->consumer->enabled) {
365 ret = LTTNG_OK;
366 goto error;
367 }
368
369 DBG("Sending streams of channel %s to kernel consumer",
370 channel->channel->name);
371
372 if (!channel->sent_to_consumer) {
373 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
374 if (ret < 0) {
375 goto error;
376 }
377 channel->sent_to_consumer = true;
378 }
379
380 /* Send streams */
381 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
382 if (!stream->fd || stream->sent_to_consumer) {
383 continue;
384 }
385
386 /* Add stream on the kernel consumer side. */
387 ret = kernel_consumer_add_stream(sock, channel, stream,
388 ksession, monitor);
389 if (ret < 0) {
390 goto error;
391 }
392 stream->sent_to_consumer = true;
393 }
394
395 error:
396 rcu_read_unlock();
397 return ret;
398 }
399
400 /*
401 * Send all stream fds of the kernel session to the consumer.
402 *
403 * The consumer socket lock must be held by the caller.
404 */
405 int kernel_consumer_send_session(struct consumer_socket *sock,
406 struct ltt_kernel_session *session)
407 {
408 int ret, monitor = 0;
409 struct ltt_kernel_channel *chan;
410
411 /* Safety net */
412 assert(session);
413 assert(session->consumer);
414 assert(sock);
415
416 /* Bail out if consumer is disabled */
417 if (!session->consumer->enabled) {
418 ret = LTTNG_OK;
419 goto error;
420 }
421
422 /* Don't monitor the streams on the consumer if in flight recorder. */
423 if (session->output_traces) {
424 monitor = 1;
425 }
426
427 DBG("Sending session stream to kernel consumer");
428
429 if (session->metadata_stream_fd >= 0 && session->metadata) {
430 ret = kernel_consumer_add_metadata(sock, session, monitor);
431 if (ret < 0) {
432 goto error;
433 }
434 }
435
436 /* Send channel and streams of it */
437 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
438 ret = kernel_consumer_send_channel_streams(sock, chan, session,
439 monitor);
440 if (ret < 0) {
441 goto error;
442 }
443 if (monitor) {
444 /*
445 * Inform the relay that all the streams for the
446 * channel were sent.
447 */
448 ret = kernel_consumer_streams_sent(sock, session, chan->key);
449 if (ret < 0) {
450 goto error;
451 }
452 }
453 }
454
455 DBG("Kernel consumer FDs of metadata and channel streams sent");
456
457 session->consumer_fds_sent = 1;
458 return 0;
459
460 error:
461 return ret;
462 }
463
464 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
465 struct ltt_kernel_channel *channel)
466 {
467 int ret;
468 struct lttcomm_consumer_msg msg;
469
470 assert(channel);
471 assert(socket);
472
473 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
474
475 memset(&msg, 0, sizeof(msg));
476 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
477 msg.u.destroy_channel.key = channel->key;
478
479 pthread_mutex_lock(socket->lock);
480 health_code_update();
481
482 ret = consumer_send_msg(socket, &msg);
483 if (ret < 0) {
484 goto error;
485 }
486
487 error:
488 health_code_update();
489 pthread_mutex_unlock(socket->lock);
490 return ret;
491 }
492
493 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
494 struct ltt_kernel_metadata *metadata)
495 {
496 int ret;
497 struct lttcomm_consumer_msg msg;
498
499 assert(metadata);
500 assert(socket);
501
502 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
503
504 memset(&msg, 0, sizeof(msg));
505 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
506 msg.u.destroy_channel.key = metadata->key;
507
508 pthread_mutex_lock(socket->lock);
509 health_code_update();
510
511 ret = consumer_send_msg(socket, &msg);
512 if (ret < 0) {
513 goto error;
514 }
515
516 error:
517 health_code_update();
518 pthread_mutex_unlock(socket->lock);
519 return ret;
520 }
This page took 0.061887 seconds and 4 git commands to generate.