/* read */
CEPH_OSD_OP_READ = 1,
CEPH_OSD_OP_STAT = 2,
+ CEPH_OSD_OP_GETXATTR = 3,
+ CEPH_OSD_OP_GETXATTRS = 4,
/* modify */
CEPH_OSD_OP_WRNOOP = 10, /* write no-op (i.e. sync) */
CEPH_OSD_OP_TRUNCATE = 13,
CEPH_OSD_OP_ZERO = 14, /* zero extent */
CEPH_OSD_OP_WRITEFULL = 15, /* write complete object */
+ CEPH_OSD_OP_SETXATTR = 16,
+ CEPH_OSD_OP_SETXATTRS = 17,
+ CEPH_OSD_OP_RMXATTR = 18,
/* lock */
CEPH_OSD_OP_WRLOCK = 20,
CEPH_OSD_OP_BALANCE_READS = 16,
CEPH_OSD_OP_ACKNVRAM = 32, /* ACK when stable in NVRAM, not RAM */
CEPH_OSD_OP_ORDERSNAP = 64, /* EOLDSNAP if snapc is out of order */
+ CEPH_OSD_OP_PEERSTAT = 128, /* msg includes osd_peer_stat */
};
#define EOLDSNAPC 44 /* ORDERSNAP flag set and writer has old snap context*/
-struct ceph_osd_peer_stat {
- struct ceph_timespec stamp;
- float oprate;
- float qlen;
- float recent_qlen;
- float read_latency;
- float read_latency_mine;
- float frac_rd_ops_shed_in;
- float frac_rd_ops_shed_out;
+struct ceph_osd_op {
+ __le16 op;
+ union {
+ struct {
+ __le64 offset, length;
+ };
+ struct {
+ __le32 name_len;
+ __le32 value_len;
+ };
+ };
} __attribute__ ((packed));
struct ceph_osd_request_head {
ceph_tid_t tid;
__le32 client_inc;
- __le32 op;
- __le64 offset, length;
struct ceph_object oid;
struct ceph_object_layout layout;
ceph_epoch_t osdmap_epoch;
struct ceph_eversion reassert_version;
- /* semi-hack, fix me */
- __le32 shed_count;
- struct ceph_osd_peer_stat peer_stat;
-
/* writer's snap context */
__le64 snap_seq;
__le32 num_snaps;
- __le64 snaps[];
+
+ /* read or mutation */
+ __le16 num_ops;
+ __u8 is_modify;
+ __u8 object_type;
+ struct ceph_osd_op ops[]; /* followed by snaps */
} __attribute__ ((packed));
struct ceph_osd_reply_head {
ceph_tid_t tid;
- __le32 op;
__le32 flags;
struct ceph_object oid;
struct ceph_object_layout layout;
ceph_epoch_t osdmap_epoch;
- __le32 result;
- __le64 offset, length;
struct ceph_eversion reassert_version;
+
+ __le32 result;
+
+ __le16 num_ops;
+ __le16 is_modify;
+ struct ceph_osd_op ops[0];
} __attribute__ ((packed));
#endif
WRITE_RAW_ENCODER(ceph_inopath_item)
WRITE_RAW_ENCODER(ceph_osd_request_head)
WRITE_RAW_ENCODER(ceph_osd_reply_head)
+WRITE_RAW_ENCODER(ceph_osd_op)
WRITE_RAW_ENCODER(ceph_mon_statfs)
WRITE_RAW_ENCODER(ceph_mon_statfs_reply)
return out << hex << f.major << '.' << f.minor << dec;
}
-
-
+inline ostream& operator<<(ostream& out, const ceph_osd_op& op) {
+ out << ceph_osd_op_name(op.op);
+ out << " " << op.offset << "~" << op.length;
+ return out;
+}
#endif
{
struct inode *inode = req->r_inode;
struct ceph_osd_reply_head *replyhead;
+ struct ceph_osd_op *op;
struct ceph_inode_info *ci = ceph_inode(inode);
unsigned wrote;
loff_t offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
/* parse reply */
if (req->r_reply) {
replyhead = req->r_reply->front.iov_base;
+ op = (void *)(replyhead + 1);
rc = le32_to_cpu(replyhead->result);
- bytes = le64_to_cpu(replyhead->length);
+ bytes = le64_to_cpu(op->length);
}
if (rc >= 0) {
struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+ struct ceph_osd_op *op = (void *)(reqhead + 1);
u64 orig_len = *plen;
u64 objoff, objlen; /* extent in object */
if (*plen < orig_len)
dout(10, " skipping last %llu, final file extent %llu~%llu\n",
orig_len - *plen, off, *plen);
- reqhead->offset = cpu_to_le64(objoff);
- reqhead->length = cpu_to_le64(objlen);
+ op->offset = cpu_to_le64(objoff);
+ op->length = cpu_to_le64(objlen);
req->r_num_pages = calc_pages_for(off, *plen);
/* pgid? */
/*
* build osd request message only.
*/
-static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
+static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int opc,
struct ceph_snap_context *snapc)
{
struct ceph_msg *req;
struct ceph_osd_request_head *head;
- size_t size = sizeof(*head);
+ struct ceph_osd_op *op;
+ __le64 *snaps;
+ size_t size = sizeof(*head) + sizeof(*op);
+ int i;
if (snapc)
size += sizeof(u64) * snapc->num_snaps;
return req;
memset(req->front.iov_base, 0, req->front.iov_len);
head = req->front.iov_base;
+ op = (void *)(head + 1);
+ snaps = (void *)(op + 1);
/* encode head */
- head->op = cpu_to_le32(op);
head->client_inc = cpu_to_le32(1); /* always, for now. */
head->flags = 0;
+ head->num_ops = cpu_to_le16(1);
+ op->op = cpu_to_le32(opc);
if (snapc) {
head->snap_seq = cpu_to_le64(snapc->seq);
head->num_snaps = cpu_to_le32(snapc->num_snaps);
- memcpy(req->front.iov_base + sizeof(*head), snapc->snaps,
- snapc->num_snaps*sizeof(u64));
+ for (i = 0; i < snapc->num_snaps; i++)
+ snaps[i] = cpu_to_le64(snapc->snaps[i]);
}
return req;
}
{
struct ceph_osd_request *req;
struct ceph_osd_request_head *reqhead;
+ struct ceph_osd_op *op;
struct page *page;
pgoff_t next_index;
int contig_pages;
len);
req->r_num_pages = contig_pages;
reqhead = req->r_request->front.iov_base;
- reqhead->length = cpu_to_le64(len);
+ op = (void *)(reqhead + 1);
+ op->length = cpu_to_le64(len);
dout(10, "readpages final extent is %llu~%llu -> %d pages\n",
off, len, req->r_num_pages);
rc = do_sync_request(osdc, req);
{
struct ceph_msg *reqm;
struct ceph_osd_request_head *reqhead;
+ struct ceph_osd_op *op;
struct ceph_osd_request *req;
int rc = 0;
reqhead->flags = cpu_to_le32(CEPH_OSD_OP_ACK);
else
reqhead->flags = cpu_to_le32(CEPH_OSD_OP_SAFE);
+ op = (void *)(reqhead + 1);
- len = le64_to_cpu(reqhead->length);
+ len = le64_to_cpu(op->length);
dout(10, "writepages %llu~%llu -> %d pages\n", off, len,
req->r_num_pages);
{
struct ceph_msg *reqm = req->r_request;
struct ceph_osd_request_head *reqhead = reqm->front.iov_base;
- u64 off = le64_to_cpu(reqhead->offset);
+ struct ceph_osd_op *op = (void *)(reqhead + 1);
+ u64 off = le64_to_cpu(op->offset);
int rc;
dout(10, "writepages_start %llu~%llu, %d pages\n", off, len, num_pages);
reqhead->flags = cpu_to_le32(CEPH_OSD_OP_ACK);
else
reqhead->flags = cpu_to_le32(CEPH_OSD_OP_SAFE);
- reqhead->length = cpu_to_le64(len);
+ op->length = cpu_to_le64(len);
/* reference pages in message */
reqm->pages = req->r_pages;
class MOSDOp : public Message {
private:
ceph_osd_request_head head;
+public:
+ vector<ceph_osd_op> ops;
vector<snapid_t> snaps;
+ osd_peer_stat_t peer_stat;
friend class MOSDOpReply;
-public:
snapid_t get_snap_seq() { return snapid_t(head.snap_seq); }
vector<snapid_t> &get_snaps() { return snaps; }
void set_snap_seq(snapid_t s) { head.snap_seq = s; }
eversion_t get_version() { return head.reassert_version; }
- const int get_op() { return head.op; }
- void set_op(int o) { head.op = o; }
- bool is_read() { return ceph_osd_op_is_read(get_op()); }
-
- loff_t get_length() const { return head.length; }
- loff_t get_offset() const { return head.offset; }
+ bool is_modify() { return head.is_modify; }
unsigned get_inc_lock() const { return head.inc_lock; }
- void set_peer_stat(const osd_peer_stat_t& stat) { head.peer_stat = stat; }
- const ceph_osd_peer_stat& get_peer_stat() { return head.peer_stat; }
+ void set_peer_stat(const osd_peer_stat_t& stat) {
+ peer_stat = stat;
+ head.flags = (head.flags | CEPH_OSD_OP_PEERSTAT);
+ }
+ const osd_peer_stat_t& get_peer_stat() {
+ assert(head.flags & CEPH_OSD_OP_PEERSTAT);
+ return peer_stat;
+ }
- void inc_shed_count() { head.shed_count = get_shed_count() + 1; }
- int get_shed_count() { return head.shed_count; }
-
+ //void inc_shed_count() { head.shed_count = get_shed_count() + 1; }
+ //int get_shed_count() { return head.shed_count; }
+
- MOSDOp(int inc, long tid,
- object_t oid, ceph_object_layout ol, epoch_t mapepoch, int op,
+ MOSDOp(int inc, long tid, bool modify,
+ object_t oid, ceph_object_layout ol, epoch_t mapepoch,
int flags) :
Message(CEPH_MSG_OSD_OP) {
memset(&head, 0, sizeof(head));
head.tid = tid;
head.client_inc = inc;
+ head.is_modify = modify;
head.oid = oid;
head.layout = ol;
head.osdmap_epoch = mapepoch;
- head.op = op;
head.flags = flags;
}
MOSDOp() {}
- void set_inc_lock(__u32 l) {
- head.inc_lock = l;
+ void set_inc_lock(__u32 l) { head.inc_lock = l; }
+ void set_layout(const ceph_object_layout& l) { head.layout = l; }
+ void set_version(eversion_t v) { head.reassert_version = v; }
+
+ // ops
+ void add_simple_op(int o, __u64 off, __u64 len) {
+ ceph_osd_op op;
+ op.op = o;
+ op.offset = off;
+ op.length = len;
+ ops.push_back(op);
+ }
+ void write(__u64 off, __u64 len, bufferlist& bl) {
+ add_simple_op(CEPH_OSD_OP_WRITE, off, len);
+ data.claim(bl);
+ header.data_off = off;
+ }
+ void writefull(bufferlist& bl) {
+ add_simple_op(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
+ data.claim(bl);
+ header.data_off = 0;
+ }
+ void zero(__u64 off, __u64 len) {
+ add_simple_op(CEPH_OSD_OP_ZERO, off, len);
+ }
+ void truncate(__u64 off) {
+ add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0);
+ }
+ void remove() {
+ add_simple_op(CEPH_OSD_OP_DELETE, 0, 0);
}
- void set_layout(const ceph_object_layout& l) { head.layout = l; }
+ void read(__u64 off, __u64 len) {
+ add_simple_op(CEPH_OSD_OP_READ, off, len);
+ }
+ void stat() {
+ add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
+ }
- void set_length(loff_t l) { head.length = l; }
- void set_offset(loff_t o) { head.offset = o; }
- void set_version(eversion_t v) { head.reassert_version = v; }
-
+ // flags
int get_flags() const { return head.flags; }
bool wants_ack() const { return get_flags() & CEPH_OSD_OP_ACK; }
bool wants_commit() const { return get_flags() & CEPH_OSD_OP_SAFE; }
// marshalling
virtual void encode_payload() {
head.num_snaps = snaps.size();
+ head.num_ops = ops.size();
::encode(head, payload);
- for (unsigned i=0; i<snaps.size(); i++)
- ::encode(snaps[i], payload);
- header.data_off = get_offset();
+ ::encode_nohead(ops, payload);
+ ::encode_nohead(snaps, payload);
+ if (head.flags & CEPH_OSD_OP_PEERSTAT)
+ ::encode(peer_stat, payload);
}
virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(head, p);
- snaps.resize(head.num_snaps);
- for (unsigned i=0; i<snaps.size(); i++)
- ::decode(snaps[i], p);
+ decode_nohead(head.num_ops, ops, p);
+ decode_nohead(head.num_snaps, snaps, p);
+ if (head.flags & CEPH_OSD_OP_PEERSTAT)
+ ::decode(peer_stat, p);
}
const char *get_type_name() { return "osd_op"; }
void print(ostream& out) {
- out << "osd_op(" << get_reqid()
- << " " << ceph_osd_op_name(get_op())
- << " " << head.oid;
- if (get_length()) out << " " << get_offset() << "~" << get_length();
+ out << "osd_op(" << get_reqid();
+ out << " " << head.oid << " " << ops;
out << " " << pg_t(head.layout.ol_pgid);
if (is_retry_attempt()) out << " RETRY";
if (!snaps.empty())
class MOSDOpReply : public Message {
ceph_osd_reply_head head;
-
public:
+ vector<ceph_osd_op> ops;
+
long get_tid() { return head.tid; }
object_t get_oid() { return head.oid; }
pg_t get_pg() { return pg_t(head.layout.ol_pgid); }
- int get_op() { return head.op; }
int get_flags() { return head.flags; }
bool is_safe() { return get_flags() & CEPH_OSD_OP_SAFE; }
__s32 get_result() { return head.result; }
- __u64 get_length() { return head.length; }
- __u64 get_offset() { return head.offset; }
eversion_t get_version() { return head.reassert_version; }
+ bool is_modify() { return head.is_modify; }
+
void set_result(int r) { head.result = r; }
- void set_length(loff_t s) { head.length = s; }
- void set_offset(loff_t o) { head.offset = o; }
void set_version(eversion_t v) { head.reassert_version = v; }
- void set_op(int op) { head.op = op; }
-
// osdmap
epoch_t get_map_epoch() { return head.osdmap_epoch; }
Message(CEPH_MSG_OSD_OPREPLY) {
memset(&head, 0, sizeof(head));
head.tid = req->head.tid;
- head.op = req->head.op;
+ head.is_modify = req->is_modify();
+ ops = req->ops;
+ head.result = result;
head.flags = commit ? CEPH_OSD_OP_SAFE:0;
head.oid = req->head.oid;
head.layout = req->head.layout;
head.osdmap_epoch = e;
- head.result = result;
- head.offset = req->head.offset;
- head.length = req->head.length; // speculative... OSD should ensure these are correct
head.reassert_version = req->head.reassert_version;
}
MOSDOpReply() {}
virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(head, p);
+ ::decode_nohead(head.num_ops, ops, p);
}
virtual void encode_payload() {
+ head.num_ops = ops.size();
::encode(head, payload);
- header.data_off = get_offset();
+ ::encode_nohead(ops, payload);
}
const char *get_type_name() { return "osd_op_reply"; }
void print(ostream& out) {
out << "osd_op_reply(" << get_tid()
- << " " << ceph_osd_op_name(get_op())
- << " " << head.oid;
- if (get_length()) out << " " << get_offset() << "~" << get_length();
- if (get_op() >= 10) {
+ << " " << head.oid << " " << ops;
+ if (is_modify()) {
if (is_safe())
out << " commit";
else
// subop
pg_t pgid;
pobject_t poid;
- int32_t op;
- loff_t offset, length;
+ bool is_modify, wants_reply;
+ vector<ceph_osd_op> ops;
+
// subop metadata
tid_t rep_tid;
eversion_t version, old_version;
::decode(reqid, p);
::decode(pgid, p);
::decode(poid, p);
- ::decode(op, p);
- ::decode(offset, p);
- ::decode(length, p);
+ ::decode(ops, p);
::decode(rep_tid, p);
::decode(version, p);
::decode(old_version, p);
::encode(reqid, payload);
::encode(pgid, payload);
::encode(poid, payload);
- ::encode(op, payload);
- ::encode(offset, payload);
- ::encode(length, payload);
+ ::encode(ops, payload);
::encode(rep_tid, payload);
::encode(version, payload);
::encode(old_version, payload);
::encode(attrset, payload);
::encode(data_subset, payload);
::encode(clone_subsets, payload);
- header.data_off = offset;
+ if (ops.size())
+ header.data_off = ops[0].offset;
+ else
+ header.data_off = 0;
}
- bool wants_reply() {
- if (op < 100) return true;
- return false; // no reply needed for primary-lock, -unlock.
- }
- bool is_read() { return op < 10; }
-
- MOSDSubOp(osd_reqid_t r, pg_t p, pobject_t po, int o, loff_t of, loff_t le,
+ MOSDSubOp(osd_reqid_t r, pg_t p, pobject_t po, vector<ceph_osd_op>& o, bool wr,
epoch_t mape, tid_t rtid, unsigned il, eversion_t v) :
Message(MSG_OSD_SUBOP),
map_epoch(mape),
reqid(r),
pgid(p),
poid(po),
- op(o),
- offset(of),
- length(le),
+ wants_reply(wr),
+ ops(o),
rep_tid(rtid),
version(v),
inc_lock(il)
const char *get_type_name() { return "osd_sub_op"; }
void print(ostream& out) {
out << "osd_sub_op(" << reqid
- << " " << ceph_osd_op_name(op)
<< " " << poid
+ << " " << ops
<< " v " << version
<< " snapset=" << snapset << " snapc=" << snapc;
- if (length) out << " " << offset << "~" << length;
if (!data_subset.empty()) out << " subset " << data_subset;
out << ")";
}
osd_reqid_t reqid;
pg_t pgid;
tid_t rep_tid;
- int32_t op;
pobject_t poid;
- off_t length, offset;
-
+
+public:
+ vector<ceph_osd_op> ops;
+
// result
bool commit;
int32_t result;
map<string,bufferptr> attrset;
- public:
virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(map_epoch, p);
::decode(reqid, p);
::decode(pgid, p);
::decode(rep_tid, p);
- ::decode(op, p);
::decode(poid, p);
- ::decode(length, p);
- ::decode(offset, p);
+ ::decode(ops, p);
::decode(commit, p);
::decode(result, p);
::decode(pg_complete_thru, p);
::encode(reqid, payload);
::encode(pgid, payload);
::encode(rep_tid, payload);
- ::encode(op, payload);
::encode(poid, payload);
- ::encode(length, payload);
- ::encode(offset, payload);
+ ::encode(ops, payload);
::encode(commit, payload);
::encode(result, payload);
::encode(pg_complete_thru, payload);
::encode(peer_stat, payload);
::encode(attrset, payload);
- header.data_off = offset;
}
epoch_t get_map_epoch() { return map_epoch; }
pg_t get_pg() { return pgid; }
tid_t get_rep_tid() { return rep_tid; }
- int get_op() { return op; }
pobject_t get_poid() { return poid; }
- const off_t get_length() { return length; }
- const off_t get_offset() { return offset; }
bool get_commit() { return commit; }
int get_result() { return result; }
reqid(req->reqid),
pgid(req->pgid),
rep_tid(req->rep_tid),
- op(req->op),
poid(req->poid),
- length(req->length),
- offset(req->offset),
+ ops(req->ops),
commit(commit_),
result(result_) {
memset(&peer_stat, 0, sizeof(peer_stat));
void print(ostream& out) {
out << "osd_sub_op_reply(" << reqid
- << " " << ceph_osd_op_name(op)
- << " " << poid;
- if (length) out << " " << offset << "~" << length;
- if (op >= 10) {
- if (commit)
- out << " commit";
- else
- out << " ack";
- }
+ << " " << poid << " " << ops;
+ if (commit)
+ out << " commit";
+ else
+ out << " ack";
out << " = " << result;
out << ")";
}
stat_oprate.hit(now);
stat_ops++;
stat_qlen += pending_ops;
- if (op->get_op() == CEPH_OSD_OP_READ) {
+ if (!op->is_modify()) {
stat_rd_ops++;
if (op->get_source().is_osd()) {
//derr(-10) << "shed in " << stat_rd_ops_shed_in << " / " << stat_rd_ops << dendl;
}
// pg must be same-ish...
- if (op->is_read()) {
+ if (!op->is_modify()) {
// read
if (!pg->same_for_read_since(op->get_map_epoch())) {
dout(7) << "handle_rep_op pg changed " << pg->info.history
return;
}
- if (op->get_op() == CEPH_OSD_OP_READ) {
+ if (!op->is_modify()) {
Mutex::Locker lock(peer_stat_lock);
stat_rd_ops_in_queue++;
}
}
-void PG::append_log(ObjectStore::Transaction &t, const PG::Log::Entry &logentry,
- eversion_t trim_to)
+void PG::append_log(ObjectStore::Transaction &t, bufferlist& bl,
+ eversion_t logversion, eversion_t trim_to)
{
- dout(10) << "append_log " << ondisklog.top << " " << logentry << dendl;
+ dout(10) << "append_log " << ondisklog.top << dendl;
- // write entry on disk
- bufferlist bl;
- ::encode(logentry, bl);
- /*
if (g_conf.osd_pad_pg_log) { // pad to 4k, until i fix ebofs reallocation crap. FIXME.
- bufferptr bp(4096 - sizeof(logentry));
+ bufferptr bp(4096 - bl.length());
bl.push_back(bp);
}
- */
t.write(0, info.pgid.to_pobject(), ondisklog.top, bl.length(), bl );
// update block map?
if (ondisklog.top % 4096 == 0)
- ondisklog.block_map[ondisklog.top] = logentry.version;
+ ondisklog.block_map[ondisklog.top] = logversion;
ondisklog.top += bl.length();
- t.collection_setattr(info.pgid.to_coll(), "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
+ t.collection_setattr(info.pgid.to_coll(), "ondisklog_top",
+ &ondisklog.top, sizeof(ondisklog.top));
// trim?
if (trim_to > log.bottom &&
// pg on-disk state
void write_info(ObjectStore::Transaction& t);
void write_log(ObjectStore::Transaction& t);
- void append_log(ObjectStore::Transaction &t,
- const PG::Log::Entry &logentry,
- eversion_t trim_to);
+ void append_log(ObjectStore::Transaction &t, bufferlist& bl,
+ eversion_t log_version, eversion_t trim_to);
void read_log(ObjectStore *store);
void trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v);
out << "pg[" << pg.info
<< " r=" << pg.get_role();
- if (pg.log.bottom != pg.info.log_bottom)
+ if (pg.log.bottom != pg.info.log_bottom ||
+ pg.log.top != pg.info.last_update)
out << " (info mismatch, " << pg.log << ")";
if (pg.log.log.empty()) {
void RAID4PG::do_op(MOSDOp *op)
{
+ /*
// a write will do something like
object_t oid = op->get_oid(); // logical object
if (rank == n) rank = 0;
}
-
+ */
}
bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
{
// we only care about reads here on out..
- if (!op->is_read())
+ if (op->is_modify() ||
+ op->ops.size() < 1 ||
+ op->ops[0].op != CEPH_OSD_OP_READ)
return false;
+ ceph_osd_op& readop = op->ops[0];
object_t oid = op->get_oid();
pobject_t poid(info.pgid.pool(), 0, oid);
ceph_object_layout layout;
layout.ol_pgid = info.pgid.u.pg64;
layout.ol_stripe_unit = 0;
- MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
+ MOSDOp *pop = new MOSDOp(0, osd->get_tid(), true,
oid,
layout,
osd->osdmap->get_epoch(),
- CEPH_OSD_OP_BALANCEREADS, 0);
+ 0);
+ pop->add_simple_op(CEPH_OSD_OP_BALANCEREADS, 0, 0);
do_op(pop);
}
if (is_balanced && !should_balance &&
ceph_object_layout layout;
layout.ol_pgid = info.pgid.u.pg64;
layout.ol_stripe_unit = 0;
- MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
+ MOSDOp *pop = new MOSDOp(0, osd->get_tid(), true,
oid,
layout,
osd->osdmap->get_epoch(),
- CEPH_OSD_OP_UNBALANCEREADS, 0);
+ 0);
+ pop->add_simple_op(CEPH_OSD_OP_UNBALANCEREADS, 0, 0);
do_op(pop);
}
}
// if this is a read and the data is in the cache, do an immediate read..
if ( g_conf.osd_immediate_read_from_cache ) {
if (osd->store->is_cached(info.pgid.to_coll(), poid,
- op->get_offset(),
- op->get_length()) == 0) {
+ readop.offset,
+ readop.length) == 0) {
if (!is_primary() && !op->get_source().is_osd()) {
// am i allowed?
bool v;
osd->logger->inc("op");
- if (ceph_osd_op_is_read(op->get_op()))
- op_read(op);
- else
+ if (op->is_modify())
op_modify(op);
+ else
+ op_read(op);
}
osd->logger->inc("subop");
- switch (op->op) {
+ assert(op->ops.size() >= 1);
+ ceph_osd_op& first = op->ops[0];
+
+ switch (first.op) {
// rep stuff
case CEPH_OSD_OP_PULL:
sub_op_pull(op);
break;
default:
- if (ceph_osd_op_is_modify(op->op) ||
- ceph_osd_op_is_lock(op->op))
- sub_op_modify(op);
- else
- assert(0);
+ sub_op_modify(op);
}
-
}
void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r)
{
- if (r->get_op() == CEPH_OSD_OP_PUSH) {
+ assert(r->ops.size() >= 1);
+ ceph_osd_op& first = r->ops[0];
+ if (first.op == CEPH_OSD_OP_PUSH) {
// continue peer recovery
sub_op_push_reply(r);
} else {
object_t oid = op->get_oid();
pobject_t poid(info.pgid.pool(), 0, oid);
- dout(10) << "op_read " << ceph_osd_op_name(op->get_op())
+ ceph_osd_op& readop = op->ops[0];
+
+ dout(10) << "op_read " << ceph_osd_op_name(readop.op)
<< " " << oid
- << " " << op->get_offset() << "~" << op->get_length()
+ << " " << readop.offset << "~" << readop.length
<< dendl;
// wrlocked?
if (block_if_wrlocked(op))
return;
+ // taking read shedding out for now, because i don't want to embed
+ // the osd_peer_stat in MOSDOp
+
// !primary and unbalanced?
// (ignore ops forwarded from the primary)
if (!is_primary()) {
op->get_source().num() == get_primary()) {
// read was shed to me by the primary
int from = op->get_source().num();
+ assert(op->get_flags() & CEPH_OSD_OP_PEERSTAT);
osd->take_peer_stat(from, op->get_peer_stat());
dout(10) << "read shed IN from " << op->get_source()
<< " " << op->get_reqid()
}
}
}
-
// set up reply
MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true);
}
}
- switch (op->get_op()) {
+ switch (readop.op) {
case CEPH_OSD_OP_READ:
{
// read into a buffer
bufferlist bl;
r = osd->store->read(info.pgid.to_coll(), poid,
- op->get_offset(), op->get_length(),
+ readop.offset, readop.length,
bl);
reply->set_data(bl);
+ reply->get_header().data_off = readop.offset;
if (r >= 0)
- reply->set_length(r);
+ reply->ops[0].length = r;
else
- reply->set_length(0);
- dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
+ reply->ops[0].length = 0;
+ dout(10) << " read got " << r << " / " << readop.length << " bytes from obj " << oid << dendl;
}
osd->logger->inc("c_rd");
- osd->logger->inc("c_rdb", op->get_length());
+ osd->logger->inc("c_rdb", reply->ops[0].length);
break;
case CEPH_OSD_OP_STAT:
memset(&st, sizeof(st), 0);
r = osd->store->stat(info.pgid.to_coll(), poid, &st);
if (r >= 0)
- reply->set_length(st.st_size);
+ reply->ops[0].length = st.st_size;
}
break;
t.setattr(info.pgid.to_coll(), coid, "from_version", &ov, sizeof(v));
}
-void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
- pobject_t poid, int op,
- eversion_t old_version, eversion_t at_version,
- off_t offset, off_t length, bufferlist& bl,
- SnapSet& snapset, SnapContext& snapc,
- __u32 inc_lock, eversion_t trim_to)
+void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid,
+ pobject_t poid, loff_t old_size,
+ eversion_t old_version, eversion_t at_version,
+ SnapSet& snapset, SnapContext& snapc)
{
- // WRNOOP does nothing.
- if (op == CEPH_OSD_OP_WRNOOP)
- return;
-
- // munge zero into remove?
- if (op == CEPH_OSD_OP_ZERO) {
- struct stat st;
- int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
- if (r >= 0) {
- if (offset == 0 && offset + length >= (loff_t)st.st_size) {
- dout(10) << " munging ZERO " << offset << "~" << length
- << " -> DELETE (size is " << st.st_size << ")" << dendl;
- op = CEPH_OSD_OP_DELETE;
- }
- }
- }
-
// clone?
- if (poid.oid.snap) {
- assert(poid.oid.snap == CEPH_NOSNAP);
- dout(20) << "snapset=" << snapset << " snapc=" << snapc << dendl;;
-
- // use newer snapc?
- if (snapset.seq > snapc.seq) {
- snapc.seq = snapset.seq;
- snapc.snaps = snapset.snaps;
- dout(10) << " using newer snapc " << snapc << dendl;
+ assert(poid.oid.snap == CEPH_NOSNAP);
+ dout(20) << "snapset=" << snapset << " snapc=" << snapc << dendl;;
+
+ // use newer snapc?
+ if (snapset.seq > snapc.seq) {
+ snapc.seq = snapset.seq;
+ snapc.snaps = snapset.snaps;
+ dout(10) << " using newer snapc " << snapc << dendl;
+ }
+
+ if (snapset.head_exists && // head exists
+ snapc.snaps.size() && // there are snaps
+ snapc.snaps[0] > snapset.seq) { // existing object is old
+ // clone
+ pobject_t coid = poid;
+ coid.oid.snap = snapc.seq;
+
+ unsigned l;
+ for (l=1; l<snapc.snaps.size() && snapc.snaps[l] > snapset.seq; l++) ;
+
+ vector<snapid_t> snaps(l);
+ for (unsigned i=0; i<l; i++)
+ snaps[i] = snapc.snaps[i];
+
+ bufferlist snapsbl;
+ ::encode(snaps, snapsbl);
+
+ // log clone
+ dout(10) << "cloning to " << coid << " v " << at_version << " snaps=" << snaps << dendl;
+ Log::Entry cloneentry(PG::Log::Entry::CLONE, coid.oid, at_version, old_version, reqid);
+ cloneentry.snaps = snapsbl;
+ dout(10) << "prepare_transaction " << cloneentry << dendl;
+ log.add(cloneentry);
+ ::encode(cloneentry, logbl);
+ assert(log.top == at_version);
+
+ // prepare clone
+ _make_clone(t, poid, coid, old_version, at_version, snapsbl);
+
+ // add to snap bound collections
+ coll_t fc = make_snap_collection(t, snaps[0]);
+ t.collection_add(fc, info.pgid.to_coll(), coid);
+ if (snaps.size() > 1) {
+ coll_t lc = make_snap_collection(t, snaps[snaps.size()-1]);
+ t.collection_add(lc, info.pgid.to_coll(), coid);
}
+
+ snapset.clones.push_back(coid.oid.snap);
+ snapset.clone_size[coid.oid.snap] = old_size;
+ snapset.clone_overlap[coid.oid.snap].insert(0, old_size);
+
+ at_version.version++;
+ }
+
+ // update snapset with latest snap context
+ snapset.seq = snapc.seq;
+ snapset.snaps = snapc.snaps;
+}
- if (snapset.head_exists && // head exists
- snapc.snaps.size() && // there are snaps
- snapc.snaps[0] > snapset.seq && // existing object is old
- ceph_osd_op_is_modify(op)) { // is a (non-lock) modification
- // clone
- pobject_t coid = poid;
- coid.oid.snap = snapc.seq;
-
- struct stat st;
- osd->store->stat(info.pgid.to_coll(), poid, &st);
-
- unsigned l;
- for (l=1; l<snapc.snaps.size() && snapc.snaps[l] > snapset.seq; l++) ;
-
- vector<snapid_t> snaps(l);
- for (unsigned i=0; i<l; i++)
- snaps[i] = snapc.snaps[i];
-
- bufferlist snapsbl;
- ::encode(snaps, snapsbl);
-
- // log clone
- dout(10) << "cloning to " << coid << " v " << at_version << " snaps=" << snaps << dendl;
- Log::Entry cloneentry(PG::Log::Entry::CLONE, coid.oid, at_version, old_version, reqid);
- cloneentry.snaps = snapsbl;
- dout(10) << "prepare_transaction " << cloneentry << dendl;
- log.add(cloneentry);
- assert(log.top == at_version);
-
- // prepare clone
- _make_clone(t, poid, coid, old_version, at_version, snapsbl);
-
- // add to snap bound collections
- coll_t fc = make_snap_collection(t, snaps[0]);
- t.collection_add(fc, info.pgid.to_coll(), coid);
- if (snaps.size() > 1) {
- coll_t lc = make_snap_collection(t, snaps[snaps.size()-1]);
- t.collection_add(lc, info.pgid.to_coll(), coid);
- }
-
- snapset.clones.push_back(coid.oid.snap);
- snapset.clone_size[coid.oid.snap] = st.st_size;
- snapset.clone_overlap[coid.oid.snap].insert(0, st.st_size);
-
- at_version.version++;
- }
- // update snapset with latest snap context
- snapset.seq = snapc.seq;
- snapset.snaps = snapc.snaps;
-
- // munge delete into a truncate?
- if (op == CEPH_OSD_OP_DELETE &&
- snapset.clones.size()) {
- dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << snapset.clones << dendl;
- op = CEPH_OSD_OP_TRUNCATE;
- length = 0;
- snapset.head_exists = false;
+// low level object operations
+int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid,
+ pobject_t poid, __u64& old_size,
+ ceph_osd_op& op, bufferlist::iterator& bp,
+ SnapSet& snapset, SnapContext& snapc)
+{
+ int eop = op.op;
+
+ // munge ZERO -> DELETE or TRUNCATE?
+ if (eop == CEPH_OSD_OP_ZERO &&
+ snapset.head_exists &&
+ op.offset + op.length >= old_size) {
+ if (op.offset == 0) {
+ dout(10) << " munging ZERO " << op.offset << "~" << op.length
+ << " -> DELETE (size is " << old_size << ")" << dendl;
+ eop = CEPH_OSD_OP_DELETE;
+ } else {
+ dout(10) << " munging ZERO " << op.offset << "~" << op.length
+ << " -> TRUNCATE (size is " << old_size << ")" << dendl;
+ eop = CEPH_OSD_OP_TRUNCATE;
}
}
+ // munge DELETE -> TRUNCATE?
+ if (eop == CEPH_OSD_OP_DELETE &&
+ snapset.clones.size()) {
+ dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << snapset.clones << dendl;
+ eop = CEPH_OSD_OP_TRUNCATE;
+ op.length = 0;
+ snapset.head_exists = false;
+ }
-
- // log op
- int opcode = Log::Entry::MODIFY;
- if (op == CEPH_OSD_OP_DELETE) opcode = Log::Entry::DELETE;
- Log::Entry logentry(opcode, poid.oid, at_version, old_version, reqid);
- dout(10) << "prepare_transaction " << logentry << dendl;
-
- assert(at_version > log.top);
- log.add(logentry);
- assert(log.top == at_version);
-
- // prepare op
- switch (op) {
+ switch (eop) {
// -- locking --
-
case CEPH_OSD_OP_WRLOCK:
{ // lock object
t.setattr(info.pgid.to_coll(), poid, "wrlock", &reqid.name, sizeof(entity_name_t));
case CEPH_OSD_OP_WRITE:
{ // write
- assert(bl.length() == length);
bufferlist nbl;
- nbl.claim(bl); // give buffers to store; we keep *op in memory for a long time!
- t.write(info.pgid.to_coll(), poid, offset, length, nbl);
+ bp.copy(op.length, nbl);
+ t.write(info.pgid.to_coll(), poid, op.offset, op.length, nbl);
snapset.head_exists = true;
if (snapset.clones.size()) {
snapid_t newest = *snapset.clones.rbegin();
interval_set<__u64> ch;
- ch.insert(offset, length);
+ ch.insert(op.offset, op.length);
ch.intersection_of(snapset.clone_overlap[newest]);
snapset.clone_overlap[newest].subtract(ch);
}
+ old_size = MAX(old_size, op.offset + op.length);
}
break;
case CEPH_OSD_OP_WRITEFULL:
{ // write full object
- assert(bl.length() == length);
bufferlist nbl;
- nbl.claim(bl); // give buffers to store; we keep *op in memory for a long time!
+ bp.copy(op.length, nbl);
t.truncate(info.pgid.to_coll(), poid, 0);
- t.write(info.pgid.to_coll(), poid, offset, length, nbl);
+ t.write(info.pgid.to_coll(), poid, op.offset, op.length, nbl);
snapset.head_exists = true;
if (snapset.clones.size()) {
snapid_t newest = *snapset.clones.rbegin();
snapset.clone_overlap.erase(newest);
}
+ old_size = op.length;
}
break;
case CEPH_OSD_OP_ZERO:
{ // zero
- t.zero(info.pgid.to_coll(), poid, offset, length);
+ t.zero(info.pgid.to_coll(), poid, op.offset, op.length);
if (snapset.clones.size()) {
snapid_t newest = *snapset.clones.rbegin();
interval_set<__u64> ch;
- ch.insert(offset, length);
+ ch.insert(op.offset, op.length);
ch.intersection_of(snapset.clone_overlap[newest]);
snapset.clone_overlap[newest].subtract(ch);
}
case CEPH_OSD_OP_TRUNCATE:
{ // truncate
- t.truncate(info.pgid.to_coll(), poid, length);
+ t.truncate(info.pgid.to_coll(), poid, op.length);
if (snapset.clones.size()) {
snapid_t newest = *snapset.clones.rbegin();
interval_set<__u64> keep;
- if (length)
- keep.insert(0, length);
+ if (op.length)
+ keep.insert(0, op.length);
snapset.clone_overlap[newest].intersection_of(keep);
}
+ old_size = op.length;
}
break;
snapid_t newest = *snapset.clones.rbegin();
snapset.clone_overlap.erase(newest); // ok, redundant.
}
+ old_size = 0;
}
break;
default:
- assert(0);
+ return -EINVAL;
}
+
+ return 0;
+}
+
+void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
+ pobject_t poid,
+ vector<ceph_osd_op>& ops, bufferlist& bl,
+ eversion_t old_version, eversion_t at_version,
+ SnapSet& snapset, SnapContext& snapc,
+ __u32 inc_lock, eversion_t trim_to)
+{
+ bufferlist log_bl;
+ eversion_t log_version = at_version;
+ assert(!ops.empty());
- // object collection, version
- if (op != CEPH_OSD_OP_DELETE) {
- if (inc_lock && ceph_osd_op_is_modify(op))
+ // stat existing
+ struct stat st;
+ int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
+ __u64 old_size = 0;
+ if (r == 0)
+ old_size = st.st_size;
+
+ // apply ops
+ bool did_snap = false;
+ bufferlist::iterator bp = bl.begin();
+ for (unsigned i=0; i<ops.size(); i++) {
+ // clone?
+ if (!did_snap && poid.oid.snap &&
+ ceph_osd_op_is_modify(ops[i].op)) { // is a (non-lock) modification
+ prepare_clone(t, log_bl, reqid, poid, old_size, old_version, at_version, snapset, snapc);
+ did_snap = true;
+ }
+ prepare_simple_op(t, reqid, poid, old_size, ops[i], bp,
+ snapset, snapc);
+ }
+
+ // finish.
+ if (!old_size >= 0) {
+ if (inc_lock)
t.setattr(info.pgid.to_coll(), poid, "inc_lock", &inc_lock, sizeof(inc_lock));
t.setattr(info.pgid.to_coll(), poid, "version", &at_version, sizeof(at_version));
// raise last_update.
assert(at_version > info.last_update);
info.last_update = at_version;
-
- write_info(t);
- // prepare log append
- append_log(t, logentry, trim_to);
+ // log mutation
+ int logopcode = Log::Entry::MODIFY;
+ if (!snapset.head_exists)
+ logopcode = Log::Entry::DELETE;
+ Log::Entry logentry(logopcode, poid.oid, at_version, old_version, reqid);
+ dout(10) << "prepare_transaction " << logentry << dendl;
+ log.add(logentry);
+ ::encode(logentry, log_bl);
+
+ // write pg info, log to disk
+ write_info(t);
+ append_log(t, log_bl, log_version, trim_to);
}
// any completion stuff to do here?
object_t oid = repop->op->get_oid();
+ ceph_osd_op& first = repop->op->ops[0];
- switch (repop->op->get_op()) {
+ switch (first.op) {
case CEPH_OSD_OP_UNBALANCEREADS:
dout(-10) << "apply_repop completed unbalance-reads on " << oid << dendl;
unbalancing_reads.erase(oid);
// forward the write/update/whatever
MOSDSubOp *wr = new MOSDSubOp(repop->op->get_reqid(), info.pgid, poid,
- repop->op->get_op(),
- repop->op->get_offset(), repop->op->get_length(),
+ repop->op->ops, true,
osd->osdmap->get_epoch(),
repop->rep_tid, repop->op->get_inc_lock(), repop->at_version);
wr->old_version = repop->old_version;
int whoami = osd->get_nodeid();
pobject_t poid(info.pgid.pool(), 0, op->get_oid());
- const char *opname = ceph_osd_op_name(op->get_op());
-
- // make sure it looks ok
- if ((op->get_op() == CEPH_OSD_OP_WRITE ||
- op->get_op() == CEPH_OSD_OP_WRITEFULL) &&
- op->get_length() != op->get_data().length()) {
- dout(0) << "op_modify got bad write, claimed length " << op->get_length()
- << " != payload length " << op->get_data().length()
- << dendl;
- delete op;
- return;
- }
+ const char *opname = "no-op";
+ if (op->ops.size())
+ opname = ceph_osd_op_name(op->ops[0].op);
// --- locking ---
// wrlock?
- if (op->get_op() != CEPH_OSD_OP_WRNOOP && // except WRNOOP; we just want to flush
+ if (!op->ops.empty() && // except WRNOOP; we just want to flush
block_if_wrlocked(op))
return; // op will be handled later, after the object unlocks
}
// balance-reads set?
+#if 0
char v;
if ((op->get_op() != CEPH_OSD_OP_BALANCEREADS && op->get_op() != CEPH_OSD_OP_UNBALANCEREADS) &&
(osd->store->getattr(info.pgid.to_coll(), poid, "balance-reads", &v, 1) >= 0 ||
waiting_for_unbalanced_reads[poid.oid].push_back(op);
return;
}
+#endif
// dup op?
if (is_dup(op->get_reqid())) {
dout(3) << "op_modify " << opname << " dup op " << op->get_reqid()
<< ", doing WRNOOP" << dendl;
- op->set_op(CEPH_OSD_OP_WRNOOP);
- opname = ceph_osd_op_name(op->get_op());
+ op->ops.clear();
+ opname = "no-op";
}
// version
eversion_t av = log.top;
- if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
+ if (op->ops.size()) {
av.epoch = osd->osdmap->get_epoch();
av.version++;
assert(av > info.last_update);
dout(10) << "op_modify " << opname
<< " " << poid.oid
- << " " << op->get_offset() << "~" << op->get_length()
<< " ov " << old_version << " av " << av
<< " snapc " << snapc
<< " snapset " << snapset
}
}
- if (op->get_op() == CEPH_OSD_OP_WRITE ||
- op->get_op() == CEPH_OSD_OP_WRITEFULL) {
+ if (op->get_data().length()) {
osd->logger->inc("c_wr");
- osd->logger->inc("c_wrb", op->get_length());
+ osd->logger->inc("c_wrb", op->get_data().length());
}
// note my stats
utime_t now = g_clock.now();
+ // permute operation?
+ // ...
+
// issue replica writes
tid_t rep_tid = osd->get_tid();
RepGather *repop = new_rep_gather(op, rep_tid, av, snapset, snapc);
issue_repop(repop, acting[i], now);
// we are acker.
- if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
+ if (op->ops.size()) {
// log and update later.
- prepare_transaction(repop->t, op->get_reqid(), poid, op->get_op(),
+ prepare_transaction(repop->t, op->get_reqid(), poid, op->ops, op->get_data(),
old_version, av,
- op->get_offset(), op->get_length(), op->get_data(),
snapset, snapc,
op->get_inc_lock(), peers_complete_thru);
}
void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
{
pobject_t poid = op->poid;
- const char *opname = ceph_osd_op_name(op->op);
+ const char *opname = "no-op";
+ if (op->ops.size())
+ ceph_osd_op_name(op->ops[0].op);
dout(10) << "sub_op_modify " << opname
<< " " << poid
<< " v " << op->version
- << " " << op->offset << "~" << op->length
<< dendl;
// sanity checks
// do op
int ackerosd = acting[0];
osd->logger->inc("r_wr");
- osd->logger->inc("r_wrb", op->length);
+ osd->logger->inc("r_wrb", op->get_data().length());
- if (op->op != CEPH_OSD_OP_WRNOOP) {
+ if (op->ops.size()) {
prepare_transaction(t, op->reqid,
- op->poid, op->op, op->old_version, op->version,
- op->offset, op->length, op->get_data(),
+ op->poid, op->ops, op->get_data(),
+ op->old_version, op->version,
op->snapset, op->snapc,
op->inc_lock, op->pg_trim_to);
}
// send op
osd_reqid_t rid;
tid_t tid = osd->get_tid();
-
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PULL,
- 0, 0,
+ vector<ceph_osd_op> pull(1);
+ pull[0].op = CEPH_OSD_OP_PULL;
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, pull, true,
osd->osdmap->get_epoch(), tid, 0, v);
subop->data_subset.swap(data_subset);
// do not include clone_subsets in pull request; we will recalculate this
osd->store->getattrs(info.pgid.to_coll(), poid, attrset);
osd_reqid_t rid; // useless?
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PUSH, 0, st.st_size,
+ vector<ceph_osd_op> push(1);
+ push[0].op = CEPH_OSD_OP_PUSH;
+ push[0].offset = 0;
+ push[0].length = st.st_size;
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, push, false,
osd->osdmap->get_epoch(), osd->get_tid(), 0, version);
subop->data_subset.insert(0, st.st_size);
subop->attrset.swap(attrset);
// send
osd_reqid_t rid; // useless?
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PUSH, 0, size,
+ vector<ceph_osd_op> push(1);
+ push[0].op = CEPH_OSD_OP_PUSH;
+ push[0].offset = 0;
+ push[0].length = size;
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, push, false,
osd->osdmap->get_epoch(), osd->get_tid(), 0, v);
subop->data_subset.swap(data_subset);
subop->clone_subsets.swap(clone_subsets);
{
pobject_t poid = op->poid;
eversion_t v = op->version;
+ ceph_osd_op& push = op->ops[0];
dout(7) << "op_push "
<< poid
<< " v " << v
- << " len " << op->length
+ << " len " << push.length
<< " subset " << op->data_subset
<< " data " << op->get_data().length()
<< dendl;
// determine data/clone subsets
data_subset = op->data_subset;
- if (data_subset.empty() && op->length && op->length == op->get_data().length())
- data_subset.insert(0, op->length);
+ if (data_subset.empty() && push.length && push.length == op->get_data().length())
+ data_subset.insert(0, push.length);
clone_subsets = op->clone_subsets;
if (is_primary()) {
void _make_clone(ObjectStore::Transaction& t,
pobject_t head, pobject_t coid,
eversion_t ov, eversion_t v, bufferlist& snaps);
+ void prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid,
+ pobject_t poid, loff_t old_size,
+ eversion_t old_version, eversion_t at_version,
+ SnapSet& snapset, SnapContext& snapc);
+ int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid,
+ pobject_t poid, __u64& old_size,
+ ceph_osd_op& op, bufferlist::iterator& bp,
+ SnapSet& snapset, SnapContext& snapc);
void prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
- pobject_t poid, int op,
+ pobject_t poid,
+ vector<ceph_osd_op>& ops, bufferlist& bl,
eversion_t old_version, eversion_t at_version,
- off_t offset, off_t length, bufferlist& bl,
SnapSet& snapset, SnapContext& snapc,
__u32 inc_lock, eversion_t trim_to);
};
WRITE_CLASS_ENCODER(pg_stat_t)
-typedef struct ceph_osd_peer_stat osd_peer_stat_t;
+struct osd_peer_stat_t {
+ struct ceph_timespec stamp;
+ float oprate;
+ float qlen;
+ float recent_qlen;
+ float read_latency;
+ float read_latency_mine;
+ float frac_rd_ops_shed_in;
+ float frac_rd_ops_shed_out;
+} __attribute__ ((packed));
+
WRITE_RAW_ENCODER(osd_peer_stat_t)
inline ostream& operator<<(ostream& out, const osd_peer_stat_t &stat) {
WRITE_CLASS_ENCODER(interval_set<__u64>)
+
+
+
/*
* attached to object head. describes most recent snap context, and
* set of existing clones.
void Objecter::handle_osd_op_reply(MOSDOpReply *m)
{
+ assert(m->ops.size() >= 1);
+ int op = m->ops[0].op;
+
// read or modify?
- switch (m->get_op()) {
+ switch (op) {
case CEPH_OSD_OP_READ:
handle_osd_read_reply(m);
break;
handle_osd_stat_reply(m);
break;
- case CEPH_OSD_OP_WRNOOP:
- case CEPH_OSD_OP_WRITE:
- case CEPH_OSD_OP_WRITEFULL:
- case CEPH_OSD_OP_ZERO:
- case CEPH_OSD_OP_DELETE:
- case CEPH_OSD_OP_WRUNLOCK:
- case CEPH_OSD_OP_WRLOCK:
- case CEPH_OSD_OP_RDLOCK:
- case CEPH_OSD_OP_RDUNLOCK:
- case CEPH_OSD_OP_UPLOCK:
- case CEPH_OSD_OP_DNLOCK:
+ default:
+ assert(m->is_modify());
handle_osd_modify_reply(m);
break;
-
- default:
- assert(0);
}
}
int flags = st->flags;
if (st->onfinish) flags |= CEPH_OSD_OP_ACK;
- MOSDOp *m = new MOSDOp(client_inc, last_tid,
+ MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
ex.oid, ex.layout, osdmap->get_epoch(),
- CEPH_OSD_OP_STAT, flags);
+ flags);
+ m->stat();
if (inc_lock > 0) {
st->inc_lock = inc_lock;
m->set_inc_lock(inc_lock);
return;
}
+ ceph_osd_op& op = m->ops[0];
+
dout(7) << "handle_osd_stat_reply " << tid
- << " r=" << m->get_result()
- << " size=" << m->get_length()
- << dendl;
+ << " r=" << m->get_result()
+ << " size=" << op.length
+ << dendl;
OSDStat *st = op_stat[ tid ];
op_stat.erase( tid );
if (m->get_result() < 0) {
*st->size = 0;
} else {
- *st->size = m->get_length();
+ *st->size = op.length;
}
// finish, clean up
if (pg.acker() >= 0) {
int flags = rd->flags;
if (rd->onfinish) flags |= CEPH_OSD_OP_ACK;
- MOSDOp *m = new MOSDOp(client_inc, last_tid,
+ MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
ex.oid, ex.layout, osdmap->get_epoch(),
- CEPH_OSD_OP_READ, flags);
+ flags);
+ m->read(ex.start, ex.length);
if (inc_lock > 0) {
rd->inc_lock = inc_lock;
m->set_inc_lock(inc_lock);
}
- m->set_length(ex.length);
- m->set_offset(ex.start);
m->set_retry_attempt(retry);
int who = pg.acker();
dout(7) << "handle_osd_read_reply " << tid << dendl;
OSDRead *rd = op_read[ tid ];
op_read.erase( tid );
+
+ ceph_osd_op& op = m->ops[0];
// remove from osd/tid maps
PG& pg = get_pg( m->get_pg() );
// what buffer offset are we?
dout(7) << " got frag from " << m->get_oid() << " "
- << m->get_offset() << "~" << m->get_length()
+ << op.offset << "~" << op.length
<< ", still have " << rd->ops.size() << " more ops" << dendl;
if (rd->ops.empty()) {
<< " osd" << pg.primary()
<< dendl;
if (pg.primary() >= 0) {
- MOSDOp *m = new MOSDOp(client_inc, tid,
+ MOSDOp *m = new MOSDOp(client_inc, tid, true,
ex.oid, ex.layout, osdmap->get_epoch(),
- wr->op, flags);
+ flags);
+ m->add_simple_op(wr->op, ex.start, ex.length);
m->set_snap_seq(wr->snapc.seq);
m->get_snaps() = wr->snapc.snaps;
if (inc_lock > 0) {
wr->inc_lock = inc_lock;
m->set_inc_lock(inc_lock);
}
- m->set_length(ex.length);
- m->set_offset(ex.start);
if (usetid > 0)
m->set_retry_attempt(true);
#!/bin/bash
[ "$CEPH_NUM_MON" == "" ] && CEPH_NUM_MON=3
-[ "$CEPH_NUM_OSD" == "" ] && CEPH_NUM_OSD=4
+[ "$CEPH_NUM_OSD" == "" ] && CEPH_NUM_OSD=1
[ "$CEPH_NUM_MDS" == "" ] && CEPH_NUM_MDS=1
let new=0