Fix: use a free running channel key between sessiond and kernel consumer
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23 #include <inttypes.h>
24
25 #include <common/common.h>
26 #include <common/defaults.h>
27 #include <common/compat/string.h>
28
29 #include "consumer.h"
30 #include "health-sessiond.h"
31 #include "kernel-consumer.h"
32 #include "notification-thread-commands.h"
33 #include "session.h"
34 #include "lttng-sessiond.h"
35
36 static char *create_channel_path(struct consumer_output *consumer,
37 uid_t uid, gid_t gid)
38 {
39 int ret;
40 char tmp_path[PATH_MAX];
41 char *pathname = NULL;
42
43 assert(consumer);
44
45 /* Get the right path name destination */
46 if (consumer->type == CONSUMER_DST_LOCAL) {
47 /* Set application path to the destination path */
48 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
49 consumer->dst.trace_path, consumer->subdir);
50 if (ret < 0) {
51 PERROR("snprintf kernel channel path");
52 goto error;
53 }
54 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
55 if (!pathname) {
56 PERROR("lttng_strndup");
57 goto error;
58 }
59
60 /* Create directory */
61 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
62 if (ret < 0) {
63 if (errno != EEXIST) {
64 ERR("Trace directory creation error");
65 goto error;
66 }
67 }
68 DBG3("Kernel local consumer tracefile path: %s", pathname);
69 } else {
70 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
71 if (ret < 0) {
72 PERROR("snprintf kernel metadata path");
73 goto error;
74 }
75 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
76 if (!pathname) {
77 PERROR("lttng_strndup");
78 goto error;
79 }
80 DBG3("Kernel network consumer subdir path: %s", pathname);
81 }
82
83 return pathname;
84
85 error:
86 free(pathname);
87 return NULL;
88 }
89
90 /*
91 * Sending a single channel to the consumer with command ADD_CHANNEL.
92 */
93 int kernel_consumer_add_channel(struct consumer_socket *sock,
94 struct ltt_kernel_channel *channel,
95 struct ltt_kernel_session *ksession,
96 unsigned int monitor)
97 {
98 int ret;
99 char *pathname;
100 struct lttcomm_consumer_msg lkm;
101 struct consumer_output *consumer;
102 enum lttng_error_code status;
103 struct ltt_session *session;
104 struct lttng_channel_extended *channel_attr_extended;
105
106 /* Safety net */
107 assert(channel);
108 assert(ksession);
109 assert(ksession->consumer);
110
111 consumer = ksession->consumer;
112 channel_attr_extended = (struct lttng_channel_extended *)
113 channel->channel->attr.extended.ptr;
114
115 DBG("Kernel consumer adding channel %s to kernel consumer",
116 channel->channel->name);
117
118 if (monitor) {
119 pathname = create_channel_path(consumer, ksession->uid,
120 ksession->gid);
121 } else {
122 /* Empty path. */
123 pathname = strdup("");
124 }
125 if (!pathname) {
126 ret = -1;
127 goto error;
128 }
129
130 /* Prep channel message structure */
131 consumer_init_channel_comm_msg(&lkm,
132 LTTNG_CONSUMER_ADD_CHANNEL,
133 channel->key,
134 ksession->id,
135 pathname,
136 ksession->uid,
137 ksession->gid,
138 consumer->net_seq_index,
139 channel->channel->name,
140 channel->stream_count,
141 channel->channel->attr.output,
142 CONSUMER_CHANNEL_TYPE_DATA,
143 channel->channel->attr.tracefile_size,
144 channel->channel->attr.tracefile_count,
145 monitor,
146 channel->channel->attr.live_timer_interval,
147 channel_attr_extended->monitor_timer_interval);
148
149 health_code_update();
150
151 ret = consumer_send_channel(sock, &lkm);
152 if (ret < 0) {
153 goto error;
154 }
155
156 health_code_update();
157 rcu_read_lock();
158 session = session_find_by_id(ksession->id);
159 assert(session);
160
161 status = notification_thread_command_add_channel(
162 notification_thread_handle, session->name,
163 ksession->uid, ksession->gid,
164 channel->channel->name, channel->key,
165 LTTNG_DOMAIN_KERNEL,
166 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
167 rcu_read_unlock();
168 if (status != LTTNG_OK) {
169 ret = -1;
170 goto error;
171 }
172
173 channel->published_to_notification_thread = true;
174
175 error:
176 free(pathname);
177 return ret;
178 }
179
180 /*
181 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
182 *
183 * The consumer socket lock must be held by the caller.
184 */
185 int kernel_consumer_add_metadata(struct consumer_socket *sock,
186 struct ltt_kernel_session *session, unsigned int monitor)
187 {
188 int ret;
189 char *pathname;
190 struct lttcomm_consumer_msg lkm;
191 struct consumer_output *consumer;
192
193 /* Safety net */
194 assert(session);
195 assert(session->consumer);
196 assert(sock);
197
198 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
199
200 /* Get consumer output pointer */
201 consumer = session->consumer;
202
203 if (monitor) {
204 pathname = create_channel_path(consumer, session->uid, session->gid);
205 } else {
206 /* Empty path. */
207 pathname = strdup("");
208 }
209 if (!pathname) {
210 ret = -1;
211 goto error;
212 }
213
214 /* Prep channel message structure */
215 consumer_init_channel_comm_msg(&lkm,
216 LTTNG_CONSUMER_ADD_CHANNEL,
217 session->metadata->fd,
218 session->id,
219 pathname,
220 session->uid,
221 session->gid,
222 consumer->net_seq_index,
223 DEFAULT_METADATA_NAME,
224 1,
225 DEFAULT_KERNEL_CHANNEL_OUTPUT,
226 CONSUMER_CHANNEL_TYPE_METADATA,
227 0, 0,
228 monitor, 0, 0);
229
230 health_code_update();
231
232 ret = consumer_send_channel(sock, &lkm);
233 if (ret < 0) {
234 goto error;
235 }
236
237 health_code_update();
238
239 /* Prep stream message structure */
240 consumer_init_stream_comm_msg(&lkm,
241 LTTNG_CONSUMER_ADD_STREAM,
242 session->metadata->fd,
243 session->metadata_stream_fd,
244 0); /* CPU: 0 for metadata. */
245
246 health_code_update();
247
248 /* Send stream and file descriptor */
249 ret = consumer_send_stream(sock, consumer, &lkm,
250 &session->metadata_stream_fd, 1);
251 if (ret < 0) {
252 goto error;
253 }
254
255 health_code_update();
256
257 error:
258 free(pathname);
259 return ret;
260 }
261
262 /*
263 * Sending a single stream to the consumer with command ADD_STREAM.
264 */
265 int kernel_consumer_add_stream(struct consumer_socket *sock,
266 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
267 struct ltt_kernel_session *session, unsigned int monitor)
268 {
269 int ret;
270 struct lttcomm_consumer_msg lkm;
271 struct consumer_output *consumer;
272
273 assert(channel);
274 assert(stream);
275 assert(session);
276 assert(session->consumer);
277 assert(sock);
278
279 DBG("Sending stream %d of channel %s to kernel consumer",
280 stream->fd, channel->channel->name);
281
282 /* Get consumer output pointer */
283 consumer = session->consumer;
284
285 /* Prep stream consumer message */
286 consumer_init_stream_comm_msg(&lkm,
287 LTTNG_CONSUMER_ADD_STREAM,
288 channel->key,
289 stream->fd,
290 stream->cpu);
291
292 health_code_update();
293
294 /* Send stream and file descriptor */
295 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
296 if (ret < 0) {
297 goto error;
298 }
299
300 health_code_update();
301
302 error:
303 return ret;
304 }
305
306 /*
307 * Sending the notification that all streams were sent with STREAMS_SENT.
308 */
309 int kernel_consumer_streams_sent(struct consumer_socket *sock,
310 struct ltt_kernel_session *session, uint64_t channel_key)
311 {
312 int ret;
313 struct lttcomm_consumer_msg lkm;
314 struct consumer_output *consumer;
315
316 assert(sock);
317 assert(session);
318
319 DBG("Sending streams_sent");
320 /* Get consumer output pointer */
321 consumer = session->consumer;
322
323 /* Prep stream consumer message */
324 consumer_init_streams_sent_comm_msg(&lkm,
325 LTTNG_CONSUMER_STREAMS_SENT,
326 channel_key, consumer->net_seq_index);
327
328 health_code_update();
329
330 /* Send stream and file descriptor */
331 ret = consumer_send_msg(sock, &lkm);
332 if (ret < 0) {
333 goto error;
334 }
335
336 error:
337 return ret;
338 }
339
340 /*
341 * Send all stream fds of kernel channel to the consumer.
342 *
343 * The consumer socket lock must be held by the caller.
344 */
345 int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
346 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
347 unsigned int monitor)
348 {
349 int ret = LTTNG_OK;
350 struct ltt_kernel_stream *stream;
351
352 /* Safety net */
353 assert(channel);
354 assert(session);
355 assert(session->consumer);
356 assert(sock);
357
358 /* Bail out if consumer is disabled */
359 if (!session->consumer->enabled) {
360 ret = LTTNG_OK;
361 goto error;
362 }
363
364 DBG("Sending streams of channel %s to kernel consumer",
365 channel->channel->name);
366
367 if (!channel->sent_to_consumer) {
368 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
369 if (ret < 0) {
370 goto error;
371 }
372 channel->sent_to_consumer = true;
373 }
374
375 /* Send streams */
376 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
377 if (!stream->fd || stream->sent_to_consumer) {
378 continue;
379 }
380
381 /* Add stream on the kernel consumer side. */
382 ret = kernel_consumer_add_stream(sock, channel, stream, session,
383 monitor);
384 if (ret < 0) {
385 goto error;
386 }
387 stream->sent_to_consumer = true;
388 }
389
390 error:
391 return ret;
392 }
393
394 /*
395 * Send all stream fds of the kernel session to the consumer.
396 *
397 * The consumer socket lock must be held by the caller.
398 */
399 int kernel_consumer_send_session(struct consumer_socket *sock,
400 struct ltt_kernel_session *session)
401 {
402 int ret, monitor = 0;
403 struct ltt_kernel_channel *chan;
404
405 /* Safety net */
406 assert(session);
407 assert(session->consumer);
408 assert(sock);
409
410 /* Bail out if consumer is disabled */
411 if (!session->consumer->enabled) {
412 ret = LTTNG_OK;
413 goto error;
414 }
415
416 /* Don't monitor the streams on the consumer if in flight recorder. */
417 if (session->output_traces) {
418 monitor = 1;
419 }
420
421 DBG("Sending session stream to kernel consumer");
422
423 if (session->metadata_stream_fd >= 0 && session->metadata) {
424 ret = kernel_consumer_add_metadata(sock, session, monitor);
425 if (ret < 0) {
426 goto error;
427 }
428 }
429
430 /* Send channel and streams of it */
431 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
432 ret = kernel_consumer_send_channel_stream(sock, chan, session,
433 monitor);
434 if (ret < 0) {
435 goto error;
436 }
437 if (monitor) {
438 /*
439 * Inform the relay that all the streams for the
440 * channel were sent.
441 */
442 ret = kernel_consumer_streams_sent(sock, session, chan->key);
443 if (ret < 0) {
444 goto error;
445 }
446 }
447 }
448
449 DBG("Kernel consumer FDs of metadata and channel streams sent");
450
451 session->consumer_fds_sent = 1;
452 return 0;
453
454 error:
455 return ret;
456 }
457
458 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
459 struct ltt_kernel_channel *channel)
460 {
461 int ret;
462 struct lttcomm_consumer_msg msg;
463
464 assert(channel);
465 assert(socket);
466
467 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
468
469 memset(&msg, 0, sizeof(msg));
470 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
471 msg.u.destroy_channel.key = channel->key;
472
473 pthread_mutex_lock(socket->lock);
474 health_code_update();
475
476 ret = consumer_send_msg(socket, &msg);
477 if (ret < 0) {
478 goto error;
479 }
480
481 error:
482 health_code_update();
483 pthread_mutex_unlock(socket->lock);
484 return ret;
485 }
486
487 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
488 struct ltt_kernel_metadata *metadata)
489 {
490 int ret;
491 struct lttcomm_consumer_msg msg;
492
493 assert(metadata);
494 assert(socket);
495
496 DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
497
498 memset(&msg, 0, sizeof(msg));
499 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
500 msg.u.destroy_channel.key = metadata->fd;
501
502 pthread_mutex_lock(socket->lock);
503 health_code_update();
504
505 ret = consumer_send_msg(socket, &msg);
506 if (ret < 0) {
507 goto error;
508 }
509
510 error:
511 health_code_update();
512 pthread_mutex_unlock(socket->lock);
513 return ret;
514 }
This page took 0.0399659999999999 seconds and 5 git commands to generate.