Fix: Send remove channel to notification thread only when necessary
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23
24 #include <common/common.h>
25 #include <common/defaults.h>
26 #include <common/compat/string.h>
27
28 #include "consumer.h"
29 #include "health-sessiond.h"
30 #include "kernel-consumer.h"
31 #include "notification-thread-commands.h"
32 #include "session.h"
33 #include "lttng-sessiond.h"
34
35 static char *create_channel_path(struct consumer_output *consumer,
36 uid_t uid, gid_t gid)
37 {
38 int ret;
39 char tmp_path[PATH_MAX];
40 char *pathname = NULL;
41
42 assert(consumer);
43
44 /* Get the right path name destination */
45 if (consumer->type == CONSUMER_DST_LOCAL) {
46 /* Set application path to the destination path */
47 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
48 consumer->dst.trace_path, consumer->subdir);
49 if (ret < 0) {
50 PERROR("snprintf kernel channel path");
51 goto error;
52 }
53 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
54 if (!pathname) {
55 PERROR("lttng_strndup");
56 goto error;
57 }
58
59 /* Create directory */
60 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
61 if (ret < 0) {
62 if (errno != EEXIST) {
63 ERR("Trace directory creation error");
64 goto error;
65 }
66 }
67 DBG3("Kernel local consumer tracefile path: %s", pathname);
68 } else {
69 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
70 if (ret < 0) {
71 PERROR("snprintf kernel metadata path");
72 goto error;
73 }
74 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
75 if (!pathname) {
76 PERROR("lttng_strndup");
77 goto error;
78 }
79 DBG3("Kernel network consumer subdir path: %s", pathname);
80 }
81
82 return pathname;
83
84 error:
85 free(pathname);
86 return NULL;
87 }
88
89 /*
90 * Sending a single channel to the consumer with command ADD_CHANNEL.
91 */
92 int kernel_consumer_add_channel(struct consumer_socket *sock,
93 struct ltt_kernel_channel *channel,
94 struct ltt_kernel_session *ksession,
95 unsigned int monitor)
96 {
97 int ret;
98 char *pathname;
99 struct lttcomm_consumer_msg lkm;
100 struct consumer_output *consumer;
101 enum lttng_error_code status;
102 struct ltt_session *session;
103 struct lttng_channel_extended *channel_attr_extended;
104
105 /* Safety net */
106 assert(channel);
107 assert(ksession);
108 assert(ksession->consumer);
109
110 consumer = ksession->consumer;
111 channel_attr_extended = (struct lttng_channel_extended *)
112 channel->channel->attr.extended.ptr;
113
114 DBG("Kernel consumer adding channel %s to kernel consumer",
115 channel->channel->name);
116
117 if (monitor) {
118 pathname = create_channel_path(consumer, ksession->uid,
119 ksession->gid);
120 } else {
121 /* Empty path. */
122 pathname = strdup("");
123 }
124 if (!pathname) {
125 ret = -1;
126 goto error;
127 }
128
129 /* Prep channel message structure */
130 consumer_init_channel_comm_msg(&lkm,
131 LTTNG_CONSUMER_ADD_CHANNEL,
132 channel->fd,
133 ksession->id,
134 pathname,
135 ksession->uid,
136 ksession->gid,
137 consumer->net_seq_index,
138 channel->channel->name,
139 channel->stream_count,
140 channel->channel->attr.output,
141 CONSUMER_CHANNEL_TYPE_DATA,
142 channel->channel->attr.tracefile_size,
143 channel->channel->attr.tracefile_count,
144 monitor,
145 channel->channel->attr.live_timer_interval,
146 channel_attr_extended->monitor_timer_interval);
147
148 health_code_update();
149
150 ret = consumer_send_channel(sock, &lkm);
151 if (ret < 0) {
152 goto error;
153 }
154
155 health_code_update();
156 rcu_read_lock();
157 session = session_find_by_id(ksession->id);
158 assert(session);
159
160 status = notification_thread_command_add_channel(
161 notification_thread_handle, session->name,
162 ksession->uid, ksession->gid,
163 channel->channel->name, channel->fd,
164 LTTNG_DOMAIN_KERNEL,
165 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
166 rcu_read_unlock();
167 if (status != LTTNG_OK) {
168 ret = -1;
169 goto error;
170 }
171
172 channel->published_to_notification_thread = true;
173
174 error:
175 free(pathname);
176 return ret;
177 }
178
179 /*
180 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
181 */
182 int kernel_consumer_add_metadata(struct consumer_socket *sock,
183 struct ltt_kernel_session *session, unsigned int monitor)
184 {
185 int ret;
186 char *pathname;
187 struct lttcomm_consumer_msg lkm;
188 struct consumer_output *consumer;
189
190 /* Safety net */
191 assert(session);
192 assert(session->consumer);
193 assert(sock);
194
195 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
196
197 /* Get consumer output pointer */
198 consumer = session->consumer;
199
200 if (monitor) {
201 pathname = create_channel_path(consumer, session->uid, session->gid);
202 } else {
203 /* Empty path. */
204 pathname = strdup("");
205 }
206 if (!pathname) {
207 ret = -1;
208 goto error;
209 }
210
211 /* Prep channel message structure */
212 consumer_init_channel_comm_msg(&lkm,
213 LTTNG_CONSUMER_ADD_CHANNEL,
214 session->metadata->fd,
215 session->id,
216 pathname,
217 session->uid,
218 session->gid,
219 consumer->net_seq_index,
220 DEFAULT_METADATA_NAME,
221 1,
222 DEFAULT_KERNEL_CHANNEL_OUTPUT,
223 CONSUMER_CHANNEL_TYPE_METADATA,
224 0, 0,
225 monitor, 0, 0);
226
227 health_code_update();
228
229 ret = consumer_send_channel(sock, &lkm);
230 if (ret < 0) {
231 goto error;
232 }
233
234 health_code_update();
235
236 /* Prep stream message structure */
237 consumer_init_stream_comm_msg(&lkm,
238 LTTNG_CONSUMER_ADD_STREAM,
239 session->metadata->fd,
240 session->metadata_stream_fd,
241 0); /* CPU: 0 for metadata. */
242
243 health_code_update();
244
245 /* Send stream and file descriptor */
246 ret = consumer_send_stream(sock, consumer, &lkm,
247 &session->metadata_stream_fd, 1);
248 if (ret < 0) {
249 goto error;
250 }
251
252 health_code_update();
253
254 error:
255 free(pathname);
256 return ret;
257 }
258
259 /*
260 * Sending a single stream to the consumer with command ADD_STREAM.
261 */
262 int kernel_consumer_add_stream(struct consumer_socket *sock,
263 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
264 struct ltt_kernel_session *session, unsigned int monitor)
265 {
266 int ret;
267 struct lttcomm_consumer_msg lkm;
268 struct consumer_output *consumer;
269
270 assert(channel);
271 assert(stream);
272 assert(session);
273 assert(session->consumer);
274 assert(sock);
275
276 DBG("Sending stream %d of channel %s to kernel consumer",
277 stream->fd, channel->channel->name);
278
279 /* Get consumer output pointer */
280 consumer = session->consumer;
281
282 /* Prep stream consumer message */
283 consumer_init_stream_comm_msg(&lkm,
284 LTTNG_CONSUMER_ADD_STREAM,
285 channel->fd,
286 stream->fd,
287 stream->cpu);
288
289 health_code_update();
290
291 /* Send stream and file descriptor */
292 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
293 if (ret < 0) {
294 goto error;
295 }
296
297 health_code_update();
298
299 error:
300 return ret;
301 }
302
303 /*
304 * Sending the notification that all streams were sent with STREAMS_SENT.
305 */
306 int kernel_consumer_streams_sent(struct consumer_socket *sock,
307 struct ltt_kernel_session *session, uint64_t channel_key)
308 {
309 int ret;
310 struct lttcomm_consumer_msg lkm;
311 struct consumer_output *consumer;
312
313 assert(sock);
314 assert(session);
315
316 DBG("Sending streams_sent");
317 /* Get consumer output pointer */
318 consumer = session->consumer;
319
320 /* Prep stream consumer message */
321 consumer_init_streams_sent_comm_msg(&lkm,
322 LTTNG_CONSUMER_STREAMS_SENT,
323 channel_key, consumer->net_seq_index);
324
325 health_code_update();
326
327 /* Send stream and file descriptor */
328 ret = consumer_send_msg(sock, &lkm);
329 if (ret < 0) {
330 goto error;
331 }
332
333 error:
334 return ret;
335 }
336
337 /*
338 * Send all stream fds of kernel channel to the consumer.
339 */
340 int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
341 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
342 unsigned int monitor)
343 {
344 int ret = LTTNG_OK;
345 struct ltt_kernel_stream *stream;
346
347 /* Safety net */
348 assert(channel);
349 assert(session);
350 assert(session->consumer);
351 assert(sock);
352
353 /* Bail out if consumer is disabled */
354 if (!session->consumer->enabled) {
355 ret = LTTNG_OK;
356 goto error;
357 }
358
359 DBG("Sending streams of channel %s to kernel consumer",
360 channel->channel->name);
361
362 if (!channel->sent_to_consumer) {
363 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
364 if (ret < 0) {
365 goto error;
366 }
367 channel->sent_to_consumer = true;
368 }
369
370 /* Send streams */
371 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
372 if (!stream->fd || stream->sent_to_consumer) {
373 continue;
374 }
375
376 /* Add stream on the kernel consumer side. */
377 ret = kernel_consumer_add_stream(sock, channel, stream, session,
378 monitor);
379 if (ret < 0) {
380 goto error;
381 }
382 stream->sent_to_consumer = true;
383 }
384
385 error:
386 return ret;
387 }
388
389 /*
390 * Send all stream fds of the kernel session to the consumer.
391 */
392 int kernel_consumer_send_session(struct consumer_socket *sock,
393 struct ltt_kernel_session *session)
394 {
395 int ret, monitor = 0;
396 struct ltt_kernel_channel *chan;
397
398 /* Safety net */
399 assert(session);
400 assert(session->consumer);
401 assert(sock);
402
403 /* Bail out if consumer is disabled */
404 if (!session->consumer->enabled) {
405 ret = LTTNG_OK;
406 goto error;
407 }
408
409 /* Don't monitor the streams on the consumer if in flight recorder. */
410 if (session->output_traces) {
411 monitor = 1;
412 }
413
414 DBG("Sending session stream to kernel consumer");
415
416 if (session->metadata_stream_fd >= 0 && session->metadata) {
417 ret = kernel_consumer_add_metadata(sock, session, monitor);
418 if (ret < 0) {
419 goto error;
420 }
421 }
422
423 /* Send channel and streams of it */
424 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
425 ret = kernel_consumer_send_channel_stream(sock, chan, session,
426 monitor);
427 if (ret < 0) {
428 goto error;
429 }
430 if (monitor) {
431 /*
432 * Inform the relay that all the streams for the
433 * channel were sent.
434 */
435 ret = kernel_consumer_streams_sent(sock, session, chan->fd);
436 if (ret < 0) {
437 goto error;
438 }
439 }
440 }
441
442 DBG("Kernel consumer FDs of metadata and channel streams sent");
443
444 session->consumer_fds_sent = 1;
445 return 0;
446
447 error:
448 return ret;
449 }
450
451 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
452 struct ltt_kernel_channel *channel)
453 {
454 int ret;
455 struct lttcomm_consumer_msg msg;
456
457 assert(channel);
458 assert(socket);
459
460 DBG("Sending kernel consumer destroy channel key %d", channel->fd);
461
462 memset(&msg, 0, sizeof(msg));
463 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
464 msg.u.destroy_channel.key = channel->fd;
465
466 pthread_mutex_lock(socket->lock);
467 health_code_update();
468
469 ret = consumer_send_msg(socket, &msg);
470 if (ret < 0) {
471 goto error;
472 }
473
474 error:
475 health_code_update();
476 pthread_mutex_unlock(socket->lock);
477 return ret;
478 }
479
480 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
481 struct ltt_kernel_metadata *metadata)
482 {
483 int ret;
484 struct lttcomm_consumer_msg msg;
485
486 assert(metadata);
487 assert(socket);
488
489 DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
490
491 memset(&msg, 0, sizeof(msg));
492 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
493 msg.u.destroy_channel.key = metadata->fd;
494
495 pthread_mutex_lock(socket->lock);
496 health_code_update();
497
498 ret = consumer_send_msg(socket, &msg);
499 if (ret < 0) {
500 goto error;
501 }
502
503 error:
504 health_code_update();
505 pthread_mutex_unlock(socket->lock);
506 return ret;
507 }
This page took 0.039154 seconds and 4 git commands to generate.