Tests fix: Missing waitpid in fork test
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <sys/stat.h>
23 #include <unistd.h>
24
25 #include <common/common.h>
26 #include <common/defaults.h>
27
28 #include "consumer.h"
29 #include "health-sessiond.h"
30 #include "kernel-consumer.h"
31
32 static char *create_channel_path(struct consumer_output *consumer,
33 uid_t uid, gid_t gid)
34 {
35 int ret;
36 char tmp_path[PATH_MAX];
37 char *pathname = NULL;
38
39 assert(consumer);
40
41 /* Get the right path name destination */
42 if (consumer->type == CONSUMER_DST_LOCAL) {
43 /* Set application path to the destination path */
44 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
45 consumer->dst.trace_path, consumer->subdir);
46 if (ret < 0) {
47 PERROR("snprintf kernel channel path");
48 goto error;
49 }
50 pathname = strndup(tmp_path, sizeof(tmp_path));
51
52 /* Create directory */
53 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
54 if (ret < 0) {
55 if (ret != -EEXIST) {
56 ERR("Trace directory creation error");
57 goto error;
58 }
59 }
60 DBG3("Kernel local consumer tracefile path: %s", pathname);
61 } else {
62 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
63 if (ret < 0) {
64 PERROR("snprintf kernel metadata path");
65 goto error;
66 }
67 pathname = strndup(tmp_path, sizeof(tmp_path));
68 DBG3("Kernel network consumer subdir path: %s", pathname);
69 }
70
71 return pathname;
72
73 error:
74 free(pathname);
75 return NULL;
76 }
77
78 /*
79 * Sending a single channel to the consumer with command ADD_CHANNEL.
80 */
81 int kernel_consumer_add_channel(struct consumer_socket *sock,
82 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
83 unsigned int monitor)
84 {
85 int ret;
86 char *pathname;
87 struct lttcomm_consumer_msg lkm;
88 struct consumer_output *consumer;
89
90 /* Safety net */
91 assert(channel);
92 assert(session);
93 assert(session->consumer);
94
95 consumer = session->consumer;
96
97 DBG("Kernel consumer adding channel %s to kernel consumer",
98 channel->channel->name);
99
100 if (monitor) {
101 pathname = create_channel_path(consumer, session->uid, session->gid);
102 if (!pathname) {
103 ret = -1;
104 goto error;
105 }
106 } else {
107 /* Empty path. */
108 pathname = strdup("");
109 }
110
111 /* Prep channel message structure */
112 consumer_init_channel_comm_msg(&lkm,
113 LTTNG_CONSUMER_ADD_CHANNEL,
114 channel->fd,
115 session->id,
116 pathname,
117 session->uid,
118 session->gid,
119 consumer->net_seq_index,
120 channel->channel->name,
121 channel->stream_count,
122 channel->channel->attr.output,
123 CONSUMER_CHANNEL_TYPE_DATA,
124 channel->channel->attr.tracefile_size,
125 channel->channel->attr.tracefile_count,
126 monitor,
127 channel->channel->attr.live_timer_interval);
128
129 health_code_update();
130
131 ret = consumer_send_channel(sock, &lkm);
132 if (ret < 0) {
133 goto error;
134 }
135
136 health_code_update();
137
138 error:
139 free(pathname);
140 return ret;
141 }
142
143 /*
144 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
145 */
146 int kernel_consumer_add_metadata(struct consumer_socket *sock,
147 struct ltt_kernel_session *session, unsigned int monitor)
148 {
149 int ret;
150 char *pathname;
151 struct lttcomm_consumer_msg lkm;
152 struct consumer_output *consumer;
153
154 /* Safety net */
155 assert(session);
156 assert(session->consumer);
157 assert(sock);
158
159 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
160
161 /* Get consumer output pointer */
162 consumer = session->consumer;
163
164 if (monitor) {
165 pathname = create_channel_path(consumer, session->uid, session->gid);
166 if (!pathname) {
167 ret = -1;
168 goto error;
169 }
170 } else {
171 /* Empty path. */
172 pathname = strdup("");
173 }
174
175 /* Prep channel message structure */
176 consumer_init_channel_comm_msg(&lkm,
177 LTTNG_CONSUMER_ADD_CHANNEL,
178 session->metadata->fd,
179 session->id,
180 pathname,
181 session->uid,
182 session->gid,
183 consumer->net_seq_index,
184 DEFAULT_METADATA_NAME,
185 1,
186 DEFAULT_KERNEL_CHANNEL_OUTPUT,
187 CONSUMER_CHANNEL_TYPE_METADATA,
188 0, 0,
189 monitor, 0);
190
191 health_code_update();
192
193 ret = consumer_send_channel(sock, &lkm);
194 if (ret < 0) {
195 goto error;
196 }
197
198 health_code_update();
199
200 /* Prep stream message structure */
201 consumer_init_stream_comm_msg(&lkm,
202 LTTNG_CONSUMER_ADD_STREAM,
203 session->metadata->fd,
204 session->metadata_stream_fd,
205 0); /* CPU: 0 for metadata. */
206
207 health_code_update();
208
209 /* Send stream and file descriptor */
210 ret = consumer_send_stream(sock, consumer, &lkm,
211 &session->metadata_stream_fd, 1);
212 if (ret < 0) {
213 goto error;
214 }
215
216 health_code_update();
217
218 error:
219 free(pathname);
220 return ret;
221 }
222
223 /*
224 * Sending a single stream to the consumer with command ADD_STREAM.
225 */
226 int kernel_consumer_add_stream(struct consumer_socket *sock,
227 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
228 struct ltt_kernel_session *session, unsigned int monitor)
229 {
230 int ret;
231 struct lttcomm_consumer_msg lkm;
232 struct consumer_output *consumer;
233
234 assert(channel);
235 assert(stream);
236 assert(session);
237 assert(session->consumer);
238 assert(sock);
239
240 DBG("Sending stream %d of channel %s to kernel consumer",
241 stream->fd, channel->channel->name);
242
243 /* Get consumer output pointer */
244 consumer = session->consumer;
245
246 /* Prep stream consumer message */
247 consumer_init_stream_comm_msg(&lkm,
248 LTTNG_CONSUMER_ADD_STREAM,
249 channel->fd,
250 stream->fd,
251 stream->cpu);
252
253 health_code_update();
254
255 /* Send stream and file descriptor */
256 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
257 if (ret < 0) {
258 goto error;
259 }
260
261 health_code_update();
262
263 error:
264 return ret;
265 }
266
267 /*
268 * Sending the notification that all streams were sent with STREAMS_SENT.
269 */
270 int kernel_consumer_streams_sent(struct consumer_socket *sock,
271 struct ltt_kernel_session *session, uint64_t channel_key)
272 {
273 int ret;
274 struct lttcomm_consumer_msg lkm;
275 struct consumer_output *consumer;
276
277 assert(sock);
278 assert(session);
279
280 DBG("Sending streams_sent");
281 /* Get consumer output pointer */
282 consumer = session->consumer;
283
284 /* Prep stream consumer message */
285 consumer_init_streams_sent_comm_msg(&lkm,
286 LTTNG_CONSUMER_STREAMS_SENT,
287 channel_key, consumer->net_seq_index);
288
289 health_code_update();
290
291 /* Send stream and file descriptor */
292 ret = consumer_send_msg(sock, &lkm);
293 if (ret < 0) {
294 goto error;
295 }
296
297 error:
298 return ret;
299 }
300
301 /*
302 * Send all stream fds of kernel channel to the consumer.
303 */
304 int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
305 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
306 unsigned int monitor)
307 {
308 int ret;
309 struct ltt_kernel_stream *stream;
310
311 /* Safety net */
312 assert(channel);
313 assert(session);
314 assert(session->consumer);
315 assert(sock);
316
317 /* Bail out if consumer is disabled */
318 if (!session->consumer->enabled) {
319 ret = LTTNG_OK;
320 goto error;
321 }
322
323 DBG("Sending streams of channel %s to kernel consumer",
324 channel->channel->name);
325
326 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
327 if (ret < 0) {
328 goto error;
329 }
330
331 /* Send streams */
332 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
333 if (!stream->fd) {
334 continue;
335 }
336
337 /* Add stream on the kernel consumer side. */
338 ret = kernel_consumer_add_stream(sock, channel, stream, session,
339 monitor);
340 if (ret < 0) {
341 goto error;
342 }
343 }
344
345 error:
346 return ret;
347 }
348
349 /*
350 * Send all stream fds of the kernel session to the consumer.
351 */
352 int kernel_consumer_send_session(struct consumer_socket *sock,
353 struct ltt_kernel_session *session)
354 {
355 int ret, monitor = 0;
356 struct ltt_kernel_channel *chan;
357
358 /* Safety net */
359 assert(session);
360 assert(session->consumer);
361 assert(sock);
362
363 /* Bail out if consumer is disabled */
364 if (!session->consumer->enabled) {
365 ret = LTTNG_OK;
366 goto error;
367 }
368
369 /* Don't monitor the streams on the consumer if in flight recorder. */
370 if (session->output_traces) {
371 monitor = 1;
372 }
373
374 DBG("Sending session stream to kernel consumer");
375
376 if (session->metadata_stream_fd >= 0) {
377 ret = kernel_consumer_add_metadata(sock, session, monitor);
378 if (ret < 0) {
379 goto error;
380 }
381 }
382
383 /* Send channel and streams of it */
384 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
385 ret = kernel_consumer_send_channel_stream(sock, chan, session,
386 monitor);
387 if (ret < 0) {
388 goto error;
389 }
390 if (monitor) {
391 /*
392 * Inform the relay that all the streams for the
393 * channel were sent.
394 */
395 ret = kernel_consumer_streams_sent(sock, session, chan->fd);
396 if (ret < 0) {
397 goto error;
398 }
399 }
400 }
401
402 DBG("Kernel consumer FDs of metadata and channel streams sent");
403
404 session->consumer_fds_sent = 1;
405 return 0;
406
407 error:
408 return ret;
409 }
410
411 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
412 struct ltt_kernel_channel *channel)
413 {
414 int ret;
415 struct lttcomm_consumer_msg msg;
416
417 assert(channel);
418 assert(socket);
419
420 DBG("Sending kernel consumer destroy channel key %d", channel->fd);
421
422 memset(&msg, 0, sizeof(msg));
423 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
424 msg.u.destroy_channel.key = channel->fd;
425
426 pthread_mutex_lock(socket->lock);
427 health_code_update();
428
429 ret = consumer_send_msg(socket, &msg);
430 if (ret < 0) {
431 goto error;
432 }
433
434 error:
435 health_code_update();
436 pthread_mutex_unlock(socket->lock);
437 return ret;
438 }
439
440 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
441 struct ltt_kernel_metadata *metadata)
442 {
443 int ret;
444 struct lttcomm_consumer_msg msg;
445
446 assert(metadata);
447 assert(socket);
448
449 DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
450
451 memset(&msg, 0, sizeof(msg));
452 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
453 msg.u.destroy_channel.key = metadata->fd;
454
455 pthread_mutex_lock(socket->lock);
456 health_code_update();
457
458 ret = consumer_send_msg(socket, &msg);
459 if (ret < 0) {
460 goto error;
461 }
462
463 error:
464 health_code_update();
465 pthread_mutex_unlock(socket->lock);
466 return ret;
467 }
This page took 0.039668 seconds and 4 git commands to generate.