*
*/
+#define EINCLOCKED 100
+
class MOSDOp : public Message {
public:
static const char* get_opname(int op) {
off_t get_length() const { return le64_to_cpu(head.length); }
off_t get_offset() const { return le64_to_cpu(head.offset); }
+ unsigned get_inc_lock() const { return le32_to_cpu(head.inc_lock); }
+
void set_peer_stat(const osd_peer_stat_t& stat) { head.peer_stat = stat; }
const ceph_osd_peer_stat& get_peer_stat() { return head.peer_stat; }
if (oid.rev && !pick_object_rev(oid)) {
// we have no revision for this request.
r = -EEXIST;
- } else {
- switch (op->get_op()) {
- case CEPH_OSD_OP_READ:
- {
- // read into a buffer
- bufferlist bl;
- r = osd->store->read(poid,
- op->get_offset(), op->get_length(),
- bl);
- reply->set_data(bl);
- if (r >= 0)
- reply->set_length(r);
- else
- reply->set_length(0);
- dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
- }
- osd->logger->inc("c_rd");
- osd->logger->inc("c_rdb", op->get_length());
- break;
-
- case CEPH_OSD_OP_STAT:
- {
- struct stat st;
- memset(&st, sizeof(st), 0);
- r = osd->store->stat(poid, &st);
- if (r >= 0)
- reply->set_length(st.st_size);
- }
- break;
-
- default:
- assert(0);
+ goto done;
+ }
+
+ // check inc_lock?
+ if (op->get_inc_lock() > 0) {
+ __u32 cur = 0;
+ osd->store->getattr(poid, "inc_lock", &cur, sizeof(cur));
+ if (cur > op->get_inc_lock()) {
+ dout(10) << " inc_lock " << cur << " > " << op->get_inc_lock()
+ << " on " << poid << dendl;
+ r = -EINCLOCKED;
+ goto done;
}
}
+ switch (op->get_op()) {
+ case CEPH_OSD_OP_READ:
+ {
+ // read into a buffer
+ bufferlist bl;
+ r = osd->store->read(poid,
+ op->get_offset(), op->get_length(),
+ bl);
+ reply->set_data(bl);
+ if (r >= 0)
+ reply->set_length(r);
+ else
+ reply->set_length(0);
+ dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
+ }
+ osd->logger->inc("c_rd");
+ osd->logger->inc("c_rdb", op->get_length());
+ break;
+
+ case CEPH_OSD_OP_STAT:
+ {
+ struct stat st;
+ memset(&st, sizeof(st), 0);
+ r = osd->store->stat(poid, &st);
+ if (r >= 0)
+ reply->set_length(st.st_size);
+ }
+ break;
+
+ default:
+ assert(0);
+ }
+
+
+ done:
if (r >= 0) {
reply->set_result(0);
block_if_wrlocked(op))
return; // op will be handled later, after the object unlocks
+ // check inc_lock?
+ if (op->get_inc_lock() > 0) {
+ __u32 cur = 0;
+ osd->store->getattr(poid, "inc_lock", &cur, sizeof(cur));
+ if (cur > op->get_inc_lock()) {
+ dout(10) << " inc_lock " << cur << " > " << op->get_inc_lock()
+ << " on " << poid << dendl;
+ MOSDOpReply *reply = new MOSDOpReply(op, -EINCLOCKED, osd->osdmap->get_epoch(), true);
+ osd->messenger->send_message(reply, op->get_client_inst());
+ delete op;
+ return;
+ }
+ }
+
// balance-reads set?
char v;
if ((op->get_op() != CEPH_OSD_OP_BALANCEREADS && op->get_op() != CEPH_OSD_OP_UNBALANCEREADS) &&
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
ex.oid, ex.layout, osdmap->get_epoch(),
CEPH_OSD_OP_STAT, flags);
- if (inc_lock >= 0) {
+ if (inc_lock > 0) {
st->inc_lock = inc_lock;
m->set_inc_lock(inc_lock);
}
if (pg.active_tids.empty()) close_pg( m->get_pg() );
// success?
+ if (m->get_result() == -EINCLOCKED) {
+ dout(7) << " got -EINCLOCKED, resubmitting" << dendl;
+ stat_submit(st);
+ delete m;
+ return;
+ }
if (m->get_result() == -EAGAIN) {
dout(7) << " got -EAGAIN, resubmitting" << dendl;
stat_submit(st);
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
ex.oid, ex.layout, osdmap->get_epoch(),
CEPH_OSD_OP_READ, flags);
- if (inc_lock >= 0) {
+ if (inc_lock > 0) {
rd->inc_lock = inc_lock;
m->set_inc_lock(inc_lock);
}
rd->ops.erase(tid);
// success?
- if (m->get_result() == -EAGAIN) {
- dout(7) << " got -EAGAIN, resubmitting" << dendl;
+ if (m->get_result() == -EAGAIN ||
+ m->get_result() == -EINCLOCKED) {
+ dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl;
readx_submit(rd, rd->ops[tid], true);
delete m;
return;
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
ex.oid, ex.layout, osdmap->get_epoch(),
wr->op, flags);
- if (inc_lock >= 0) {
+ if (inc_lock > 0) {
wr->inc_lock = inc_lock;
m->set_inc_lock(inc_lock);
}
delete m;
return;
}
- if (m->get_result() == -EAGAIN) {
- dout(7) << " got -EAGAIN, resubmitting" << dendl;
+ if (m->get_result() == -EAGAIN ||
+ m->get_result() == -EINCLOCKED) {
+ dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl;
if (wr->onack) num_unacked--;
if (wr->oncommit) num_uncommitted--;
if (wr->waitfor_ack.count(tid))
public:
list<ObjectExtent> extents;
int inc_lock;
- OSDOp() : inc_lock(-1) {}
+ OSDOp() : inc_lock(0) {}
virtual ~OSDOp() {}
};
public:
Objecter(Messenger *m, MonMap *mm, OSDMap *om, Mutex& l) :
messenger(m), monmap(mm), osdmap(om),
- last_tid(0), client_inc(-1), inc_lock(-1),
+ last_tid(0), client_inc(-1), inc_lock(0),
num_unacked(0), num_uncommitted(0),
last_epoch_requested(0),
client_lock(l), timer(l)