2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2 only,
6 * as published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 #include <common/common.h>
26 #include <common/consumer.h>
27 #include <common/defaults.h>
30 #include "ust-consumer.h"
33 * Send a single channel to the consumer using command ADD_CHANNEL.
35 static int send_channel(struct consumer_socket
*sock
,
36 struct ust_app_channel
*uchan
)
39 struct lttcomm_consumer_msg msg
;
50 DBG2("Sending channel %s to UST consumer", uchan
->name
);
52 consumer_init_channel_comm_msg(&msg
,
53 LTTNG_CONSUMER_ADD_CHANNEL
,
55 uchan
->attr
.subbuf_size
,
56 uchan
->obj
->memory_map_size
,
58 uchan
->streams
.count
);
60 health_code_update(&health_thread_cmd
);
62 ret
= consumer_send_channel(sock
, &msg
);
67 health_code_update(&health_thread_cmd
);
69 fd
= uchan
->obj
->shm_fd
;
70 ret
= consumer_send_fds(sock
, &fd
, 1);
75 health_code_update(&health_thread_cmd
);
82 * Send a single stream to the consumer using ADD_STREAM command.
84 static int send_channel_stream(struct consumer_socket
*sock
,
85 struct ust_app_channel
*uchan
, struct ust_app_session
*usess
,
86 struct ust_app_stream
*stream
, struct consumer_output
*consumer
,
90 struct lttcomm_consumer_msg msg
;
99 DBG2("Sending stream %d of channel %s to kernel consumer",
100 stream
->obj
->shm_fd
, uchan
->name
);
102 consumer_init_stream_comm_msg(&msg
,
103 LTTNG_CONSUMER_ADD_STREAM
,
106 LTTNG_CONSUMER_ACTIVE_STREAM
,
107 DEFAULT_UST_CHANNEL_OUTPUT
,
108 stream
->obj
->memory_map_size
,
111 consumer
->net_seq_index
,
112 0, /* Metadata flag unset */
117 health_code_update(&health_thread_cmd
);
119 /* Send stream and file descriptor */
120 fds
[0] = stream
->obj
->shm_fd
;
121 fds
[1] = stream
->obj
->wait_fd
;
122 ret
= consumer_send_stream(sock
, consumer
, &msg
, fds
, 2);
127 health_code_update(&health_thread_cmd
);
134 * Send all stream fds of UST channel to the consumer.
136 static int send_channel_streams(struct consumer_socket
*sock
,
137 struct ust_app_channel
*uchan
, struct ust_app_session
*usess
,
138 struct consumer_output
*consumer
)
141 char tmp_path
[PATH_MAX
];
142 const char *pathname
;
143 struct ust_app_stream
*stream
, *tmp
;
147 DBG("Sending streams of channel %s to UST consumer", uchan
->name
);
149 ret
= send_channel(sock
, uchan
);
154 /* Get the right path name destination */
155 if (consumer
->type
== CONSUMER_DST_LOCAL
) {
156 /* Set application path to the destination path */
157 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s/%s/%s",
158 consumer
->dst
.trace_path
, consumer
->subdir
, usess
->path
);
160 PERROR("snprintf stream path");
164 DBG3("UST local consumer tracefile path: %s", pathname
);
166 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s/%s",
167 consumer
->subdir
, usess
->path
);
169 PERROR("snprintf stream path");
173 DBG3("UST network consumer subdir path: %s", pathname
);
176 cds_list_for_each_entry_safe(stream
, tmp
, &uchan
->streams
.head
, list
) {
177 if (!stream
->obj
->shm_fd
) {
181 ret
= send_channel_stream(sock
, uchan
, usess
, stream
, consumer
,
188 DBG("UST consumer channel streams sent");
197 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
199 static int send_metadata(struct consumer_socket
*sock
,
200 struct ust_app_session
*usess
, struct consumer_output
*consumer
)
203 char tmp_path
[PATH_MAX
];
204 const char *pathname
;
205 struct lttcomm_consumer_msg msg
;
213 ERR("Consumer socket is negative (%d)", sock
->fd
);
217 if (usess
->metadata
->obj
->shm_fd
== 0) {
218 ERR("Metadata obj shm_fd is 0");
223 DBG("UST consumer sending metadata stream fd");
225 consumer_init_channel_comm_msg(&msg
,
226 LTTNG_CONSUMER_ADD_CHANNEL
,
227 usess
->metadata
->obj
->shm_fd
,
228 usess
->metadata
->attr
.subbuf_size
,
229 usess
->metadata
->obj
->memory_map_size
,
233 health_code_update(&health_thread_cmd
);
235 ret
= consumer_send_channel(sock
, &msg
);
240 health_code_update(&health_thread_cmd
);
242 /* Sending metadata shared memory fd */
243 fd
= usess
->metadata
->obj
->shm_fd
;
244 ret
= consumer_send_fds(sock
, &fd
, 1);
249 health_code_update(&health_thread_cmd
);
251 /* Get correct path name destination */
252 if (consumer
->type
== CONSUMER_DST_LOCAL
) {
253 /* Set application path to the destination path */
254 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s/%s/%s",
255 consumer
->dst
.trace_path
, consumer
->subdir
, usess
->path
);
257 PERROR("snprintf stream path");
262 /* Create directory */
263 ret
= run_as_mkdir_recursive(pathname
, S_IRWXU
| S_IRWXG
,
264 usess
->uid
, usess
->gid
);
266 if (ret
!= -EEXIST
) {
267 ERR("Trace directory creation error");
272 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s/%s",
273 consumer
->subdir
, usess
->path
);
275 PERROR("snprintf metadata path");
281 consumer_init_stream_comm_msg(&msg
,
282 LTTNG_CONSUMER_ADD_STREAM
,
283 usess
->metadata
->obj
->shm_fd
,
284 usess
->metadata
->stream_obj
->shm_fd
,
285 LTTNG_CONSUMER_ACTIVE_STREAM
,
286 DEFAULT_UST_CHANNEL_OUTPUT
,
287 usess
->metadata
->stream_obj
->memory_map_size
,
290 consumer
->net_seq_index
,
291 1, /* Flag metadata set */
296 health_code_update(&health_thread_cmd
);
298 /* Send stream and file descriptor */
299 fds
[0] = usess
->metadata
->stream_obj
->shm_fd
;
300 fds
[1] = usess
->metadata
->stream_obj
->wait_fd
;
301 ret
= consumer_send_stream(sock
, consumer
, &msg
, fds
, 2);
306 health_code_update(&health_thread_cmd
);
313 * Send all stream fds of the UST session to the consumer.
315 int ust_consumer_send_session(struct ust_app_session
*usess
,
316 struct consumer_output
*consumer
, struct consumer_socket
*sock
)
319 struct lttng_ht_iter iter
;
320 struct ust_app_channel
*ua_chan
;
324 if (consumer
== NULL
|| sock
== NULL
) {
325 /* There is no consumer so just ignoring the command. */
326 DBG("UST consumer does not exist. Not sending streams");
330 DBG("Sending metadata stream fd to consumer on %d", sock
->fd
);
332 pthread_mutex_lock(sock
->lock
);
334 /* Sending metadata information to the consumer */
335 ret
= send_metadata(sock
, usess
, consumer
);
340 /* Send each channel fd streams of session */
342 cds_lfht_for_each_entry(usess
->channels
->ht
, &iter
.iter
, ua_chan
,
345 * Indicate that the channel was not created on the tracer side so skip
346 * sending unexisting streams.
348 if (ua_chan
->obj
== NULL
) {
352 ret
= send_channel_streams(sock
, ua_chan
, usess
, consumer
);
360 DBG("consumer fds (metadata and channel streams) sent");
366 pthread_mutex_unlock(sock
->lock
);