From: Sage Weil Date: Mon, 24 Mar 2008 18:27:24 +0000 (-0700) Subject: osd: verify inc_lock X-Git-Tag: v0.2~229^2^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f74506d4421ee4ebca253a7db5ee3e9df095c20a;p=ceph.git osd: verify inc_lock --- diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h index 79a92d00138..6eb97fce841 100644 --- a/src/messages/MOSDOp.h +++ b/src/messages/MOSDOp.h @@ -27,6 +27,8 @@ * */ +#define EINCLOCKED 100 + class MOSDOp : public Message { public: static const char* get_opname(int op) { @@ -91,6 +93,8 @@ public: off_t get_length() const { return le64_to_cpu(head.length); } off_t get_offset() const { return le64_to_cpu(head.offset); } + unsigned get_inc_lock() const { return le32_to_cpu(head.inc_lock); } + void set_peer_stat(const osd_peer_stat_t& stat) { head.peer_stat = stat; } const ceph_osd_peer_stat& get_peer_stat() { return head.peer_stat; } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 5603e8054fc..2d18171d952 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -512,41 +512,56 @@ void ReplicatedPG::op_read(MOSDOp *op) if (oid.rev && !pick_object_rev(oid)) { // we have no revision for this request. r = -EEXIST; - } else { - switch (op->get_op()) { - case CEPH_OSD_OP_READ: - { - // read into a buffer - bufferlist bl; - r = osd->store->read(poid, - op->get_offset(), op->get_length(), - bl); - reply->set_data(bl); - if (r >= 0) - reply->set_length(r); - else - reply->set_length(0); - dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl; - } - osd->logger->inc("c_rd"); - osd->logger->inc("c_rdb", op->get_length()); - break; - - case CEPH_OSD_OP_STAT: - { - struct stat st; - memset(&st, sizeof(st), 0); - r = osd->store->stat(poid, &st); - if (r >= 0) - reply->set_length(st.st_size); - } - break; - - default: - assert(0); + goto done; + } + + // check inc_lock? + if (op->get_inc_lock() > 0) { + __u32 cur = 0; + osd->store->getattr(poid, "inc_lock", &cur, sizeof(cur)); + if (cur > op->get_inc_lock()) { + dout(10) << " inc_lock " << cur << " > " << op->get_inc_lock() + << " on " << poid << dendl; + r = -EINCLOCKED; + goto done; } } + switch (op->get_op()) { + case CEPH_OSD_OP_READ: + { + // read into a buffer + bufferlist bl; + r = osd->store->read(poid, + op->get_offset(), op->get_length(), + bl); + reply->set_data(bl); + if (r >= 0) + reply->set_length(r); + else + reply->set_length(0); + dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl; + } + osd->logger->inc("c_rd"); + osd->logger->inc("c_rdb", op->get_length()); + break; + + case CEPH_OSD_OP_STAT: + { + struct stat st; + memset(&st, sizeof(st), 0); + r = osd->store->stat(poid, &st); + if (r >= 0) + reply->set_length(st.st_size); + } + break; + + default: + assert(0); + } + + + done: if (r >= 0) { reply->set_result(0); @@ -1141,6 +1156,20 @@ void ReplicatedPG::op_modify(MOSDOp *op) block_if_wrlocked(op)) return; // op will be handled later, after the object unlocks + // check inc_lock? + if (op->get_inc_lock() > 0) { + __u32 cur = 0; + osd->store->getattr(poid, "inc_lock", &cur, sizeof(cur)); + if (cur > op->get_inc_lock()) { + dout(10) << " inc_lock " << cur << " > " << op->get_inc_lock() + << " on " << poid << dendl; + MOSDOpReply *reply = new MOSDOpReply(op, -EINCLOCKED, osd->osdmap->get_epoch(), true); + osd->messenger->send_message(reply, op->get_client_inst()); + delete op; + return; + } + } + // balance-reads set? char v; if ((op->get_op() != CEPH_OSD_OP_BALANCEREADS && op->get_op() != CEPH_OSD_OP_UNBALANCEREADS) && diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 1acf5eb41b9..c72377df365 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -373,7 +373,7 @@ tid_t Objecter::stat_submit(OSDStat *st) MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, ex.oid, ex.layout, osdmap->get_epoch(), CEPH_OSD_OP_STAT, flags); - if (inc_lock >= 0) { + if (inc_lock > 0) { st->inc_lock = inc_lock; m->set_inc_lock(inc_lock); } @@ -409,6 +409,12 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m) if (pg.active_tids.empty()) close_pg( m->get_pg() ); // success? + if (m->get_result() == -EINCLOCKED) { + dout(7) << " got -EINCLOCKED, resubmitting" << dendl; + stat_submit(st); + delete m; + return; + } if (m->get_result() == -EAGAIN) { dout(7) << " got -EAGAIN, resubmitting" << dendl; stat_submit(st); @@ -495,7 +501,7 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry) MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, ex.oid, ex.layout, osdmap->get_epoch(), CEPH_OSD_OP_READ, flags); - if (inc_lock >= 0) { + if (inc_lock > 0) { rd->inc_lock = inc_lock; m->set_inc_lock(inc_lock); } @@ -543,8 +549,9 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) rd->ops.erase(tid); // success? - if (m->get_result() == -EAGAIN) { - dout(7) << " got -EAGAIN, resubmitting" << dendl; + if (m->get_result() == -EAGAIN || + m->get_result() == -EINCLOCKED) { + dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl; readx_submit(rd, rd->ops[tid], true); delete m; return; @@ -785,7 +792,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid, ex.oid, ex.layout, osdmap->get_epoch(), wr->op, flags); - if (inc_lock >= 0) { + if (inc_lock > 0) { wr->inc_lock = inc_lock; m->set_inc_lock(inc_lock); } @@ -855,8 +862,9 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) delete m; return; } - if (m->get_result() == -EAGAIN) { - dout(7) << " got -EAGAIN, resubmitting" << dendl; + if (m->get_result() == -EAGAIN || + m->get_result() == -EINCLOCKED) { + dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl; if (wr->onack) num_unacked--; if (wr->oncommit) num_uncommitted--; if (wr->waitfor_ack.count(tid)) diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index b1cb68f9140..de50740a349 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -72,7 +72,7 @@ class Objecter { public: list extents; int inc_lock; - OSDOp() : inc_lock(-1) {} + OSDOp() : inc_lock(0) {} virtual ~OSDOp() {} }; @@ -191,7 +191,7 @@ class Objecter { public: Objecter(Messenger *m, MonMap *mm, OSDMap *om, Mutex& l) : messenger(m), monmap(mm), osdmap(om), - last_tid(0), client_inc(-1), inc_lock(-1), + last_tid(0), client_inc(-1), inc_lock(0), num_unacked(0), num_uncommitted(0), last_epoch_requested(0), client_lock(l), timer(l)