Change trace_path to session_root_path and chunk_path
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23 #include <inttypes.h>
24
25 #include <common/common.h>
26 #include <common/defaults.h>
27 #include <common/compat/string.h>
28
29 #include "consumer.h"
30 #include "health-sessiond.h"
31 #include "kernel-consumer.h"
32 #include "notification-thread-commands.h"
33 #include "session.h"
34 #include "lttng-sessiond.h"
35
36 static char *create_channel_path(struct consumer_output *consumer,
37 uid_t uid, gid_t gid)
38 {
39 int ret;
40 char tmp_path[PATH_MAX];
41 char *pathname = NULL;
42
43 assert(consumer);
44
45 /* Get the right path name destination */
46 if (consumer->type == CONSUMER_DST_LOCAL) {
47 /* Set application path to the destination path */
48 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s%s",
49 consumer->dst.session_root_path,
50 consumer->chunk_path,
51 consumer->subdir);
52 if (ret < 0) {
53 PERROR("snprintf kernel channel path");
54 goto error;
55 }
56 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
57 if (!pathname) {
58 PERROR("lttng_strndup");
59 goto error;
60 }
61
62 /* Create directory */
63 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
64 if (ret < 0) {
65 if (errno != EEXIST) {
66 ERR("Trace directory creation error");
67 goto error;
68 }
69 }
70 DBG3("Kernel local consumer tracefile path: %s", pathname);
71 } else {
72 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
73 if (ret < 0) {
74 PERROR("snprintf kernel metadata path");
75 goto error;
76 }
77 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
78 if (!pathname) {
79 PERROR("lttng_strndup");
80 goto error;
81 }
82 DBG3("Kernel network consumer subdir path: %s", pathname);
83 }
84
85 return pathname;
86
87 error:
88 free(pathname);
89 return NULL;
90 }
91
92 /*
93 * Sending a single channel to the consumer with command ADD_CHANNEL.
94 */
95 int kernel_consumer_add_channel(struct consumer_socket *sock,
96 struct ltt_kernel_channel *channel,
97 struct ltt_kernel_session *ksession,
98 unsigned int monitor)
99 {
100 int ret;
101 char *pathname;
102 struct lttcomm_consumer_msg lkm;
103 struct consumer_output *consumer;
104 enum lttng_error_code status;
105 struct ltt_session *session;
106 struct lttng_channel_extended *channel_attr_extended;
107
108 /* Safety net */
109 assert(channel);
110 assert(ksession);
111 assert(ksession->consumer);
112
113 consumer = ksession->consumer;
114 channel_attr_extended = (struct lttng_channel_extended *)
115 channel->channel->attr.extended.ptr;
116
117 DBG("Kernel consumer adding channel %s to kernel consumer",
118 channel->channel->name);
119
120 if (monitor) {
121 pathname = create_channel_path(consumer, ksession->uid,
122 ksession->gid);
123 } else {
124 /* Empty path. */
125 pathname = strdup("");
126 }
127 if (!pathname) {
128 ret = -1;
129 goto error;
130 }
131
132 /* Prep channel message structure */
133 consumer_init_channel_comm_msg(&lkm,
134 LTTNG_CONSUMER_ADD_CHANNEL,
135 channel->key,
136 ksession->id,
137 pathname,
138 ksession->uid,
139 ksession->gid,
140 consumer->net_seq_index,
141 channel->channel->name,
142 channel->stream_count,
143 channel->channel->attr.output,
144 CONSUMER_CHANNEL_TYPE_DATA,
145 channel->channel->attr.tracefile_size,
146 channel->channel->attr.tracefile_count,
147 monitor,
148 channel->channel->attr.live_timer_interval,
149 channel_attr_extended->monitor_timer_interval);
150
151 health_code_update();
152
153 ret = consumer_send_channel(sock, &lkm);
154 if (ret < 0) {
155 goto error;
156 }
157
158 health_code_update();
159 rcu_read_lock();
160 session = session_find_by_id(ksession->id);
161 assert(session);
162
163 status = notification_thread_command_add_channel(
164 notification_thread_handle, session->name,
165 ksession->uid, ksession->gid,
166 channel->channel->name, channel->key,
167 LTTNG_DOMAIN_KERNEL,
168 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
169 rcu_read_unlock();
170 if (status != LTTNG_OK) {
171 ret = -1;
172 goto error;
173 }
174
175 channel->published_to_notification_thread = true;
176
177 error:
178 free(pathname);
179 return ret;
180 }
181
182 /*
183 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
184 *
185 * The consumer socket lock must be held by the caller.
186 */
187 int kernel_consumer_add_metadata(struct consumer_socket *sock,
188 struct ltt_kernel_session *session, unsigned int monitor)
189 {
190 int ret;
191 char *pathname;
192 struct lttcomm_consumer_msg lkm;
193 struct consumer_output *consumer;
194
195 /* Safety net */
196 assert(session);
197 assert(session->consumer);
198 assert(sock);
199
200 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
201
202 /* Get consumer output pointer */
203 consumer = session->consumer;
204
205 if (monitor) {
206 pathname = create_channel_path(consumer, session->uid, session->gid);
207 } else {
208 /* Empty path. */
209 pathname = strdup("");
210 }
211 if (!pathname) {
212 ret = -1;
213 goto error;
214 }
215
216 /* Prep channel message structure */
217 consumer_init_channel_comm_msg(&lkm,
218 LTTNG_CONSUMER_ADD_CHANNEL,
219 session->metadata->fd,
220 session->id,
221 pathname,
222 session->uid,
223 session->gid,
224 consumer->net_seq_index,
225 DEFAULT_METADATA_NAME,
226 1,
227 DEFAULT_KERNEL_CHANNEL_OUTPUT,
228 CONSUMER_CHANNEL_TYPE_METADATA,
229 0, 0,
230 monitor, 0, 0);
231
232 health_code_update();
233
234 ret = consumer_send_channel(sock, &lkm);
235 if (ret < 0) {
236 goto error;
237 }
238
239 health_code_update();
240
241 /* Prep stream message structure */
242 consumer_init_stream_comm_msg(&lkm,
243 LTTNG_CONSUMER_ADD_STREAM,
244 session->metadata->fd,
245 session->metadata_stream_fd,
246 0); /* CPU: 0 for metadata. */
247
248 health_code_update();
249
250 /* Send stream and file descriptor */
251 ret = consumer_send_stream(sock, consumer, &lkm,
252 &session->metadata_stream_fd, 1);
253 if (ret < 0) {
254 goto error;
255 }
256
257 health_code_update();
258
259 error:
260 free(pathname);
261 return ret;
262 }
263
264 /*
265 * Sending a single stream to the consumer with command ADD_STREAM.
266 */
267 int kernel_consumer_add_stream(struct consumer_socket *sock,
268 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
269 struct ltt_kernel_session *session, unsigned int monitor)
270 {
271 int ret;
272 struct lttcomm_consumer_msg lkm;
273 struct consumer_output *consumer;
274
275 assert(channel);
276 assert(stream);
277 assert(session);
278 assert(session->consumer);
279 assert(sock);
280
281 DBG("Sending stream %d of channel %s to kernel consumer",
282 stream->fd, channel->channel->name);
283
284 /* Get consumer output pointer */
285 consumer = session->consumer;
286
287 /* Prep stream consumer message */
288 consumer_init_stream_comm_msg(&lkm,
289 LTTNG_CONSUMER_ADD_STREAM,
290 channel->key,
291 stream->fd,
292 stream->cpu);
293
294 health_code_update();
295
296 /* Send stream and file descriptor */
297 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
298 if (ret < 0) {
299 goto error;
300 }
301
302 health_code_update();
303
304 error:
305 return ret;
306 }
307
308 /*
309 * Sending the notification that all streams were sent with STREAMS_SENT.
310 */
311 int kernel_consumer_streams_sent(struct consumer_socket *sock,
312 struct ltt_kernel_session *session, uint64_t channel_key)
313 {
314 int ret;
315 struct lttcomm_consumer_msg lkm;
316 struct consumer_output *consumer;
317
318 assert(sock);
319 assert(session);
320
321 DBG("Sending streams_sent");
322 /* Get consumer output pointer */
323 consumer = session->consumer;
324
325 /* Prep stream consumer message */
326 consumer_init_streams_sent_comm_msg(&lkm,
327 LTTNG_CONSUMER_STREAMS_SENT,
328 channel_key, consumer->net_seq_index);
329
330 health_code_update();
331
332 /* Send stream and file descriptor */
333 ret = consumer_send_msg(sock, &lkm);
334 if (ret < 0) {
335 goto error;
336 }
337
338 error:
339 return ret;
340 }
341
342 /*
343 * Send all stream fds of kernel channel to the consumer.
344 *
345 * The consumer socket lock must be held by the caller.
346 */
347 int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
348 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
349 unsigned int monitor)
350 {
351 int ret = LTTNG_OK;
352 struct ltt_kernel_stream *stream;
353
354 /* Safety net */
355 assert(channel);
356 assert(session);
357 assert(session->consumer);
358 assert(sock);
359
360 /* Bail out if consumer is disabled */
361 if (!session->consumer->enabled) {
362 ret = LTTNG_OK;
363 goto error;
364 }
365
366 DBG("Sending streams of channel %s to kernel consumer",
367 channel->channel->name);
368
369 if (!channel->sent_to_consumer) {
370 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
371 if (ret < 0) {
372 goto error;
373 }
374 channel->sent_to_consumer = true;
375 }
376
377 /* Send streams */
378 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
379 if (!stream->fd || stream->sent_to_consumer) {
380 continue;
381 }
382
383 /* Add stream on the kernel consumer side. */
384 ret = kernel_consumer_add_stream(sock, channel, stream, session,
385 monitor);
386 if (ret < 0) {
387 goto error;
388 }
389 stream->sent_to_consumer = true;
390 }
391
392 error:
393 return ret;
394 }
395
396 /*
397 * Send all stream fds of the kernel session to the consumer.
398 *
399 * The consumer socket lock must be held by the caller.
400 */
401 int kernel_consumer_send_session(struct consumer_socket *sock,
402 struct ltt_kernel_session *session)
403 {
404 int ret, monitor = 0;
405 struct ltt_kernel_channel *chan;
406
407 /* Safety net */
408 assert(session);
409 assert(session->consumer);
410 assert(sock);
411
412 /* Bail out if consumer is disabled */
413 if (!session->consumer->enabled) {
414 ret = LTTNG_OK;
415 goto error;
416 }
417
418 /* Don't monitor the streams on the consumer if in flight recorder. */
419 if (session->output_traces) {
420 monitor = 1;
421 }
422
423 DBG("Sending session stream to kernel consumer");
424
425 if (session->metadata_stream_fd >= 0 && session->metadata) {
426 ret = kernel_consumer_add_metadata(sock, session, monitor);
427 if (ret < 0) {
428 goto error;
429 }
430 }
431
432 /* Send channel and streams of it */
433 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
434 ret = kernel_consumer_send_channel_stream(sock, chan, session,
435 monitor);
436 if (ret < 0) {
437 goto error;
438 }
439 if (monitor) {
440 /*
441 * Inform the relay that all the streams for the
442 * channel were sent.
443 */
444 ret = kernel_consumer_streams_sent(sock, session, chan->key);
445 if (ret < 0) {
446 goto error;
447 }
448 }
449 }
450
451 DBG("Kernel consumer FDs of metadata and channel streams sent");
452
453 session->consumer_fds_sent = 1;
454 return 0;
455
456 error:
457 return ret;
458 }
459
460 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
461 struct ltt_kernel_channel *channel)
462 {
463 int ret;
464 struct lttcomm_consumer_msg msg;
465
466 assert(channel);
467 assert(socket);
468
469 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
470
471 memset(&msg, 0, sizeof(msg));
472 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
473 msg.u.destroy_channel.key = channel->key;
474
475 pthread_mutex_lock(socket->lock);
476 health_code_update();
477
478 ret = consumer_send_msg(socket, &msg);
479 if (ret < 0) {
480 goto error;
481 }
482
483 error:
484 health_code_update();
485 pthread_mutex_unlock(socket->lock);
486 return ret;
487 }
488
489 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
490 struct ltt_kernel_metadata *metadata)
491 {
492 int ret;
493 struct lttcomm_consumer_msg msg;
494
495 assert(metadata);
496 assert(socket);
497
498 DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
499
500 memset(&msg, 0, sizeof(msg));
501 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
502 msg.u.destroy_channel.key = metadata->fd;
503
504 pthread_mutex_lock(socket->lock);
505 health_code_update();
506
507 ret = consumer_send_msg(socket, &msg);
508 if (ret < 0) {
509 goto error;
510 }
511
512 error:
513 health_code_update();
514 pthread_mutex_unlock(socket->lock);
515 return ret;
516 }
This page took 0.040881 seconds and 4 git commands to generate.