Cleanup: app is never used by alloc_ust_app_session()
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23 #include <inttypes.h>
24
25 #include <common/common.h>
26 #include <common/defaults.h>
27 #include <common/compat/string.h>
28
29 #include "consumer.h"
30 #include "health-sessiond.h"
31 #include "kernel-consumer.h"
32 #include "notification-thread-commands.h"
33 #include "session.h"
34 #include "lttng-sessiond.h"
35
36 static char *create_channel_path(struct consumer_output *consumer,
37 uid_t uid, gid_t gid)
38 {
39 int ret;
40 char tmp_path[PATH_MAX];
41 char *pathname = NULL;
42
43 assert(consumer);
44
45 /* Get the right path name destination */
46 if (consumer->type == CONSUMER_DST_LOCAL) {
47 /* Set application path to the destination path */
48 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s%s",
49 consumer->dst.session_root_path,
50 consumer->chunk_path,
51 consumer->subdir);
52 if (ret < 0) {
53 PERROR("snprintf kernel channel path");
54 goto error;
55 } else if (ret >= sizeof(tmp_path)) {
56 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s%s\"",
57 sizeof(tmp_path), ret,
58 consumer->dst.session_root_path,
59 consumer->chunk_path,
60 consumer->subdir);
61 goto error;
62 }
63 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
64 if (!pathname) {
65 PERROR("lttng_strndup");
66 goto error;
67 }
68
69 /* Create directory */
70 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
71 if (ret < 0) {
72 if (errno != EEXIST) {
73 ERR("Trace directory creation error");
74 goto error;
75 }
76 }
77 DBG3("Kernel local consumer tracefile path: %s", pathname);
78 } else {
79 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
80 consumer->dst.net.base_dir,
81 consumer->subdir);
82 if (ret < 0) {
83 PERROR("snprintf kernel metadata path");
84 goto error;
85 } else if (ret >= sizeof(tmp_path)) {
86 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
87 sizeof(tmp_path), ret,
88 consumer->dst.net.base_dir,
89 consumer->subdir);
90 goto error;
91 }
92 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
93 if (!pathname) {
94 PERROR("lttng_strndup");
95 goto error;
96 }
97 DBG3("Kernel network consumer subdir path: %s", pathname);
98 }
99
100 return pathname;
101
102 error:
103 free(pathname);
104 return NULL;
105 }
106
107 /*
108 * Sending a single channel to the consumer with command ADD_CHANNEL.
109 */
110 int kernel_consumer_add_channel(struct consumer_socket *sock,
111 struct ltt_kernel_channel *channel,
112 struct ltt_kernel_session *ksession,
113 unsigned int monitor)
114 {
115 int ret;
116 char *pathname;
117 struct lttcomm_consumer_msg lkm;
118 struct consumer_output *consumer;
119 enum lttng_error_code status;
120 struct ltt_session *session;
121 struct lttng_channel_extended *channel_attr_extended;
122
123 /* Safety net */
124 assert(channel);
125 assert(ksession);
126 assert(ksession->consumer);
127
128 consumer = ksession->consumer;
129 channel_attr_extended = (struct lttng_channel_extended *)
130 channel->channel->attr.extended.ptr;
131
132 DBG("Kernel consumer adding channel %s to kernel consumer",
133 channel->channel->name);
134
135 if (monitor) {
136 pathname = create_channel_path(consumer, ksession->uid,
137 ksession->gid);
138 } else {
139 /* Empty path. */
140 pathname = strdup("");
141 }
142 if (!pathname) {
143 ret = -1;
144 goto error;
145 }
146
147 /* Prep channel message structure */
148 consumer_init_add_channel_comm_msg(&lkm,
149 channel->key,
150 ksession->id,
151 pathname,
152 ksession->uid,
153 ksession->gid,
154 consumer->net_seq_index,
155 channel->channel->name,
156 channel->stream_count,
157 channel->channel->attr.output,
158 CONSUMER_CHANNEL_TYPE_DATA,
159 channel->channel->attr.tracefile_size,
160 channel->channel->attr.tracefile_count,
161 monitor,
162 channel->channel->attr.live_timer_interval,
163 channel_attr_extended->monitor_timer_interval);
164
165 health_code_update();
166
167 ret = consumer_send_channel(sock, &lkm);
168 if (ret < 0) {
169 goto error;
170 }
171
172 health_code_update();
173 rcu_read_lock();
174 session = session_find_by_id(ksession->id);
175 assert(session);
176
177 status = notification_thread_command_add_channel(
178 notification_thread_handle, session->name,
179 ksession->uid, ksession->gid,
180 channel->channel->name, channel->key,
181 LTTNG_DOMAIN_KERNEL,
182 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
183 rcu_read_unlock();
184 if (status != LTTNG_OK) {
185 ret = -1;
186 goto error;
187 }
188
189 channel->published_to_notification_thread = true;
190
191 error:
192 free(pathname);
193 return ret;
194 }
195
196 /*
197 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
198 *
199 * The consumer socket lock must be held by the caller.
200 */
201 int kernel_consumer_add_metadata(struct consumer_socket *sock,
202 struct ltt_kernel_session *session, unsigned int monitor)
203 {
204 int ret;
205 char *pathname;
206 struct lttcomm_consumer_msg lkm;
207 struct consumer_output *consumer;
208
209 /* Safety net */
210 assert(session);
211 assert(session->consumer);
212 assert(sock);
213
214 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
215
216 /* Get consumer output pointer */
217 consumer = session->consumer;
218
219 if (monitor) {
220 pathname = create_channel_path(consumer, session->uid, session->gid);
221 } else {
222 /* Empty path. */
223 pathname = strdup("");
224 }
225 if (!pathname) {
226 ret = -1;
227 goto error;
228 }
229
230 /* Prep channel message structure */
231 consumer_init_add_channel_comm_msg(&lkm,
232 session->metadata->key,
233 session->id,
234 pathname,
235 session->uid,
236 session->gid,
237 consumer->net_seq_index,
238 DEFAULT_METADATA_NAME,
239 1,
240 DEFAULT_KERNEL_CHANNEL_OUTPUT,
241 CONSUMER_CHANNEL_TYPE_METADATA,
242 0, 0,
243 monitor, 0, 0);
244
245 health_code_update();
246
247 ret = consumer_send_channel(sock, &lkm);
248 if (ret < 0) {
249 goto error;
250 }
251
252 health_code_update();
253
254 /* Prep stream message structure */
255 consumer_init_stream_comm_msg(&lkm,
256 LTTNG_CONSUMER_ADD_STREAM,
257 session->metadata->key,
258 session->metadata_stream_fd,
259 0); /* CPU: 0 for metadata. */
260
261 health_code_update();
262
263 /* Send stream and file descriptor */
264 ret = consumer_send_stream(sock, consumer, &lkm,
265 &session->metadata_stream_fd, 1);
266 if (ret < 0) {
267 goto error;
268 }
269
270 health_code_update();
271
272 error:
273 free(pathname);
274 return ret;
275 }
276
277 /*
278 * Sending a single stream to the consumer with command ADD_STREAM.
279 */
280 static
281 int kernel_consumer_add_stream(struct consumer_socket *sock,
282 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
283 struct ltt_kernel_session *session, unsigned int monitor)
284 {
285 int ret;
286 struct lttcomm_consumer_msg lkm;
287 struct consumer_output *consumer;
288
289 assert(channel);
290 assert(stream);
291 assert(session);
292 assert(session->consumer);
293 assert(sock);
294
295 DBG("Sending stream %d of channel %s to kernel consumer",
296 stream->fd, channel->channel->name);
297
298 /* Get consumer output pointer */
299 consumer = session->consumer;
300
301 /* Prep stream consumer message */
302 consumer_init_stream_comm_msg(&lkm,
303 LTTNG_CONSUMER_ADD_STREAM,
304 channel->key,
305 stream->fd,
306 stream->cpu);
307
308 health_code_update();
309
310 /* Send stream and file descriptor */
311 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
312 if (ret < 0) {
313 goto error;
314 }
315
316 health_code_update();
317
318 error:
319 return ret;
320 }
321
322 /*
323 * Sending the notification that all streams were sent with STREAMS_SENT.
324 */
325 int kernel_consumer_streams_sent(struct consumer_socket *sock,
326 struct ltt_kernel_session *session, uint64_t channel_key)
327 {
328 int ret;
329 struct lttcomm_consumer_msg lkm;
330 struct consumer_output *consumer;
331
332 assert(sock);
333 assert(session);
334
335 DBG("Sending streams_sent");
336 /* Get consumer output pointer */
337 consumer = session->consumer;
338
339 /* Prep stream consumer message */
340 consumer_init_streams_sent_comm_msg(&lkm,
341 LTTNG_CONSUMER_STREAMS_SENT,
342 channel_key, consumer->net_seq_index);
343
344 health_code_update();
345
346 /* Send stream and file descriptor */
347 ret = consumer_send_msg(sock, &lkm);
348 if (ret < 0) {
349 goto error;
350 }
351
352 error:
353 return ret;
354 }
355
356 /*
357 * Send all stream fds of kernel channel to the consumer.
358 *
359 * The consumer socket lock must be held by the caller.
360 */
361 int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
362 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
363 unsigned int monitor)
364 {
365 int ret = LTTNG_OK;
366 struct ltt_kernel_stream *stream;
367
368 /* Safety net */
369 assert(channel);
370 assert(session);
371 assert(session->consumer);
372 assert(sock);
373
374 /* Bail out if consumer is disabled */
375 if (!session->consumer->enabled) {
376 ret = LTTNG_OK;
377 goto error;
378 }
379
380 DBG("Sending streams of channel %s to kernel consumer",
381 channel->channel->name);
382
383 if (!channel->sent_to_consumer) {
384 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
385 if (ret < 0) {
386 goto error;
387 }
388 channel->sent_to_consumer = true;
389 }
390
391 /* Send streams */
392 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
393 if (!stream->fd || stream->sent_to_consumer) {
394 continue;
395 }
396
397 /* Add stream on the kernel consumer side. */
398 ret = kernel_consumer_add_stream(sock, channel, stream, session,
399 monitor);
400 if (ret < 0) {
401 goto error;
402 }
403 stream->sent_to_consumer = true;
404 }
405
406 error:
407 return ret;
408 }
409
410 /*
411 * Send all stream fds of the kernel session to the consumer.
412 *
413 * The consumer socket lock must be held by the caller.
414 */
415 int kernel_consumer_send_session(struct consumer_socket *sock,
416 struct ltt_kernel_session *session)
417 {
418 int ret, monitor = 0;
419 struct ltt_kernel_channel *chan;
420
421 /* Safety net */
422 assert(session);
423 assert(session->consumer);
424 assert(sock);
425
426 /* Bail out if consumer is disabled */
427 if (!session->consumer->enabled) {
428 ret = LTTNG_OK;
429 goto error;
430 }
431
432 /* Don't monitor the streams on the consumer if in flight recorder. */
433 if (session->output_traces) {
434 monitor = 1;
435 }
436
437 DBG("Sending session stream to kernel consumer");
438
439 if (session->metadata_stream_fd >= 0 && session->metadata) {
440 ret = kernel_consumer_add_metadata(sock, session, monitor);
441 if (ret < 0) {
442 goto error;
443 }
444 }
445
446 /* Send channel and streams of it */
447 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
448 ret = kernel_consumer_send_channel_streams(sock, chan, session,
449 monitor);
450 if (ret < 0) {
451 goto error;
452 }
453 if (monitor) {
454 /*
455 * Inform the relay that all the streams for the
456 * channel were sent.
457 */
458 ret = kernel_consumer_streams_sent(sock, session, chan->key);
459 if (ret < 0) {
460 goto error;
461 }
462 }
463 }
464
465 DBG("Kernel consumer FDs of metadata and channel streams sent");
466
467 session->consumer_fds_sent = 1;
468 return 0;
469
470 error:
471 return ret;
472 }
473
474 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
475 struct ltt_kernel_channel *channel)
476 {
477 int ret;
478 struct lttcomm_consumer_msg msg;
479
480 assert(channel);
481 assert(socket);
482
483 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
484
485 memset(&msg, 0, sizeof(msg));
486 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
487 msg.u.destroy_channel.key = channel->key;
488
489 pthread_mutex_lock(socket->lock);
490 health_code_update();
491
492 ret = consumer_send_msg(socket, &msg);
493 if (ret < 0) {
494 goto error;
495 }
496
497 error:
498 health_code_update();
499 pthread_mutex_unlock(socket->lock);
500 return ret;
501 }
502
503 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
504 struct ltt_kernel_metadata *metadata)
505 {
506 int ret;
507 struct lttcomm_consumer_msg msg;
508
509 assert(metadata);
510 assert(socket);
511
512 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
513
514 memset(&msg, 0, sizeof(msg));
515 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
516 msg.u.destroy_channel.key = metadata->key;
517
518 pthread_mutex_lock(socket->lock);
519 health_code_update();
520
521 ret = consumer_send_msg(socket, &msg);
522 if (ret < 0) {
523 goto error;
524 }
525
526 error:
527 health_code_update();
528 pthread_mutex_unlock(socket->lock);
529 return ret;
530 }
This page took 0.039292 seconds and 4 git commands to generate.