]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: add promote_object() for chunked objects.
authormyoungwon oh <omwmw@sk.com>
Tue, 10 Oct 2017 11:50:37 +0000 (20:50 +0900)
committermyoungwon oh <omwmw@sk.com>
Mon, 6 Nov 2017 04:25:28 +0000 (13:25 +0900)
Signed-off-by: Myoungwon Oh <omwmw@sk.com>
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index 273acf6df22a6e9b495abba24c375c433c1c6dec..ca6ae5915b66395c6d73f30d733b2148f6f17180 100644 (file)
@@ -2329,7 +2329,8 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
   for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p) {
     OSDOp& osd_op = *p;
     ceph_osd_op& op = osd_op.op;
-    if (op.op == CEPH_OSD_OP_SET_REDIRECT) {
+    if (op.op == CEPH_OSD_OP_SET_REDIRECT ||
+       op.op == CEPH_OSD_OP_SET_CHUNK) {
       return cache_result_t::NOOP;
     }
   }
@@ -2347,11 +2348,14 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
       do_proxy_chunked_op(op, obc->obs.oi.soid, obc, write_ordered);
       return cache_result_t::HANDLED_PROXY;
     }
-    if (obc->obs.oi.size == 0) {
-      const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
-      const object_locator_t oloc = m->get_object_locator();
-      promote_object(obc, obc->obs.oi.soid, oloc, op, NULL);
-      return cache_result_t::BLOCKED_PROMOTE;
+    
+    for (auto& p : obc->obs.oi.manifest.chunk_map) {
+      if (p.second.flags == chunk_info_t::FLAG_MISSING) {
+       const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+       const object_locator_t oloc = m->get_object_locator();
+       promote_object(obc, obc->obs.oi.soid, oloc, op, NULL);
+       return cache_result_t::BLOCKED_PROMOTE;
+      }
     }
     return cache_result_t::NOOP;
   default:
@@ -3316,6 +3320,24 @@ public:
   }
 };
 
+class PromoteManifestCallback: public PrimaryLogPG::CopyCallback {
+  ObjectContextRef obc;
+  PrimaryLogPG *pg;
+  utime_t start;
+public:
+  PromoteManifestCallback(ObjectContextRef obc_, PrimaryLogPG *pg_)
+    : obc(obc_),
+      pg(pg_),
+      start(ceph_clock_now()) {}
+
+  void finish(PrimaryLogPG::CopyCallbackResults results) override {
+    PrimaryLogPG::CopyResults *results_data = results.get<1>();
+    int r = results.get<0>();
+    pg->finish_promote_manifest(r, results_data, obc);
+    pg->osd->logger->tinc(l_osd_tier_promote_lat, ceph_clock_now() - start);
+  }
+};
+
 void PrimaryLogPG::promote_object(ObjectContextRef obc,
                                  const hobject_t& missing_oid,
                                  const object_locator_t& oloc,
@@ -3355,13 +3377,16 @@ void PrimaryLogPG::promote_object(ObjectContextRef obc,
     src_fadvise_flags |= LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
   }
 
-  PromoteCallback *cb = new PromoteCallback(obc, this);
+  CopyCallback *cb;
   object_locator_t my_oloc = oloc;
   my_oloc.pool = pool.info.tier_of;
-  if (obc->obs.oi.has_manifest()) {
+  if (!obc->obs.oi.has_manifest()) {
+    cb = new PromoteCallback(obc, this);
+  } else {
     if (obc->obs.oi.manifest.is_chunked()) {
-      object_locator_t chunk_oloc(obc->obs.oi.manifest.chunk_map[0].oid);
-      my_oloc = chunk_oloc;
+      cb = new PromoteManifestCallback(obc, this);
+    } else {
+      assert(0);
     }
   }
 
@@ -6411,10 +6436,10 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
        pg_t raw_pg;
        chunk_info_t chunk_info;
+       get_osdmap()->object_locator_to_pg(tgt_name, tgt_oloc, raw_pg);
        hobject_t target(tgt_name, tgt_oloc.key, snapid_t(),
                         raw_pg.ps(), raw_pg.pool(),
                         tgt_oloc.nspace);
-       get_osdmap()->object_locator_to_pg(tgt_name, tgt_oloc, raw_pg);
        chunk_info.flags = chunk_info_t::FLAG_MISSING;
        chunk_info.oid = target;
        chunk_info.offset = tgt_offset;
@@ -7969,6 +7994,29 @@ struct C_CopyFrom_AsyncReadCb : public Context {
   }
 };
 
+struct C_CopyChunk : public Context {
+  PrimaryLogPGRef pg;
+  hobject_t oid;
+  epoch_t last_peering_reset;
+  ceph_tid_t tid;
+  PrimaryLogPG::CopyOpRef cop;
+  uint64_t offset;
+  C_CopyChunk(PrimaryLogPG *p, hobject_t o, epoch_t lpr,
+            const PrimaryLogPG::CopyOpRef& c)
+    : pg(p), oid(o), last_peering_reset(lpr),
+      tid(0), cop(c) 
+  {}
+  void finish(int r) override {
+    if (r == -ECANCELED)
+      return;
+    pg->lock();
+    if (last_peering_reset == pg->get_last_peering_reset()) {
+      pg->process_copy_chunk_manifest(oid, tid, r, offset);
+    }
+    pg->unlock();
+  }
+};
+
 int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::iterator& bp,
                              OSDOp& osd_op, ObjectContextRef &obc)
 {
@@ -8193,7 +8241,12 @@ void PrimaryLogPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
   copy_ops[dest] = cop;
   obc->start_block();
 
-  _copy_some(obc, cop);
+  if (!obc->obs.oi.has_manifest()) {
+    _copy_some(obc, cop);
+  } else {
+    auto p = obc->obs.oi.manifest.chunk_map.begin();
+    _copy_some_manifest(obc, cop, p->first);
+  }
 }
 
 void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
@@ -8264,6 +8317,95 @@ void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
   gather.activate();
 }
 
+void PrimaryLogPG::_copy_some_manifest(ObjectContextRef obc, CopyOpRef cop, uint64_t start_offset)
+{
+  dout(10) << __func__ << " " << obc << " " << cop << dendl;
+
+  unsigned flags = 0;
+  if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_FLUSH)
+    flags |= CEPH_OSD_FLAG_FLUSH;
+  if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE)
+    flags |= CEPH_OSD_FLAG_IGNORE_CACHE;
+  if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY)
+    flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
+  if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE)
+    flags |= CEPH_OSD_FLAG_MAP_SNAP_CLONE;
+  if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_RWORDERED)
+    flags |= CEPH_OSD_FLAG_RWORDERED;
+
+  int num_chunks = 0;
+  uint64_t last_offset = 0, chunks_size = 0;
+  object_manifest_t *manifest = &obc->obs.oi.manifest;
+  map<uint64_t, chunk_info_t>::iterator iter = manifest->chunk_map.find(start_offset); 
+  for (;iter != manifest->chunk_map.end(); ++iter) {
+    num_chunks++;
+    chunks_size += iter->second.length;
+    last_offset = iter->first;
+    if (get_copy_chunk_size() < chunks_size) {
+      break;
+    }
+  }
+
+  cop->num_chunk = num_chunks;
+  cop->start_offset = start_offset;
+  cop->last_offset = last_offset;
+  dout(20) << __func__ << " oid " << obc->obs.oi.soid << " num_chunks: " << num_chunks
+         << " start_offset: " << start_offset << " chunks_size: " << chunks_size 
+         << " last_offset: " << last_offset << dendl;
+
+  iter = manifest->chunk_map.find(start_offset);
+  for (;iter != manifest->chunk_map.end(); ++iter) {
+    uint64_t obj_offset = iter->first;
+    uint64_t length = manifest->chunk_map[iter->first].length;
+    hobject_t soid = manifest->chunk_map[iter->first].oid;
+    object_locator_t oloc(soid);
+    CopyOpRef sub_cop(std::make_shared<CopyOp>(cop->cb, cop->obc, cop->src, oloc,
+                      cop->results.user_version, cop->flags, cop->mirror_snapset,
+                      cop->src_obj_fadvise_flags, cop->dest_obj_fadvise_flags));
+    sub_cop->cursor.data_offset = obj_offset;
+    cop->chunk_cops[obj_offset] = sub_cop;
+
+    int s = sub_cop->chunk_ops.size();
+    sub_cop->chunk_ops.resize(s+1);
+    sub_cop->chunk_ops[s].op.op =  CEPH_OSD_OP_READ;
+    sub_cop->chunk_ops[s].op.extent.offset = manifest->chunk_map[iter->first].offset;
+    sub_cop->chunk_ops[s].op.extent.length = length;
+
+    ObjectOperation op;
+    op.dup(sub_cop->chunk_ops);
+
+    dout(20) << __func__ << " tgt_oid: " << soid.oid << " tgt_offset: " 
+           << manifest->chunk_map[iter->first].offset
+           << " length: " << length << " pool id: " << oloc.pool << dendl;
+
+    if (cop->results.user_version) {
+      op.assert_version(cop->results.user_version);
+    } else {
+      // we should learn the version after the first chunk, if we didn't know
+      // it already!
+      assert(cop->cursor.is_initial());
+    }
+    op.set_last_op_flags(cop->src_obj_fadvise_flags);
+
+    C_CopyChunk *fin = new C_CopyChunk(this, obc->obs.oi.soid,
+                                    get_last_peering_reset(), cop);
+    fin->offset = obj_offset;
+    unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
+
+    ceph_tid_t tid = osd->objecter->read(soid.oid, oloc, op,
+                                   sub_cop->src.snap, NULL,
+                                   flags,
+                                   new C_OnFinisher(fin, osd->objecter_finishers[n]),
+                                   // discover the object version if we don't know it yet
+                                   sub_cop->results.user_version ? NULL : &sub_cop->results.user_version);
+    fin->tid = tid;
+    sub_cop->objecter_tid = tid;
+    if (last_offset < iter->first) {
+      break;
+    }
+  }
+}
+
 void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
 {
   dout(10) << __func__ << " " << oid << " tid " << tid
@@ -8449,28 +8591,133 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
 
   // cancel and requeue proxy ops on this object
   if (!r) {
-    for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
-       it != proxyread_ops.end();) {
-      if (it->second->soid == cobc->obs.oi.soid) {
-       cancel_proxy_read((it++)->second);
-      } else {
-       ++it;
-      }
+    cancel_and_requeue_proxy_ops(cobc->obs.oi.soid);
+  }
+
+  kick_object_context_blocked(cobc);
+}
+
+void PrimaryLogPG::process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset)
+{
+  dout(10) << __func__ << " " << oid << " tid " << tid
+          << " " << cpp_strerror(r) << dendl;
+  map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
+  if (p == copy_ops.end()) {
+    dout(10) << __func__ << " no copy_op found" << dendl;
+    return;
+  }
+  CopyOpRef obj_cop = p->second;
+  CopyOpRef chunk_cop = obj_cop->chunk_cops[offset];
+
+  if (tid != chunk_cop->objecter_tid) {
+    dout(10) << __func__ << " tid " << tid << " != cop " << chunk_cop
+            << " tid " << chunk_cop->objecter_tid << dendl;
+    return;
+  }
+
+  if (chunk_cop->omap_data.length() || chunk_cop->omap_header.length()) {
+    r = -EOPNOTSUPP;
+  }
+
+  chunk_cop->objecter_tid = 0;
+  chunk_cop->objecter_tid2 = 0;  // assume this ordered before us (if it happened)
+  ObjectContextRef& cobc = obj_cop->obc;
+  OSDOp &chunk_data = chunk_cop->chunk_ops[0];
+
+  if (r < 0) {
+    obj_cop->failed = true;
+    goto out;
+  }   
+
+  if (obj_cop->failed) {
+    return;
+  }                                                                                                                  
+
+  if (!chunk_data.outdata.length()) {
+    r = -EIO;
+    obj_cop->failed = true;
+    goto out;
+  }
+
+  obj_cop->num_chunk--;
+
+  /* check all of the copyop are completed */
+  if (obj_cop->num_chunk) {
+    dout(20) << __func__ << " num_chunk: " << obj_cop->num_chunk << dendl;
+    return;
+  }
+
+  {
+    OpContextUPtr ctx = simple_opc_create(obj_cop->obc);
+    PGTransaction *t = ctx->op_t.get();
+    ObjectState& obs = ctx->new_obs;
+    for (auto p : obj_cop->chunk_cops) {
+      OSDOp &sub_chunk = p.second->chunk_ops[0];
+      t->write(cobc->obs.oi.soid,
+             p.second->cursor.data_offset,
+             sub_chunk.outdata.length(),
+             sub_chunk.outdata,
+             p.second->dest_obj_fadvise_flags);
+      obs.oi.manifest.chunk_map[p.second->cursor.data_offset].flags = 0; // clean
+      dout(20) << __func__ << " offset: " << p.second->cursor.data_offset 
+             << " length: " << sub_chunk.outdata.length() << dendl;
+      sub_chunk.outdata.clear();
+      write_update_size_and_usage(ctx->delta_stats, obs.oi, ctx->modified_ranges,
+                                 p.second->cursor.data_offset, sub_chunk.outdata.length());
     }
-    for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
-        it != proxywrite_ops.end();) {
-      if (it->second->soid == cobc->obs.oi.soid) {
-       cancel_proxy_write((it++)->second);
-      } else {
-       ++it;
+    obs.oi.clear_data_digest();
+    ctx->at_version = get_next_version(); 
+    finish_ctx(ctx.get(), pg_log_entry_t::PROMOTE);
+    simple_opc_submit(std::move(ctx));
+
+    auto p = cobc->obs.oi.manifest.chunk_map.end();
+    /* check remaining work */
+    if (obj_cop->last_offset >= p->first + p->second.length) {
+      for (auto &en : cobc->obs.oi.manifest.chunk_map) {
+       if (obj_cop->last_offset < en.first) {
+         _copy_some_manifest(cobc, obj_cop, en.first);
+         return;
+       }
       }
     }
-    kick_proxy_ops_blocked(cobc->obs.oi.soid);
+  }
+
+ out:
+  dout(20) << __func__ << " complete r = " << cpp_strerror(r) << dendl;
+  CopyCallbackResults results(r, &obj_cop->results);
+  obj_cop->cb->complete(results);
+
+  copy_ops.erase(cobc->obs.oi.soid);
+  cobc->stop_block();
+
+  // cancel and requeue proxy ops on this object
+  if (!r) {
+    cancel_and_requeue_proxy_ops(cobc->obs.oi.soid);
   }
 
   kick_object_context_blocked(cobc);
 }
 
+void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) {
+  for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
+      it != proxyread_ops.end();) {
+    if (it->second->soid == oid) {
+      cancel_proxy_read((it++)->second);
+    } else {
+      ++it;
+    }
+  }
+  for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
+       it != proxywrite_ops.end();) {
+    if (it->second->soid == oid) {
+      cancel_proxy_write((it++)->second);
+    } else {
+      ++it;
+    }
+  }
+  kick_proxy_ops_blocked(oid);
+}
+
 void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
 {
   dout(20) << __func__ << " " << cop
@@ -8814,6 +9061,42 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
     agent_choose_mode();
 }
 
+void PrimaryLogPG::finish_promote_manifest(int r, CopyResults *results,
+                                           ObjectContextRef obc)
+{
+  const hobject_t& soid = obc->obs.oi.soid;
+  dout(10) << __func__ << " " << soid << " r=" << r
+          << " uv" << results->user_version << dendl;
+
+  if (r == -ECANCELED) {
+    return;
+  }
+
+  if (r < 0) {
+    derr << __func__ << " unexpected promote error " << cpp_strerror(r) << dendl;
+    // pass error to everyone blocked on this object
+    // FIXME: this is pretty sloppy, but at this point we got
+    // something unexpected and don't have many other options.
+    map<hobject_t,list<OpRequestRef>>::iterator blocked_iter =
+      waiting_for_blocked_object.find(soid);
+    if (blocked_iter != waiting_for_blocked_object.end()) {
+      while (!blocked_iter->second.empty()) {
+       osd->reply_op_error(blocked_iter->second.front(), r);
+       blocked_iter->second.pop_front();
+      }
+      waiting_for_blocked_object.erase(blocked_iter);
+    }
+    return;
+  }
+  
+  osd->promote_finish(results->object_size);
+  osd->logger->inc(l_osd_tier_promote);
+
+  if (agent_state &&
+      agent_state->is_idle())
+    agent_choose_mode();
+}
+
 void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
 {
   dout(10) << __func__ << " " << cop->obc->obs.oi.soid
index e5badb887a6a4b790099ee978b3dca320c95240e..bda615b2de5e35ac0aa22532d86ce4f14e28285a 100644 (file)
@@ -117,6 +117,9 @@ public:
     {}
   };
 
+  struct CopyOp;
+  typedef ceph::shared_ptr<CopyOp> CopyOpRef;
+
   struct CopyOp {
     CopyCallback *cb;
     ObjectContextRef obc;
@@ -149,6 +152,13 @@ public:
     unsigned src_obj_fadvise_flags;
     unsigned dest_obj_fadvise_flags;
 
+    map<uint64_t, CopyOpRef> chunk_cops;
+    int num_chunk;
+    bool failed;
+    uint64_t start_offset;
+    uint64_t last_offset;
+    vector<OSDOp> chunk_ops;
+  
     CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s,
           object_locator_t l,
            version_t v,
@@ -162,13 +172,14 @@ public:
        objecter_tid2(0),
        rval(-1),
        src_obj_fadvise_flags(src_obj_fadvise_flags),
-       dest_obj_fadvise_flags(dest_obj_fadvise_flags)
+       dest_obj_fadvise_flags(dest_obj_fadvise_flags),
+       num_chunk(0),
+       failed(false)
     {
       results.user_version = v;
       results.mirror_snapset = mirror_snapset;
     }
   };
-  typedef ceph::shared_ptr<CopyOp> CopyOpRef;
 
   /**
    * The CopyCallback class defines an interface for completions to the
@@ -1385,8 +1396,14 @@ protected:
                             uint64_t chunk_index, uint64_t req_offset, uint64_t req_length,
                             uint64_t req_total_len);
   bool can_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc);
-  
+  void _copy_some_manifest(ObjectContextRef obc, CopyOpRef cop, uint64_t start_offset);
+  void process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset);
+  void finish_promote_manifest(int r, CopyResults *results, ObjectContextRef obc);
+  void cancel_and_requeue_proxy_ops(hobject_t oid);
+
   friend struct C_ProxyChunkRead;
+  friend class PromoteManifestCallback;
+  friend class C_CopyChunk;
 
 public:
   PrimaryLogPG(OSDService *o, OSDMapRef curmap,