]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
RGW: fix issue in observability over the persistent topics queue
authorAli Masarwa <ali.saed.masarwa@gmail.com>
Thu, 13 Jul 2023 11:17:48 +0000 (14:17 +0300)
committerAli Masarwa <ali.saed.masarwa@gmail.com>
Mon, 17 Jul 2023 15:13:50 +0000 (18:13 +0300)
when releasing entires, we don't decrease them in the urgent data

Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
src/cls/2pc_queue/cls_2pc_queue.cc
src/cls/2pc_queue/cls_2pc_queue_client.cc
src/cls/2pc_queue/cls_2pc_queue_client.h
src/cls/2pc_queue/cls_2pc_queue_ops.h
src/rgw/driver/rados/rgw_notify.cc
src/test/cls_2pc_queue/test_cls_2pc_queue.cc
src/test/cls_queue/test_cls_queue.cc
src/test/rgw/bucket_notification/test_bn.py

index 45710f9abe3d9005ecf8c2732c227c0fbbfe9fe8..019f2c96deafe71df902060452a41f5bb6d23dd3 100644 (file)
@@ -75,7 +75,7 @@ static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist *
     auto in_iter = head.bl_urgent_data.cbegin();
     decode(urgent_data, in_iter);
   } catch (ceph::buffer::error& err) {
-    CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what());
+    CLS_LOG(1, "ERROR: cls_2pc_queue_get_topic_stats: failed to decode header of queue: %s", err.what());
     return -EINVAL;
   }
   op_ret.queue_entries = urgent_data.committed_entries;
@@ -581,9 +581,9 @@ static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in,
 static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   auto in_iter = in->cbegin();
-  cls_queue_remove_op op;
+  cls_2pc_queue_remove_op rem_2pc_op;
   try {
-    decode(op, in_iter);
+    decode(rem_2pc_op, in_iter);
   } catch (ceph::buffer::error& err) {
     CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode entry: %s", err.what());
     return -EINVAL;
@@ -594,10 +594,26 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i
   if (ret < 0) {
     return ret;
   }
-  ret = queue_remove_entries(hctx, op, head);
+  cls_queue_remove_op rem_op;
+  rem_op.end_marker = std::move(rem_2pc_op.end_marker);
+  ret = queue_remove_entries(hctx, rem_op, head);
   if (ret < 0) {
     return ret;
   }
+
+  cls_2pc_urgent_data urgent_data;
+  try {
+    auto in_iter = head.bl_urgent_data.cbegin();
+    decode(urgent_data, in_iter);
+  } catch (ceph::buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode header of queue: %s", err.what());
+    return -EINVAL;
+  }
+  urgent_data.committed_entries -= rem_2pc_op.entries_to_remove;
+  // write back head
+  head.bl_urgent_data.clear();
+  encode(urgent_data, head.bl_urgent_data);
+
   return queue_write_head(hctx, head);
 }
 
index 42632cba61aa5c58eef84178edb3eaea6638f5c0..5b457b291ca9b0353197b5ac0402d9d040c4263d 100644 (file)
@@ -226,10 +226,11 @@ void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, i
   op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval);
 }
 
-void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) {
+void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove) {
   bufferlist in;
-  cls_queue_remove_op rem_op;
+  cls_2pc_queue_remove_op rem_op;
   rem_op.end_marker = end_marker;
+  rem_op.entries_to_remove = entries_to_remove;
   encode(rem_op, in);
   op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in);
 }
index 20043edd2009924966005b5c4de51f5b1f5b3571..c806d30f59e0fd404ff94097b6af192df02ddf7b 100644 (file)
@@ -87,5 +87,5 @@ void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op,
         ceph::coarse_real_time stale_time);
 
 // remove all entries up to the given marker
-void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker);
+void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove);
 
index d0b84193d5c01e6ad796ba842face895a3e5624f..bb61ef341ac1e8523569beb3a815b46a12a4c999 100644 (file)
@@ -115,3 +115,25 @@ struct cls_2pc_queue_reservations_ret {
   }
 };
 WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret)
+
+struct cls_2pc_queue_remove_op {
+  std::string end_marker;
+  uint32_t entries_to_remove;
+
+  cls_2pc_queue_remove_op() {}
+
+  void encode(ceph::buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(end_marker, bl);
+    encode(entries_to_remove, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(ceph::buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(end_marker, bl);
+    decode(entries_to_remove, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_remove_op)
\ No newline at end of file
index 324fb4460bb396f14c1eefd2609d11950f5d561b..184d9b38f6d99691c2beb0cab864dbd5b37c003c 100644 (file)
@@ -305,6 +305,7 @@ private:
       is_idle = false;
       auto has_error = false;
       auto remove_entries = false;
+      uint64_t entries_to_remove = 0;
       auto entry_idx = 1U;
       tokens_waiter waiter(io_context);
       for (auto& entry : entries) {
@@ -313,12 +314,13 @@ private:
           break;
         }
         // TODO pass entry pointer instead of by-value
-        spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) {
+        spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &entries_to_remove, &has_error, &waiter, entry](yield_context yield) {
             const auto token = waiter.make_token();
             if (process_entry(entry, yield)) {
               ldpp_dout(this, 20) << "INFO: processing of entry: " << 
                 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
               remove_entries = true;
+              ++entries_to_remove;
             }  else {
               if (set_min_marker(end_marker, entry.marker) < 0) {
                 ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
@@ -344,7 +346,7 @@ private:
           ClsLockType::EXCLUSIVE,
           lock_cookie, 
           "" /*no tag*/);
-        cls_2pc_queue_remove_entries(op, end_marker); 
+        cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove);
         // check ownership and deleted entries in one batch
         const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); 
         if (ret == -ENOENT) {
index a0e83aacb63001ad6da7d916e0a2f00628fac2f2..d3570bf9c557e523431ed8bd07d53430c3c78faf 100644 (file)
@@ -716,7 +716,7 @@ TEST_F(TestCls2PCQueue, MultiProducer)
             const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
             ASSERT_EQ(0, ret);
             consume_count += entries.size();
-            cls_2pc_queue_remove_entries(op, end_marker); 
+            cls_2pc_queue_remove_entries(op, end_marker, max_elements);
             ASSERT_EQ(0, ioctx.operate(queue_name, &op));
           }
        });
@@ -771,7 +771,7 @@ TEST_F(TestCls2PCQueue, AsyncConsumer)
     ASSERT_EQ(rc, 0);
     ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
     consume_count += entries.size();
-    cls_2pc_queue_remove_entries(wop, end_marker); 
+    cls_2pc_queue_remove_entries(wop, end_marker, max_elements);
                marker = end_marker;
   }
 
@@ -849,7 +849,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
               // queue is empty, let it fill
               std::this_thread::sleep_for(std::chrono::milliseconds(100));
             } else {
-              cls_2pc_queue_remove_entries(op, end_marker); 
+              cls_2pc_queue_remove_entries(op, end_marker, max_elements);
               ASSERT_EQ(0, ioctx.operate(queue_name, &op));
             }
           }
index 5dbbccb82fd142b43b71f365e5f911c49a2df5cd..cca615afb0a58e957a079a38f4de26f9e182a840 100644 (file)
@@ -191,7 +191,7 @@ TEST_F(TestClsQueue, DequeueMarker)
     ASSERT_EQ(marker.from_str(entry.marker.c_str()), 0);
     if (marker.offset > 0 && marker.offset % 2 == 0) {
       after_deleted_marker = marker;
-      cls_queue_remove_entries(op, marker.to_str()); 
+      cls_queue_remove_entries(op, marker.to_str());
     }
   }
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
@@ -243,7 +243,7 @@ TEST_F(TestClsQueue, DequeueEmpty)
   std::vector<cls_queue_entry> entries;
   const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
   ASSERT_EQ(0, ret);
-  cls_queue_remove_entries(op, end_marker); 
+  cls_queue_remove_entries(op, end_marker);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
 }
 
@@ -289,7 +289,7 @@ TEST_F(TestClsQueue, DeleteAll)
   std::vector<cls_queue_entry> entries;
   auto ret = cls_queue_list_entries(ioctx, queue_name, marker, total_elements, entries, &truncated, end_marker);
   ASSERT_EQ(0, ret);
-  cls_queue_remove_entries(op, end_marker); 
+  cls_queue_remove_entries(op, end_marker);
   ASSERT_EQ(0, ioctx.operate(queue_name, &op));
   // list again to make sure that queue is empty
   ret = cls_queue_list_entries(ioctx, queue_name, marker, 10, entries, &truncated, end_marker);
@@ -328,7 +328,7 @@ TEST_F(TestClsQueue, EnqueueDequeue)
             const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
             ASSERT_EQ(0, ret);
             consume_count += entries.size();
-            cls_queue_remove_entries(op, end_marker); 
+            cls_queue_remove_entries(op, end_marker);
             ASSERT_EQ(0, ioctx.operate(queue_name, &op));
           }
        });
@@ -387,7 +387,7 @@ TEST_F(TestClsQueue, QueueFullDequeue)
             auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
             ASSERT_EQ(0, ret);
             consume_count += entries.size();
-            cls_queue_remove_entries(op, end_marker); 
+            cls_queue_remove_entries(op, end_marker);
             ASSERT_EQ(0, ioctx.operate(queue_name, &op));
           }
        });
@@ -431,7 +431,7 @@ TEST_F(TestClsQueue, MultiProducer)
             const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
             ASSERT_EQ(0, ret);
             consume_count += entries.size();
-            cls_queue_remove_entries(op, end_marker); 
+            cls_queue_remove_entries(op, end_marker);
             ASSERT_EQ(0, ioctx.operate(queue_name, &op));
           }
        });
@@ -478,7 +478,7 @@ TEST_F(TestClsQueue, MultiConsumer)
             const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
             ASSERT_EQ(0, ret);
             consume_count += entries.size();
-            cls_queue_remove_entries(op, end_marker); 
+            cls_queue_remove_entries(op, end_marker);
             ASSERT_EQ(0, ioctx.operate(queue_name, &op));
           }
     });
@@ -521,7 +521,7 @@ TEST_F(TestClsQueue, NoLockMultiConsumer)
           while (!done || truncated) {
             const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
             ASSERT_EQ(0, ret);
-            cls_queue_remove_entries(op, end_marker); 
+            cls_queue_remove_entries(op, end_marker);
             ASSERT_EQ(0, ioctx.operate(queue_name, &op));
           }
     });
@@ -587,7 +587,7 @@ TEST_F(TestClsQueue, WrapAround)
       total_bl.pop_front();
     }
     marker = end_marker;
-    cls_queue_remove_entries(op, end_marker); 
+    cls_queue_remove_entries(op, end_marker);
     ASSERT_EQ(0, ioctx.operate(queue_name, &op));
    
     // fill half+1 of the queue
index 3fd249f61f086e6a794dcba94d9fc64c58a03b43..a51064dbb52a8da2684f832d149388d8f4ccc35b 100644 (file)
@@ -3031,7 +3031,10 @@ def test_ps_s3_persistent_topic_stats():
 
     # create random port for the http server
     host = get_ip()
-    port = random.randint(10000, 20000)
+    http_port = random.randint(10000, 20000)
+
+    # start an http server in a separate thread
+    http_server = StreamingHTTPServer(host, http_port, num_workers=10, delay=0.5)
 
     # create bucket
     bucket_name = gen_bucket_name()
@@ -3039,7 +3042,7 @@ def test_ps_s3_persistent_topic_stats():
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_address = 'http://'+host+':'+str(http_port)
     endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
     topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
@@ -3053,6 +3056,10 @@ def test_ps_s3_persistent_topic_stats():
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
+    delay = 20
+    time.sleep(delay)
+    http_server.close()
+
     # topic stats
     result = admin(['topic', 'stats', '--topic', topic_name])
     parsed_result = json.loads(result[0])
@@ -3101,11 +3108,25 @@ def test_ps_s3_persistent_topic_stats():
     assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
     assert_equal(result[1], 0)
 
+    # start an http server in a separate thread
+    http_server = StreamingHTTPServer(host, http_port, num_workers=10, delay=0.5)
+
+    print('wait for '+str(delay)+'sec for the messages...')
+    time.sleep(delay)
+
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name])
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+    assert_equal(result[1], 0)
+
     # cleanup
     s3_notification_conf.del_config()
     topic_conf.del_config()
     # delete the bucket
     conn.delete_bucket(bucket_name)
+    time.sleep(delay)
+    http_server.close()
 
 @attr('manual_test')
 def test_ps_s3_persistent_notification_pushback():