]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: validate node immediately prior to adding pin
authorSamuel Just <sjust@redhat.com>
Thu, 14 Jan 2021 03:43:07 +0000 (03:43 +0000)
committerSamuel Just <sjust@redhat.com>
Mon, 1 Feb 2021 20:48:39 +0000 (12:48 -0800)
Mostly, operating on an invalid extent is harmless by construction --
it'll be caught by the read set check in submit_transaction.  However,
prior to adding an extent to the lba pin set, we need to check that it's
really still live, which we can do by checking that it and its immediate
parent are still valid.  Mutating the target extent would have
invalidated it, and replacing the target extent would necessarily have
involved a mutation to the parent, which would have invalidated it.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/cached_extent.h
src/crimson/os/seastore/lba_manager/btree/btree_lba_manager.cc
src/crimson/os/seastore/lba_manager/btree/btree_range_pin.h
src/crimson/os/seastore/lba_manager/btree/lba_btree_node.h
src/crimson/os/seastore/lba_manager/btree/lba_btree_node_impl.cc
src/crimson/os/seastore/transaction_manager.cc
src/crimson/os/seastore/transaction_manager.h

index 79e6de7e53705785ee3e905b7ebcd6b4bdfd6b98..2cab4325e46a60c526e96195e03c846eec7f779b 100644 (file)
@@ -235,6 +235,11 @@ public:
     return state != extent_state_t::INVALID;
   }
 
+  /// Returns true if extent or prior_instance has been invalidated
+  bool has_been_invalidated() const {
+    return !is_valid() || (prior_instance && !prior_instance->is_valid());
+  }
+
   /**
    * get_dirty_from
    *
@@ -555,6 +560,7 @@ public:
   virtual paddr_t get_paddr() const = 0;
   virtual laddr_t get_laddr() const = 0;
   virtual LBAPinRef duplicate() const = 0;
+  virtual bool has_been_invalidated() const = 0;
 
   virtual ~LBAPin() {}
 };
index 4d2eb9fd3a60a099538ec77e2ac84cf6fe403c1a..0bef6381f180ca84f839ecee611cd28dcc4630ea 100644 (file)
@@ -55,6 +55,7 @@ BtreeLBAManager::get_root(Transaction &t)
       unsigned(croot->get_root().lba_depth));
     return get_lba_btree_extent(
       get_context(t),
+      croot,
       croot->get_root().lba_depth,
       croot->get_root().lba_root_addr,
       paddr_t());
@@ -567,6 +568,7 @@ BtreeLBAManager::update_internal_mapping(
        paddr);
       return get_lba_btree_extent(
        get_context(t),
+       croot,
        croot->get_root().lba_depth,
        croot->get_root().lba_root_addr,
        paddr_t()).safe_then([=, &t](LBANodeRef broot) {
index 3fa218fc8cdc7f19fa23961582ac3ea9797aa3ef..571ce2f3fe4a6038f6d9f1b297e2952ee0bfa2e5 100644 (file)
@@ -269,6 +269,10 @@ public:
   void take_pin(LBAPin &opin) final {
     pin.take_pin(static_cast<BtreeLBAPin&>(opin).pin);
   }
+
+  bool has_been_invalidated() const final {
+    return parent->has_been_invalidated();
+  }
 };
 
 }
index e442461789de73a1d91ce6f5ee0999fc065cb214..17c9e09ea27ac24d1eb495f57d810f04ced60047 100644 (file)
@@ -258,6 +258,7 @@ using get_lba_node_ertr = base_ertr;
 using get_lba_node_ret = get_lba_node_ertr::future<LBANodeRef>;
 get_lba_node_ret get_lba_btree_extent(
   op_context_t c, ///< [in] context structure
+  CachedExtentRef parent, ///< [in] paddr ref source
   depth_t depth,  ///< [in] depth of node to fetch
   paddr_t offset, ///< [in] physical addr of node
   paddr_t base    ///< [in] depending on user, block addr or record addr
index 3e8b290b1fb155e3ac45ea4eb13371291cb68401..e143ce2566beaff08956d352064f986ee20d84e8 100644 (file)
@@ -42,6 +42,7 @@ LBAInternalNode::lookup_ret LBAInternalNode::lookup(
   auto iter = lower_bound(addr);
   return get_lba_btree_extent(
     c,
+    this,
     meta.depth - 1,
     iter->get_val(),
     get_paddr()).safe_then([c, addr, depth](auto child) {
@@ -63,6 +64,7 @@ LBAInternalNode::lookup_range_ret LBAInternalNode::lookup_range(
     [this, c, &result, addr, len](const auto &val) mutable {
       return get_lba_btree_extent(
        c,
+       this,
        get_meta().depth - 1,
        val.get_val(),
        get_paddr()).safe_then(
@@ -90,6 +92,7 @@ LBAInternalNode::insert_ret LBAInternalNode::insert(
   auto insertion_pt = get_containing_child(laddr);
   return get_lba_btree_extent(
     c,
+    this,
     get_meta().depth - 1,
     insertion_pt->get_val(),
     get_paddr()).safe_then(
@@ -125,6 +128,7 @@ LBAInternalNode::mutate_mapping_ret LBAInternalNode::mutate_mapping_internal(
   }
   return get_lba_btree_extent(
     c,
+    this,
     get_meta().depth - 1,
     mutation_pt->get_val(),
     get_paddr()
@@ -180,6 +184,7 @@ LBAInternalNode::mutate_internal_address_ret LBAInternalNode::mutate_internal_ad
     auto iter = get_containing_child(laddr);
     return get_lba_btree_extent(
       c,
+      this,
       get_meta().depth - 1,
       iter->get_val(),
       get_paddr()
@@ -210,6 +215,7 @@ LBAInternalNode::find_hole_ret LBAInternalNode::find_hole(
         std::make_optional<laddr_t>(L_ADDR_NULL));
     }
     return get_lba_btree_extent(c,
+                               this,
                                get_meta().depth - 1,
                                i->get_val(),
                                get_paddr()).safe_then(
@@ -246,6 +252,7 @@ LBAInternalNode::scan_mappings_ret LBAInternalNode::scan_mappings(
     [=, &f](auto &viter) {
       return get_lba_btree_extent(
        c,
+       this,
        get_meta().depth - 1,
        viter->get_val(),
        get_paddr()).safe_then([=, &f](auto child) {
@@ -264,6 +271,7 @@ LBAInternalNode::scan_mapped_space_ret LBAInternalNode::scan_mapped_space(
     [=, &f](auto &viter) {
       return get_lba_btree_extent(
        c,
+       this,
        get_meta().depth - 1,
        viter->get_val(),
        get_paddr()).safe_then([=, &f](auto child) {
@@ -350,6 +358,7 @@ LBAInternalNode::merge_entry(
   auto donor_iter = donor_is_left ? iter - 1 : iter + 1;
   return get_lba_btree_extent(
     c,
+    this,
     get_meta().depth - 1,
     donor_iter->get_val(),
     get_paddr()
@@ -643,8 +652,9 @@ LBALeafNode::get_leaf_entries(laddr_t addr, extent_len_t len)
   return bound(addr, addr + len);
 }
 
-get_lba_node_ertr::future<LBANodeRef> get_lba_btree_extent(
+get_lba_node_ret get_lba_btree_extent(
   op_context_t c,
+  CachedExtentRef parent,
   depth_t depth,
   paddr_t offset,
   paddr_t base)
@@ -659,17 +669,23 @@ get_lba_node_ertr::future<LBANodeRef> get_lba_btree_extent(
     return c.cache.get_extent<LBAInternalNode>(
       c.trans,
       offset,
-      LBA_BLOCK_SIZE).safe_then([c](auto ret) {
+      LBA_BLOCK_SIZE).safe_then([c, parent](auto ret)
+                               -> get_lba_node_ret {
        auto meta = ret->get_meta();
        if (ret->get_size()) {
          ceph_assert(meta.begin <= ret->begin()->get_key());
          ceph_assert(meta.end > (ret->end() - 1)->get_key());
        }
+       if (parent->has_been_invalidated() || ret->has_been_invalidated()) {
+         return crimson::ct_error::eagain::make();
+       }
        if (!ret->is_pending() && !ret->pin.is_linked()) {
          ret->pin.set_range(meta);
          c.pins.add_pin(ret->pin);
        }
-       return LBANodeRef(ret.detach(), /* add_ref = */ false);
+       return get_lba_node_ret(
+         get_lba_node_ertr::ready_future_marker{},
+         LBANodeRef(ret.detach(), /* add_ref = */ false));
       });
   } else {
     logger().debug(
@@ -679,21 +695,28 @@ get_lba_node_ertr::future<LBANodeRef> get_lba_btree_extent(
     return c.cache.get_extent<LBALeafNode>(
       c.trans,
       offset,
-      LBA_BLOCK_SIZE).safe_then([offset, c](auto ret) {
+      LBA_BLOCK_SIZE).safe_then([offset, c, parent](auto ret)
+                               -> get_lba_node_ret {
        logger().debug(
-         "get_lba_btree_extent: read leaf at offset {} {}",
+         "get_lba_btree_extent: read leaf at offset {} {}, parent {}",
          offset,
-         *ret);
+         *ret,
+         *parent);
        auto meta = ret->get_meta();
        if (ret->get_size()) {
          ceph_assert(meta.begin <= ret->begin()->get_key());
          ceph_assert(meta.end > (ret->end() - 1)->get_key());
        }
+       if (parent->has_been_invalidated() || ret->has_been_invalidated()) {
+         return crimson::ct_error::eagain::make();
+       }
        if (!ret->is_pending() && !ret->pin.is_linked()) {
          ret->pin.set_range(meta);
          c.pins.add_pin(ret->pin);
        }
-       return LBANodeRef(ret.detach(), /* add_ref = */ false);
+       return get_lba_node_ret(
+         get_lba_node_ertr::ready_future_marker{},
+         LBANodeRef(ret.detach(), /* add_ref = */ false));
       });
   }
 }
index bfe5ea233005cdce3c122ffc540e8e6aec48d90c..f4cb6da58885fa2a5c9e173ff317fc1de2aea8da 100644 (file)
@@ -289,11 +289,16 @@ TransactionManager::get_extent_if_live_ret TransactionManager::get_extent_if_liv
            addr,
            laddr,
            len).safe_then(
-             [this, pin=std::move(pin)](CachedExtentRef ret) mutable {
+             [this, pin=std::move(pin)](CachedExtentRef ret) mutable
+             -> get_extent_if_live_ret {
                auto lref = ret->cast<LogicalCachedExtent>();
                if (!lref->has_pin()) {
-                 lref->set_pin(std::move(pin));
-                 lba_manager.add_pin(lref->get_pin());
+                 if (pin->has_been_invalidated() || lref->has_been_invalidated()) {
+                   return crimson::ct_error::eagain::make();
+                 } else {
+                   lref->set_pin(std::move(pin));
+                   lba_manager.add_pin(lref->get_pin());
+                 }
                }
                return get_extent_if_live_ret(
                  get_extent_if_live_ertr::ready_future_marker{},
index e47ad75b16edbaec4b8d5812b506ce179cbb0c2f..e105b87e1f68bc36d6fc71ef857bdf5bf7ba3a46 100644 (file)
@@ -113,10 +113,15 @@ public:
            t,
            pin->get_paddr(),
            pin->get_length()
-         ).safe_then([this, &pin, &ret_ref](auto ref) mutable {
+         ).safe_then([this, &pin, &ret_ref](auto ref) mutable
+                     -> read_extent_ertr::future<> {
            if (!ref->has_pin()) {
-             ref->set_pin(std::move(pin));
-             lba_manager.add_pin(ref->get_pin());
+             if (pin->has_been_invalidated() || ref->has_been_invalidated()) {
+               return crimson::ct_error::eagain::make();
+             } else {
+               ref->set_pin(std::move(pin));
+               lba_manager.add_pin(ref->get_pin());
+             }
            }
            ret_ref.push_back(std::make_pair(ref->get_laddr(), ref));
            crimson::get_logger(ceph_subsys_filestore).debug(