ffa6429375b3d7c60d67f6545ce5c4a8e7d8d4e5
[lttng-tools.git] / src / bin / lttng-sessiond / ust-consumer.c
1 /*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2 only,
6 * as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <unistd.h>
24
25 #include <common/common.h>
26 #include <common/consumer.h>
27 #include <common/defaults.h>
28
29 #include "consumer.h"
30 #include "ust-consumer.h"
31
32 /*
33 * Send a single channel to the consumer using command ADD_CHANNEL.
34 */
35 static int send_channel(struct consumer_socket *sock,
36 struct ust_app_channel *uchan)
37 {
38 int ret, fd;
39 struct lttcomm_consumer_msg msg;
40
41 /* Safety net */
42 assert(uchan);
43 assert(sock);
44
45 if (sock->fd < 0) {
46 ret = -EINVAL;
47 goto error;
48 }
49
50 DBG2("Sending channel %s to UST consumer", uchan->name);
51
52 consumer_init_channel_comm_msg(&msg,
53 LTTNG_CONSUMER_ADD_CHANNEL,
54 uchan->obj->shm_fd,
55 uchan->attr.subbuf_size,
56 uchan->obj->memory_map_size,
57 uchan->name,
58 uchan->streams.count);
59
60 health_code_update(&health_thread_cmd);
61
62 ret = consumer_send_channel(sock, &msg);
63 if (ret < 0) {
64 goto error;
65 }
66
67 health_code_update(&health_thread_cmd);
68
69 fd = uchan->obj->shm_fd;
70 ret = consumer_send_fds(sock, &fd, 1);
71 if (ret < 0) {
72 goto error;
73 }
74
75 health_code_update(&health_thread_cmd);
76
77 error:
78 return ret;
79 }
80
81 /*
82 * Send a single stream to the consumer using ADD_STREAM command.
83 */
84 static int send_channel_stream(struct consumer_socket *sock,
85 struct ust_app_channel *uchan, struct ust_app_session *usess,
86 struct ust_app_stream *stream, struct consumer_output *consumer,
87 const char *pathname)
88 {
89 int ret, fds[2];
90 struct lttcomm_consumer_msg msg;
91
92 /* Safety net */
93 assert(uchan);
94 assert(usess);
95 assert(stream);
96 assert(consumer);
97 assert(sock);
98
99 DBG2("Sending stream %d of channel %s to kernel consumer",
100 stream->obj->shm_fd, uchan->name);
101
102 consumer_init_stream_comm_msg(&msg,
103 LTTNG_CONSUMER_ADD_STREAM,
104 uchan->obj->shm_fd,
105 stream->obj->shm_fd,
106 LTTNG_CONSUMER_ACTIVE_STREAM,
107 DEFAULT_UST_CHANNEL_OUTPUT,
108 stream->obj->memory_map_size,
109 usess->uid,
110 usess->gid,
111 consumer->net_seq_index,
112 0, /* Metadata flag unset */
113 stream->name,
114 pathname,
115 usess->id);
116
117 health_code_update(&health_thread_cmd);
118
119 /* Send stream and file descriptor */
120 fds[0] = stream->obj->shm_fd;
121 fds[1] = stream->obj->wait_fd;
122 ret = consumer_send_stream(sock, consumer, &msg, fds, 2);
123 if (ret < 0) {
124 goto error;
125 }
126
127 health_code_update(&health_thread_cmd);
128
129 error:
130 return ret;
131 }
132
133 /*
134 * Send all stream fds of UST channel to the consumer.
135 */
136 static int send_channel_streams(struct consumer_socket *sock,
137 struct ust_app_channel *uchan, struct ust_app_session *usess,
138 struct consumer_output *consumer)
139 {
140 int ret;
141 char tmp_path[PATH_MAX];
142 const char *pathname;
143 struct ust_app_stream *stream, *tmp;
144
145 assert(sock);
146
147 DBG("Sending streams of channel %s to UST consumer", uchan->name);
148
149 ret = send_channel(sock, uchan);
150 if (ret < 0) {
151 goto error;
152 }
153
154 /* Get the right path name destination */
155 if (consumer->type == CONSUMER_DST_LOCAL) {
156 /* Set application path to the destination path */
157 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
158 consumer->dst.trace_path, consumer->subdir, usess->path);
159 if (ret < 0) {
160 PERROR("snprintf stream path");
161 goto error;
162 }
163 pathname = tmp_path;
164 DBG3("UST local consumer tracefile path: %s", pathname);
165 } else {
166 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
167 consumer->subdir, usess->path);
168 if (ret < 0) {
169 PERROR("snprintf stream path");
170 goto error;
171 }
172 pathname = tmp_path;
173 DBG3("UST network consumer subdir path: %s", pathname);
174 }
175
176 cds_list_for_each_entry_safe(stream, tmp, &uchan->streams.head, list) {
177 if (!stream->obj->shm_fd) {
178 continue;
179 }
180
181 ret = send_channel_stream(sock, uchan, usess, stream, consumer,
182 pathname);
183 if (ret < 0) {
184 goto error;
185 }
186 }
187
188 DBG("UST consumer channel streams sent");
189
190 return 0;
191
192 error:
193 return ret;
194 }
195
196 /*
197 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
198 */
199 static int send_metadata(struct consumer_socket *sock,
200 struct ust_app_session *usess, struct consumer_output *consumer)
201 {
202 int ret, fd, fds[2];
203 char tmp_path[PATH_MAX];
204 const char *pathname;
205 struct lttcomm_consumer_msg msg;
206
207 /* Safety net */
208 assert(usess);
209 assert(consumer);
210 assert(sock);
211
212 if (sock->fd < 0) {
213 ERR("Consumer socket is negative (%d)", sock->fd);
214 return -EINVAL;
215 }
216
217 if (usess->metadata->obj->shm_fd == 0) {
218 ERR("Metadata obj shm_fd is 0");
219 ret = -1;
220 goto error;
221 }
222
223 DBG("UST consumer sending metadata stream fd");
224
225 consumer_init_channel_comm_msg(&msg,
226 LTTNG_CONSUMER_ADD_CHANNEL,
227 usess->metadata->obj->shm_fd,
228 usess->metadata->attr.subbuf_size,
229 usess->metadata->obj->memory_map_size,
230 "metadata",
231 1);
232
233 health_code_update(&health_thread_cmd);
234
235 ret = consumer_send_channel(sock, &msg);
236 if (ret < 0) {
237 goto error;
238 }
239
240 health_code_update(&health_thread_cmd);
241
242 /* Sending metadata shared memory fd */
243 fd = usess->metadata->obj->shm_fd;
244 ret = consumer_send_fds(sock, &fd, 1);
245 if (ret < 0) {
246 goto error;
247 }
248
249 health_code_update(&health_thread_cmd);
250
251 /* Get correct path name destination */
252 if (consumer->type == CONSUMER_DST_LOCAL) {
253 /* Set application path to the destination path */
254 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
255 consumer->dst.trace_path, consumer->subdir, usess->path);
256 if (ret < 0) {
257 PERROR("snprintf stream path");
258 goto error;
259 }
260 pathname = tmp_path;
261
262 /* Create directory */
263 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
264 usess->uid, usess->gid);
265 if (ret < 0) {
266 if (ret != -EEXIST) {
267 ERR("Trace directory creation error");
268 goto error;
269 }
270 }
271 } else {
272 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
273 consumer->subdir, usess->path);
274 if (ret < 0) {
275 PERROR("snprintf metadata path");
276 goto error;
277 }
278 pathname = tmp_path;
279 }
280
281 consumer_init_stream_comm_msg(&msg,
282 LTTNG_CONSUMER_ADD_STREAM,
283 usess->metadata->obj->shm_fd,
284 usess->metadata->stream_obj->shm_fd,
285 LTTNG_CONSUMER_ACTIVE_STREAM,
286 DEFAULT_UST_CHANNEL_OUTPUT,
287 usess->metadata->stream_obj->memory_map_size,
288 usess->uid,
289 usess->gid,
290 consumer->net_seq_index,
291 1, /* Flag metadata set */
292 "metadata",
293 pathname,
294 usess->id);
295
296 health_code_update(&health_thread_cmd);
297
298 /* Send stream and file descriptor */
299 fds[0] = usess->metadata->stream_obj->shm_fd;
300 fds[1] = usess->metadata->stream_obj->wait_fd;
301 ret = consumer_send_stream(sock, consumer, &msg, fds, 2);
302 if (ret < 0) {
303 goto error;
304 }
305
306 health_code_update(&health_thread_cmd);
307
308 error:
309 return ret;
310 }
311
312 /*
313 * Send all stream fds of the UST session to the consumer.
314 */
315 int ust_consumer_send_session(struct ust_app_session *usess,
316 struct consumer_output *consumer, struct consumer_socket *sock)
317 {
318 int ret = 0;
319 struct lttng_ht_iter iter;
320 struct ust_app_channel *ua_chan;
321
322 assert(usess);
323
324 if (consumer == NULL || sock == NULL) {
325 /* There is no consumer so just ignoring the command. */
326 DBG("UST consumer does not exist. Not sending streams");
327 return 0;
328 }
329
330 DBG("Sending metadata stream fd to consumer on %d", sock->fd);
331
332 pthread_mutex_lock(sock->lock);
333
334 /* Sending metadata information to the consumer */
335 ret = send_metadata(sock, usess, consumer);
336 if (ret < 0) {
337 goto error;
338 }
339
340 /* Send each channel fd streams of session */
341 rcu_read_lock();
342 cds_lfht_for_each_entry(usess->channels->ht, &iter.iter, ua_chan,
343 node.node) {
344 /*
345 * Indicate that the channel was not created on the tracer side so skip
346 * sending unexisting streams.
347 */
348 if (ua_chan->obj == NULL) {
349 continue;
350 }
351
352 ret = send_channel_streams(sock, ua_chan, usess, consumer);
353 if (ret < 0) {
354 rcu_read_unlock();
355 goto error;
356 }
357 }
358 rcu_read_unlock();
359
360 DBG("consumer fds (metadata and channel streams) sent");
361
362 /* All good! */
363 ret = 0;
364
365 error:
366 pthread_mutex_unlock(sock->lock);
367 return ret;
368 }
This page took 0.037366 seconds and 3 git commands to generate.