]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: add support for basic write path, part 1.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Fri, 26 Apr 2019 16:43:05 +0000 (18:43 +0200)
committerKefu Chai <kchai@redhat.com>
Tue, 7 May 2019 09:49:01 +0000 (17:49 +0800)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/exceptions.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h

index 8462d73188dd819efb0260952050bf8de99e5b72..7437ae4b88705f0267d97fcb2e97285bb5880872 100644 (file)
@@ -11,3 +11,5 @@ class object_not_found : public std::exception {
 class object_corrupted : public std::exception {
 };
 
+class invalid_argument : public std::exception {
+};
index 32f8cac525117c1b8fcd1f489f5f909494fbacfa..b977857ccd1ec3cee342ac0df4e50d9f5f47ea0d 100644 (file)
@@ -28,6 +28,7 @@
 #include "crimson/net/Messenger.h"
 #include "crimson/os/cyan_collection.h"
 #include "crimson/os/cyan_store.h"
+#include "crimson/os/Transaction.h"
 #include "crimson/osd/exceptions.h"
 #include "crimson/osd/pg_meta.h"
 
@@ -969,9 +970,12 @@ seastar::future<> PG::wait_for_active()
 }
 
 seastar::future<>
-PG::do_osd_op(const ObjectState& os, OSDOp& osd_op)
+PG::do_osd_op(const ObjectState& os, OSDOp& osd_op, ceph::os::Transaction& txn)
 {
-  switch (const auto& op = osd_op.op; op.op) {
+  // TODO: dispatch via call table?
+  // TODO: we might want to find a way to unify both input and output
+  // of each op.
+  switch (const ceph_osd_op& op = osd_op.op; op.op) {
   case CEPH_OSD_OP_SYNC_READ:
     [[fallthrough]];
   case CEPH_OSD_OP_READ:
@@ -985,6 +989,14 @@ PG::do_osd_op(const ObjectState& os, OSDOp& osd_op)
       osd_op.outdata = std::move(bl);
       return seastar::now();
     });
+  case CEPH_OSD_OP_WRITE:
+    // TODO: handle write separately. For `rados bench write` the fall-
+    // through path somehow works but this is really nasty.
+    [[fallthrough]];
+  case CEPH_OSD_OP_WRITEFULL:
+    return backend->writefull(os, osd_op, txn);
+  case CEPH_OSD_OP_SETALLOCHINT:
+    return seastar::now();
   default:
     throw std::runtime_error(
       fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
@@ -995,18 +1007,22 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
 {
   // todo: issue requests in parallel if they don't write,
   // with writes being basically a synchronization barrier
-  return seastar::do_with(std::move(m), [this](auto& m) {
+  return seastar::do_with(std::move(m), ceph::os::Transaction{},
+                          [this](auto& m, auto& txn) {
     return seastar::do_for_each(begin(m->ops), end(m->ops),
-                                [m,this](OSDOp& osd_op) {
+                                [m,&txn,this](OSDOp& osd_op) {
       const auto oid = (m->get_snapid() == CEPH_SNAPDIR ?
                         m->get_hobj().get_head() :
                         m->get_hobj());
-      return backend->get_object_state(oid).then([&osd_op,this](auto os) {
-        return do_osd_op(*os, osd_op);
+      return backend->get_object_state(oid).then([&osd_op,&txn,this](auto os) {
+        return do_osd_op(*os, osd_op, txn);
       }).handle_exception_type([&osd_op](const object_not_found&) {
         osd_op.rval = -ENOENT;
         throw;
       });
+    }).then([&] {
+      return txn.empty() ? seastar::now()
+                         : backend->submit_transaction(std::move(txn));
     }).then([=] {
       auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
                                              0, false);
index 2e2571e646aeab4b33f372ae7af14e9d892a3022..bcf6eea4d4eb511231f3e6e48cc0a1d1173cd497 100644 (file)
@@ -12,6 +12,7 @@
 #include <seastar/core/shared_future.hh>
 
 #include "crimson/net/Fwd.h"
+#include "crimson/os/Transaction.h"
 #include "osd/osd_types.h"
 #include "osd/osd_internal_types.h"
 #include "recovery_state.h"
@@ -129,7 +130,10 @@ private:
                            const std::vector<int>& new_acting,
                            int new_acting_primary);
   seastar::future<Ref<MOSDOpReply>> do_osd_ops(Ref<MOSDOp> m);
-  seastar::future<> do_osd_op(const ObjectState& os, OSDOp& op);
+  seastar::future<> do_osd_op(
+    const ObjectState& os,
+    OSDOp& op,
+    ceph::os::Transaction& txn);
 
 private:
   const spg_t pgid;
index 696fbd8cfc65fa205d91a608123fb2a67f592132..5cbddb374c545b03fc6ee1df492b3731f85cf434 100644 (file)
@@ -99,7 +99,6 @@ PGBackend::_load_os(const hobject_t& oid)
   return store->get_attr(coll,
                          ghobject_t{oid, ghobject_t::NO_GEN, shard},
                          OI_ATTR).then_wrapped([oid, this](auto fut) {
-    auto oi = std::make_unique<object_info_t>();
     if (fut.failed()) {
       auto ep = std::move(fut).get_exception();
       if (!ceph::os::CyanStore::EnoentException::is_class_of(ep)) {
@@ -188,3 +187,29 @@ seastar::future<bufferlist> PGBackend::read(const object_info_t& oi,
       return seastar::make_ready_future<bufferlist>(std::move(bl));
     });
 }
+
+seastar::future<> PGBackend::writefull(
+  const ObjectState& os,
+  const OSDOp& osd_op,
+  ceph::os::Transaction& txn)
+{
+  const ceph_osd_op& op = osd_op.op;
+  if (op.extent.length != osd_op.indata.length()) {
+    throw ::invalid_argument();
+  }
+
+  if (os.exists && op.extent.length < os.oi.size) {
+    txn.truncate(coll->cid, ghobject_t{os.oi.soid}, op.extent.length);
+  }
+  if (op.extent.length) {
+    txn.write(coll->cid, ghobject_t{os.oi.soid}, 0, op.extent.length,
+              osd_op.indata, op.flags);
+  }
+  return seastar::now();
+}
+
+seastar::future<> PGBackend::submit_transaction(ceph::os::Transaction&& txn)
+{
+  logger().trace("submit_transaction: num_ops={}", txn.get_num_ops());
+  return store->do_transaction(coll, std::move(txn));
+}
index 6d92100fe90cb50cb3ceb88fcce760fbcd0e6d6f..15f43b95866e3b3d62015ab344740bb83207aeca 100644 (file)
@@ -9,6 +9,7 @@
 #include <boost/smart_ptr/local_shared_ptr.hpp>
 
 #include "crimson/common/shared_lru.h"
+#include "crimson/os/Transaction.h"
 #include "osd/osd_types.h"
 #include "osd/osd_internal_types.h"
 
@@ -39,6 +40,12 @@ public:
                                   size_t truncate_size,
                                   uint32_t truncate_seq,
                                   uint32_t flags);
+  seastar::future<> write(
+    const ObjectState& os,
+    const OSDOp& osd_op,
+    ceph::os::Transaction& trans);
+  seastar::future<> submit_transaction(ceph::os::Transaction&& txn);
+
 protected:
   const shard_id_t shard;
   CollectionRef coll;