Fix: holding the stream lock does not equate to having data pending
[lttng-tools.git] / src / common / consumer / consumer.c
index 63a2ab288c865e338582485199a7a268064bb4af..1d6f8b4cc4ce7cba975c85eb6a2c01fcb6e19e5b 100644 (file)
@@ -3673,34 +3673,6 @@ error_nosignal:
        }
 }
 
-/*
- * Try to lock the stream mutex.
- *
- * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
- */
-static int stream_try_lock(struct lttng_consumer_stream *stream)
-{
-       int ret;
-
-       assert(stream);
-
-       /*
-        * Try to lock the stream mutex. On failure, we know that the stream is
-        * being used else where hence there is data still being extracted.
-        */
-       ret = pthread_mutex_trylock(&stream->lock);
-       if (ret) {
-               /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
-               ret = 0;
-               goto end;
-       }
-
-       ret = 1;
-
-end:
-       return ret;
-}
-
 /*
  * Search for a relayd associated to the session id and return the reference.
  *
@@ -3786,11 +3758,7 @@ int consumer_data_pending(uint64_t id)
                        ht->hash_fct(&id, lttng_ht_seed),
                        ht->match_fct, &id,
                        &iter.iter, stream, node_session_id.node) {
-               /* If this call fails, the stream is being used hence data pending. */
-               ret = stream_try_lock(stream);
-               if (!ret) {
-                       goto data_pending;
-               }
+               pthread_mutex_lock(&stream->lock);
 
                /*
                 * A removed node from the hash table indicates that the stream has
@@ -4216,6 +4184,10 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
                        stream->chan->current_chunk_id,
                        stream->last_sequence_number);
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret < 0) {
+               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        if (ret) {
                ERR("Rotate relay stream");
        }
@@ -4389,6 +4361,10 @@ int rotate_rename_relay(const char *old_path, const char *new_path,
 
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
        ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path);
+       if (ret < 0) {
+               ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 end:
        return ret;
@@ -4419,6 +4395,10 @@ int lttng_consumer_rotate_pending_relay(uint64_t session_id,
 
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
        ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
+       if (ret < 0) {
+               ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 
 end:
@@ -4456,6 +4436,10 @@ int mkdir_relay(const char *path, uint64_t relayd_id)
 
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
        ret = relayd_mkdir(&relayd->control_sock, path);
+       if (ret < 0) {
+               ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 
 end:
This page took 0.026832 seconds and 4 git commands to generate.