Accept uid and gid parameters in utils_mkdir()/utils_mkdir_recursive()
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
CommitLineData
2a174661
DG
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
6c1c0768 20#define _LGPL_SOURCE
2a174661
DG
21#include <common/common.h>
22
23#include "index.h"
24#include "stream.h"
25#include "viewer-stream.h"
26
2a174661
DG
27/*
28 * Get stream from stream id from the given hash table. Return stream if found
29 * else NULL.
30 *
31 * Need to be called with RCU read-side lock held.
32 */
33struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
34 uint64_t stream_id)
35{
36 struct lttng_ht_node_u64 *node;
37 struct lttng_ht_iter iter;
38 struct relay_stream *stream = NULL;
39
40 assert(ht);
41
42 lttng_ht_lookup(ht, &stream_id, &iter);
43 node = lttng_ht_iter_get_node_u64(&iter);
44 if (node == NULL) {
45 DBG("Relay stream %" PRIu64 " not found", stream_id);
46 goto end;
47 }
48 stream = caa_container_of(node, struct relay_stream, node);
49
50end:
51 return stream;
52}
53
54/*
55 * Close a given stream. If an assosiated viewer stream exists it is updated.
56 *
57 * RCU read side lock MUST be acquired.
58 *
59 * Return 0 if close was successful or 1 if already closed.
60 */
61int stream_close(struct relay_session *session, struct relay_stream *stream)
62{
63 int delret, ret;
64 struct relay_viewer_stream *vstream;
65 struct ctf_trace *ctf_trace;
66
67 assert(stream);
68
69 pthread_mutex_lock(&stream->lock);
70
71 if (stream->terminated_flag) {
72 /* This stream is already closed. Ignore. */
73 ret = 1;
74 goto end_unlock;
75 }
76
77 DBG("Closing stream id %" PRIu64, stream->stream_handle);
78
79 if (stream->fd >= 0) {
80 delret = close(stream->fd);
81 if (delret < 0) {
82 PERROR("close stream");
83 }
84 }
85
86 if (stream->index_fd >= 0) {
87 delret = close(stream->index_fd);
88 if (delret < 0) {
89 PERROR("close stream index_fd");
90 }
91 }
92
93 vstream = viewer_stream_find_by_id(stream->stream_handle);
94 if (vstream) {
95 /*
96 * Set the last good value into the viewer stream. This is done
97 * right before the stream gets deleted from the hash table. The
98 * lookup failure on the live thread side of a stream indicates
99 * that the viewer stream index received value should be used.
100 */
101 pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
102 vstream->total_index_received = stream->total_index_received;
103 vstream->tracefile_count_last = stream->tracefile_count_current;
104 vstream->close_write_flag = 1;
105 pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
106 }
107
108 /* Cleanup index of that stream. */
109 relay_index_destroy_by_stream_id(stream->stream_handle);
110
111 ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
112 stream->path_name);
113 assert(ctf_trace);
114 ctf_trace_put_ref(ctf_trace);
115
8bf28e67 116 stream->close_flag = 1;
2a174661
DG
117 stream->terminated_flag = 1;
118 ret = 0;
119
120end_unlock:
121 pthread_mutex_unlock(&stream->lock);
122 return ret;
123}
124
125void stream_delete(struct lttng_ht *ht, struct relay_stream *stream)
126{
127 int ret;
128 struct lttng_ht_iter iter;
129
130 assert(ht);
131 assert(stream);
132
133 iter.iter.node = &stream->node.node;
134 ret = lttng_ht_del(ht, &iter);
135 assert(!ret);
136
137 cds_list_del(&stream->trace_list);
138}
139
140void stream_destroy(struct relay_stream *stream)
141{
142 assert(stream);
5eb3e5b8
JG
143 free(stream->path_name);
144 free(stream->channel_name);
145 free(stream);
2a174661 146}
This page took 0.033618 seconds and 4 git commands to generate.