]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: implement read balancer
authorLaura Flores <lflores@redhat.com>
Wed, 1 Feb 2023 23:50:53 +0000 (23:50 +0000)
committerLaura Flores <lflores@redhat.com>
Wed, 22 Feb 2023 16:29:38 +0000 (16:29 +0000)
This commit implements two functions:

1. calc_desired_primary_distribution
   Based on the number of pgs in a pool and the pool's
   replica count, we calculate the ideal number of primary
   pgs that should be assigned to each OSD on that pool in
   order for reads to be balanced.

2. balance_primaries
   This is the overall algorithm used to balance reads (primary
   pgs) in a pool. Based on the first function, we re-distribute
   primaries on the OSDs in a pool so each OSD has the ideal
   number of primaries. This is done without data movement.

Signed-off-by: Laura Flores <lflores@redhat.com>
src/osd/OSDMap.cc
src/osd/OSDMap.h

index c36a926245c49f0ff568c8aa229001bfd5caf847..b5e76e7d0806c716f9a9af106c202c6e9691743a 100644 (file)
@@ -4921,6 +4921,207 @@ bool OSDMap::try_pg_upmap(
   return true;
 }
 
+
+int OSDMap::balance_primaries(
+  CephContext *cct,
+  int64_t pid,
+  OSDMap::Incremental *pending_inc,
+  OSDMap& tmp_osd_map) const
+{
+  // This function only handles replicated pools.
+  const pg_pool_t* pool = get_pg_pool(pid);
+  if (! pool->is_replicated()) {
+    ldout(cct, 10) << __func__ << " skipping erasure pool "
+               << get_pool_name(pid) << dendl;
+    return -EINVAL;
+  }
+
+  // Info to be used in verify_upmap
+  int pool_size = pool->get_size();
+  int crush_rule = pool->get_crush_rule();
+
+  // Get pgs by osd (map of osd -> pgs)
+  // Get primaries by osd (map of osd -> primary)
+  map<uint64_t,set<pg_t>> pgs_by_osd;
+  map<uint64_t,set<pg_t>> prim_pgs_by_osd;
+  map<uint64_t,set<pg_t>> acting_prims_by_osd;
+  pgs_by_osd = tmp_osd_map.get_pgs_by_osd(cct, pid, &prim_pgs_by_osd, &acting_prims_by_osd);
+
+  // Transfer pgs into a map, `pgs_to_check`. This will tell us the total num_changes after all
+  //     calculations have been finalized.
+  // Transfer osds into a set, `osds_to_check`.
+  // This is to avoid poor runtime when we loop through the pgs and to set up
+  // our call to calc_desired_primary_distribution.
+  map<pg_t,bool> prim_pgs_to_check;
+  vector<uint64_t> osds_to_check;
+  for (const auto & [osd, pgs] : prim_pgs_by_osd) {
+    osds_to_check.push_back(osd);
+    for (const auto & pg : pgs) {
+      prim_pgs_to_check.insert({pg, false});
+    }
+  }
+
+  // calculate desired primary distribution for each osd
+  map<uint64_t,float> desired_prim_dist;
+  int rc = 0;
+  rc = calc_desired_primary_distribution(cct, pid, osds_to_check, desired_prim_dist);
+  if (rc < 0) {
+    ldout(cct, 10) << __func__ << " Error in calculating desired primary distribution" << dendl;
+    return -EINVAL;
+  }
+  map<uint64_t,float> prim_dist_scores;
+  float actual;
+  float desired;
+  for (auto osd : osds_to_check) {
+    actual = prim_pgs_by_osd[osd].size();
+    desired = desired_prim_dist[osd];
+    prim_dist_scores[osd] = actual - desired;
+    ldout(cct, 10) << __func__ << " desired distribution for osd." << osd << " " << desired << dendl;
+  }
+
+  // get read balance score before balancing
+  float read_balance_score_before = 0.0;
+  read_balance_info_t rb_info;
+  rc = tmp_osd_map.calc_read_balance_score(cct, pid, &rb_info);
+  if (rc >= 0) {
+    read_balance_score_before = rb_info.adjusted_score;
+  }
+  if (rb_info.err_msg.length() > 0) {
+    ldout(cct, 10) << __func__ << (rc < 0 ? " ERROR: " : " Warning: ") << rb_info.err_msg << dendl;
+    return -EINVAL;
+  }
+
+  // get ready to swap pgs
+  while (true) {
+    int curr_num_changes = 0;
+    vector<int> up_osds;
+    vector<int> acting_osds;
+    int up_primary, acting_primary;
+    for (const auto & [pg, mapped] : prim_pgs_to_check) {
+      // fill in the up, up primary, acting, and acting primary for the current PG
+      tmp_osd_map.pg_to_up_acting_osds(pg, &up_osds, &up_primary,
+         &acting_osds, &acting_primary);
+      
+      // find the OSD that would make the best swap based on its score
+      // We start by first testing the OSD that is currently primary for the PG we are checking.
+      uint64_t curr_best_osd = up_primary;
+      float prim_score = prim_dist_scores[up_primary];
+      for (auto potential_osd : up_osds) {
+       float potential_score = prim_dist_scores[potential_osd];
+       if ((prim_score > 0) && // taking 1 pg from the prim would not make its score worse
+           (potential_score < 0) && // adding 1 pg to the potential would not make its score worse
+           ((prim_score - potential_score) > 1) && // swapping a pg would not just keep the scores the same
+           (desired_prim_dist[potential_osd] > 0)) // the potential is not off limits (the primary affinity is above 0)
+       {
+         curr_best_osd = potential_osd;
+       }
+      }
+
+      // Make the swap only if:
+      //    1. The swap is legal
+      //    2. The balancer has chosen a new primary
+      auto legal_swap = crush->verify_upmap(cct,
+                                 crush_rule,
+                                 pool_size,
+                                 {(int)curr_best_osd});
+      if (legal_swap >= 0 &&
+         ((int)curr_best_osd != up_primary)) {
+       // Update prim_dist_scores
+       prim_dist_scores[curr_best_osd] += 1;
+       prim_dist_scores[up_primary] -= 1;
+
+       // Update the mappings
+       pending_inc->new_pg_upmap_primary[pg] = curr_best_osd;
+       tmp_osd_map.pg_upmap_primaries[pg] = curr_best_osd;
+       prim_pgs_to_check[pg] = true; // mark that this pg changed mappings
+
+       curr_num_changes++;
+      }
+      ldout(cct, 20) << __func__ << " curr_num_changes: " << curr_num_changes << dendl;
+    }
+    // If there are no changes after one pass through the pgs, then no further optimizations can be made.
+    if (curr_num_changes == 0) {
+      ldout(cct, 20) << __func__ << " curr_num_changes is 0; no further optimizations can be made." << dendl;
+      break;
+    }
+  }
+
+  // get read balance score after balancing
+  float read_balance_score_after = 0.0;
+  rc = tmp_osd_map.calc_read_balance_score(cct, pid, &rb_info);
+  if (rc >= 0) {
+    read_balance_score_after = rb_info.adjusted_score;
+  }
+  if (rb_info.err_msg.length() > 0) {
+    ldout(cct, 10) << __func__ << (rc < 0 ? " ERROR: " : " Warning: ") << rb_info.err_msg << dendl;
+    return -EINVAL;
+  }
+
+  // Tally total number of changes
+  int num_changes = 0;
+  if (read_balance_score_after < read_balance_score_before) {
+    for (auto [pg, mapped] : prim_pgs_to_check) {
+      if (mapped) {
+        num_changes++;
+      }
+    }
+  }
+
+  ldout(cct, 10) << __func__ << " num_changes " << num_changes << dendl;
+  return num_changes;
+}
+
+int OSDMap::calc_desired_primary_distribution(
+  CephContext *cct,
+  int64_t pid,
+  const vector<uint64_t> &osds,
+  std::map<uint64_t, float>& desired_primary_distribution) const
+{
+  // will return a perfect distribution of floats
+  // without calculating the floor of each value
+  //
+  // This function only handles replicated pools.
+  const pg_pool_t* pool = get_pg_pool(pid);
+  if (pool->is_replicated()) {
+    ldout(cct, 20) << __func__ << " calculating distribution for replicated pool "
+                   << get_pool_name(pid) << dendl;
+    uint64_t replica_count = pool->get_size();
+    
+    map<uint64_t,set<pg_t>> pgs_by_osd;
+    pgs_by_osd = get_pgs_by_osd(cct, pid);
+
+    // First calculate the distribution using primary affinity and tally up the sum
+    auto distribution_sum = 0.0;
+    for (const auto & osd : osds) {
+      float osd_primary_count = ((float)pgs_by_osd[osd].size() / (float)replica_count) * get_primary_affinityf(osd);
+      desired_primary_distribution.insert({osd, osd_primary_count});
+      distribution_sum += osd_primary_count;
+    }
+    if (distribution_sum <= 0) {
+      ldout(cct, 10) << __func__ << " Unable to calculate primary distribution, likely because primary affinity is"
+                           << " set to 0 on all OSDs." << dendl;
+      return -EINVAL;
+    }
+
+    // Then, stretch the value (necessary when primary affinity is smaller than 1)
+    float factor = (float)pool->get_pg_num() / (float)distribution_sum;
+    float distribution_sum_desired = 0.0;
+
+    ceph_assert(factor >= 1.0);
+    for (const auto & [osd, osd_primary_count] : desired_primary_distribution) {
+      desired_primary_distribution[osd] *= factor;
+      distribution_sum_desired += desired_primary_distribution[osd];
+    }
+    ceph_assert(fabs(distribution_sum_desired - pool->get_pg_num()) < 0.01);
+  } else {
+    ldout(cct, 10) << __func__ <<" skipping erasure pool "
+                   << get_pool_name(pid) << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
 int OSDMap::calc_pg_upmaps(
   CephContext *cct,
   uint32_t max_deviation,
index b0ae3bc4e60ff50781b7bfda0d82a6b193a01ad4..c578bd1cd8a3d6355a934bf55cf88ee43c3ab780 100644 (file)
@@ -1455,6 +1455,18 @@ public:
     std::vector<int> *orig,
     std::vector<int> *out);             ///< resulting alternative mapping
 
+  int balance_primaries(
+    CephContext *cct,
+    int64_t pid,
+    Incremental *pending_inc,
+    OSDMap& tmp_osd_map) const;
+
+  int calc_desired_primary_distribution(
+    CephContext *cct,
+    int64_t pid, // pool id
+    const std::vector<uint64_t> &osds,
+    std::map<uint64_t, float>& desired_primary_distribution) const; // vector of osd ids
+
   int calc_pg_upmaps(
     CephContext *cct,
     uint32_t max_deviation, ///< max deviation from target (value >= 1)
@@ -1464,8 +1476,6 @@ public:
     std::random_device::result_type *p_seed = nullptr  ///< [optional] for regression tests
     );
 
-private: // Bunch of internal functions used only by calc_pg_upmaps (result of code refactoring)
-
   std::map<uint64_t,std::set<pg_t>> get_pgs_by_osd(
     CephContext *cct,
     int64_t pid,
@@ -1473,7 +1483,8 @@ private: // Bunch of internal functions used only by calc_pg_upmaps (result of c
     std::map<uint64_t, std::set<pg_t>> *p_acting_primaries_by_osd = nullptr
   ) const; // used in calc_desired_primary_distribution()
 
-private:
+private: // Bunch of internal functions used only by calc_pg_upmaps (result of code refactoring)
+
   float get_osds_weight(
     CephContext *cct,
     const OSDMap& tmp_osd_map,