Make the consumer sends a ACK after each command
[lttng-tools.git] / src / bin / lttng-sessiond / ust-consumer.c
1 /*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2 only,
6 * as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <unistd.h>
24
25 #include <common/common.h>
26 #include <common/consumer.h>
27 #include <common/defaults.h>
28
29 #include "consumer.h"
30 #include "ust-consumer.h"
31
32 /*
33 * Send a single channel to the consumer using command ADD_CHANNEL.
34 */
35 static int send_channel(struct consumer_socket *sock,
36 struct ust_app_channel *uchan)
37 {
38 int ret, fd;
39 struct lttcomm_consumer_msg msg;
40
41 /* Safety net */
42 assert(uchan);
43 assert(sock);
44
45 if (sock->fd < 0) {
46 ret = -EINVAL;
47 goto error;
48 }
49
50 DBG2("Sending channel %s to UST consumer", uchan->name);
51
52 consumer_init_channel_comm_msg(&msg,
53 LTTNG_CONSUMER_ADD_CHANNEL,
54 uchan->obj->shm_fd,
55 uchan->attr.subbuf_size,
56 uchan->obj->memory_map_size,
57 uchan->name,
58 uchan->streams.count);
59
60 ret = consumer_send_channel(sock, &msg);
61 if (ret < 0) {
62 goto error;
63 }
64
65 fd = uchan->obj->shm_fd;
66 ret = consumer_send_fds(sock, &fd, 1);
67 if (ret < 0) {
68 goto error;
69 }
70
71 error:
72 return ret;
73 }
74
75 /*
76 * Send a single stream to the consumer using ADD_STREAM command.
77 */
78 static int send_channel_stream(struct consumer_socket *sock,
79 struct ust_app_channel *uchan, struct ust_app_session *usess,
80 struct ltt_ust_stream *stream, struct consumer_output *consumer,
81 const char *pathname)
82 {
83 int ret, fds[2];
84 struct lttcomm_consumer_msg msg;
85
86 /* Safety net */
87 assert(uchan);
88 assert(usess);
89 assert(stream);
90 assert(consumer);
91 assert(sock);
92
93 DBG2("Sending stream %d of channel %s to kernel consumer",
94 stream->obj->shm_fd, uchan->name);
95
96 consumer_init_stream_comm_msg(&msg,
97 LTTNG_CONSUMER_ADD_STREAM,
98 uchan->obj->shm_fd,
99 stream->obj->shm_fd,
100 LTTNG_CONSUMER_ACTIVE_STREAM,
101 DEFAULT_UST_CHANNEL_OUTPUT,
102 stream->obj->memory_map_size,
103 usess->uid,
104 usess->gid,
105 consumer->net_seq_index,
106 0, /* Metadata flag unset */
107 stream->name,
108 pathname,
109 usess->id);
110
111 /* Send stream and file descriptor */
112 fds[0] = stream->obj->shm_fd;
113 fds[1] = stream->obj->wait_fd;
114 ret = consumer_send_stream(sock, consumer, &msg, fds, 2);
115 if (ret < 0) {
116 goto error;
117 }
118
119 error:
120 return ret;
121 }
122
123 /*
124 * Send all stream fds of UST channel to the consumer.
125 */
126 static int send_channel_streams(struct consumer_socket *sock,
127 struct ust_app_channel *uchan, struct ust_app_session *usess,
128 struct consumer_output *consumer)
129 {
130 int ret;
131 char tmp_path[PATH_MAX];
132 const char *pathname;
133 struct ltt_ust_stream *stream, *tmp;
134
135 assert(sock);
136
137 DBG("Sending streams of channel %s to UST consumer", uchan->name);
138
139 ret = send_channel(sock, uchan);
140 if (ret < 0) {
141 goto error;
142 }
143
144 /* Get the right path name destination */
145 if (consumer->type == CONSUMER_DST_LOCAL) {
146 /* Set application path to the destination path */
147 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
148 consumer->dst.trace_path, consumer->subdir, usess->path);
149 if (ret < 0) {
150 PERROR("snprintf stream path");
151 goto error;
152 }
153 pathname = tmp_path;
154 DBG3("UST local consumer tracefile path: %s", pathname);
155 } else {
156 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
157 consumer->subdir, usess->path);
158 if (ret < 0) {
159 PERROR("snprintf stream path");
160 goto error;
161 }
162 pathname = tmp_path;
163 DBG3("UST network consumer subdir path: %s", pathname);
164 }
165
166 cds_list_for_each_entry_safe(stream, tmp, &uchan->streams.head, list) {
167 if (!stream->obj->shm_fd) {
168 continue;
169 }
170
171 ret = send_channel_stream(sock, uchan, usess, stream, consumer,
172 pathname);
173 if (ret < 0) {
174 goto error;
175 }
176 }
177
178 DBG("UST consumer channel streams sent");
179
180 return 0;
181
182 error:
183 return ret;
184 }
185
186 /*
187 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
188 */
189 static int send_metadata(struct consumer_socket *sock,
190 struct ust_app_session *usess, struct consumer_output *consumer)
191 {
192 int ret, fd, fds[2];
193 char tmp_path[PATH_MAX];
194 const char *pathname;
195 struct lttcomm_consumer_msg msg;
196
197 /* Safety net */
198 assert(usess);
199 assert(consumer);
200 assert(sock);
201
202 if (sock->fd < 0) {
203 ERR("Consumer socket is negative (%d)", sock->fd);
204 return -EINVAL;
205 }
206
207 if (usess->metadata->obj->shm_fd == 0) {
208 ERR("Metadata obj shm_fd is 0");
209 ret = -1;
210 goto error;
211 }
212
213 DBG("UST consumer sending metadata stream fd");
214
215 consumer_init_channel_comm_msg(&msg,
216 LTTNG_CONSUMER_ADD_CHANNEL,
217 usess->metadata->obj->shm_fd,
218 usess->metadata->attr.subbuf_size,
219 usess->metadata->obj->memory_map_size,
220 "metadata",
221 1);
222
223 ret = consumer_send_channel(sock, &msg);
224 if (ret < 0) {
225 goto error;
226 }
227
228 /* Sending metadata shared memory fd */
229 fd = usess->metadata->obj->shm_fd;
230 ret = consumer_send_fds(sock, &fd, 1);
231 if (ret < 0) {
232 goto error;
233 }
234
235 /* Get correct path name destination */
236 if (consumer->type == CONSUMER_DST_LOCAL) {
237 /* Set application path to the destination path */
238 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
239 consumer->dst.trace_path, consumer->subdir, usess->path);
240 if (ret < 0) {
241 PERROR("snprintf stream path");
242 goto error;
243 }
244 pathname = tmp_path;
245
246 /* Create directory */
247 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
248 usess->uid, usess->gid);
249 if (ret < 0) {
250 if (ret != -EEXIST) {
251 ERR("Trace directory creation error");
252 goto error;
253 }
254 }
255 } else {
256 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
257 consumer->subdir, usess->path);
258 if (ret < 0) {
259 PERROR("snprintf metadata path");
260 goto error;
261 }
262 pathname = tmp_path;
263 }
264
265 consumer_init_stream_comm_msg(&msg,
266 LTTNG_CONSUMER_ADD_STREAM,
267 usess->metadata->obj->shm_fd,
268 usess->metadata->stream_obj->shm_fd,
269 LTTNG_CONSUMER_ACTIVE_STREAM,
270 DEFAULT_UST_CHANNEL_OUTPUT,
271 usess->metadata->stream_obj->memory_map_size,
272 usess->uid,
273 usess->gid,
274 consumer->net_seq_index,
275 1, /* Flag metadata set */
276 "metadata",
277 pathname,
278 usess->id);
279
280 /* Send stream and file descriptor */
281 fds[0] = usess->metadata->stream_obj->shm_fd;
282 fds[1] = usess->metadata->stream_obj->wait_fd;
283 ret = consumer_send_stream(sock, consumer, &msg, fds, 2);
284 if (ret < 0) {
285 goto error;
286 }
287
288 error:
289 return ret;
290 }
291
292 /*
293 * Send all stream fds of the UST session to the consumer.
294 */
295 int ust_consumer_send_session(struct ust_app_session *usess,
296 struct consumer_output *consumer, struct consumer_socket *sock)
297 {
298 int ret = 0;
299 struct lttng_ht_iter iter;
300 struct ust_app_channel *ua_chan;
301
302 assert(usess);
303
304 if (consumer == NULL || sock == NULL) {
305 /* There is no consumer so just ignoring the command. */
306 DBG("UST consumer does not exist. Not sending streams");
307 return 0;
308 }
309
310 DBG("Sending metadata stream fd to consumer on %d", sock->fd);
311
312 pthread_mutex_lock(sock->lock);
313
314 /* Sending metadata information to the consumer */
315 ret = send_metadata(sock, usess, consumer);
316 if (ret < 0) {
317 goto error;
318 }
319
320 /* Send each channel fd streams of session */
321 rcu_read_lock();
322 cds_lfht_for_each_entry(usess->channels->ht, &iter.iter, ua_chan,
323 node.node) {
324 /*
325 * Indicate that the channel was not created on the tracer side so skip
326 * sending unexisting streams.
327 */
328 if (ua_chan->obj == NULL) {
329 continue;
330 }
331
332 ret = send_channel_streams(sock, ua_chan, usess, consumer);
333 if (ret < 0) {
334 rcu_read_unlock();
335 goto error;
336 }
337 }
338 rcu_read_unlock();
339
340 DBG("consumer fds (metadata and channel streams) sent");
341
342 /* All good! */
343 ret = 0;
344
345 error:
346 pthread_mutex_unlock(sock->lock);
347 return ret;
348 }
This page took 0.035475 seconds and 4 git commands to generate.