aabe49403fbc1215d423986cb7dfa71301da2a47
[lttng-tools.git] / src / bin / lttng-sessiond / ust-consumer.c
1 /*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2 only,
6 * as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <unistd.h>
24
25 #include <common/common.h>
26 #include <common/consumer.h>
27 #include <common/defaults.h>
28
29 #include "consumer.h"
30 #include "ust-consumer.h"
31
32 /*
33 * Send a single channel to the consumer using command ADD_CHANNEL.
34 */
35 static int send_channel(int sock, struct ust_app_channel *uchan)
36 {
37 int ret, fd;
38 struct lttcomm_consumer_msg msg;
39
40 /* Safety net */
41 assert(uchan);
42
43 if (sock < 0) {
44 ret = -EINVAL;
45 goto error;
46 }
47
48 DBG2("Sending channel %s to UST consumer", uchan->name);
49
50 consumer_init_channel_comm_msg(&msg,
51 LTTNG_CONSUMER_ADD_CHANNEL,
52 uchan->obj->shm_fd,
53 uchan->attr.subbuf_size,
54 uchan->obj->memory_map_size,
55 uchan->name);
56
57 ret = consumer_send_channel(sock, &msg);
58 if (ret < 0) {
59 goto error;
60 }
61
62 fd = uchan->obj->shm_fd;
63 ret = consumer_send_fds(sock, &fd, 1);
64 if (ret < 0) {
65 goto error;
66 }
67
68 error:
69 return ret;
70 }
71
72 /*
73 * Send a single stream to the consumer using ADD_STREAM command.
74 */
75 static int send_channel_stream(int sock, struct ust_app_channel *uchan,
76 struct ust_app_session *usess, struct ltt_ust_stream *stream,
77 struct consumer_output *consumer, const char *pathname)
78 {
79 int ret, fds[2];
80 struct lttcomm_consumer_msg msg;
81
82 /* Safety net */
83 assert(uchan);
84 assert(usess);
85 assert(stream);
86 assert(consumer);
87
88 DBG2("Sending stream %d of channel %s to kernel consumer",
89 stream->obj->shm_fd, uchan->name);
90
91 consumer_init_stream_comm_msg(&msg,
92 LTTNG_CONSUMER_ADD_STREAM,
93 uchan->obj->shm_fd,
94 stream->obj->shm_fd,
95 LTTNG_CONSUMER_ACTIVE_STREAM,
96 DEFAULT_UST_CHANNEL_OUTPUT,
97 stream->obj->memory_map_size,
98 usess->uid,
99 usess->gid,
100 consumer->net_seq_index,
101 0, /* Metadata flag unset */
102 stream->name,
103 pathname);
104
105 /* Send stream and file descriptor */
106 fds[0] = stream->obj->shm_fd;
107 fds[1] = stream->obj->wait_fd;
108 ret = consumer_send_stream(sock, consumer, &msg, fds, 2);
109 if (ret < 0) {
110 goto error;
111 }
112
113 error:
114 return ret;
115 }
116
117 /*
118 * Send all stream fds of UST channel to the consumer.
119 */
120 static int send_channel_streams(int sock,
121 struct ust_app_channel *uchan, struct ust_app_session *usess,
122 struct consumer_output *consumer)
123 {
124 int ret;
125 char tmp_path[PATH_MAX];
126 const char *pathname;
127 struct ltt_ust_stream *stream, *tmp;
128
129 DBG("Sending streams of channel %s to UST consumer", uchan->name);
130
131 ret = send_channel(sock, uchan);
132 if (ret < 0) {
133 goto error;
134 }
135
136 /* Get the right path name destination */
137 if (consumer->type == CONSUMER_DST_LOCAL) {
138 /* Set application path to the destination path */
139 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
140 consumer->dst.trace_path, consumer->subdir, usess->path);
141 if (ret < 0) {
142 PERROR("snprintf stream path");
143 goto error;
144 }
145 pathname = tmp_path;
146 DBG3("UST local consumer tracefile path: %s", pathname);
147 } else {
148 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
149 consumer->subdir, usess->path);
150 if (ret < 0) {
151 PERROR("snprintf stream path");
152 goto error;
153 }
154 pathname = tmp_path;
155 DBG3("UST network consumer subdir path: %s", pathname);
156 }
157
158 cds_list_for_each_entry_safe(stream, tmp, &uchan->streams.head, list) {
159 if (!stream->obj->shm_fd) {
160 continue;
161 }
162
163 ret = send_channel_stream(sock, uchan, usess, stream, consumer,
164 pathname);
165 if (ret < 0) {
166 goto error;
167 }
168 }
169
170 DBG("UST consumer channel streams sent");
171
172 return 0;
173
174 error:
175 return ret;
176 }
177
178 /*
179 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
180 */
181 static int send_metadata(int sock, struct ust_app_session *usess,
182 struct consumer_output *consumer)
183 {
184 int ret, fd, fds[2];
185 char tmp_path[PATH_MAX];
186 const char *pathname;
187 struct lttcomm_consumer_msg msg;
188
189 /* Safety net */
190 assert(usess);
191 assert(consumer);
192
193 if (sock < 0) {
194 ERR("Consumer socket is negative (%d)", sock);
195 return -EINVAL;
196 }
197
198 if (usess->metadata->obj->shm_fd == 0) {
199 ERR("Metadata obj shm_fd is 0");
200 ret = -1;
201 goto error;
202 }
203
204 DBG("UST consumer sending metadata stream fd");
205
206 consumer_init_channel_comm_msg(&msg,
207 LTTNG_CONSUMER_ADD_CHANNEL,
208 usess->metadata->obj->shm_fd,
209 usess->metadata->attr.subbuf_size,
210 usess->metadata->obj->memory_map_size,
211 "metadata");
212
213 ret = consumer_send_channel(sock, &msg);
214 if (ret < 0) {
215 goto error;
216 }
217
218 /* Sending metadata shared memory fd */
219 fd = usess->metadata->obj->shm_fd;
220 ret = consumer_send_fds(sock, &fd, 1);
221 if (ret < 0) {
222 goto error;
223 }
224
225 /* Get correct path name destination */
226 if (consumer->type == CONSUMER_DST_LOCAL) {
227 /* Set application path to the destination path */
228 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
229 consumer->dst.trace_path, consumer->subdir, usess->path);
230 if (ret < 0) {
231 PERROR("snprintf stream path");
232 goto error;
233 }
234 pathname = tmp_path;
235
236 /* Create directory */
237 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
238 usess->uid, usess->gid);
239 if (ret < 0) {
240 if (ret != -EEXIST) {
241 ERR("Trace directory creation error");
242 goto error;
243 }
244 }
245 } else {
246 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
247 consumer->subdir, usess->path);
248 if (ret < 0) {
249 PERROR("snprintf metadata path");
250 goto error;
251 }
252 pathname = tmp_path;
253 }
254
255 consumer_init_stream_comm_msg(&msg,
256 LTTNG_CONSUMER_ADD_STREAM,
257 usess->metadata->obj->shm_fd,
258 usess->metadata->stream_obj->shm_fd,
259 LTTNG_CONSUMER_ACTIVE_STREAM,
260 DEFAULT_UST_CHANNEL_OUTPUT,
261 usess->metadata->stream_obj->memory_map_size,
262 usess->uid,
263 usess->gid,
264 consumer->net_seq_index,
265 1, /* Flag metadata set */
266 "metadata",
267 pathname);
268
269 /* Send stream and file descriptor */
270 fds[0] = usess->metadata->stream_obj->shm_fd;
271 fds[1] = usess->metadata->stream_obj->wait_fd;
272 ret = consumer_send_stream(sock, consumer, &msg, fds, 2);
273 if (ret < 0) {
274 goto error;
275 }
276
277 error:
278 return ret;
279 }
280
281 /*
282 * Send all stream fds of the UST session to the consumer.
283 */
284 int ust_consumer_send_session(struct ust_app_session *usess,
285 struct consumer_output *consumer, struct consumer_socket *sock)
286 {
287 int ret = 0;
288 struct lttng_ht_iter iter;
289 struct ust_app_channel *ua_chan;
290
291 assert(usess);
292
293 if (consumer == NULL || sock == NULL) {
294 /* There is no consumer so just ignoring the command. */
295 DBG("UST consumer does not exist. Not sending streams");
296 return 0;
297 }
298
299 DBG("Sending metadata stream fd to consumer on %d", sock->fd);
300
301 pthread_mutex_lock(sock->lock);
302
303 /* Sending metadata information to the consumer */
304 ret = send_metadata(sock->fd, usess, consumer);
305 if (ret < 0) {
306 goto error;
307 }
308
309 /* Send each channel fd streams of session */
310 rcu_read_lock();
311 cds_lfht_for_each_entry(usess->channels->ht, &iter.iter, ua_chan,
312 node.node) {
313 /*
314 * Indicate that the channel was not created on the tracer side so skip
315 * sending unexisting streams.
316 */
317 if (ua_chan->obj == NULL) {
318 continue;
319 }
320
321 ret = send_channel_streams(sock->fd, ua_chan, usess, consumer);
322 if (ret < 0) {
323 rcu_read_unlock();
324 goto error;
325 }
326 }
327 rcu_read_unlock();
328
329 DBG("consumer fds (metadata and channel streams) sent");
330
331 /* All good! */
332 ret = 0;
333
334 error:
335 pthread_mutex_unlock(sock->lock);
336 return ret;
337 }
This page took 0.054122 seconds and 4 git commands to generate.