Fix: sessiond: size-based rotation threshold exceeded in per-pid tracing (1/2)
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.cpp
1 /*
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <sys/stat.h>
12 #include <unistd.h>
13 #include <inttypes.h>
14
15 #include <common/common.hpp>
16 #include <common/defaults.hpp>
17 #include <common/compat/string.hpp>
18
19 #include "consumer.hpp"
20 #include "health-sessiond.hpp"
21 #include "kernel-consumer.hpp"
22 #include "notification-thread-commands.hpp"
23 #include "session.hpp"
24 #include "lttng-sessiond.hpp"
25
26 static char *create_channel_path(struct consumer_output *consumer,
27 size_t *consumer_path_offset)
28 {
29 int ret;
30 char tmp_path[PATH_MAX];
31 char *pathname = NULL;
32
33 LTTNG_ASSERT(consumer);
34
35 /* Get the right path name destination */
36 if (consumer->type == CONSUMER_DST_LOCAL ||
37 (consumer->type == CONSUMER_DST_NET &&
38 consumer->relay_major_version == 2 &&
39 consumer->relay_minor_version >= 11)) {
40 pathname = strdup(consumer->domain_subdir);
41 if (!pathname) {
42 PERROR("Failed to copy domain subdirectory string %s",
43 consumer->domain_subdir);
44 goto error;
45 }
46 *consumer_path_offset = strlen(consumer->domain_subdir);
47 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
48 pathname);
49 } else {
50 /* Network output, relayd < 2.11. */
51 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
52 consumer->dst.net.base_dir,
53 consumer->domain_subdir);
54 if (ret < 0) {
55 PERROR("snprintf kernel metadata path");
56 goto error;
57 } else if (ret >= sizeof(tmp_path)) {
58 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
59 sizeof(tmp_path), ret,
60 consumer->dst.net.base_dir,
61 consumer->domain_subdir);
62 goto error;
63 }
64 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
65 if (!pathname) {
66 PERROR("lttng_strndup");
67 goto error;
68 }
69 *consumer_path_offset = 0;
70 DBG3("Kernel network consumer subdir path: %s", pathname);
71 }
72
73 return pathname;
74
75 error:
76 free(pathname);
77 return NULL;
78 }
79
80 /*
81 * Sending a single channel to the consumer with command ADD_CHANNEL.
82 */
83 static
84 int kernel_consumer_add_channel(struct consumer_socket *sock,
85 struct ltt_kernel_channel *channel,
86 struct ltt_kernel_session *ksession,
87 unsigned int monitor)
88 {
89 int ret;
90 char *pathname = NULL;
91 struct lttcomm_consumer_msg lkm;
92 struct consumer_output *consumer;
93 enum lttng_error_code status;
94 struct ltt_session *session = NULL;
95 struct lttng_channel_extended *channel_attr_extended;
96 bool is_local_trace;
97 size_t consumer_path_offset = 0;
98
99 /* Safety net */
100 LTTNG_ASSERT(channel);
101 LTTNG_ASSERT(ksession);
102 LTTNG_ASSERT(ksession->consumer);
103
104 consumer = ksession->consumer;
105 channel_attr_extended = (struct lttng_channel_extended *)
106 channel->channel->attr.extended.ptr;
107
108 DBG("Kernel consumer adding channel %s to kernel consumer",
109 channel->channel->name);
110 is_local_trace = consumer->net_seq_index == -1ULL;
111
112 pathname = create_channel_path(consumer, &consumer_path_offset);
113 if (!pathname) {
114 ret = -1;
115 goto error;
116 }
117
118 if (is_local_trace && ksession->current_trace_chunk) {
119 enum lttng_trace_chunk_status chunk_status;
120 char *pathname_index;
121
122 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
123 pathname);
124 if (ret < 0) {
125 ERR("Failed to format channel index directory");
126 ret = -1;
127 goto error;
128 }
129
130 /*
131 * Create the index subdirectory which will take care
132 * of implicitly creating the channel's path.
133 */
134 chunk_status = lttng_trace_chunk_create_subdirectory(
135 ksession->current_trace_chunk, pathname_index);
136 free(pathname_index);
137 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
138 ret = -1;
139 goto error;
140 }
141 }
142
143 /* Prep channel message structure */
144 consumer_init_add_channel_comm_msg(&lkm,
145 channel->key,
146 ksession->id,
147 &pathname[consumer_path_offset],
148 consumer->net_seq_index,
149 channel->channel->name,
150 channel->stream_count,
151 channel->channel->attr.output,
152 CONSUMER_CHANNEL_TYPE_DATA,
153 channel->channel->attr.tracefile_size,
154 channel->channel->attr.tracefile_count,
155 monitor,
156 channel->channel->attr.live_timer_interval,
157 ksession->is_live_session,
158 channel_attr_extended->monitor_timer_interval,
159 ksession->current_trace_chunk);
160
161 health_code_update();
162
163 ret = consumer_send_channel(sock, &lkm);
164 if (ret < 0) {
165 goto error;
166 }
167
168 health_code_update();
169 rcu_read_lock();
170 session = session_find_by_id(ksession->id);
171 LTTNG_ASSERT(session);
172 ASSERT_LOCKED(session->lock);
173 ASSERT_SESSION_LIST_LOCKED();
174
175 status = notification_thread_command_add_channel(the_notification_thread_handle,
176 session->id, channel->channel->name, channel->key, LTTNG_DOMAIN_KERNEL,
177 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
178 rcu_read_unlock();
179 if (status != LTTNG_OK) {
180 ret = -1;
181 goto error;
182 }
183
184 channel->published_to_notification_thread = true;
185
186 error:
187 if (session) {
188 session_put(session);
189 }
190 free(pathname);
191 return ret;
192 }
193
194 /*
195 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
196 *
197 * The consumer socket lock must be held by the caller.
198 */
199 int kernel_consumer_add_metadata(struct consumer_socket *sock,
200 struct ltt_kernel_session *ksession, unsigned int monitor)
201 {
202 int ret;
203 struct lttcomm_consumer_msg lkm;
204 struct consumer_output *consumer;
205
206 rcu_read_lock();
207
208 /* Safety net */
209 LTTNG_ASSERT(ksession);
210 LTTNG_ASSERT(ksession->consumer);
211 LTTNG_ASSERT(sock);
212
213 DBG("Sending metadata %d to kernel consumer",
214 ksession->metadata_stream_fd);
215
216 /* Get consumer output pointer */
217 consumer = ksession->consumer;
218
219 /* Prep channel message structure */
220 consumer_init_add_channel_comm_msg(&lkm,
221 ksession->metadata->key,
222 ksession->id,
223 "",
224 consumer->net_seq_index,
225 ksession->metadata->conf->name,
226 1,
227 ksession->metadata->conf->attr.output,
228 CONSUMER_CHANNEL_TYPE_METADATA,
229 ksession->metadata->conf->attr.tracefile_size,
230 ksession->metadata->conf->attr.tracefile_count,
231 monitor,
232 ksession->metadata->conf->attr.live_timer_interval,
233 ksession->is_live_session,
234 0,
235 ksession->current_trace_chunk);
236
237 health_code_update();
238
239 ret = consumer_send_channel(sock, &lkm);
240 if (ret < 0) {
241 goto error;
242 }
243
244 health_code_update();
245
246 /* Prep stream message structure */
247 consumer_init_add_stream_comm_msg(&lkm,
248 ksession->metadata->key,
249 ksession->metadata_stream_fd,
250 0 /* CPU: 0 for metadata. */);
251
252 health_code_update();
253
254 /* Send stream and file descriptor */
255 ret = consumer_send_stream(sock, consumer, &lkm,
256 &ksession->metadata_stream_fd, 1);
257 if (ret < 0) {
258 goto error;
259 }
260
261 health_code_update();
262
263 error:
264 rcu_read_unlock();
265 return ret;
266 }
267
268 /*
269 * Sending a single stream to the consumer with command ADD_STREAM.
270 */
271 static
272 int kernel_consumer_add_stream(struct consumer_socket *sock,
273 struct ltt_kernel_channel *channel,
274 struct ltt_kernel_stream *stream,
275 struct ltt_kernel_session *session)
276 {
277 int ret;
278 struct lttcomm_consumer_msg lkm;
279 struct consumer_output *consumer;
280
281 LTTNG_ASSERT(channel);
282 LTTNG_ASSERT(stream);
283 LTTNG_ASSERT(session);
284 LTTNG_ASSERT(session->consumer);
285 LTTNG_ASSERT(sock);
286
287 DBG("Sending stream %d of channel %s to kernel consumer",
288 stream->fd, channel->channel->name);
289
290 /* Get consumer output pointer */
291 consumer = session->consumer;
292
293 /* Prep stream consumer message */
294 consumer_init_add_stream_comm_msg(&lkm,
295 channel->key,
296 stream->fd,
297 stream->cpu);
298
299 health_code_update();
300
301 /* Send stream and file descriptor */
302 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
303 if (ret < 0) {
304 goto error;
305 }
306
307 health_code_update();
308
309 error:
310 return ret;
311 }
312
313 /*
314 * Sending the notification that all streams were sent with STREAMS_SENT.
315 */
316 int kernel_consumer_streams_sent(struct consumer_socket *sock,
317 struct ltt_kernel_session *session, uint64_t channel_key)
318 {
319 int ret;
320 struct lttcomm_consumer_msg lkm;
321 struct consumer_output *consumer;
322
323 LTTNG_ASSERT(sock);
324 LTTNG_ASSERT(session);
325
326 DBG("Sending streams_sent");
327 /* Get consumer output pointer */
328 consumer = session->consumer;
329
330 /* Prep stream consumer message */
331 consumer_init_streams_sent_comm_msg(&lkm,
332 LTTNG_CONSUMER_STREAMS_SENT,
333 channel_key, consumer->net_seq_index);
334
335 health_code_update();
336
337 /* Send stream and file descriptor */
338 ret = consumer_send_msg(sock, &lkm);
339 if (ret < 0) {
340 goto error;
341 }
342
343 error:
344 return ret;
345 }
346
347 /*
348 * Send all stream fds of kernel channel to the consumer.
349 *
350 * The consumer socket lock must be held by the caller.
351 */
352 int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
353 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
354 unsigned int monitor)
355 {
356 int ret = LTTNG_OK;
357 struct ltt_kernel_stream *stream;
358
359 /* Safety net */
360 LTTNG_ASSERT(channel);
361 LTTNG_ASSERT(ksession);
362 LTTNG_ASSERT(ksession->consumer);
363 LTTNG_ASSERT(sock);
364
365 rcu_read_lock();
366
367 /* Bail out if consumer is disabled */
368 if (!ksession->consumer->enabled) {
369 ret = LTTNG_OK;
370 goto error;
371 }
372
373 DBG("Sending streams of channel %s to kernel consumer",
374 channel->channel->name);
375
376 if (!channel->sent_to_consumer) {
377 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
378 if (ret < 0) {
379 goto error;
380 }
381 channel->sent_to_consumer = true;
382 }
383
384 /* Send streams */
385 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
386 if (!stream->fd || stream->sent_to_consumer) {
387 continue;
388 }
389
390 /* Add stream on the kernel consumer side. */
391 ret = kernel_consumer_add_stream(sock, channel, stream,
392 ksession);
393 if (ret < 0) {
394 goto error;
395 }
396 stream->sent_to_consumer = true;
397 }
398
399 error:
400 rcu_read_unlock();
401 return ret;
402 }
403
404 /*
405 * Send all stream fds of the kernel session to the consumer.
406 *
407 * The consumer socket lock must be held by the caller.
408 */
409 int kernel_consumer_send_session(struct consumer_socket *sock,
410 struct ltt_kernel_session *session)
411 {
412 int ret, monitor = 0;
413 struct ltt_kernel_channel *chan;
414
415 /* Safety net */
416 LTTNG_ASSERT(session);
417 LTTNG_ASSERT(session->consumer);
418 LTTNG_ASSERT(sock);
419
420 /* Bail out if consumer is disabled */
421 if (!session->consumer->enabled) {
422 ret = LTTNG_OK;
423 goto error;
424 }
425
426 /* Don't monitor the streams on the consumer if in flight recorder. */
427 if (session->output_traces) {
428 monitor = 1;
429 }
430
431 DBG("Sending session stream to kernel consumer");
432
433 if (session->metadata_stream_fd >= 0 && session->metadata) {
434 ret = kernel_consumer_add_metadata(sock, session, monitor);
435 if (ret < 0) {
436 goto error;
437 }
438 }
439
440 /* Send channel and streams of it */
441 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
442 ret = kernel_consumer_send_channel_streams(sock, chan, session,
443 monitor);
444 if (ret < 0) {
445 goto error;
446 }
447 if (monitor) {
448 /*
449 * Inform the relay that all the streams for the
450 * channel were sent.
451 */
452 ret = kernel_consumer_streams_sent(sock, session, chan->key);
453 if (ret < 0) {
454 goto error;
455 }
456 }
457 }
458
459 DBG("Kernel consumer FDs of metadata and channel streams sent");
460
461 session->consumer_fds_sent = 1;
462 return 0;
463
464 error:
465 return ret;
466 }
467
468 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
469 struct ltt_kernel_channel *channel)
470 {
471 int ret;
472 struct lttcomm_consumer_msg msg;
473
474 LTTNG_ASSERT(channel);
475 LTTNG_ASSERT(socket);
476
477 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
478
479 memset(&msg, 0, sizeof(msg));
480 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
481 msg.u.destroy_channel.key = channel->key;
482
483 pthread_mutex_lock(socket->lock);
484 health_code_update();
485
486 ret = consumer_send_msg(socket, &msg);
487 if (ret < 0) {
488 goto error;
489 }
490
491 error:
492 health_code_update();
493 pthread_mutex_unlock(socket->lock);
494 return ret;
495 }
496
497 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
498 struct ltt_kernel_metadata *metadata)
499 {
500 int ret;
501 struct lttcomm_consumer_msg msg;
502
503 LTTNG_ASSERT(metadata);
504 LTTNG_ASSERT(socket);
505
506 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
507
508 memset(&msg, 0, sizeof(msg));
509 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
510 msg.u.destroy_channel.key = metadata->key;
511
512 pthread_mutex_lock(socket->lock);
513 health_code_update();
514
515 ret = consumer_send_msg(socket, &msg);
516 if (ret < 0) {
517 goto error;
518 }
519
520 error:
521 health_code_update();
522 pthread_mutex_unlock(socket->lock);
523 return ret;
524 }
This page took 0.039581 seconds and 4 git commands to generate.