]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: add flush() for the chunked object.
authormyoungwon oh <omwmw@sk.com>
Tue, 20 Jun 2017 11:38:18 +0000 (20:38 +0900)
committermyoungwon oh <omwmw@sk.com>
Sun, 7 Jan 2018 13:40:58 +0000 (22:40 +0900)
If all chunks are dirty, the cheunked object will be flushed

Signed-off-by: Myoungwon Oh <omwmw@sk.com>
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index 0f57703eadd7ad2e52975e1b6e4ed0c51f4692dd..682ec073491cd759451efd5b25f4deed68d0453d 100644 (file)
@@ -2412,6 +2412,16 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
          return cache_result_t::BLOCKED_PROMOTE;
        }
       }
+
+      bool all_dirty = true;
+      for (auto& p : obc->obs.oi.manifest.chunk_map) {
+       if (p.second.flags != chunk_info_t::FLAG_DIRTY) {
+         all_dirty = false;
+       }
+      }
+      if (all_dirty) {
+       start_flush(OpRequestRef(), obc, true, NULL, boost::none);
+      }
       return cache_result_t::NOOP;
     }
   default:
@@ -2421,6 +2431,154 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
   return cache_result_t::NOOP;
 }
 
+struct C_ManifestFlush : public Context {
+  PrimaryLogPGRef pg;
+  hobject_t oid;
+  epoch_t last_peering_reset;
+  ceph_tid_t tid;
+  utime_t start;
+  uint64_t offset;
+  uint64_t last_offset;
+  PrimaryLogPG::FlushOpRef manifest_fop;
+  C_ManifestFlush(PrimaryLogPG *p, hobject_t o, epoch_t lpr)
+    : pg(p), oid(o), last_peering_reset(lpr),
+      tid(0), start(ceph_clock_now())
+  {}
+  void finish(int r) override {
+    if (r == -ECANCELED)
+      return;
+    pg->lock();
+    if (manifest_fop->rval < 0) {
+      pg->unlock();
+      return;
+    }
+    manifest_fop->io_results[offset] = r;
+    for (auto &p : manifest_fop->io_results) {
+      if (p.second < 0) {
+       pg->finish_manifest_flush(oid, tid, r, manifest_fop->obc, last_offset, manifest_fop);
+       manifest_fop->rval = r;
+       pg->unlock();
+       return;
+      }
+    }
+    if (manifest_fop->chunks == manifest_fop->io_results.size()) {
+      if (last_peering_reset == pg->get_last_peering_reset()) {
+       assert(manifest_fop->obc);
+       pg->finish_manifest_flush(oid, tid, r, manifest_fop->obc, last_offset, manifest_fop);
+       pg->osd->logger->tinc(l_osd_tier_flush_lat, ceph_clock_now() - start);
+      }
+    }
+    pg->unlock();
+  }
+};
+
+int PrimaryLogPG::start_manifest_flush(OpRequestRef op, ObjectContextRef obc, bool blocking,
+                                      boost::optional<std::function<void()>> &&on_flush)
+{
+  auto p = obc->obs.oi.manifest.chunk_map.begin();
+  FlushOpRef manifest_fop(std::make_shared<FlushOp>());
+  manifest_fop->op = op;
+  manifest_fop->obc = obc;
+  manifest_fop->flushed_version = obc->obs.oi.user_version;
+  manifest_fop->blocking = blocking;
+  manifest_fop->on_flush = std::move(on_flush);
+  int r = do_manifest_flush(op, obc, manifest_fop, p->first, blocking);
+  if (r < 0) {
+    return r;
+  }
+
+  flush_ops[obc->obs.oi.soid] = manifest_fop;
+  return -EINPROGRESS;
+}
+
+int PrimaryLogPG::do_manifest_flush(OpRequestRef op, ObjectContextRef obc, FlushOpRef manifest_fop,
+                                   uint64_t start_offset, bool block)
+{
+  struct object_manifest_t &manifest = obc->obs.oi.manifest;
+  hobject_t soid = obc->obs.oi.soid;
+  ceph_tid_t tid;
+  SnapContext snapc;
+  uint64_t max_copy_size = 0, last_offset = 0;
+
+  map<uint64_t, chunk_info_t>::iterator iter = manifest.chunk_map.find(start_offset); 
+  assert(iter != manifest.chunk_map.end());
+  for (;iter != manifest.chunk_map.end(); ++iter) {
+    if (iter->second.flags == chunk_info_t::FLAG_DIRTY) {
+      last_offset = iter->first;
+      max_copy_size += iter->second.length;
+    }
+    if (get_copy_chunk_size() < max_copy_size) {
+      break;
+    }
+  }
+
+  iter = manifest.chunk_map.find(start_offset);
+  for (;iter != manifest.chunk_map.end(); ++iter) {
+    if (iter->second.flags != chunk_info_t::FLAG_DIRTY) {
+      continue;
+    }
+    uint64_t tgt_length = iter->second.length;
+    uint64_t tgt_offset= iter->second.offset;
+    hobject_t tgt_soid = iter->second.oid;
+    object_locator_t oloc(tgt_soid);
+    ObjectOperation obj_op;
+    bufferlist chunk_data;
+    int r = pgbackend->objects_read_sync(
+       soid, iter->first, tgt_length, 0, &chunk_data);
+    if (r < 0) {
+      dout(0) << __func__ << " read fail " << " offset: " << tgt_offset
+             << " len: " << tgt_length << " r: " << r << dendl;
+      return r;
+    }
+    if (!chunk_data.length()) {
+      return -ENODATA;
+    }
+    tgt_length = chunk_data.length();
+    obj_op.add_data(CEPH_OSD_OP_WRITE, tgt_offset, tgt_length, chunk_data);
+
+    unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY |
+                    CEPH_OSD_FLAG_RWORDERED ;
+    C_ManifestFlush *fin = new C_ManifestFlush(this, soid, get_last_peering_reset());
+    fin->offset = iter->first;
+    fin->last_offset = last_offset;
+    fin->manifest_fop = manifest_fop;
+    manifest_fop->chunks++;
+
+    unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
+    tid = osd->objecter->mutate(
+      tgt_soid.oid, oloc, obj_op, snapc,
+      ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
+      flags, new C_OnFinisher(fin, osd->objecter_finishers[n]));
+    fin->tid = tid;
+    manifest_fop->io_tids[iter->first] = tid;
+
+    dout(20) << __func__ << " offset: " << tgt_offset << " len: " << tgt_length 
+           << " oid: " << tgt_soid.oid << " ori oid: " << soid.oid.name 
+           << " tid: " << tid << dendl;
+    if (last_offset < iter->first) {
+      break;
+    }
+  }
+
+  return 0;
+}
+
+void PrimaryLogPG::finish_manifest_flush(hobject_t oid, ceph_tid_t tid, int r, ObjectContextRef obc, 
+                                        uint64_t last_offset, FlushOpRef manifest_fop)
+{
+  dout(10) << __func__ << " " << oid << " tid " << tid
+          << " " << cpp_strerror(r) << " last_offset: " << last_offset << dendl;
+  map<uint64_t, chunk_info_t>::iterator iter = obc->obs.oi.manifest.chunk_map.find(last_offset); 
+  assert(iter != obc->obs.oi.manifest.chunk_map.end());
+  for (;iter != obc->obs.oi.manifest.chunk_map.end(); ++iter) {
+    if (iter->second.flags == chunk_info_t::FLAG_DIRTY && last_offset < iter->first) {
+      do_manifest_flush(manifest_fop->op, obc, manifest_fop, iter->first, manifest_fop->blocking);
+      return;
+    }
+  }
+  finish_flush(oid, tid, r);
+}
+
 void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
                                      MOSDOpReply *orig_reply, int r)
 {
@@ -8745,12 +8903,12 @@ void PrimaryLogPG::process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, in
              sub_chunk.outdata.length(),
              sub_chunk.outdata,
              p.second->dest_obj_fadvise_flags);
-      obs.oi.manifest.chunk_map[p.second->cursor.data_offset].flags = 0; // clean
       dout(20) << __func__ << " offset: " << p.second->cursor.data_offset 
              << " length: " << sub_chunk.outdata.length() << dendl;
       sub_chunk.outdata.clear();
       write_update_size_and_usage(ctx->delta_stats, obs.oi, ctx->modified_ranges,
                                  p.second->cursor.data_offset, sub_chunk.outdata.length());
+      obs.oi.manifest.chunk_map[p.second->cursor.data_offset].flags = 0; // clean
     }
     obs.oi.clear_data_digest();
     ctx->at_version = get_next_version(); 
@@ -9359,6 +9517,15 @@ int PrimaryLogPG::start_flush(
     cancel_flush(fop, false);
   }
 
+  if (obc->obs.oi.has_manifest() && obc->obs.oi.manifest.is_chunked()) {
+    int r = start_manifest_flush(op, obc, blocking, std::move(on_flush));
+    if (r != -EINPROGRESS) {
+      if (blocking)
+       obc->stop_block();
+    }
+    return r;
+  }
+
   /**
    * In general, we need to send a delete and a copyfrom.
    * Consider snapc 10:[10, 9, 8, 4, 3, 2]:[10(10, 9), 4(4,3,2)]
@@ -9475,7 +9642,7 @@ void PrimaryLogPG::finish_flush(hobject_t oid, ceph_tid_t tid, int r)
     return;
   }
   FlushOpRef fop = p->second;
-  if (tid != fop->objecter_tid) {
+  if (tid != fop->objecter_tid && !fop->obc->obs.oi.has_manifest()) {
     dout(10) << __func__ << " tid " << tid << " != fop " << fop
             << " tid " << fop->objecter_tid << dendl;
     return;
@@ -9559,7 +9726,7 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
   }
 
   // successfully flushed, can we evict this object?
-  if (!fop->op && agent_state->evict_mode != TierAgentState::EVICT_MODE_IDLE &&
+  if (!obc->obs.oi.has_manifest() && !fop->op && agent_state->evict_mode != TierAgentState::EVICT_MODE_IDLE &&
       agent_maybe_evict(obc, true)) {
     osd->logger->inc(l_osd_tier_clean);
     if (fop->on_flush) {
@@ -9606,6 +9773,36 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
   ctx->new_obs = obc->obs;
   ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY);
   --ctx->delta_stats.num_objects_dirty;
+  if (fop->obc->obs.oi.has_manifest()) {
+    assert(obc->obs.oi.manifest.is_chunked());
+    PGTransaction* t = ctx->op_t.get();
+    uint64_t chunks_size = 0;
+    for (auto &p : ctx->new_obs.oi.manifest.chunk_map) {
+      chunks_size += p.second.length;
+    }
+    if (ctx->new_obs.oi.is_omap() && pool.info.supports_omap()) {
+      t->omap_clear(oid);
+      ctx->new_obs.oi.clear_omap_digest();
+      ctx->new_obs.oi.clear_flag(object_info_t::FLAG_OMAP);
+    }
+    if (obc->obs.oi.size == chunks_size) { 
+      t->truncate(oid, 0);
+      ctx->new_obs.oi.size = 0;
+      ctx->new_obs.oi.new_object();
+      for (auto &p : ctx->new_obs.oi.manifest.chunk_map) {
+       p.second.flags = chunk_info_t::FLAG_MISSING;
+       ctx->delta_stats.num_bytes -= p.second.length;
+      }
+    } else {
+      for (auto &p : ctx->new_obs.oi.manifest.chunk_map) {
+       if (p.second.flags == chunk_info_t::FLAG_DIRTY) {
+         dout(20) << __func__ << " offset: " << p.second.offset 
+                 << " length: " << p.second.length << dendl;
+         p.second.flags = 0; // CLEAN
+       }
+      }
+    }
+  }
 
   finish_ctx(ctx.get(), pg_log_entry_t::CLEAN);
 
@@ -9640,6 +9837,12 @@ void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
     osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
     fop->objecter_tid = 0;
   }
+  if (fop->io_tids.size()) {
+    for (auto &p : fop->io_tids) {
+      osd->objecter->op_cancel(p.second, -ECANCELED);
+      p.second = 0;
+    } 
+  }
   if (fop->blocking) {
     fop->obc->stop_block();
     kick_object_context_blocked(fop->obc);
index 36cc06fc96803464afae19f8f58497434f316f1f..838053bd2f8048b20bebb5f9520ee39f498739b5 100644 (file)
@@ -243,10 +243,14 @@ public:
     bool blocking;              ///< whether we are blocking updates
     bool removal;               ///< we are removing the backend object
     boost::optional<std::function<void()>> on_flush; ///< callback, may be null
+    // for chunked object
+    map<uint64_t, int> io_results; 
+    map<uint64_t, ceph_tid_t> io_tids; 
+    uint64_t chunks;
 
     FlushOp()
       : flushed_version(0), objecter_tid(0), rval(0),
-       blocking(false), removal(false) {}
+       blocking(false), removal(false), chunks(0) {}
     ~FlushOp() { assert(!on_flush); }
   };
   typedef ceph::shared_ptr<FlushOp> FlushOpRef;
@@ -1413,10 +1417,17 @@ protected:
   void process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset);
   void finish_promote_manifest(int r, CopyResults *results, ObjectContextRef obc);
   void cancel_and_requeue_proxy_ops(hobject_t oid);
+  int do_manifest_flush(OpRequestRef op, ObjectContextRef obc, FlushOpRef manifest_fop,
+                       uint64_t start_offset, bool block);
+  int start_manifest_flush(OpRequestRef op, ObjectContextRef obc, bool blocking,
+                          boost::optional<std::function<void()>> &&on_flush);
+  void finish_manifest_flush(hobject_t oid, ceph_tid_t tid, int r, ObjectContextRef obc, 
+                            uint64_t last_offset, FlushOpRef manifest_fop);
 
   friend struct C_ProxyChunkRead;
   friend class PromoteManifestCallback;
   friend class C_CopyChunk;
+  friend struct C_ManifestFlush;
 
 public:
   PrimaryLogPG(OSDService *o, OSDMapRef curmap,