relay: use urcu_ref_get_unless_zero
[lttng-tools.git] / src / bin / lttng-relayd / index.c
index cb7ae3db966e34ae3205c00a4f04f15853f3e247..1b14e3e078e1d96fb0d88f93301fac8e0c3e2d13 100644 (file)
@@ -17,7 +17,6 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <assert.h>
 
@@ -59,7 +58,6 @@ static struct relay_index *relay_index_create(struct relay_stream *stream,
 
        lttng_ht_node_init_u64(&index->index_n, net_seq_num);
        pthread_mutex_init(&index->lock, NULL);
-       pthread_mutex_init(&index->reflock, NULL);
        urcu_ref_init(&index->ref);
 
 end:
@@ -99,21 +97,11 @@ static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
  */
 static bool relay_index_get(struct relay_index *index)
 {
-       bool has_ref = false;
-
        DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
                        index->stream->stream_handle, index->index_n.key,
                        (int) index->ref.refcount);
 
-       /* Confirm that the index refcount has not reached 0. */
-       pthread_mutex_lock(&index->reflock);
-       if (index->ref.refcount != 0) {
-               has_ref = true;
-               urcu_ref_get(&index->ref);
-       }
-       pthread_mutex_unlock(&index->reflock);
-
-       return has_ref;
+       return urcu_ref_get_unless_zero(&index->ref);
 }
 
 /*
@@ -144,7 +132,7 @@ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
                index = relay_index_create(stream, net_seq_num);
                if (!index) {
                        ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
-                               index->stream->stream_handle, net_seq_num);
+                               stream->stream_handle, net_seq_num);
                        goto end;
                }
                oldindex = relay_index_add_unique(stream, index);
@@ -163,22 +151,23 @@ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
 end:
        rcu_read_unlock();
        DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
-                       (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num);
+                       (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
        return index;
 }
 
-int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+int relay_index_set_file(struct relay_index *index,
+               struct lttng_index_file *index_file,
                uint64_t data_offset)
 {
        int ret = 0;
 
        pthread_mutex_lock(&index->lock);
-       if (index->index_fd) {
+       if (index->index_file) {
                ret = -1;
                goto end;
        }
-       stream_fd_get(index_fd);
-       index->index_fd = index_fd;
+       lttng_index_file_get(index_file);
+       index->index_file = index_file;
        index->index_data.offset = data_offset;
 end:
        pthread_mutex_unlock(&index->lock);
@@ -229,9 +218,9 @@ static void index_release(struct urcu_ref *ref)
        int ret;
        struct lttng_ht_iter iter;
 
-       if (index->index_fd) {
-               stream_fd_put(index->index_fd);
-               index->index_fd = NULL;
+       if (index->index_file) {
+               lttng_index_file_put(index->index_file);
+               index->index_file = NULL;
        }
        if (index->in_hash_table) {
                /* Delete index from hash table. */
@@ -265,10 +254,8 @@ void relay_index_put(struct relay_index *index)
         * Index lock ensures that concurrent test and update of stream
         * ref is atomic.
         */
-       pthread_mutex_lock(&index->reflock);
        assert(index->ref.refcount != 0);
        urcu_ref_put(&index->ref, index_release);
-       pthread_mutex_unlock(&index->reflock);
        rcu_read_unlock();
 }
 
@@ -291,21 +278,16 @@ int relay_index_try_flush(struct relay_index *index)
                goto skip;
        }
        /* Check if we are ready to flush. */
-       if (!index->has_index_data || !index->index_fd) {
+       if (!index->has_index_data || !index->index_file) {
                goto skip;
        }
-       fd = index->index_fd->fd;
+       fd = index->index_file->fd;
        DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
                        " on fd %d", index->stream->stream_handle,
                        index->index_n.key, fd);
        flushed = true;
        index->flushed = true;
-       ret = index_write(fd, &index->index_data, sizeof(index->index_data));
-       if (ret == sizeof(index->index_data)) {
-               ret = 0;
-       } else {
-               ret = -1;
-       }
+       ret = lttng_index_file_write(index->index_file, &index->index_data);
 skip:
        pthread_mutex_unlock(&index->lock);
 
@@ -342,11 +324,11 @@ void relay_index_close_partial_fd(struct relay_stream *stream)
        rcu_read_lock();
        cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
                        index, index_n.node) {
-               if (!index->index_fd) {
+               if (!index->index_file) {
                        continue;
                }
                /*
-                * Partial index has its index_fd: we have only
+                * Partial index has its index_file: we have only
                 * received its info from the data socket.
                 * Put self-ref from index.
                 */
This page took 0.025643 seconds and 4 git commands to generate.