size_t len,
bufferlist *bl,
int flags) {
- Objecter::OSDRead *rd = new Objecter::OSDRead(bl, flags);
+ Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
file_to_extents(inode, offset, len, rd->extents);
return rd;
}
Context *onack,
Context *oncommit,
objectrev_t rev=0) {
- Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl, flags);
+ Objecter::OSDWrite *wr = objecter->prepare_write(bl, flags);
file_to_extents(inode, offset, len, wr->extents, rev);
return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1;
}
int flags,
Context *onack,
Context *oncommit) {
- Objecter::OSDModify *z = new Objecter::OSDModify(CEPH_OSD_OP_ZERO, flags);
+ Objecter::OSDModify *z = objecter->prepare_modify(CEPH_OSD_OP_ZERO, flags);
file_to_extents(inode, offset, len, z->extents);
return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
}
int flags,
Context *onack,
Context *oncommit) {
- Objecter::OSDModify *z = new Objecter::OSDModify(CEPH_OSD_OP_DELETE, flags);
+ Objecter::OSDModify *z = objecter->prepare_modify(CEPH_OSD_OP_DELETE, flags);
file_to_extents(inode, offset, len, z->extents);
return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
}
bufferlist *bl,
int flags,
Context *onfinish) {
- Objecter::OSDRead *rd = new Objecter::OSDRead(bl, flags);
+ Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
filer.file_to_extents(inode, offset, len, rd->extents);
return readx(rd, inode.ino, onfinish);
}
off_t offset, size_t len,
bufferlist& bl, int flags,
objectrev_t rev=0) {
- Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl, flags);
+ Objecter::OSDWrite *wr = objecter->prepare_write(bl, flags);
filer.file_to_extents(inode, offset, len, wr->extents);
return writex(wr, inode.ino);
}
off_t offset, size_t len,
bufferlist *bl, int flags,
Mutex &lock) {
- Objecter::OSDRead *rd = new Objecter::OSDRead(bl, flags);
+ Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
filer.file_to_extents(inode, offset, len, rd->extents);
return atomic_sync_readx(rd, inode.ino, lock);
}
bufferlist& bl, int flags,
Mutex &lock,
objectrev_t rev=0) {
- Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl, flags);
+ Objecter::OSDWrite *wr = objecter->prepare_write(bl, flags);
filer.file_to_extents(inode, offset, len, wr->extents);
return atomic_sync_writex(wr, inode.ino, lock);
}
tid_t Objecter::stat(object_t oid, off_t *size, ceph_object_layout ol, int flags, Context *onfinish)
{
- OSDStat *st = new OSDStat(size, flags);
+ OSDStat *st = prepare_stat(size, flags);
st->extents.push_back(ObjectExtent(oid, 0, 0));
st->extents.front().layout = ol;
st->onfinish = onfinish;
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
ex.oid, ex.layout, osdmap->get_epoch(),
CEPH_OSD_OP_STAT, flags);
+ if (inc_lock >= 0) {
+ st->inc_lock = inc_lock;
+ m->set_inc_lock(inc_lock);
+ }
messenger->send_message(m, osdmap->get_inst(pg.acker()));
}
tid_t Objecter::read(object_t oid, off_t off, size_t len, ceph_object_layout ol, bufferlist *bl, int flags,
Context *onfinish)
{
- OSDRead *rd = new OSDRead(bl, flags);
+ OSDRead *rd = prepare_read(bl, flags);
rd->extents.push_back(ObjectExtent(oid, off, len));
rd->extents.front().layout = ol;
readx(rd, onfinish);
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
ex.oid, ex.layout, osdmap->get_epoch(),
CEPH_OSD_OP_READ, flags);
+ if (inc_lock >= 0) {
+ rd->inc_lock = inc_lock;
+ m->set_inc_lock(inc_lock);
+ }
m->set_length(ex.length);
m->set_offset(ex.start);
m->set_retry_attempt(retry);
tid_t Objecter::write(object_t oid, off_t off, size_t len, ceph_object_layout ol, bufferlist &bl, int flags,
Context *onack, Context *oncommit)
{
- OSDWrite *wr = new OSDWrite(bl, flags);
+ OSDWrite *wr = prepare_write(bl, flags);
wr->extents.push_back(ObjectExtent(oid, off, len));
wr->extents.front().layout = ol;
wr->extents.front().buffer_extents[0] = len;
tid_t Objecter::zero(object_t oid, off_t off, size_t len, ceph_object_layout ol, int flags,
Context *onack, Context *oncommit)
{
- OSDModify *z = new OSDModify(CEPH_OSD_OP_ZERO, flags);
+ OSDModify *z = prepare_modify(CEPH_OSD_OP_ZERO, flags);
z->extents.push_back(ObjectExtent(oid, off, len));
z->extents.front().layout = ol;
modifyx(z, onack, oncommit);
tid_t Objecter::lock(int op, object_t oid, int flags, ceph_object_layout ol,
Context *onack, Context *oncommit)
{
- OSDModify *l = new OSDModify(op, flags);
+ OSDModify *l = prepare_modify(op, flags);
l->extents.push_back(ObjectExtent(oid, 0, 0));
l->extents.front().layout = ol;
modifyx(l, onack, oncommit);
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
ex.oid, ex.layout, osdmap->get_epoch(),
wr->op, flags);
+ if (inc_lock >= 0) {
+ wr->inc_lock = inc_lock;
+ m->set_inc_lock(inc_lock);
+ }
m->set_length(ex.length);
m->set_offset(ex.start);
if (usetid > 0)
delete m;
return;
}
+ if (m->get_result() == -EAGAIN) {
+ dout(7) << " got -EAGAIN, resubmitting" << dendl;
+ if (wr->onack) num_unacked--;
+ if (wr->oncommit) num_uncommitted--;
+ if (wr->waitfor_ack.count(tid))
+ modifyx_submit(wr, wr->waitfor_ack[tid]);
+ else if (wr->waitfor_commit.count(tid))
+ modifyx_submit(wr, wr->waitfor_commit[tid]);
+ else assert(0);
+ delete m;
+ return;
+ }
assert(m->get_result() >= 0);
private:
tid_t last_tid;
int client_inc;
+ int inc_lock; // optional
int num_unacked;
int num_uncommitted;
class OSDOp {
public:
list<ObjectExtent> extents;
+ int inc_lock;
+ OSDOp() : inc_lock(-1) {}
virtual ~OSDOp() {}
};
}
};
+ OSDRead *prepare_read(bufferlist *b, int f) {
+ return new OSDRead(b, f);
+ }
+
class OSDStat : public OSDOp {
public:
tid_t tid;
OSDStat(off_t *s, int f) : tid(0), size(s), flags(f), onfinish(0) { }
};
+ OSDStat *prepare_stat(off_t *s, int f) {
+ return new OSDStat(s, f);
+ }
+
// generic modify
class OSDModify : public OSDOp {
public:
OSDModify(int o, int f) : op(o), flags(f), onack(0), oncommit(0) {}
};
+
+ OSDModify *prepare_modify(int o, int f) {
+ return new OSDModify(o, f);
+ }
// write (includes the bufferlist)
class OSDWrite : public OSDModify {
OSDWrite(bufferlist &b, int f) : OSDModify(CEPH_OSD_OP_WRITE, f), bl(b) {}
};
+ OSDWrite *prepare_write(bufferlist &b, int f) {
+ return new OSDWrite(b, f);
+ }
+
private:
public:
Objecter(Messenger *m, MonMap *mm, OSDMap *om, Mutex& l) :
messenger(m), monmap(mm), osdmap(om),
- last_tid(0), client_inc(-1),
+ last_tid(0), client_inc(-1), inc_lock(-1),
num_unacked(0), num_uncommitted(0),
last_epoch_requested(0),
client_lock(l), timer(l)
}
void dump_active();
- int get_client_incarnation() { return client_inc; }
- void set_client_incarnation(int inc) {
- client_inc = inc;
- }
+ int get_client_incarnation() const { return client_inc; }
+ void set_client_incarnation(int inc) { client_inc = inc; }
+
+ //int get_inc_lock() const { return inc_lock; }
+ void set_inc_lock(int l) { inc_lock = l; }
+
// med level
tid_t readx(OSDRead *read, Context *onfinish);