Rename consumer socket fd to fd_ptr
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <sys/stat.h>
23 #include <unistd.h>
24
25 #include <common/common.h>
26 #include <common/defaults.h>
27
28 #include "consumer.h"
29 #include "health.h"
30 #include "kernel-consumer.h"
31
32 static char *create_channel_path(struct consumer_output *consumer,
33 uid_t uid, gid_t gid)
34 {
35 int ret;
36 char tmp_path[PATH_MAX];
37 char *pathname = NULL;
38
39 assert(consumer);
40
41 /* Get the right path name destination */
42 if (consumer->type == CONSUMER_DST_LOCAL) {
43 /* Set application path to the destination path */
44 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
45 consumer->dst.trace_path, consumer->subdir);
46 if (ret < 0) {
47 PERROR("snprintf kernel channel path");
48 goto error;
49 }
50 pathname = strndup(tmp_path, sizeof(tmp_path));
51
52 /* Create directory */
53 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
54 if (ret < 0) {
55 if (ret != -EEXIST) {
56 ERR("Trace directory creation error");
57 goto error;
58 }
59 }
60 DBG3("Kernel local consumer tracefile path: %s", pathname);
61 } else {
62 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
63 if (ret < 0) {
64 PERROR("snprintf kernel metadata path");
65 goto error;
66 }
67 pathname = strndup(tmp_path, sizeof(tmp_path));
68 DBG3("Kernel network consumer subdir path: %s", pathname);
69 }
70
71 return pathname;
72
73 error:
74 free(pathname);
75 return NULL;
76 }
77
78 /*
79 * Sending a single channel to the consumer with command ADD_CHANNEL.
80 */
81 int kernel_consumer_add_channel(struct consumer_socket *sock,
82 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
83 unsigned int monitor)
84 {
85 int ret;
86 char *pathname;
87 struct lttcomm_consumer_msg lkm;
88 struct consumer_output *consumer;
89
90 /* Safety net */
91 assert(channel);
92 assert(session);
93 assert(session->consumer);
94
95 consumer = session->consumer;
96
97 DBG("Kernel consumer adding channel %s to kernel consumer",
98 channel->channel->name);
99
100 if (monitor) {
101 pathname = create_channel_path(consumer, session->uid, session->gid);
102 if (!pathname) {
103 ret = -1;
104 goto error;
105 }
106 } else {
107 /* Empty path. */
108 pathname = "";
109 }
110
111 /* Prep channel message structure */
112 consumer_init_channel_comm_msg(&lkm,
113 LTTNG_CONSUMER_ADD_CHANNEL,
114 channel->fd,
115 session->id,
116 pathname,
117 session->uid,
118 session->gid,
119 consumer->net_seq_index,
120 channel->channel->name,
121 channel->stream_count,
122 channel->channel->attr.output,
123 CONSUMER_CHANNEL_TYPE_DATA,
124 channel->channel->attr.tracefile_size,
125 channel->channel->attr.tracefile_count,
126 monitor);
127
128 health_code_update();
129
130 ret = consumer_send_channel(sock, &lkm);
131 if (ret < 0) {
132 goto error;
133 }
134
135 health_code_update();
136
137 error:
138 return ret;
139 }
140
141 /*
142 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
143 */
144 int kernel_consumer_add_metadata(struct consumer_socket *sock,
145 struct ltt_kernel_session *session, unsigned int monitor)
146 {
147 int ret;
148 char *pathname;
149 struct lttcomm_consumer_msg lkm;
150 struct consumer_output *consumer;
151
152 /* Safety net */
153 assert(session);
154 assert(session->consumer);
155 assert(sock);
156
157 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
158
159 /* Get consumer output pointer */
160 consumer = session->consumer;
161
162 if (monitor) {
163 pathname = create_channel_path(consumer, session->uid, session->gid);
164 if (!pathname) {
165 ret = -1;
166 goto error;
167 }
168 } else {
169 /* Empty path. */
170 pathname = "";
171 }
172
173 /* Prep channel message structure */
174 consumer_init_channel_comm_msg(&lkm,
175 LTTNG_CONSUMER_ADD_CHANNEL,
176 session->metadata->fd,
177 session->id,
178 pathname,
179 session->uid,
180 session->gid,
181 consumer->net_seq_index,
182 DEFAULT_METADATA_NAME,
183 1,
184 DEFAULT_KERNEL_CHANNEL_OUTPUT,
185 CONSUMER_CHANNEL_TYPE_METADATA,
186 0, 0,
187 monitor);
188
189 health_code_update();
190
191 ret = consumer_send_channel(sock, &lkm);
192 if (ret < 0) {
193 goto error;
194 }
195
196 health_code_update();
197
198 /* Prep stream message structure */
199 consumer_init_stream_comm_msg(&lkm,
200 LTTNG_CONSUMER_ADD_STREAM,
201 session->metadata->fd,
202 session->metadata_stream_fd,
203 0); /* CPU: 0 for metadata. */
204
205 health_code_update();
206
207 /* Send stream and file descriptor */
208 ret = consumer_send_stream(sock, consumer, &lkm,
209 &session->metadata_stream_fd, 1);
210 if (ret < 0) {
211 goto error;
212 }
213
214 health_code_update();
215
216 error:
217 return ret;
218 }
219
220 /*
221 * Sending a single stream to the consumer with command ADD_STREAM.
222 */
223 int kernel_consumer_add_stream(struct consumer_socket *sock,
224 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
225 struct ltt_kernel_session *session, unsigned int monitor)
226 {
227 int ret;
228 struct lttcomm_consumer_msg lkm;
229 struct consumer_output *consumer;
230
231 assert(channel);
232 assert(stream);
233 assert(session);
234 assert(session->consumer);
235 assert(sock);
236
237 DBG("Sending stream %d of channel %s to kernel consumer",
238 stream->fd, channel->channel->name);
239
240 /* Get consumer output pointer */
241 consumer = session->consumer;
242
243 /* Prep stream consumer message */
244 consumer_init_stream_comm_msg(&lkm,
245 LTTNG_CONSUMER_ADD_STREAM,
246 channel->fd,
247 stream->fd,
248 stream->cpu);
249
250 health_code_update();
251
252 /* Send stream and file descriptor */
253 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
254 if (ret < 0) {
255 goto error;
256 }
257
258 health_code_update();
259
260 error:
261 return ret;
262 }
263
264 /*
265 * Send all stream fds of kernel channel to the consumer.
266 */
267 int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
268 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
269 unsigned int monitor)
270 {
271 int ret;
272 struct ltt_kernel_stream *stream;
273
274 /* Safety net */
275 assert(channel);
276 assert(session);
277 assert(session->consumer);
278 assert(sock);
279
280 /* Bail out if consumer is disabled */
281 if (!session->consumer->enabled) {
282 ret = LTTNG_OK;
283 goto error;
284 }
285
286 DBG("Sending streams of channel %s to kernel consumer",
287 channel->channel->name);
288
289 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
290 if (ret < 0) {
291 goto error;
292 }
293
294 /* Send streams */
295 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
296 if (!stream->fd) {
297 continue;
298 }
299
300 /* Add stream on the kernel consumer side. */
301 ret = kernel_consumer_add_stream(sock, channel, stream, session,
302 monitor);
303 if (ret < 0) {
304 goto error;
305 }
306 }
307
308 error:
309 return ret;
310 }
311
312 /*
313 * Send all stream fds of the kernel session to the consumer.
314 */
315 int kernel_consumer_send_session(struct consumer_socket *sock,
316 struct ltt_kernel_session *session)
317 {
318 int ret, monitor = 0;
319 struct ltt_kernel_channel *chan;
320
321 /* Safety net */
322 assert(session);
323 assert(session->consumer);
324 assert(sock);
325
326 /* Bail out if consumer is disabled */
327 if (!session->consumer->enabled) {
328 ret = LTTNG_OK;
329 goto error;
330 }
331
332 /* Don't monitor the streams on the consumer if in flight recorder. */
333 if (session->output_traces) {
334 monitor = 1;
335 }
336
337 DBG("Sending session stream to kernel consumer");
338
339 if (session->metadata_stream_fd >= 0) {
340 ret = kernel_consumer_add_metadata(sock, session, monitor);
341 if (ret < 0) {
342 goto error;
343 }
344
345 /* Flag that at least the metadata has been sent to the consumer. */
346 session->consumer_fds_sent = 1;
347 }
348
349 /* Send channel and streams of it */
350 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
351 ret = kernel_consumer_send_channel_stream(sock, chan, session,
352 monitor);
353 if (ret < 0) {
354 goto error;
355 }
356 }
357
358 DBG("Kernel consumer FDs of metadata and channel streams sent");
359
360 return 0;
361
362 error:
363 return ret;
364 }
365
366 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
367 struct ltt_kernel_channel *channel)
368 {
369 int ret;
370 struct lttcomm_consumer_msg msg;
371
372 assert(channel);
373 assert(socket);
374
375 DBG("Sending kernel consumer destroy channel key %d", channel->fd);
376
377 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
378 msg.u.destroy_channel.key = channel->fd;
379
380 pthread_mutex_lock(socket->lock);
381 health_code_update();
382
383 ret = consumer_send_msg(socket, &msg);
384 if (ret < 0) {
385 goto error;
386 }
387
388 error:
389 health_code_update();
390 pthread_mutex_unlock(socket->lock);
391 return ret;
392 }
393
394 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
395 struct ltt_kernel_metadata *metadata)
396 {
397 int ret;
398 struct lttcomm_consumer_msg msg;
399
400 assert(metadata);
401 assert(socket);
402
403 DBG("Sending kernel consumer destroy channel key %d", metadata->fd);
404
405 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
406 msg.u.destroy_channel.key = metadata->fd;
407
408 pthread_mutex_lock(socket->lock);
409 health_code_update();
410
411 ret = consumer_send_msg(socket, &msg);
412 if (ret < 0) {
413 goto error;
414 }
415
416 error:
417 health_code_update();
418 pthread_mutex_unlock(socket->lock);
419 return ret;
420 }
This page took 0.037556 seconds and 5 git commands to generate.