include/nstring.h\
include/object.h\
include/page.h\
- include/pobject.h\
include/rangeset.h\
include/rados.h\
include/statlite.h\
- make mds exhert memory pressure on client caps, leases
- optionally separate osd interfaces (ips) for clients and osds (replication, peering, etc.)
+rados
+- move snap out of object_t
+- create/destroy pg_pools
+- autosize pg_pools?
+- security
+- c library glue to c3
+- pyexec?
+-
+
+
later
- client reconnect after long eviction; and slow delayed reconnect
- ENOSPC
m->get_size() < in->inode.size) {
// map range to objects
vector<ObjectExtent> ls;
- filer->file_to_extents(in->inode.ino, &in->inode.layout, CEPH_NOSNAP,
+ filer->file_to_extents(in->inode.ino, &in->inode.layout,
m->get_size(), in->inode.size - m->get_size(),
ls);
objectcacher->truncate_set(in->inode.ino, ls);
Inode *in = f->inode;
// map to a list of extents
- filer->file_to_extents(in->inode.ino, &in->inode.layout, CEPH_NOSNAP, offset, length, result);
+ filer->file_to_extents(in->inode.ino, &in->inode.layout, offset, length, result);
dout(3) << "enumerate_layout(" << fd << ", " << length << ", " << offset << ") = 0" << dendl;
return 0;
lock.Lock();
ceph_object_layout layout = client->osdmap->make_object_layout(oid, CEPH_CASDATA_RULE);
__u64 size;
- client->objecter->stat(oid, layout, &size, 0, new C_SafeCond(&lock, &cond, &ack));
+ client->objecter->stat(oid, layout, CEPH_NOSNAP, &size, 0, new C_SafeCond(&lock, &cond, &ack));
while (!ack) cond.Wait(lock);
lock.Unlock();
}
lock.Lock();
ceph_object_layout layout = client->osdmap->make_object_layout(oid, CEPH_CASDATA_RULE);
bufferlist bl;
- client->objecter->read(oid, layout, off, len, &bl, 0, new C_SafeCond(&lock, &cond, &ack));
+ client->objecter->read(oid, layout, off, len, CEPH_NOSNAP, &bl, 0, new C_SafeCond(&lock, &cond, &ack));
while (!ack) cond.Wait(lock);
lock.Unlock();
}
} else {
dout(10) << "read from " << oid << dendl;
bufferlist inbl;
- client->objecter->read(oid, layout, 0, osize, &inbl, 0,
+ client->objecter->read(oid, layout, 0, osize, CEPH_NOSNAP, &inbl, 0,
new C_Ref(lock, cond, &unack));
}
client->client_lock.Unlock();
} __attribute__ ((packed));
/* followed by my snap list, then prior parent snap list */
-
#endif
struct {
uint64_t ino; // "file" identifier
uint32_t bno; // "block" in that "file"
- uint64_t snap; // snap revision.
+ uint64_t pad;
} __attribute__ ((packed));
} __attribute__ ((packed));
- object_t() : ino(0), bno(0), snap(0) {}
- object_t(uint64_t i, uint32_t b) : ino(i), bno(b), snap(0) {}
- object_t(uint64_t i, uint32_t b, uint64_t r) : ino(i), bno(b), snap(r) {}
+ object_t() : ino(0), bno(0), pad(0) {}
+ object_t(uint64_t i, uint32_t b) : ino(i), bno(b), pad(0) {}
// IMPORTANT: make this match struct ceph_object ****
object_t(const ceph_object& co) {
ino = co.ino;
bno = co.bno;
- snap = co.snap;
+ pad = co.pad;
}
operator ceph_object() {
ceph_object oid;
oid.ino = ino;
oid.bno = bno;
- oid.snap = snap;
+ oid.pad = pad;
return oid;
}
void encode(bufferlist &bl) const {
::encode(ino, bl);
::encode(bno, bl);
- ::encode(snap, bl);
+ ::encode(pad, bl);
}
void decode(bufferlist::iterator &bl) {
__u64 i, r;
::decode(r, bl);
ino = i;
bno = b;
- snap = r;
+ pad = r;
}
} __attribute__ ((packed));
WRITE_CLASS_ENCODER(object_t)
out.fill('0');
out << setw(8) << o.bno;
out.unsetf(ios::right);
- if (o.snap) {
- if (o.snap == CEPH_NOSNAP)
- out << ".head";
- else
- out << '.' << o.snap;
- }
out << dec;
return out;
}
size_t operator()(const object_t &r) const {
static rjhash<uint64_t> H;
static rjhash<uint32_t> I;
- return H(r.ino) ^ I(r.bno) ^ H(r.snap);
+ return H(r.ino) ^ I(r.bno) ^ H(r.pad);
}
};
}
+// ---------------------------
+// snaps
+
+struct snapid_t {
+ __u64 val;
+ snapid_t(__u64 v=0) : val(v) {}
+ snapid_t operator+=(snapid_t o) { val += o.val; return *this; }
+ snapid_t operator++() { ++val; return *this; }
+ operator __u64() const { return val; }
+};
+
+inline void encode(snapid_t i, bufferlist &bl) { encode(i.val, bl); }
+inline void decode(snapid_t &i, bufferlist::iterator &p) { decode(i.val, p); }
+
+inline ostream& operator<<(ostream& out, snapid_t s) {
+ if (s == CEPH_NOSNAP)
+ return out << "head";
+ else if (s == CEPH_SNAPDIR)
+ return out << "snapdir";
+ else
+ return out << hex << s.val << dec;
+}
+
+
+struct sobject_t {
+ object_t oid;
+ snapid_t snap;
+
+ sobject_t() : snap(0) {}
+ sobject_t(object_t o, snapid_t s) : oid(o), snap(s) {}
+
+ void encode(bufferlist& bl) const {
+ ::encode(oid, bl);
+ ::encode(snap, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ ::decode(oid, bl);
+ ::decode(snap, bl);
+ }
+};
+WRITE_CLASS_ENCODER(sobject_t)
+
+inline bool operator==(const sobject_t l, const sobject_t r) {
+ return l.oid == r.oid && l.snap == r.snap;
+}
+inline bool operator!=(const sobject_t l, const sobject_t r) {
+ return l.oid != r.oid || l.snap != r.snap;
+}
+inline bool operator>(const sobject_t l, const sobject_t r) {
+ return l.oid > r.oid || (l.oid == r.oid && l.snap > r.snap);
+}
+inline bool operator<(const sobject_t l, const sobject_t r) {
+ return l.oid < r.oid || (l.oid == r.oid && l.snap < r.snap);
+}
+inline bool operator>=(const sobject_t l, const sobject_t r) {
+ return l.oid > r.oid || (l.oid == r.oid && l.snap >= r.snap);
+}
+inline bool operator<=(const sobject_t l, const sobject_t r) {
+ return l.oid < r.oid || (l.oid == r.oid && l.snap <= r.snap);
+}
+inline ostream& operator<<(ostream& out, const sobject_t o) {
+ return out << o.oid << "." << o.snap;
+}
+
+namespace __gnu_cxx {
+ template<> struct hash<sobject_t> {
+ size_t operator()(const sobject_t &r) const {
+ static hash<object_t> H;
+ static rjhash<uint64_t> I;
+ return H(r.oid) ^ I(r.snap);
+ }
+ };
+}
+
+// ---------------------------
+
+typedef sobject_t pobject_t;
+
struct coll_t {
- __u64 high;
- __u64 low;
+ __u64 pgid;
+ snapid_t snap;
- coll_t(__u64 h=0, __u64 l=0) : high(h), low(l) {}
+ coll_t(__u64 p=0, snapid_t s=0) : pgid(p), snap(s) {}
void encode(bufferlist& bl) const {
- ::encode(high, bl);
- ::encode(low, bl);
+ ::encode(pgid, bl);
+ ::encode(snap, bl);
}
void decode(bufferlist::iterator& bl) {
- __u64 h, l;
- ::decode(h, bl);
- ::decode(l, bl);
- high = h;
- low = l;
+ ::decode(pgid, bl);
+ ::decode(snap, bl);
}
-} __attribute__ ((packed));
+};
WRITE_CLASS_ENCODER(coll_t)
inline ostream& operator<<(ostream& out, const coll_t& c) {
- return out << hex << c.high << '.' << c.low << dec;
+ return out << hex << c.pgid << '.' << c.snap << dec;
}
inline bool operator<(const coll_t& l, const coll_t& r) {
- return l.high < r.high || (l.high == r.high && l.low < r.low);
+ return l.pgid < r.pgid || (l.pgid == r.pgid && l.snap < r.snap);
}
inline bool operator<=(const coll_t& l, const coll_t& r) {
- return l.high < r.high || (l.high == r.high && l.low <= r.low);
+ return l.pgid < r.pgid || (l.pgid == r.pgid && l.snap <= r.snap);
}
inline bool operator==(const coll_t& l, const coll_t& r) {
- return l.high == r.high && l.low == r.low;
+ return l.pgid == r.pgid && l.snap == r.snap;
}
inline bool operator!=(const coll_t& l, const coll_t& r) {
- return l.high != r.high || l.low != r.low;
+ return l.pgid != r.pgid || l.snap != r.snap;
}
inline bool operator>(const coll_t& l, const coll_t& r) {
- return l.high > r.high || (l.high == r.high && l.low > r.low);
+ return l.pgid > r.pgid || (l.pgid == r.pgid && l.snap > r.snap);
}
inline bool operator>=(const coll_t& l, const coll_t& r) {
- return l.high > r.high || (l.high == r.high && l.low >= r.low);
+ return l.pgid > r.pgid || (l.pgid == r.pgid && l.snap >= r.snap);
}
-
namespace __gnu_cxx {
template<> struct hash<coll_t> {
size_t operator()(const coll_t &c) const {
- static rjhash<uint64_t> H;
- return H(c.high) ^ H(c.low);
+ static rjhash<uint32_t> H;
+ static rjhash<uint64_t> I;
+ return H(c.pgid) ^ I(c.snap);
}
};
}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef __CEPH_POBJECT_H
-#define __CEPH_POBJECT_H
-
-#include "object.h"
-
-/*
- * "physical" object stored in an individual OSD's object store.
- * includes fields to describe which volume the logical object_t
- * belongs to, and/or a specific part of the object (if striped
- * or encoded for redundancy, etc.).
- */
-struct pobject_t {
- uint32_t volume; // "volume"
- uint32_t rank; // rank/stripe id (e.g. for parity encoding)
- object_t oid; // logical object
- pobject_t() : volume(0), rank(0) {}
- pobject_t(uint16_t v, uint16_t r, object_t o) : volume(v), rank(r), oid(o) {}
- void encode(bufferlist &bl) const {
- ::encode(volume, bl);
- ::encode(rank, bl);
- ::encode(oid, bl);
- }
- void decode(bufferlist::iterator &bl) {
- __u32 v, r;
- ::decode(v, bl);
- ::decode(r, bl);
- volume = v;
- rank = r;
- oid.decode(bl);
- }
-} __attribute__ ((packed));
-WRITE_CLASS_ENCODER(pobject_t)
-
-inline ostream& operator<<(ostream& out, const pobject_t o) {
- return out << o.volume << '/' << o.rank << '/' << o.oid;
-}
-
-inline bool operator==(const pobject_t l, const pobject_t r) {
- return memcmp(&l, &r, sizeof(l)) == 0;
-}
-inline bool operator!=(const pobject_t l, const pobject_t r) {
- return memcmp(&l, &r, sizeof(l)) != 0;
-}
-inline bool operator>(const pobject_t l, const pobject_t r) {
- return memcmp(&l, &r, sizeof(l)) > 0;
-}
-inline bool operator>=(const pobject_t l, const pobject_t r) {
- return memcmp(&l, &r, sizeof(l)) >= 0;
-}
-inline bool operator<(const pobject_t l, const pobject_t r) {
- return memcmp(&l, &r, sizeof(l)) < 0;
-}
-inline bool operator<=(const pobject_t l, const pobject_t r) {
- return memcmp(&l, &r, sizeof(l)) <= 0;
-}
-
-namespace __gnu_cxx {
- template<> struct hash<pobject_t> {
- size_t operator()(const pobject_t &r) const {
- static rjhash<uint64_t> H;
- static rjhash<uint32_t> I;
- return I(r.volume) ^ I(r.rank) ^ H(r.oid.ino) ^ I(r.oid.bno) ^ H(r.oid.snap);
- }
- };
-}
-
-#endif
struct {
__le64 ino; /* inode "file" identifier */
__le32 bno; /* "block" (object) in that "file" */
- __le64 snap; /* snapshot id. usually NOSNAP. */
+ __le64 pad;
} __attribute__ ((packed));
};
} __attribute__ ((packed));
struct ceph_eversion reassert_version;
/* writer's snap context */
- __le64 snap_seq;
+ union {
+ __le64 snap_seq;
+ __le64 snapid;
+ };
__le32 num_snaps;
/* read or mutation */
-// snaps
-struct snapid_t {
- __u64 val;
- snapid_t(__u64 v=0) : val(v) {}
- snapid_t operator+=(snapid_t o) { val += o.val; return *this; }
- snapid_t operator++() { ++val; return *this; }
- operator __u64() const { return val; }
-};
-
-inline void encode(snapid_t i, bufferlist &bl) { encode(i.val, bl); }
-inline void decode(snapid_t &i, bufferlist::iterator &p) { decode(i.val, p); }
-
-inline ostream& operator<<(ostream& out, snapid_t s) {
- if (s == CEPH_NOSNAP)
- return out << "head";
- else if (s == CEPH_SNAPDIR)
- return out << "snapdir";
- else
- return out << hex << s.val << dec;
-}
-
-
-
struct SnapRealmInfo {
mutable ceph_mds_snap_realm h;
vector<snapid_t> my_snaps;
head = req->r_request->front.iov_base;
op = (void *)(head + 1);
- seq_printf(s, "%llx.%08x\t%lld\t",
- le64_to_cpu(head->oid.ino),
- le32_to_cpu(head->oid.bno),
- le64_to_cpu(head->oid.snap));
+ seq_printf(s, "%llx.%08x.%llx\t",
+ le64_to_cpu(head->oid.ino),
+ le32_to_cpu(head->oid.bno),
+ le64_to_cpu(head->oid.pad));
if (req->r_reassert_version.epoch)
seq_printf(s, "%u'%llu\t",
/* object extent? */
reqhead->oid.ino = cpu_to_le64(vino.ino);
- reqhead->oid.snap = cpu_to_le64(vino.snap);
+ reqhead->snapid = cpu_to_le64(vino.snap);
ceph_calc_file_object_mapping(layout, off, plen, &reqhead->oid,
&objoff, &objlen);
OSDMap *osdmap = cache->mds->objecter->osdmap;
ceph_object_layout ol = osdmap->make_object_layout(oid,
cache->mds->mdsmap->get_metadata_pg_pool());
- cache->mds->objecter->read_full(oid, ol, &fin->bl, 0, fin);
+ cache->mds->objecter->read_full(oid, ol, CEPH_NOSNAP, &fin->bl, 0, fin);
}
void CDir::_fetched(bufferlist &bl)
mdcache->mds->mdsmap->get_metadata_pg_pool());
mdcache->mds->objecter->read(oid, ol,
- rd, &c->bl, 0, c );
+ rd, CEPH_NOSNAP, &c->bl, 0, c );
}
void CInode::_fetched(bufferlist& bl, Context *fin)
OSDMap *osdmap = mds->objecter->osdmap;
ceph_object_layout ol = osdmap->make_object_layout(oid,
mds->mdsmap->get_metadata_pg_pool());
- mds->objecter->read_full(oid, ol, &c->bl, 0, c);
+ mds->objecter->read_full(oid, ol, CEPH_NOSNAP, &c->bl, 0, c);
}
void MDSTable::load_2(int r, bufferlist& bl, Context *onfinish)
OSDMap *osdmap = mds->objecter->osdmap;
ceph_object_layout ol = osdmap->make_object_layout(oid,
mds->mdsmap->get_metadata_pg_pool());
- mds->objecter->read_full(oid, ol, &c->bl, 0, c);
+ mds->objecter->read_full(oid, ol, CEPH_NOSNAP, &c->bl, 0, c);
}
void SessionMap::_load_finish(bufferlist &bl)
friend class MOSDOpReply;
+ // read
+ snapid_t get_snapid() { return snapid_t(head.snapid); }
+ // writ
snapid_t get_snap_seq() { return snapid_t(head.snap_seq); }
vector<snapid_t> &get_snaps() { return snaps; }
void set_snap_seq(snapid_t s) { head.snap_seq = s; }
/*
* sorry, these are sentitive to the pobject_t and coll_t typing.
*/
+
+ // 11111111112222222222333333333344444444445555555555
+ // 012345678901234567890123456789012345678901234567890123456789
+ // yyyyyyyyyyyyyyyy.zzzzzzzz.a_s
+
void FileStore::append_oname(const pobject_t &oid, char *s)
{
//assert(sizeof(oid) == 28);
char *t = s + strlen(s);
#ifdef __LP64__
- sprintf(t, "/%04x.%04x.%016lx.%08x.%lx",
- oid.volume, oid.rank, oid.oid.ino, oid.oid.bno, oid.oid.snap);
+ sprintf(t, "/%016lx.%08x.%lx_%lx",
+ oid.oid.ino, oid.oid.bno, oid.oid.pad, (__u64)oid.snap);
#else
- sprintf(t, "/%04x.%04x.%016llx.%08x.%llx",
- oid.volume, oid.rank, oid.oid.ino, oid.oid.bno, oid.oid.snap);
+ sprintf(t, "/%08x_%016llx.%08x.%llx_%llx",
+ oid.oid.ino, oid.oid.bno, oid.oid.pad, (__u64)oid.oid.snap);
#endif
//parse_object(t+1);
}
bool FileStore::parse_object(char *s, pobject_t& o)
{
//assert(sizeof(o) == 28);
- if (strlen(s) < 36 ||
- s[4] != '.' ||
- s[9] != '.' ||
- s[26] != '.' ||
- s[35] != '.')
+ if (strlen(s) < 29 ||
+ s[16] != '.' ||
+ s[25] != '.')
return false;
- o.volume = strtoull(s, 0, 16);
- assert(s[4] == '.');
- o.rank = strtoull(s+5, 0, 16);
- assert(s[9] == '.');
- o.oid.ino = strtoull(s+10, 0, 16);
- assert(s[26] == '.');
- o.oid.bno = strtoull(s+27, 0, 16);
- assert(s[35] == '.');
- o.oid.snap = strtoull(s+36, 0, 16);
+ o.oid.ino = strtoull(s, &s, 16);
+ o.oid.bno = strtoull(s+1, &s, 16);
+ o.oid.pad = strtoull(s+1, &s, 16);
+ o.snap = strtoull(s, &s, 16);
return true;
}
+ // 11111111112222222222333
+ // 012345678901234567890123456789012
+ // pppppppppppppppp.ssssssssssssssss
+
bool FileStore::parse_coll(char *s, coll_t& c)
{
if (strlen(s) == 33 && s[16] == '.') {
s[16] = 0;
- c.high = strtoull(s, 0, 16);
- c.low = strtoull(s+17, 0, 16);
+ c.pgid = strtoull(s, 0, 16);
+ c.snap = strtoull(s+17, 0, 16);
return true;
} else
return false;
{
assert(sizeof(cid) == 16);
#ifdef __LP64__
- sprintf(s, "%s/%016lx.%016lx", basedir.c_str(), cid.high, cid.low);
+ sprintf(s, "%s/%016lx.%016lx", basedir.c_str(), cid.pgid, (__u64)cid.snap);
#else
- sprintf(s, "%s/%016llx.%016llx", basedir.c_str(), cid.high, cid.low);
+ sprintf(s, "%s/%016llx.%016llx", basedir.c_str(), cid.pgid, (__u64)cid.snap);
#endif
}
#include "include/types.h"
#include "include/Context.h"
#include "include/buffer.h"
-#include "include/pobject.h"
#include "include/nstring.h"
#include "include/Distribution.h"
// init size distn (once)
if (!did_distn) {
did_distn = true;
- age_cur_oid = pobject_t(0, 0, object_t(0,1));
+ age_cur_oid = pobject_t(object_t(0,1), 0);
file_size_distn.add(1, 19.0758125+0.65434375);
file_size_distn.add(512, 35.6566);
file_size_distn.add(1024, 27.7271875);
utime_t start = g_clock.now();
for (int i=0; i<1000; i++) {
ObjectStore::Transaction t;
- t.write(0, pobject_t(0, 0, object_t(999,i)), 0, bl.length(), bl);
+ t.write(0, pobject_t(object_t(999,i), 0), 0, bl.length(), bl);
store->apply_transaction(t);
}
store->sync();
cout << "measured " << (1000.0 / (double)end) << " mb/sec" << std::endl;
ObjectStore::Transaction tr;
for (int i=0; i<1000; i++)
- tr.remove(0, pobject_t(0, 0, object_t(999,i)));
+ tr.remove(0, pobject_t(object_t(999,i), 0));
store->apply_transaction(tr);
// set osd weight
it++) {
if (*it == 0)
continue;
- if (it->low != 0)
+ if (it->snap != 0)
continue;
- pg_t pgid = it->high;
+ pg_t pgid = it->pgid;
PG *pg = _open_lock_pg(pgid);
// read pg state, log
return;
}
- if (op->get_oid().snap > 0) {
+ if (op->get_snapid() > 0) {
// snap read. hrm.
// are we missing a revision that we might need?
// let's get them all.
for (unsigned i=0; i<op->get_snaps().size(); i++) {
- object_t oid = op->get_oid();
- oid.snap = op->get_snaps()[i];
- if (pg->is_missing_object(oid)) {
- dout(10) << "handle_op _may_ need missing rev " << oid << ", pulling" << dendl;
- pg->wait_for_missing_object(op->get_oid(), op);
+ sobject_t soid(op->get_oid(), op->get_snaps()[i]);
+ if (pg->is_missing_object(soid)) {
+ dout(10) << "handle_op _may_ need missing rev " << soid << ", pulling" << dendl;
+ pg->wait_for_missing_object(soid, op);
pg->unlock();
return;
}
}
// missing object?
- if (pg->is_missing_object(op->get_oid())) {
- pg->wait_for_missing_object(op->get_oid(), op);
+ sobject_t head(op->get_oid(), CEPH_NOSNAP);
+ if (pg->is_missing_object(head)) {
+ pg->wait_for_missing_object(head, op);
pg->unlock();
return;
}
int get_nodeid() { return whoami; }
static pobject_t get_osdmap_pobject_name(epoch_t epoch) {
- return pobject_t(CEPH_OSDMETADATA_NS, 0, object_t(0, epoch << 1));
+ return pobject_t(object_t(0, epoch << 1), 0);
}
static pobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
- return pobject_t(CEPH_OSDMETADATA_NS, 0, object_t(0, (epoch << 1) + 1));
+ return pobject_t(object_t(0, (epoch << 1) + 1), 0);
}
eversion_t lu = oinfo.last_update;
while (pp != olog.log.rend()) {
Log::Entry& oe = *pp;
- if (!log.objects.count(oe.oid)) {
+ if (!log.objects.count(oe.soid)) {
dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl;
++pp;
continue;
}
- Log::Entry& ne = *log.objects[oe.oid];
+ Log::Entry& ne = *log.objects[oe.soid];
if (ne.version == oe.version) {
dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl;
break;
} else {
// old delete, new update.
dout(20) << " had " << oe << " new " << ne << " : missing" << dendl;
- omissing.add(ne.oid, ne.version, eversion_t());
+ omissing.add(ne.soid, ne.version, eversion_t());
}
} else {
if (ne.is_delete()) {
// old update, new delete
dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
- omissing.rm(oe.oid, oe.version);
+ omissing.rm(oe.soid, oe.version);
} else {
// old update, new update
dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
- omissing.revise_need(ne.oid, ne.version);
+ omissing.revise_need(ne.soid, ne.version);
}
}
}
*/
bool PG::merge_old_entry(ObjectStore::Transaction& t, Log::Entry& oe)
{
- if (log.objects.count(oe.oid)) {
- Log::Entry &ne = *log.objects[oe.oid]; // new(er?) entry
+ if (log.objects.count(oe.soid)) {
+ Log::Entry &ne = *log.objects[oe.soid]; // new(er?) entry
if (ne.version > oe.version) {
dout(20) << "merge_old_entry had " << oe << " new " << ne << " : older, missing" << dendl;
- assert(ne.is_delete() || missing.is_missing(ne.oid));
+ assert(ne.is_delete() || missing.is_missing(ne.soid));
return false;
}
if (ne.version == oe.version) {
} else {
// old delete, new update.
dout(20) << "merge_old_entry had " << oe << " new " << ne << " : missing" << dendl;
- assert(missing.is_missing(oe.oid));
+ assert(missing.is_missing(oe.soid));
}
} else {
if (ne.is_delete()) {
// old update, new delete
dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new delete supercedes" << dendl;
- missing.rm(oe.oid, oe.version);
+ missing.rm(oe.soid, oe.version);
} else {
// old update, new update
dout(20) << "merge_old_entry had " << oe << " new " << ne << " : new item supercedes" << dendl;
- missing.revise_need(ne.oid, ne.version);
+ missing.revise_need(ne.soid, ne.version);
}
}
} else {
dout(20) << "merge_old_entry had " << oe << " new dne : ok" << dendl;
} else {
dout(20) << "merge_old_entry had " << oe << " new dne : deleting" << dendl;
- t.remove(info.pgid.to_coll(), pobject_t(info.pgid.pool(), 0, oe.oid));
- missing.rm(oe.oid, oe.version);
+ t.remove(info.pgid.to_coll(), oe.soid);
+ missing.rm(oe.soid, oe.version);
}
}
return false;
}
assert(log.backlog || log.top == eversion_t());
- hash_map<object_t,Log::Entry*> old_objects;
+ hash_map<sobject_t, Log::Entry*> old_objects;
old_objects.swap(log.objects);
// swap in other log and index
dout(10) << "merge_log merging " << ne << dendl;
missing.add_next_event(ne);
if (ne.is_delete())
- t.remove(info.pgid.to_coll(), pobject_t(info.pgid.pool(), 0, ne.oid));
+ t.remove(info.pgid.to_coll(), ne.soid);
}
// find any divergent or removed items in old log.
p != olog.log.end();
p++) {
Log::Entry &oe = *p; // old entry
- if (old_objects.count(oe.oid) &&
- old_objects[oe.oid] == &oe)
+ if (old_objects.count(oe.soid) &&
+ old_objects[oe.soid] == &oe)
merge_old_entry(t, oe);
}
log.index(ne);
missing.add_next_event(ne);
if (ne.is_delete())
- t.remove(info.pgid.to_coll(), pobject_t(info.pgid.pool(), 0, ne.oid));
+ t.remove(info.pgid.to_coll(), ne.soid);
}
// move aside divergent items
void PG::search_for_missing(Log &olog, Missing &omissing, int fromosd)
{
// found items?
- for (map<object_t,Missing::item>::iterator p = missing.missing.begin();
+ for (map<sobject_t,Missing::item>::iterator p = missing.missing.begin();
p != missing.missing.end();
p++) {
eversion_t need = p->second.need;
for (vector<pobject_t>::iterator it = olist.begin();
it != olist.end();
it++) {
- pobject_t poid = pobject_t(info.pgid.pool(), 0, it->oid);
+ pobject_t poid = *it;
Log::Entry e;
- e.oid = it->oid;
+ e.soid = poid;
bufferlist bv;
osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
object_info_t oi(bv);
e.prior_version = oi.prior_version;
e.reqid = oi.last_reqid;
e.mtime = oi.mtime;
- if (poid.oid.snap && poid.oid.snap < CEPH_NOSNAP) {
+ if (e.soid.snap && e.soid.snap < CEPH_NOSNAP) {
e.op = Log::Entry::CLONE;
::encode(oi.snaps, e.snaps);
} else {
* - the prior_version is also already in the log
* otherwise, we need to include it.
*/
- if (log.objects.count(be.oid)) {
- Log::Entry *le = log.objects[be.oid];
+ if (log.objects.count(be.soid)) {
+ Log::Entry *le = log.objects[be.soid];
assert(!le->is_delete()); // if it's a deletion, we are corrupt..
for (list<Entry>::const_iterator p = log.begin();
p != log.end();
p++) {
- out << *p << " " << (logged_object(p->oid) ? "indexed":"NOT INDEXED") << std::endl;
- assert(logged_object(p->oid));
+ out << *p << " " << (logged_object(p->soid) ? "indexed":"NOT INDEXED") << std::endl;
+ assert(logged_object(p->soid));
assert(logged_req(p->reqid));
}
return out;
while (log.complete_to->version < info.last_complete)
log.complete_to++;
assert(log.complete_to != log.log.end());
- log.last_requested = object_t();
+ log.last_requested = sobject_t();
dout(10) << "activate - complete_to = " << log.complete_to->version << dendl;
if (is_primary()) {
dout(10) << "activate - starting recovery" << dendl;
dout(10) << "read_log checking for missing items over interval (" << info.last_complete
<< "," << info.last_update << "]" << dendl;
- set<object_t> did;
+ set<sobject_t> did;
for (list<Log::Entry>::reverse_iterator i = log.log.rbegin();
i != log.log.rend();
i++) {
if (i->version <= info.last_complete) break;
- if (did.count(i->oid)) continue;
- did.insert(i->oid);
+ if (did.count(i->soid)) continue;
+ did.insert(i->soid);
if (i->is_delete()) continue;
- pobject_t poid(info.pgid.pool(), 0, i->oid);
bufferlist bv;
- int r = osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
+ int r = osd->store->getattr(info.pgid.to_coll(), i->soid, OI_ATTR, bv);
if (r >= 0) {
object_info_t oi(bv);
if (oi.version < i->version) {
dout(15) << "read_log missing " << *i << " (have " << oi.version << ")" << dendl;
- missing.add(i->oid, i->version, oi.version);
+ missing.add(i->soid, i->version, oi.version);
}
} else {
dout(15) << "read_log missing " << *i << dendl;
- missing.add(i->oid, i->version, eversion_t());
+ missing.add(i->soid, i->version, eversion_t());
}
}
}
//
bool PG::block_if_wrlocked(MOSDOp* op, object_info_t& oi)
{
- pobject_t poid(info.pgid.pool(), 0, op->get_oid());
+ sobject_t soid(op->get_oid(), CEPH_NOSNAP);
if (oi.wrlock_by.tid &&
oi.wrlock_by.name != op->get_orig_source()) {
//the object is locked for writing by someone else -- add the op to the waiting queue
dout(10) << "blocked on wrlock on " << oi << dendl;
- waiting_for_wr_unlock[poid.oid].push_back(op);
+ waiting_for_wr_unlock[soid].push_back(op);
return true;
}
bv.push_back(po->attrs["oi"]);
object_info_t oi(bv);
if (bad_peer != acting[0]) {
- peer_missing[bad_peer].add(po->poid.oid, oi.version, eversion_t());
+ peer_missing[bad_peer].add(po->poid, oi.version, eversion_t());
} else {
- missing.add(po->poid.oid, oi.version, eversion_t());
- missing_loc[po->poid.oid].insert(ok_peer);
- log.last_requested = object_t();
+ missing.add(po->poid, oi.version, eversion_t());
+ missing_loc[po->poid].insert(ok_peer);
+ log.last_requested = sobject_t();
}
uptodate_set.erase(bad_peer);
osd->queue_for_recovery(this);
const static int BACKLOG = 4; // event invented by generate_backlog
__s32 op; // write, zero, trunc, remove
- object_t oid;
+ sobject_t soid;
+ snapid_t snap;
eversion_t version, prior_version;
osd_reqid_t reqid; // caller+tid to uniquely identify request
utime_t mtime; // this is the _user_ mtime, mind you
bufferlist snaps; // only for clone entries
Entry() : op(0) {}
- Entry(int _op, object_t _oid,
+ Entry(int _op, sobject_t _soid,
const eversion_t& v, const eversion_t& pv,
const osd_reqid_t& rid, const utime_t& mt) :
- op(_op), oid(_oid), version(v),
+ op(_op), soid(_soid), version(v),
prior_version(pv),
reqid(rid), mtime(mt) {}
void encode(bufferlist &bl) const {
::encode(op, bl);
- ::encode(oid, bl);
+ ::encode(soid, bl);
::encode(version, bl);
::encode(prior_version, bl);
::encode(reqid, bl);
}
void decode(bufferlist::iterator &bl) {
::decode(op, bl);
- ::decode(oid, bl);
+ ::decode(soid, bl);
::decode(version, bl);
::decode(prior_version, bl);
::decode(reqid, bl);
* plus some methods to manipulate it all.
*/
struct IndexedLog : public Log {
- hash_map<object_t,Entry*> objects; // ptrs into log. be careful!
- hash_set<osd_reqid_t> caller_ops;
+ hash_map<sobject_t,Entry*> objects; // ptrs into log. be careful!
+ hash_set<osd_reqid_t> caller_ops;
// recovery pointers
list<Entry>::iterator complete_to; // not inclusive of referenced item
- object_t last_requested; // last object requested by primary
+ sobject_t last_requested; // last object requested by primary
/****/
IndexedLog() {}
}
void reset_recovery_pointers() {
complete_to = log.end();
- last_requested = object_t();
+ last_requested = sobject_t();
}
- bool logged_object(object_t oid) const {
+ bool logged_object(sobject_t oid) const {
return objects.count(oid);
}
bool logged_req(const osd_reqid_t &r) const {
for (list<Entry>::iterator i = log.begin();
i != log.end();
i++) {
- objects[i->oid] = &(*i);
+ objects[i->soid] = &(*i);
caller_ops.insert(i->reqid);
}
}
void index(Entry& e) {
- if (objects.count(e.oid) == 0 ||
- objects[e.oid]->version < e.version)
- objects[e.oid] = &e;
+ if (objects.count(e.soid) == 0 ||
+ objects[e.soid]->version < e.version)
+ objects[e.soid] = &e;
caller_ops.insert(e.reqid);
}
void unindex() {
}
void unindex(Entry& e) {
// NOTE: this only works if we remove from the _bottom_ of the log!
- assert(objects.count(e.oid));
- if (objects[e.oid]->version == e.version)
- objects.erase(e.oid);
+ assert(objects.count(e.soid));
+ if (objects[e.soid]->version == e.version)
+ objects.erase(e.soid);
caller_ops.erase(e.reqid);
}
// accessors
- Entry *is_updated(object_t oid) {
+ Entry *is_updated(sobject_t oid) {
if (objects.count(oid) && objects[oid]->is_update()) return objects[oid];
return 0;
}
- Entry *is_deleted(object_t oid) {
+ Entry *is_deleted(sobject_t oid) {
if (objects.count(oid) && objects[oid]->is_delete()) return objects[oid];
return 0;
}
top = e.version;
// to our index
- objects[e.oid] = &(log.back());
+ objects[e.soid] = &(log.back());
caller_ops.insert(e.reqid);
}
};
WRITE_CLASS_ENCODER(item)
- map<object_t, item> missing; // oid -> (need v, have v)
- map<eversion_t, object_t> rmissing; // v -> oid
+ map<sobject_t, item> missing; // oid -> (need v, have v)
+ map<eversion_t, sobject_t> rmissing; // v -> oid
unsigned num_missing() const { return missing.size(); }
rmissing.swap(o.rmissing);
}
- bool is_missing(object_t oid) {
+ bool is_missing(sobject_t oid) {
return missing.count(oid);
}
- bool is_missing(object_t oid, eversion_t v) {
+ bool is_missing(sobject_t oid, eversion_t v) {
return missing.count(oid) && missing[oid].need <= v;
}
- eversion_t have_old(object_t oid) {
+ eversion_t have_old(sobject_t oid) {
return missing.count(oid) ? missing[oid].have : eversion_t();
}
if (e.is_update()) {
if (e.prior_version == eversion_t()) {
// new object.
- //assert(missing.count(e.oid) == 0); // might already be missing divergent item.
- if (missing.count(e.oid)) // already missing divergent item
- rmissing.erase(missing[e.oid].need);
- missing[e.oid] = item(e.version, eversion_t()); // .have = nil
- } else if (missing.count(e.oid)) {
+ //assert(missing.count(e.soid) == 0); // might already be missing divergent item.
+ if (missing.count(e.soid)) // already missing divergent item
+ rmissing.erase(missing[e.soid].need);
+ missing[e.soid] = item(e.version, eversion_t()); // .have = nil
+ } else if (missing.count(e.soid)) {
// already missing (prior).
- assert(missing[e.oid].need == e.prior_version);
+ assert(missing[e.soid].need == e.prior_version);
rmissing.erase(e.prior_version);
- missing[e.oid].need = e.version; // .have unchanged.
+ missing[e.soid].need = e.version; // .have unchanged.
} else {
// not missing, we must have prior_version (if any)
- missing[e.oid] = item(e.version, e.prior_version);
+ missing[e.soid] = item(e.version, e.prior_version);
}
- rmissing[e.version] = e.oid;
+ rmissing[e.version] = e.soid;
} else
- rm(e.oid, e.version);
+ rm(e.soid, e.version);
}
void add_event(Log::Entry& e) {
if (e.is_update()) {
- if (missing.count(e.oid)) {
- if (missing[e.oid].need >= e.version)
+ if (missing.count(e.soid)) {
+ if (missing[e.soid].need >= e.version)
return; // already missing same or newer.
// missing older, revise need
- rmissing.erase(missing[e.oid].need);
- missing[e.oid].need = e.version;
+ rmissing.erase(missing[e.soid].need);
+ missing[e.soid].need = e.version;
} else
// not missing => have prior_version (if any)
- missing[e.oid] = item(e.version, e.prior_version);
- rmissing[e.version] = e.oid;
+ missing[e.soid] = item(e.version, e.prior_version);
+ rmissing[e.version] = e.soid;
} else
- rm(e.oid, e.version);
+ rm(e.soid, e.version);
}
- void revise_need(object_t oid, eversion_t need) {
+ void revise_need(sobject_t oid, eversion_t need) {
if (missing.count(oid)) {
rmissing.erase(missing[oid].need);
missing[oid].need = need; // no not adjust .have
rmissing[need] = oid;
}
- void add(object_t oid, eversion_t need, eversion_t have) {
+ void add(sobject_t oid, eversion_t need, eversion_t have) {
missing[oid] = item(need, have);
rmissing[need] = oid;
}
- void rm(object_t oid, eversion_t when) {
+ void rm(sobject_t oid, eversion_t when) {
if (missing.count(oid) && missing[oid].need < when) {
rmissing.erase(missing[oid].need);
missing.erase(oid);
}
}
- void got(object_t oid, eversion_t v) {
+ void got(sobject_t oid, eversion_t v) {
assert(missing.count(oid));
assert(missing[oid].need <= v);
rmissing.erase(missing[oid].need);
void decode(bufferlist::iterator &bl) {
::decode(missing, bl);
- for (map<object_t,item>::iterator it = missing.begin();
+ for (map<sobject_t,item>::iterator it = missing.begin();
it != missing.end();
it++)
rmissing[it->second.need] = it->first;
IndexedLog log;
OndiskLog ondisklog;
Missing missing;
- map<object_t, set<int> > missing_loc;
+ map<sobject_t, set<int> > missing_loc;
set<snapid_t> snap_collections;
map<epoch_t,Interval> past_intervals;
// pg waiters
list<class Message*> waiting_for_active;
- hash_map<object_t,
+ hash_map<sobject_t,
list<class Message*> > waiting_for_missing_object;
map<eversion_t,class MOSDOp*> replay_queue;
- hash_map<object_t, list<Message*> > waiting_for_wr_unlock;
+ hash_map<sobject_t, list<Message*> > waiting_for_wr_unlock;
bool block_if_wrlocked(MOSDOp* op, object_info_t& oi);
// stats
- hash_map<object_t, DecayCounter> stat_object_temp_rd;
+ hash_map<sobject_t, DecayCounter> stat_object_temp_rd;
Mutex pg_stats_lock;
bool pg_stats_valid;
virtual bool same_for_rep_modify_since(epoch_t e) = 0;
virtual bool is_write_in_progress() = 0;
- virtual bool is_missing_object(object_t oid) = 0;
- virtual void wait_for_missing_object(object_t oid, Message *op) = 0;
+ virtual bool is_missing_object(sobject_t oid) = 0;
+ virtual void wait_for_missing_object(sobject_t oid, Message *op) = 0;
virtual void on_osd_failure(int osd) = 0;
virtual void on_role_change() = 0;
(e.is_modify() ? " m ":
(e.is_backlog() ? " b ":
" ? "))))
- << e.oid << " by " << e.reqid << " " << e.mtime;
+ << e.soid << " by " << e.reqid << " " << e.mtime;
}
inline ostream& operator<<(ostream& out, const PG::Log& log)
// ====================
// missing objects
-bool ReplicatedPG::is_missing_object(object_t oid)
+bool ReplicatedPG::is_missing_object(sobject_t soid)
{
- return missing.missing.count(oid);
+ return missing.missing.count(soid);
}
-void ReplicatedPG::wait_for_missing_object(object_t oid, Message *m)
+void ReplicatedPG::wait_for_missing_object(sobject_t soid, Message *m)
{
- assert(is_missing_object(oid));
-
- pobject_t poid(info.pgid.pool(), 0, oid);
+ assert(is_missing_object(soid));
// we don't have it (yet).
- eversion_t v = missing.missing[oid].need;
- if (pulling.count(oid)) {
+ eversion_t v = missing.missing[soid].need;
+ if (pulling.count(soid)) {
dout(7) << "missing "
- << poid
+ << soid
<< " v " << v
<< ", already pulling"
<< dendl;
} else {
dout(7) << "missing "
- << poid
+ << soid
<< " v " << v
<< ", pulling"
<< dendl;
- pull(poid);
+ pull(soid);
osd->start_recovery_op(this, 1);
}
- waiting_for_missing_object[oid].push_back(m);
+ waiting_for_missing_object[soid].push_back(m);
}
ceph_osd_op& readop = op->ops[0];
object_t oid = op->get_oid();
- pobject_t poid(info.pgid.pool(), 0, oid);
+ sobject_t soid(oid, op->get_snapid());
// -- load balance reads --
if (is_primary()) {
}
}
+#if 0
// -- balance reads?
if (g_conf.osd_balance_reads &&
!op->get_source().is_osd()) {
// hot?
double temp = 0;
- if (stat_object_temp_rd.count(oid))
- temp = stat_object_temp_rd[oid].get(op->get_recv_stamp());
+ if (stat_object_temp_rd.count(soid))
+ temp = stat_object_temp_rd[soid].get(op->get_recv_stamp());
bool is_hotly_read = temp > g_conf.osd_balance_reads_temp;
dout(20) << "balance_reads oid " << oid << " temp " << temp
bool b;
// *** FIXME *** this may block, and we're in the fast path! ***
if (g_conf.osd_balance_reads &&
- osd->store->getattr(info.pgid.to_coll(), poid, "balance-reads", &b, 1) >= 0)
+ osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) >= 0)
is_balanced = true;
if (!is_balanced && should_balance &&
- balancing_reads.count(oid) == 0) {
+ balancing_reads.count(soid) == 0) {
dout(-10) << "preprocess_op balance-reads on " << oid << dendl;
- balancing_reads.insert(oid);
+ balancing_reads.insert(soid);
ceph_object_layout layout;
layout.ol_pgid = info.pgid.u.pg64;
layout.ol_stripe_unit = 0;
do_op(pop);
}
if (is_balanced && !should_balance &&
- !unbalancing_reads.count(oid) == 0) {
+ !unbalancing_reads.count(soid) == 0) {
dout(-10) << "preprocess_op unbalance-reads on " << oid << dendl;
- unbalancing_reads.insert(oid);
+ unbalancing_reads.insert(soid);
ceph_object_layout layout;
layout.ol_pgid = info.pgid.u.pg64;
layout.ol_stripe_unit = 0;
do_op(pop);
}
}
+#endif
// -- read shedding
if (g_conf.osd_shed_reads &&
// -- fastpath read?
// if this is a read and the data is in the cache, do an immediate read..
if ( g_conf.osd_immediate_read_from_cache ) {
- if (osd->store->is_cached(info.pgid.to_coll(), poid,
+ if (osd->store->is_cached(info.pgid.to_coll(), soid,
readop.offset,
readop.length) == 0) {
if (!is_primary() && !op->get_source().is_osd()) {
// am i allowed?
bool v;
- if (osd->store->getattr(info.pgid.to_coll(), poid, "balance-reads", &v, 1) < 0) {
+ if (osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &v, 1) < 0) {
dout(-10) << "preprocess_op in-cache but no balance-reads on " << oid
<< ", fwd to primary" << dendl;
osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
dout(10) << "snap_trimmer collection " << c << " has " << ls.size() << " items" << dendl;
for (vector<pobject_t>::iterator p = ls.begin(); p != ls.end(); p++) {
- pobject_t coid = *p;
+ sobject_t coid = *p;
ObjectStore::Transaction t;
object_info_t coi(bl);
// load head info
- pobject_t head = coid;
- head.oid.snap = CEPH_NOSNAP;
+ sobject_t head = coid;
+ head.snap = CEPH_NOSNAP;
bl.clear();
osd->store->getattr(info.pgid.to_coll(), head, OI_ATTR, bl);
object_info_t hoi(bl);
t.collection_remove(info.pgid.to_snap_coll(snaps[snaps.size()-1]), coid);
// ...from snapset
- snapid_t last = coid.oid.snap;
+ snapid_t last = coid.snap;
vector<snapid_t>::iterator p;
for (p = snapset.clones.begin(); p != snapset.clones.end(); p++)
if (*p == last)
/*
* return false if object doesn't (logically) exist
*/
-int ReplicatedPG::pick_read_snap(pobject_t& poid, object_info_t& coi)
+int ReplicatedPG::pick_read_snap(sobject_t& soid, object_info_t& coi)
{
- pobject_t head = poid;
- head.oid.snap = CEPH_NOSNAP;
+ sobject_t head = soid;
+ head.snap = CEPH_NOSNAP;
bufferlist bl;
int r = osd->store->getattr(info.pgid.to_coll(), head, OI_ATTR, bl);
return -ENOENT; // if head doesn't exist, no snapped version will either.
object_info_t hoi(bl);
- dout(10) << "pick_read_snap " << poid << " snapset " << hoi.snapset << dendl;
- snapid_t want = poid.oid.snap;
+ dout(10) << "pick_read_snap " << soid << " snapset " << hoi.snapset << dendl;
+
+ snapid_t want = soid.snap;
// head?
- if (want > hoi.snapset.seq) {
+ if (soid.snap > hoi.snapset.seq) {
if (hoi.snapset.head_exists) {
dout(10) << "pick_read_snap " << head
<< " want " << want << " > snapset seq " << hoi.snapset.seq
<< " -- HIT" << dendl;
- poid = head;
+ soid = head;
+ coi = hoi;
return 0;
} else {
dout(10) << "pick_read_snap " << head
}
// check clone
- poid.oid.snap = hoi.snapset.clones[k];
-
- if (missing.is_missing(poid.oid)) {
- dout(20) << "pick_read_snap " << poid << " missing, try again later" << dendl;
+ soid.snap = hoi.snapset.clones[k];
+ if (missing.is_missing(soid)) {
+ dout(20) << "pick_read_snap " << soid << " missing, try again later" << dendl;
return -EAGAIN;
}
+ // load clone info
+ bl.clear();
+ osd->store->getattr(info.pgid.to_coll(), soid, OI_ATTR, bl);
+ coi.decode(bl);
+
// clone
- dout(20) << "pick_read_snap " << poid << " snaps " << coi.snaps << dendl;
+ dout(20) << "pick_read_snap " << soid << " snaps " << coi.snaps << dendl;
snapid_t first = coi.snaps[coi.snaps.size()-1];
snapid_t last = coi.snaps[0];
if (first <= want) {
- dout(20) << "pick_read_snap " << poid << " [" << first << "," << last << "] contains " << want << " -- HIT" << dendl;
+ dout(20) << "pick_read_snap " << soid << " [" << first << "," << last << "] contains " << want << " -- HIT" << dendl;
return 0;
} else {
- dout(20) << "pick_read_snap " << poid << " [" << first << "," << last << "] does not contain " << want << " -- DNE" << dendl;
+ dout(20) << "pick_read_snap " << soid << " [" << first << "," << last << "] does not contain " << want << " -- DNE" << dendl;
return -ENOENT;
}
}
void ReplicatedPG::op_read(MOSDOp *op)
{
object_t oid = op->get_oid();
- pobject_t poid(info.pgid.pool(), 0, oid);
+ sobject_t soid(oid, op->get_snapid());
- dout(10) << "op_read " << oid << " " << op->ops << dendl;
-
- object_info_t oi(poid);
+ dout(10) << "op_read " << soid << " " << op->ops << dendl;
- bufferlist bv;
- osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
- if (bv.length())
- oi.decode(bv);
+ bufferlist::iterator bp = op->get_data().begin();
+ bufferlist data;
+ int data_off = 0;
+ int result = 0;
+
+ // pick revision
+ object_info_t oi(soid);
+ if (soid.snap) {
+ result = pick_read_snap(soid, oi);
+ if (result == -EAGAIN) {
+ wait_for_missing_object(soid, op);
+ return;
+ }
+ if (result != 0)
+ goto done; // we have no revision for this request.
+ }
// wrlocked?
- if (poid.oid.snap == CEPH_NOSNAP &&
+ if (op->get_snapid() == CEPH_NOSNAP &&
block_if_wrlocked(op, oi))
return;
- bufferlist::iterator bp = op->get_data().begin();
- bufferlist data;
- int data_off = 0;
- int result = 0;
-
// !primary and unbalanced?
// (ignore ops forwarded from the primary)
if (!is_primary()) {
} else {
// make sure i exist and am balanced, otherwise fw back to acker.
bool b;
- if (!osd->store->exists(info.pgid.to_coll(), poid) ||
- osd->store->getattr(info.pgid.to_coll(), poid, "balance-reads", &b, 1) < 0) {
- dout(-10) << "read on replica, object " << poid
+ if (!osd->store->exists(info.pgid.to_coll(), soid) ||
+ osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) < 0) {
+ dout(-10) << "read on replica, object " << soid
<< " dne or no balance-reads, fw back to primary" << dendl;
osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
return;
}
// do it.
- if (poid.oid.snap) {
- result = pick_read_snap(poid, oi);
- if (result == -EAGAIN) {
- wait_for_missing_object(poid.oid, op);
- return;
- }
- if (result != 0)
- goto done; // we have no revision for this request.
- }
-
for (vector<ceph_osd_op>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
switch (p->op) {
case CEPH_OSD_OP_READ:
{
// read into a buffer
bufferlist bl;
- int r = osd->store->read(info.pgid.to_coll(), poid, p->offset, p->length, bl);
+ int r = osd->store->read(info.pgid.to_coll(), soid, p->offset, p->length, bl);
if (data.length() == 0)
data_off = p->offset;
data.claim(bl);
{
struct stat st;
memset(&st, sizeof(st), 0);
- int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
+ int r = osd->store->stat(info.pgid.to_coll(), soid, &st);
if (r >= 0)
p->length = st.st_size;
else
nstring name(p->name_len + 1);
name[0] = '_';
bp.copy(p->name_len, name.data()+1);
- int r = osd->store->getattr(info.pgid.to_coll(), poid, name.c_str(), data);
+ int r = osd->store->getattr(info.pgid.to_coll(), soid, name.c_str(), data);
if (r >= 0) {
p->value_len = r;
result = 0;
if (is_primary() &&
g_conf.osd_balance_reads)
- stat_object_temp_rd[oid].hit(now); // hit temp.
+ stat_object_temp_rd[soid].hit(now); // hit temp.
}
osd->messenger->send_message(reply, op->get_orig_source_inst());
void ReplicatedPG::_make_clone(ObjectStore::Transaction& t,
- pobject_t head, pobject_t coid,
+ sobject_t head, pobject_t coid,
eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector<snapid_t>& snaps)
{
object_info_t pi(coid);
}
void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid, pg_stat_t& stats,
- pobject_t poid, loff_t old_size, object_info_t& oi,
+ sobject_t soid, loff_t old_size, object_info_t& oi,
eversion_t& at_version, SnapContext& snapc)
{
// clone?
- assert(poid.oid.snap == CEPH_NOSNAP);
+ assert(soid.snap == CEPH_NOSNAP);
dout(20) << "snapset=" << oi.snapset << " snapc=" << snapc << dendl;;
// use newer snapc?
snapc.snaps.size() && // there are snaps
snapc.snaps[0] > oi.snapset.seq) { // existing object is old
// clone
- pobject_t coid = poid;
- coid.oid.snap = snapc.seq;
+ sobject_t coid = soid;
+ coid.snap = snapc.seq;
unsigned l;
for (l=1; l<snapc.snaps.size() && snapc.snaps[l] > oi.snapset.seq; l++) ;
snaps[i] = snapc.snaps[i];
// prepare clone
- _make_clone(t, poid, coid, oi.version, at_version, reqid, oi.mtime, snaps);
+ _make_clone(t, soid, coid, oi.version, at_version, reqid, oi.mtime, snaps);
// add to snap bound collections
coll_t fc = make_snap_collection(t, snaps[0]);
stats.num_objects++;
stats.num_object_clones++;
- oi.snapset.clones.push_back(coid.oid.snap);
- oi.snapset.clone_size[coid.oid.snap] = old_size;
- oi.snapset.clone_overlap[coid.oid.snap].insert(0, old_size);
+ oi.snapset.clones.push_back(coid.snap);
+ oi.snapset.clone_size[coid.snap] = old_size;
+ oi.snapset.clone_overlap[coid.snap].insert(0, old_size);
// log clone
dout(10) << "cloning v " << oi.version
<< " to " << coid << " v " << at_version
<< " snaps=" << snaps << dendl;
- Log::Entry cloneentry(PG::Log::Entry::CLONE, coid.oid, at_version, oi.version, reqid, oi.mtime);
+ Log::Entry cloneentry(PG::Log::Entry::CLONE, coid, at_version, oi.version, reqid, oi.mtime);
::encode(snaps, cloneentry.snaps);
add_log_entry(cloneentry, logbl);
// low level object operations
int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
- pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
+ sobject_t soid, __u64& old_size, bool& exists, object_info_t& oi,
vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp,
SnapContext& snapc)
{
case CEPH_OSD_OP_BALANCEREADS:
{
bool bal = true;
- t.setattr(info.pgid.to_coll(), poid, "balance-reads", &bal, sizeof(bal));
+ t.setattr(info.pgid.to_coll(), soid, "balance-reads", &bal, sizeof(bal));
}
break;
case CEPH_OSD_OP_UNBALANCEREADS:
{
- t.rmattr(info.pgid.to_coll(), poid, "balance-reads");
+ t.rmattr(info.pgid.to_coll(), soid, "balance-reads");
}
break;
assert(op.length);
bufferlist nbl;
bp.copy(op.length, nbl);
- t.write(info.pgid.to_coll(), poid, op.offset, op.length, nbl);
+ t.write(info.pgid.to_coll(), soid, op.offset, op.length, nbl);
if (oi.snapset.clones.size()) {
snapid_t newest = *oi.snapset.clones.rbegin();
interval_set<__u64> ch;
{ // write full object
bufferlist nbl;
bp.copy(op.length, nbl);
- t.truncate(info.pgid.to_coll(), poid, 0);
- t.write(info.pgid.to_coll(), poid, op.offset, op.length, nbl);
+ t.truncate(info.pgid.to_coll(), soid, 0);
+ t.write(info.pgid.to_coll(), soid, op.offset, op.length, nbl);
if (oi.snapset.clones.size()) {
snapid_t newest = *oi.snapset.clones.rbegin();
oi.snapset.clone_overlap.erase(newest);
{ // zero
assert(op.length);
if (!exists)
- t.touch(info.pgid.to_coll(), poid);
- t.zero(info.pgid.to_coll(), poid, op.offset, op.length);
+ t.touch(info.pgid.to_coll(), soid);
+ t.zero(info.pgid.to_coll(), soid, op.offset, op.length);
if (oi.snapset.clones.size()) {
snapid_t newest = *oi.snapset.clones.rbegin();
interval_set<__u64> ch;
case CEPH_OSD_OP_TRUNCATE:
{ // truncate
if (!exists)
- t.touch(info.pgid.to_coll(), poid);
- t.truncate(info.pgid.to_coll(), poid, op.offset);
+ t.touch(info.pgid.to_coll(), soid);
+ t.truncate(info.pgid.to_coll(), soid, op.offset);
if (oi.snapset.clones.size()) {
snapid_t newest = *oi.snapset.clones.rbegin();
interval_set<__u64> trim;
case CEPH_OSD_OP_DELETE:
{ // delete
- t.remove(info.pgid.to_coll(), poid);
+ t.remove(info.pgid.to_coll(), soid);
if (oi.snapset.clones.size()) {
snapid_t newest = *oi.snapset.clones.rbegin();
add_interval_usage(oi.snapset.clone_overlap[newest], st);
case CEPH_OSD_OP_SETXATTR:
{
if (!exists)
- t.touch(info.pgid.to_coll(), poid);
+ t.touch(info.pgid.to_coll(), soid);
nstring name(op.name_len + 1);
name[0] = '_';
bp.copy(op.name_len, name.data()+1);
bufferlist bl;
bp.copy(op.value_len, bl);
if (!oi.snapset.head_exists) // create object if it doesn't yet exist.
- t.touch(info.pgid.to_coll(), poid);
- t.setattr(info.pgid.to_coll(), poid, name, bl);
+ t.touch(info.pgid.to_coll(), soid);
+ t.setattr(info.pgid.to_coll(), soid, name, bl);
oi.snapset.head_exists = true;
}
break;
nstring name(op.name_len + 1);
name[0] = '_';
bp.copy(op.name_len, name.data()+1);
- t.rmattr(info.pgid.to_coll(), poid, name);
+ t.rmattr(info.pgid.to_coll(), soid, name);
}
break;
newop.op = CEPH_OSD_OP_WRITE;
newop.offset = old_size;
newop.length = op.length;
- prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, nops, 0, bp, snapc);
+ prepare_simple_op(t, reqid, st, soid, old_size, exists, oi, nops, 0, bp, snapc);
}
break;
newop.offset = op.truncate_size;
dout(10) << " seq " << op.truncate_seq << " > old_seq " << old_seq
<< ", truncating with " << newop << dendl;
- prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, nops, 0, bp, snapc);
+ prepare_simple_op(t, reqid, st, soid, old_size, exists, oi, nops, 0, bp, snapc);
} else {
// do smart truncate
interval_set<__u64> tm;
newop.op = CEPH_OSD_OP_ZERO;
newop.offset = p->first;
newop.length = p->second;
- prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, nops, 0, bp, snapc);
+ prepare_simple_op(t, reqid, st, soid, old_size, exists, oi, nops, 0, bp, snapc);
}
oi.truncate_info.clear();
}
void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
- pobject_t poid,
+ sobject_t soid,
vector<ceph_osd_op>& ops, bufferlist& bl, utime_t mtime,
bool& exists, __u64& size, object_info_t& oi,
eversion_t at_version, SnapContext& snapc,
bufferlist::iterator bp = bl.begin();
for (unsigned i=0; i<ops.size(); i++) {
// clone?
- if (!did_snap && poid.oid.snap &&
+ if (!did_snap && soid.snap &&
!ceph_osd_op_type_lock(ops[i].op)) { // is a (non-lock) modification
- prepare_clone(t, log_bl, reqid, info.stats, poid, size, oi,
+ prepare_clone(t, log_bl, reqid, info.stats, soid, size, oi,
at_version, snapc);
did_snap = true;
}
- prepare_simple_op(t, reqid, info.stats, poid, size, exists, oi,
+ prepare_simple_op(t, reqid, info.stats, soid, size, exists, oi,
ops, i, bp, snapc);
}
bufferlist bv(sizeof(oi));
::encode(oi, bv);
- t.setattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
+ t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
}
// append to log
int logopcode = Log::Entry::MODIFY;
if (!exists)
logopcode = Log::Entry::DELETE;
- Log::Entry logentry(logopcode, poid.oid, at_version, old_version, reqid, mtime);
+ Log::Entry logentry(logopcode, soid, at_version, old_version, reqid, mtime);
add_log_entry(logentry, log_bl);
// write pg info, log to disk
update_stats();
// any completion stuff to do here?
- object_t oid = repop->op->get_oid();
+ sobject_t soid(repop->op->get_oid(), CEPH_NOSNAP);
ceph_osd_op& first = repop->op->ops[0];
switch (first.op) {
+#if 0
case CEPH_OSD_OP_UNBALANCEREADS:
dout(-10) << "apply_repop completed unbalance-reads on " << oid << dendl;
unbalancing_reads.erase(oid);
}
*/
break;
-
+#endif
+
case CEPH_OSD_OP_WRUNLOCK:
- dout(-10) << "apply_repop completed wrunlock on " << oid << dendl;
- if (waiting_for_wr_unlock.count(oid)) {
- osd->take_waiters(waiting_for_wr_unlock[oid]);
- waiting_for_wr_unlock.erase(oid);
+ dout(-10) << "apply_repop completed wrunlock on " << soid << dendl;
+ if (waiting_for_wr_unlock.count(soid)) {
+ osd->take_waiters(waiting_for_wr_unlock[soid]);
+ waiting_for_wr_unlock.erase(soid);
}
break;
}
void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
{
- pobject_t poid(info.pgid.pool(), 0, repop->op->get_oid());
+ sobject_t soid(repop->op->get_oid(), CEPH_NOSNAP);
dout(7) << " issue_repop rep_tid " << repop->rep_tid
- << " o " << poid
+ << " o " << soid
<< " to osd" << dest
<< dendl;
// forward the write/update/whatever
int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
- MOSDSubOp *wr = new MOSDSubOp(repop->op->get_reqid(), info.pgid, poid,
+ MOSDSubOp *wr = new MOSDSubOp(repop->op->get_reqid(), info.pgid, soid,
repop->op->ops, repop->noop, acks_wanted,
osd->osdmap->get_epoch(),
repop->rep_tid, repop->at_version);
void ReplicatedPG::op_modify(MOSDOp *op)
{
int whoami = osd->get_nodeid();
- pobject_t poid(info.pgid.pool(), 0, op->get_oid());
+ sobject_t soid(op->get_oid(), CEPH_NOSNAP);
// balance-reads set?
#if 0
char v;
if ((op->get_op() != CEPH_OSD_OP_BALANCEREADS && op->get_op() != CEPH_OSD_OP_UNBALANCEREADS) &&
- (osd->store->getattr(info.pgid.to_coll(), poid, "balance-reads", &v, 1) >= 0 ||
- balancing_reads.count(poid.oid))) {
+ (osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &v, 1) >= 0 ||
+ balancing_reads.count(soid.oid))) {
- if (!unbalancing_reads.count(poid.oid)) {
+ if (!unbalancing_reads.count(soid.oid)) {
// unbalance
- dout(-10) << "preprocess_op unbalancing-reads on " << poid.oid << dendl;
- unbalancing_reads.insert(poid.oid);
+ dout(-10) << "preprocess_op unbalancing-reads on " << soid.oid << dendl;
+ unbalancing_reads.insert(soid.oid);
ceph_object_layout layout;
layout.ol_pgid = info.pgid.u.pg64;
layout.ol_stripe_unit = 0;
MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
- poid.oid,
+ soid.oid,
layout,
osd->osdmap->get_epoch(),
CEPH_OSD_OP_UNBALANCEREADS, 0);
}
// add to wait queue
- dout(-10) << "preprocess_op waiting for unbalance-reads on " << poid.oid << dendl;
- waiting_for_unbalanced_reads[poid.oid].push_back(op);
+ dout(-10) << "preprocess_op waiting for unbalance-reads on " << soid.oid << dendl;
+ waiting_for_unbalanced_reads[soid.oid].push_back(op);
return;
}
#endif
// get existing object info
- ProjectedObjectInfo *pinfo = get_projected_object(poid);
+ ProjectedObjectInfo *pinfo = get_projected_object(soid);
// --- locking ---
op->set_version(at_version);
dout(10) << "op_modify " << opname
- << " " << poid.oid
+ << " " << soid
<< " ov " << pinfo->oi.version << " av " << at_version
<< " snapc " << snapc
<< " snapset " << pinfo->oi.snapset
if ((op->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) &&
snapc.seq < pinfo->oi.snapset.seq) {
dout(10) << " ORDERSNAP flag set and snapc seq " << snapc.seq << " < snapset seq " << pinfo->oi.snapset.seq
- << " on " << poid << dendl;
+ << " on " << soid << dendl;
osd->reply_op_error(op, -EOLDSNAPC);
return;
}
for (unsigned i=1; i<acting.size(); i++) {
int peer = acting[i];
if (peer_missing.count(peer) &&
- peer_missing[peer].is_missing(poid.oid)) {
+ peer_missing[peer].is_missing(soid)) {
// push it before this update.
// FIXME, this is probably extra much work (eg if we're about to overwrite)
- push_to_replica(poid, peer);
+ push_to_replica(soid, peer);
osd->start_recovery_op(this, 1);
}
}
// we are acker.
if (!noop) {
// log and update later.
- prepare_transaction(repop->t, op->get_reqid(), poid, op->ops, op->get_data(), repop->mtime,
+ prepare_transaction(repop->t, op->get_reqid(), soid, op->ops, op->get_data(), repop->mtime,
pinfo->exists, pinfo->size, pinfo->oi,
at_version, snapc,
trim_to);
void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
{
- pobject_t poid = op->poid;
+ sobject_t soid = op->poid;
const char *opname;
if (op->noop)
opname = ceph_osd_op_name(op->ops[0].op);
dout(10) << "sub_op_modify " << opname
- << " " << poid
+ << " " << soid
<< " v " << op->version
<< dendl;
osd->take_peer_stat(fromosd, op->peer_stat);
// we better not be missing this.
- assert(!missing.is_missing(poid.oid));
+ assert(!missing.is_missing(soid));
// prepare our transaction
ObjectStore::Transaction t;
for (int j=snapset.clones.size()-1; j>=0; j--) {
pobject_t c = head;
- c.oid.snap = snapset.clones[j];
+ c.snap = snapset.clones[j];
prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
- if (!missing.is_missing(c.oid)) {
+ if (!missing.is_missing(c)) {
dout(10) << "calc_head_subsets " << head << " has prev " << c
<< " overlap " << prev << dendl;
clone_subsets[c] = prev;
<< " clone_subsets " << clone_subsets << dendl;
}
-void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, pobject_t poid,
+void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, sobject_t soid,
Missing& missing,
interval_set<__u64>& data_subset,
map<pobject_t, interval_set<__u64> >& clone_subsets)
{
- dout(10) << "calc_clone_subsets " << poid
+ dout(10) << "calc_clone_subsets " << soid
<< " clone_overlap " << snapset.clone_overlap << dendl;
- __u64 size = snapset.clone_size[poid.oid.snap];
+ __u64 size = snapset.clone_size[soid.snap];
unsigned i;
for (i=0; i < snapset.clones.size(); i++)
- if (snapset.clones[i] == poid.oid.snap)
+ if (snapset.clones[i] == soid.snap)
break;
// any overlap with next older clone?
if (size)
prev.insert(0, size);
for (int j=i-1; j>=0; j--) {
- pobject_t c = poid;
- c.oid.snap = snapset.clones[j];
+ sobject_t c = soid;
+ c.snap = snapset.clones[j];
prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
- if (!missing.is_missing(c.oid)) {
- dout(10) << "calc_clone_subsets " << poid << " has prev " << c
+ if (!missing.is_missing(c)) {
+ dout(10) << "calc_clone_subsets " << soid << " has prev " << c
<< " overlap " << prev << dendl;
clone_subsets[c] = prev;
cloning.union_of(prev);
break;
}
- dout(10) << "calc_clone_subsets " << poid << " does not have prev " << c
+ dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
<< " overlap " << prev << dendl;
}
if (size)
next.insert(0, size);
for (unsigned j=i+1; j<snapset.clones.size(); j++) {
- pobject_t c = poid;
- c.oid.snap = snapset.clones[j];
+ pobject_t c = soid;
+ c.snap = snapset.clones[j];
next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
- if (!missing.is_missing(c.oid)) {
- dout(10) << "calc_clone_subsets " << poid << " has next " << c
+ if (!missing.is_missing(c)) {
+ dout(10) << "calc_clone_subsets " << soid << " has next " << c
<< " overlap " << next << dendl;
clone_subsets[c] = next;
cloning.union_of(next);
break;
}
- dout(10) << "calc_clone_subsets " << poid << " does not have next " << c
+ dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
<< " overlap " << next << dendl;
}
data_subset.insert(0, size);
data_subset.subtract(cloning);
- dout(10) << "calc_clone_subsets " << poid
+ dout(10) << "calc_clone_subsets " << soid
<< " data_subset " << data_subset
<< " clone_subsets " << clone_subsets << dendl;
}
/** pull - request object from a peer
*/
-bool ReplicatedPG::pull(pobject_t poid)
+bool ReplicatedPG::pull(sobject_t soid)
{
- eversion_t v = missing.missing[poid.oid].need;
+ eversion_t v = missing.missing[soid].need;
int fromosd = -1;
- assert(missing_loc.count(poid.oid));
- for (set<int>::iterator p = missing_loc[poid.oid].begin();
- p != missing_loc[poid.oid].end();
+ assert(missing_loc.count(soid));
+ for (set<int>::iterator p = missing_loc[soid].begin();
+ p != missing_loc[soid].end();
p++) {
if (osd->osdmap->is_up(*p)) {
fromosd = *p;
}
}
- dout(7) << "pull " << poid
+ dout(7) << "pull " << soid
<< " v " << v
- << " on osds " << missing_loc[poid.oid]
+ << " on osds " << missing_loc[soid]
<< " from osd" << fromosd
<< dendl;
interval_set<__u64> data_subset;
// is this a snapped object? if so, consult the snapset.. we may not need the entire object!
- if (poid.oid.snap && poid.oid.snap < CEPH_NOSNAP) {
- pobject_t head = poid;
- head.oid.snap = CEPH_NOSNAP;
+ if (soid.snap < CEPH_NOSNAP) {
+ pobject_t head = soid;
+ head.snap = CEPH_NOSNAP;
// do we have the head?
- if (missing.is_missing(head.oid)) {
- if (pulling.count(head.oid)) {
+ if (missing.is_missing(head)) {
+ if (pulling.count(head)) {
dout(10) << " missing but already pulling head " << head << dendl;
return false;
} else {
object_info_t oi(bl);
dout(10) << " snapset " << oi.snapset << dendl;
- calc_clone_subsets(oi.snapset, poid, missing,
+ calc_clone_subsets(oi.snapset, soid, missing,
data_subset, clone_subsets);
// FIXME: this may overestimate if we are pulling multiple clones in parallel...
dout(10) << " pulling " << data_subset << ", will clone " << clone_subsets
tid_t tid = osd->get_tid();
vector<ceph_osd_op> pull(1);
pull[0].op = CEPH_OSD_OP_PULL;
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, pull, false, CEPH_OSD_FLAG_ACK,
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, pull, false, CEPH_OSD_FLAG_ACK,
osd->osdmap->get_epoch(), tid, v);
subop->data_subset.swap(data_subset);
// do not include clone_subsets in pull request; we will recalculate this
osd->messenger->send_message(subop, osd->osdmap->get_inst(fromosd));
// take note
- assert(pulling.count(poid.oid) == 0);
- pulling[poid.oid].first = v;
- pulling[poid.oid].second = fromosd;
+ assert(pulling.count(soid) == 0);
+ pulling[soid].first = v;
+ pulling[soid].second = fromosd;
return true;
}
* intelligently push an object to a replica. make use of existing
* clones/heads and dup data ranges where possible.
*/
-void ReplicatedPG::push_to_replica(pobject_t poid, int peer)
+void ReplicatedPG::push_to_replica(sobject_t soid, int peer)
{
- dout(10) << "push_to_replica " << poid << " osd" << peer << dendl;
+ dout(10) << "push_to_replica " << soid << " osd" << peer << dendl;
// get size
struct stat st;
- int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
+ int r = osd->store->stat(info.pgid.to_coll(), soid, &st);
assert(r == 0);
map<pobject_t, interval_set<__u64> > clone_subsets;
interval_set<__u64> data_subset;
bufferlist bv;
- r = osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
+ r = osd->store->getattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
assert(r >= 0);
object_info_t oi(bv);
// are we doing a clone on the replica?
- if (poid.oid.snap && poid.oid.snap < CEPH_NOSNAP) {
- pobject_t head = poid;
- head.oid.snap = CEPH_NOSNAP;
- if (peer_missing[peer].is_missing(head.oid) &&
- peer_missing[peer].have_old(head.oid) == oi.prior_version) {
+ if (soid.snap < CEPH_NOSNAP) {
+ pobject_t head = soid;
+ head.snap = CEPH_NOSNAP;
+ if (peer_missing[peer].is_missing(head) &&
+ peer_missing[peer].have_old(head) == oi.prior_version) {
dout(10) << "push_to_replica osd" << peer << " has correct old " << head
<< " v" << oi.prior_version
- << ", pushing " << poid << " attrs as a clone op" << dendl;
+ << ", pushing " << soid << " attrs as a clone op" << dendl;
interval_set<__u64> data_subset;
map<pobject_t, interval_set<__u64> > clone_subsets;
clone_subsets[head].insert(0, st.st_size);
- push(poid, peer, data_subset, clone_subsets);
+ push(soid, peer, data_subset, clone_subsets);
return;
}
// try to base push off of clones that succeed/preceed poid
// we need the head (and current SnapSet) to do that.
- if (missing.is_missing(head.oid)) {
+ if (missing.is_missing(head)) {
dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
- return push(poid, peer); // no head. push manually.
+ return push(soid, peer); // no head. push manually.
}
bufferlist bl;
object_info_t hoi(bl);
dout(15) << "push_to_replica head snapset is " << hoi.snapset << dendl;
- calc_clone_subsets(hoi.snapset, poid, peer_missing[peer],
+ calc_clone_subsets(hoi.snapset, soid, peer_missing[peer],
data_subset, clone_subsets);
} else {
// pushing head.
// base this on partially on replica's clones?
dout(15) << "push_to_replica head snapset is " << oi.snapset << dendl;
- calc_head_subsets(oi.snapset, poid, peer_missing[peer], data_subset, clone_subsets);
+ calc_head_subsets(oi.snapset, soid, peer_missing[peer], data_subset, clone_subsets);
}
- dout(10) << "push_to_replica " << poid << " pushing " << data_subset
+ dout(10) << "push_to_replica " << soid << " pushing " << data_subset
<< " cloning " << clone_subsets << dendl;
- push(poid, peer, data_subset, clone_subsets);
+ push(soid, peer, data_subset, clone_subsets);
}
/*
* push - send object to a peer
*/
-void ReplicatedPG::push(pobject_t poid, int peer)
+void ReplicatedPG::push(sobject_t soid, int peer)
{
interval_set<__u64> subset;
map<pobject_t, interval_set<__u64> > clone_subsets;
- push(poid, peer, subset, clone_subsets);
+ push(soid, peer, subset, clone_subsets);
}
-void ReplicatedPG::push(pobject_t poid, int peer,
+void ReplicatedPG::push(sobject_t soid, int peer,
interval_set<__u64> &data_subset,
map<pobject_t, interval_set<__u64> >& clone_subsets)
{
if (data_subset.size() || clone_subsets.size()) {
struct stat st;
- int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
+ int r = osd->store->stat(info.pgid.to_coll(), soid, &st);
assert(r == 0);
size = st.st_size;
p != data_subset.m.end();
p++) {
bufferlist bit;
- osd->store->read(info.pgid.to_coll(), poid, p->first, p->second, bit);
+ osd->store->read(info.pgid.to_coll(), soid, p->first, p->second, bit);
bl.claim_append(bit);
}
} else {
- osd->store->read(info.pgid.to_coll(), poid, 0, 0, bl);
+ osd->store->read(info.pgid.to_coll(), soid, 0, 0, bl);
size = bl.length();
}
- osd->store->getattrs(info.pgid.to_coll(), poid, attrset);
+ osd->store->getattrs(info.pgid.to_coll(), soid, attrset);
bufferlist bv;
bv.push_back(attrset[OI_ATTR]);
object_info_t oi(bv);
// ok
- dout(7) << "push " << poid << " v " << oi.version
+ dout(7) << "push " << soid << " v " << oi.version
<< " size " << size
<< " subset " << data_subset
<< " data " << bl.length()
push[0].op = CEPH_OSD_OP_PUSH;
push[0].offset = 0;
push[0].length = size;
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, push, false, 0,
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, push, false, 0,
osd->osdmap->get_epoch(), osd->get_tid(), oi.version);
subop->data_subset.swap(data_subset);
subop->clone_subsets.swap(clone_subsets);
osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));
if (is_primary()) {
- peer_missing[peer].got(poid.oid, oi.version);
- pushing[poid.oid].insert(peer);
+ peer_missing[peer].got(soid, oi.version);
+ pushing[soid].insert(peer);
}
}
dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
int peer = reply->get_source().num();
- pobject_t poid = reply->get_poid();
+ sobject_t soid = reply->get_poid();
- if (pushing.count(poid.oid) &&
- pushing[poid.oid].count(peer)) {
- pushing[poid.oid].erase(peer);
+ if (pushing.count(soid) &&
+ pushing[soid].count(peer)) {
+ pushing[soid].erase(peer);
if (peer_missing.count(peer) == 0 ||
peer_missing[peer].num_missing() == 0)
update_stats();
- if (pushing[poid.oid].empty()) {
- dout(10) << "pushed " << poid << " to all replicas" << dendl;
+ if (pushing[soid].empty()) {
+ dout(10) << "pushed " << soid << " to all replicas" << dendl;
finish_recovery_op();
} else {
- dout(10) << "pushed " << poid << ", still waiting for push ack from "
- << pushing[poid.oid] << dendl;
+ dout(10) << "pushed " << soid << ", still waiting for push ack from "
+ << pushing[soid] << dendl;
}
} else {
- dout(10) << "huh, i wasn't pushing " << poid << dendl;
+ dout(10) << "huh, i wasn't pushing " << soid << dendl;
}
delete reply;
}
*/
void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
{
- const pobject_t poid = op->poid;
+ const pobject_t soid = op->poid;
const eversion_t v = op->version;
- dout(7) << "op_pull " << poid << " v " << op->version
+ dout(7) << "op_pull " << soid << " v " << op->version
<< " from " << op->get_source()
<< dendl;
assert(!is_primary()); // we should be a replica or stray.
- push(poid, op->get_source().num(), op->data_subset, op->clone_subsets);
+ push(soid, op->get_source().num(), op->data_subset, op->clone_subsets);
delete op;
}
*/
void ReplicatedPG::sub_op_push(MOSDSubOp *op)
{
- pobject_t poid = op->poid;
+ sobject_t soid = op->poid;
eversion_t v = op->version;
ceph_osd_op& push = op->ops[0];
dout(7) << "op_push "
- << poid
+ << soid
<< " v " << v
<< " len " << push.length
<< " data_subset " << op->data_subset
// are we missing (this specific version)?
// (if version is wrong, it is either old (we don't want it) or
// newer (peering is buggy))
- if (!missing.is_missing(poid.oid, v)) {
- dout(7) << "sub_op_push not missing " << poid << " v" << v << dendl;
+ if (!missing.is_missing(soid, v)) {
+ dout(7) << "sub_op_push not missing " << soid << " v" << v << dendl;
dout(15) << " but i AM missing " << missing.missing << dendl;
delete op;
return;
clone_subsets = op->clone_subsets;
if (is_primary()) {
- if (poid.oid.snap && poid.oid.snap < CEPH_NOSNAP) {
+ if (soid.snap < CEPH_NOSNAP) {
// clone. make sure we have enough data.
- pobject_t head = poid;
- head.oid.snap = CEPH_NOSNAP;
- assert(!missing.is_missing(head.oid));
+ pobject_t head = soid;
+ head.snap = CEPH_NOSNAP;
+ assert(!missing.is_missing(head));
bufferlist bl;
int r = osd->store->getattr(info.pgid.to_coll(), head, OI_ATTR, bl);
clone_subsets.clear(); // forget what pusher said; recalculate cloning.
interval_set<__u64> data_needed;
- calc_clone_subsets(hoi.snapset, poid, missing, data_needed, clone_subsets);
+ calc_clone_subsets(hoi.snapset, soid, missing, data_needed, clone_subsets);
dout(10) << "sub_op_push need " << data_needed << ", got " << data_subset << dendl;
if (!data_needed.subset_of(data_subset)) {
- dout(0) << " we did not get enough of " << poid << " object data" << dendl;
+ dout(0) << " we did not get enough of " << soid << " object data" << dendl;
delete op;
return;
}
// write object and add it to the PG
ObjectStore::Transaction t;
- t.remove(info.pgid.to_coll(), poid); // in case old version exists
+ t.remove(info.pgid.to_coll(), soid); // in case old version exists
__u64 boff = 0;
for (map<pobject_t, interval_set<__u64> >::iterator p = clone_subsets.begin();
q != p->second.m.end();
q++) {
dout(15) << " clone_range " << p->first << " " << q->first << "~" << q->second << dendl;
- t.clone_range(info.pgid.to_coll(), poid, p->first, q->first, q->second);
+ t.clone_range(info.pgid.to_coll(), soid, p->first, q->first, q->second);
}
for (map<__u64,__u64>::iterator p = data_subset.m.begin();
p != data_subset.m.end();
p++) {
bufferlist bit;
bit.substr_of(data, boff, p->second);
- t.write(info.pgid.to_coll(), poid, p->first, p->second, bit);
+ t.write(info.pgid.to_coll(), soid, p->first, p->second, bit);
dout(15) << " write " << p->first << "~" << p->second << dendl;
boff += p->second;
}
if (data_subset.empty())
- t.touch(info.pgid.to_coll(), poid);
+ t.touch(info.pgid.to_coll(), soid);
- t.setattrs(info.pgid.to_coll(), poid, op->attrset);
- if (poid.oid.snap && poid.oid.snap != CEPH_NOSNAP &&
+ t.setattrs(info.pgid.to_coll(), soid, op->attrset);
+ if (soid.snap != CEPH_NOSNAP &&
op->attrset.count(OI_ATTR)) {
bufferlist bl;
bl.push_back(op->attrset[OI_ATTR]);
object_info_t oi(bl);
if (oi.snaps.size()) {
coll_t lc = make_snap_collection(t, oi.snaps[0]);
- t.collection_add(lc, info.pgid.to_coll(), poid);
+ t.collection_add(lc, info.pgid.to_coll(), soid);
if (oi.snaps.size() > 1) {
coll_t hc = make_snap_collection(t, oi.snaps[oi.snaps.size()-1]);
- t.collection_add(hc, info.pgid.to_coll(), poid);
+ t.collection_add(hc, info.pgid.to_coll(), soid);
}
}
}
- missing.got(poid.oid, v);
+ missing.got(soid, v);
// raise last_complete?
while (log.complete_to != log.log.end()) {
- if (missing.missing.count(log.complete_to->oid))
+ if (missing.missing.count(log.complete_to->soid))
break;
if (info.last_complete < log.complete_to->version)
info.last_complete = log.complete_to->version;
if (is_primary()) {
- missing_loc.erase(poid.oid);
+ missing_loc.erase(soid);
// close out pull op?
- if (pulling.count(poid.oid)) {
- pulling.erase(poid.oid);
+ if (pulling.count(soid)) {
+ pulling.erase(soid);
finish_recovery_op();
}
for (unsigned i=1; i<acting.size(); i++) {
int peer = acting[i];
assert(peer_missing.count(peer));
- if (peer_missing[peer].is_missing(poid.oid)) {
- push_to_replica(poid, peer); // ok, push it, and they (will) have it now.
+ if (peer_missing[peer].is_missing(soid)) {
+ push_to_replica(soid, peer); // ok, push it, and they (will) have it now.
osd->start_recovery_op(this, 1);
}
}
delete op;
// kick waiters
- if (waiting_for_missing_object.count(poid.oid)) {
- osd->take_waiters(waiting_for_missing_object[poid.oid]);
- waiting_for_missing_object.erase(poid.oid);
+ if (waiting_for_missing_object.count(soid)) {
+ osd->take_waiters(waiting_for_missing_object[soid]);
+ waiting_for_missing_object.erase(soid);
}
}
dout(10) << "on_role_change" << dendl;
// take object waiters
- for (hash_map<object_t, list<Message*> >::iterator it = waiting_for_missing_object.begin();
+ for (hash_map<sobject_t, list<Message*> >::iterator it = waiting_for_missing_object.begin();
it != waiting_for_missing_object.end();
it++)
osd->take_waiters(it->second);
int started = 0;
int skipped = 0;
- map<object_t, Missing::item>::iterator p = missing.missing.lower_bound(log.last_requested);
+ map<sobject_t, Missing::item>::iterator p = missing.missing.lower_bound(log.last_requested);
while (p != missing.missing.end()) {
assert(log.objects.count(p->first));
latest = log.objects[p->first];
assert(latest);
- pobject_t poid(info.pgid.pool(), 0, latest->oid);
- pobject_t head = poid;
- head.oid.snap = CEPH_NOSNAP;
+ sobject_t soid(latest->soid);
+ sobject_t head = soid;
+ head.snap = CEPH_NOSNAP;
dout(10) << "recover_primary "
<< *latest
<< (latest->is_update() ? " (update)":"")
- << (missing.is_missing(latest->oid) ? " (missing)":"")
- << (missing.is_missing(head.oid) ? " (missing head)":"")
- << (pulling.count(latest->oid) ? " (pulling)":"")
- << (pulling.count(head.oid) ? " (pulling head)":"")
+ << (missing.is_missing(latest->soid) ? " (missing)":"")
+ << (missing.is_missing(head) ? " (missing head)":"")
+ << (pulling.count(latest->soid) ? " (pulling)":"")
+ << (pulling.count(head) ? " (pulling head)":"")
<< dendl;
assert(latest->is_update());
- if (!pulling.count(latest->oid)) {
- if (pulling.count(head.oid)) {
+ if (!pulling.count(latest->soid)) {
+ if (pulling.count(head)) {
++skipped;
} else {
// is this a clone operation that we can do locally?
if (latest->op == Log::Entry::CLONE) {
- if (missing.is_missing(head.oid) &&
- missing.have_old(head.oid) == latest->prior_version) {
+ if (missing.is_missing(head) &&
+ missing.have_old(head) == latest->prior_version) {
dout(10) << "recover_primary cloning " << head << " v" << latest->prior_version
- << " to " << poid << " v" << latest->version
+ << " to " << soid << " v" << latest->version
<< " snaps " << latest->snaps << dendl;
vector<snapid_t> snaps;
::decode(snaps, latest->snaps);
ObjectStore::Transaction t;
- _make_clone(t, head, poid, latest->prior_version, latest->version, latest->reqid, latest->mtime, snaps);
+ _make_clone(t, head, soid, latest->prior_version, latest->version, latest->reqid, latest->mtime, snaps);
osd->store->apply_transaction(t);
- missing.got(latest->oid, latest->version);
- missing_loc.erase(latest->oid);
+ missing.got(latest->soid, latest->version);
+ missing_loc.erase(latest->soid);
continue;
}
}
- if (pull(poid))
+ if (pull(soid))
++started;
else
++skipped;
// only advance last_requested if we haven't skipped anything
if (!skipped)
- log.last_requested = latest->oid;
+ log.last_requested = latest->soid;
}
// done?
continue;
// oldest first!
- object_t oid = peer_missing[peer].rmissing.begin()->second;
- pobject_t poid(info.pgid.pool(), 0, oid);
+ sobject_t soid = peer_missing[peer].rmissing.begin()->second;
eversion_t v = peer_missing[peer].rmissing.begin()->first;
- push_to_replica(poid, peer);
+ push_to_replica(soid, peer);
// do other peers need it too?
for (i++; i<acting.size(); i++) {
int peer = acting[i];
if (peer_missing.count(peer) &&
- peer_missing[peer].is_missing(oid))
- push_to_replica(poid, peer);
+ peer_missing[peer].is_missing(soid))
+ push_to_replica(soid, peer);
}
if (++started >= max)
if (ls.size() != info.stats.num_objects)
dout(10) << " WARNING: " << ls.size() << " != num_objects " << info.stats.num_objects << dendl;
- set<object_t> s;
+ set<sobject_t> s;
for (vector<pobject_t>::iterator i = ls.begin();
i != ls.end();
i++)
- s.insert(i->oid);
+ s.insert(*i);
- set<object_t> did;
+ set<sobject_t> did;
for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
p != log.log.rend();
p++) {
- if (did.count(p->oid)) continue;
- did.insert(p->oid);
+ if (did.count(p->soid)) continue;
+ did.insert(p->soid);
if (p->is_delete()) {
- if (s.count(p->oid)) {
- pobject_t poid(info.pgid.pool(), 0, p->oid);
- dout(10) << " deleting " << poid
+ if (s.count(p->soid)) {
+ dout(10) << " deleting " << p->soid
<< " when " << p->version << dendl;
- t.remove(info.pgid.to_coll(), poid);
+ t.remove(info.pgid.to_coll(), p->soid);
}
- s.erase(p->oid);
+ s.erase(p->soid);
} else {
// just leave old objects.. they're missing or whatever
- s.erase(p->oid);
+ s.erase(p->soid);
}
}
- for (set<object_t>::iterator i = s.begin();
+ for (set<sobject_t>::iterator i = s.begin();
i != s.end();
i++) {
- pobject_t poid(info.pgid.pool(), 0, *i);
- dout(10) << " deleting stray " << poid << dendl;
- t.remove(info.pgid.to_coll(), poid);
+ dout(10) << " deleting stray " << *i << dendl;
+ t.remove(info.pgid.to_coll(), *i);
}
} else {
// just scan the log.
- set<object_t> did;
+ set<sobject_t> did;
for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
p != log.log.rend();
p++) {
- if (did.count(p->oid)) continue;
- did.insert(p->oid);
+ if (did.count(p->soid)) continue;
+ did.insert(p->soid);
if (p->is_delete()) {
- pobject_t poid(info.pgid.pool(), 0, p->oid);
- dout(10) << " deleting " << poid
+ dout(10) << " deleting " << p->soid
<< " when " << p->version << dendl;
- t.remove(info.pgid.to_coll(), poid);
+ t.remove(info.pgid.to_coll(), p->soid);
} else {
// keep old(+missing) objects, just for kicks.
}
coll_t c = info.pgid.to_coll();
// traverse in reverse order.
- pobject_t head;
+ sobject_t head;
SnapSet snapset;
unsigned curclone = 0;
for (vector<ScrubMap::object>::reverse_iterator p = scrubmap.objects.rbegin();
p != scrubmap.objects.rend();
p++) {
- pobject_t poid = p->poid;
+ sobject_t soid = p->poid;
stat.num_objects++;
// basic checks.
if (p->attrs.count(OI_ATTR) == 0) {
- dout(0) << "scrub no '" << OI_ATTR << "' attr on " << poid << dendl;
+ dout(0) << "scrub no '" << OI_ATTR << "' attr on " << soid << dendl;
errors++;
continue;
}
bv.push_back(p->attrs[OI_ATTR]);
object_info_t oi(bv);
- dout(20) << "scrub " << poid << " " << oi << dendl;
+ dout(20) << "scrub " << soid << " " << oi << dendl;
stat.num_bytes += p->size;
stat.num_kb += SHIFT_ROUND_UP(p->size, 10);
//assert(data.length() == p->size);
// new head?
- if (poid.oid.snap == CEPH_NOSNAP) {
+ if (soid.snap == CEPH_NOSNAP) {
// it's a head.
- if (head != pobject_t()) {
+ if (head != sobject_t()) {
derr(0) << " missing clone(s) for " << head << dendl;
- assert(head == pobject_t()); // we had better be done
+ assert(head == sobject_t()); // we had better be done
errors++;
}
// what will be next?
if (snapset.clones.empty())
- head = pobject_t(); // no clones.
+ head = sobject_t(); // no clones.
else
curclone = snapset.clones.size()-1;
}
}
- } else if (poid.oid.snap) {
+ } else if (soid.snap) {
// it's a clone
- assert(head != pobject_t());
+ assert(head != sobject_t());
stat.num_object_clones++;
- assert(poid.oid.snap == snapset.clones[curclone]);
+ assert(soid.snap == snapset.clones[curclone]);
assert(p->size == snapset.clone_size[curclone]);
// what's next?
curclone++;
if (curclone == snapset.clones.size())
- head = pobject_t();
+ head = sobject_t();
} else {
// it's unversioned.
*/
struct ProjectedObjectInfo {
int ref;
- pobject_t poid;
+ sobject_t poid;
bool exists;
__u64 size;
// projected object info
- map<pobject_t, ProjectedObjectInfo> projected_objects;
+ map<sobject_t, ProjectedObjectInfo> projected_objects;
- ProjectedObjectInfo *get_projected_object(pobject_t poid);
+ ProjectedObjectInfo *get_projected_object(sobject_t poid);
void put_projected_object(ProjectedObjectInfo *pinfo);
bool is_write_in_progress() {
}
// load balancing
- set<object_t> balancing_reads;
- set<object_t> unbalancing_reads;
- hash_map<object_t, list<Message*> > waiting_for_unbalanced_reads; // i.e. primary-lock
+ set<sobject_t> balancing_reads;
+ set<sobject_t> unbalancing_reads;
+ hash_map<sobject_t, list<Message*> > waiting_for_unbalanced_reads; // i.e. primary-lock
// push/pull
- map<object_t, pair<eversion_t, int> > pulling; // which objects are currently being pulled, and from where
- map<object_t, set<int> > pushing;
+ map<sobject_t, pair<eversion_t, int> > pulling; // which objects are currently being pulled, and from where
+ map<sobject_t, set<int> > pushing;
- void calc_head_subsets(SnapSet& snapset, pobject_t head,
+ void calc_head_subsets(SnapSet& snapset, sobject_t head,
Missing& missing,
interval_set<__u64>& data_subset,
- map<pobject_t, interval_set<__u64> >& clone_subsets);
- void calc_clone_subsets(SnapSet& snapset, pobject_t poid, Missing& missing,
+ map<sobject_t, interval_set<__u64> >& clone_subsets);
+ void calc_clone_subsets(SnapSet& snapset, sobject_t poid, Missing& missing,
interval_set<__u64>& data_subset,
- map<pobject_t, interval_set<__u64> >& clone_subsets);
- void push_to_replica(pobject_t oid, int dest);
- void push(pobject_t oid, int dest);
- void push(pobject_t oid, int dest, interval_set<__u64>& data_subset,
- map<pobject_t, interval_set<__u64> >& clone_subsets);
- bool pull(pobject_t oid);
+ map<sobject_t, interval_set<__u64> >& clone_subsets);
+ void push_to_replica(sobject_t oid, int dest);
+ void push(sobject_t oid, int dest);
+ void push(sobject_t oid, int dest, interval_set<__u64>& data_subset,
+ map<sobject_t, interval_set<__u64> >& clone_subsets);
+ bool pull(sobject_t oid);
// modify
void sub_op_modify_ondisk(MOSDSubOp *op, int ackerosd, eversion_t last_complete);
void _make_clone(ObjectStore::Transaction& t,
- pobject_t head, pobject_t coid,
+ sobject_t head, sobject_t coid,
eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector<snapid_t>& snaps);
void prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid, pg_stat_t& st,
- pobject_t poid, loff_t old_size, object_info_t& oi,
+ sobject_t poid, loff_t old_size, object_info_t& oi,
eversion_t& at_version, SnapContext& snapc);
void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st);
int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
- pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
+ sobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp, SnapContext& snapc);
void prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
- pobject_t poid,
+ sobject_t poid,
vector<ceph_osd_op>& ops, bufferlist& bl, utime_t mtime,
bool& exists, __u64& size, object_info_t& oi,
eversion_t at_version, SnapContext& snapc,
int recover_primary(int max);
int recover_replicas(int max);
- int pick_read_snap(pobject_t& poid, object_info_t& coi);
+ int pick_read_snap(sobject_t& poid, object_info_t& coi);
void op_read(MOSDOp *op);
void op_modify(MOSDOp *op);
bool same_for_modify_since(epoch_t e);
bool same_for_rep_modify_since(epoch_t e);
- bool is_missing_object(object_t oid);
- void wait_for_missing_object(object_t oid, Message *op);
+ bool is_missing_object(sobject_t oid);
+ void wait_for_missing_object(sobject_t oid, Message *op);
void on_osd_failure(int o);
void on_acker_change();
#include "msg/msg_types.h"
#include "include/types.h"
-#include "include/pobject.h"
#include "include/interval_set.h"
#include "include/nstring.h"
//#define CEPH_POOL(poolset, size) (((poolset) << 8) + (size))
-#define OSD_SUPERBLOCK_POBJECT pobject_t(CEPH_OSDMETADATA_NS, 0, object_t(0,0))
+#define OSD_SUPERBLOCK_POBJECT sobject_t(object_t(0,0), 0)
// placement group id
struct pg_t {
operator uint64_t() const { return u.pg64; }
pobject_t to_log_pobject() const {
- return pobject_t(CEPH_OSDMETADATA_NS,
- 0,
- object_t(u.pg64, 0));
+ return pobject_t(object_t(u.pg64, 0), CEPH_NOSNAP);
}
coll_t to_coll() const {
::encode(prior_version, bl);
::encode(last_reqid, bl);
::encode(mtime, bl);
- if (poid.oid.snap == CEPH_NOSNAP) {
+ if (poid.snap == CEPH_NOSNAP) {
::encode(snapset, bl);
::encode(wrlock_by, bl);
} else
::decode(prior_version, bl);
::decode(last_reqid, bl);
::decode(mtime, bl);
- if (poid.oid.snap == CEPH_NOSNAP) {
+ if (poid.snap == CEPH_NOSNAP) {
::decode(snapset, bl);
::decode(wrlock_by, bl);
} else
<< " " << oi.last_reqid;
if (oi.wrlock_by.tid)
out << " wrlock_by=" << oi.wrlock_by;
- if (oi.poid.oid.snap == CEPH_NOSNAP)
+ if (oi.poid.snap == CEPH_NOSNAP)
out << " " << oi.snapset << ")";
else
out << " " << oi.snaps << ")";
<< dendl;
// map range onto objects
- file_to_extents(probe->ino, &probe->layout, probe->snapid, probe->from, probe->probing_len, probe->probing);
+ file_to_extents(probe->ino, &probe->layout, probe->from, probe->probing_len, probe->probing);
for (vector<ObjectExtent>::iterator p = probe->probing.begin();
p != probe->probing.end();
p++) {
dout(10) << "_probe probing " << p->oid << dendl;
C_Probe *c = new C_Probe(this, probe, p->oid);
- probe->ops[p->oid] = objecter->stat(p->oid, p->layout, &c->size, probe->flags, c);
+ probe->ops[p->oid] = objecter->stat(p->oid, p->layout, probe->snapid, &c->size, probe->flags, c);
}
}
}
-void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap,
+void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout,
__u64 offset, size_t len,
vector<ObjectExtent>& extents)
{
// find oid, extent
ObjectExtent *ex = 0;
- object_t oid( ino, objectno, snap );
+ object_t oid( ino, objectno );
if (object_extents.count(oid))
ex = &object_extents[oid];
else {
* map (ino, layout, offset, len) to a (list of) OSDExtents (byte
* ranges in objects on (primary) osds)
*/
- void file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap,
- __u64 offset,
- size_t len,
+ void file_to_extents(inodeno_t ino, ceph_file_layout *layout,
+ __u64 offset, size_t len,
vector<ObjectExtent>& extents);
int read(inodeno_t ino,
ceph_file_layout *layout,
- snapid_t snapid,
+ snapid_t snap,
__u64 offset,
size_t len,
bufferlist *bl, // ptr to data
int flags,
Context *onfinish) {
- assert(snapid); // (until there is a non-NOSNAP write)
+ assert(snap); // (until there is a non-NOSNAP write)
vector<ObjectExtent> extents;
- file_to_extents(ino, layout, snapid, offset, len, extents);
- objecter->sg_read(extents, bl, flags, onfinish);
+ file_to_extents(ino, layout, offset, len, extents);
+ objecter->sg_read(extents, snap, bl, flags, onfinish);
return 0;
}
Context *onack,
Context *oncommit) {
vector<ObjectExtent> extents;
- file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
+ file_to_extents(ino, layout, offset, len, extents);
objecter->sg_write(extents, snapc, bl, mtime, flags, onack, oncommit);
return 0;
}
Context *oncommit) {
bufferlist bl;
vector<ObjectExtent> extents;
- file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
+ file_to_extents(ino, layout, offset, len, extents);
if (extents.size() == 1) {
vector<ceph_osd_op> ops(1);
memset(&ops[0], 0, sizeof(ops[0]));
Context *onack,
Context *oncommit) {
vector<ObjectExtent> extents;
- file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
+ file_to_extents(ino, layout, offset, len, extents);
if (extents.size() == 1) {
objecter->zero(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length,
snapc, mtime, flags, onack, oncommit);
Context *onack,
Context *oncommit) {
vector<ObjectExtent> extents;
- file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
+ file_to_extents(ino, layout, offset, len, extents);
if (extents.size() == 1) {
objecter->remove(extents[0].oid, extents[0].layout,
snapc, mtime, flags, onack, oncommit);
object_t oid(ino, 0);
ceph_object_layout ol = objecter->osdmap->make_object_layout(oid, pg_pool);
- objecter->read_full(oid, ol, &fin->bl, 0, fin);
+ objecter->read_full(oid, ol, CEPH_NOSNAP, &fin->bl, 0, fin);
}
void Journaler::_finish_read_head(int r, bufferlist& bl)
ex_it != rd->extents.end();
ex_it++) {
- if (ex_it->oid != oid) continue;
+ if (ex_it->oid != oid.oid) continue;
dout(10) << "map_read " << ex_it->oid
<< " " << ex_it->offset << "~" << ex_it->length << dendl;
ex_it != wr->extents.end();
ex_it++) {
- if (ex_it->oid != oid) continue;
+ if (ex_it->oid != oid.oid) continue;
dout(10) << "map_write oex " << ex_it->oid
<< " " << ex_it->offset << "~" << ex_it->length << dendl;
assert(ob->can_close());
// ok!
- objects.erase(ob->get_oid());
+ objects.erase(ob->get_soid());
objects_by_ino[ob->get_ino()].erase(ob);
if (objects_by_ino[ob->get_ino()].empty())
objects_by_ino.erase(ob->get_ino());
mark_rx(bh);
// finisher
- C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length());
+ C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_soid(), bh->start(), bh->length());
// go
objecter->read(bh->ob->get_oid(), bh->ob->get_layout(),
- bh->start(), bh->length(),
+ bh->start(), bh->length(), bh->ob->get_snap(),
&onfinish->bl, 0,
onfinish);
}
-void ObjectCacher::bh_read_finish(object_t oid, loff_t start, size_t length, bufferlist &bl)
+void ObjectCacher::bh_read_finish(sobject_t oid, loff_t start, size_t length, bufferlist &bl)
{
//lock.Lock();
dout(7) << "bh_read_finish "
- << oid
+ << oid
<< " " << start << "~" << length
<< " (bl is " << bl.length() << ")"
<< dendl;
dout(7) << "bh_write " << *bh << dendl;
// finishers
- C_WriteAck *onack = new C_WriteAck(this, bh->ob->get_oid(), bh->start(), bh->length());
- C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_oid(), bh->start(), bh->length());
+ C_WriteAck *onack = new C_WriteAck(this, bh->ob->get_soid(), bh->start(), bh->length());
+ C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_soid(), bh->start(), bh->length());
// go
tid_t tid = objecter->write(bh->ob->get_oid(), bh->ob->get_layout(),
- bh->start(), bh->length(),
+ bh->start(), bh->length(),
bh->snapc, bh->bl, bh->last_write, 0,
onack, oncommit);
mark_tx(bh);
}
-void ObjectCacher::lock_ack(list<object_t>& oids, tid_t tid)
+void ObjectCacher::lock_ack(list<sobject_t>& oids, tid_t tid)
{
- for (list<object_t>::iterator i = oids.begin();
+ for (list<sobject_t>::iterator i = oids.begin();
i != oids.end();
i++) {
- object_t oid = *i;
+ sobject_t oid = *i;
if (objects.count(oid) == 0) {
dout(7) << "lock_ack no object cache" << dendl;
}
}
-void ObjectCacher::bh_write_ack(object_t oid, loff_t start, size_t length, tid_t tid)
+void ObjectCacher::bh_write_ack(sobject_t oid, loff_t start, size_t length, tid_t tid)
{
//lock.Lock();
//lock.Unlock();
}
-void ObjectCacher::bh_write_commit(object_t oid, loff_t start, size_t length, tid_t tid)
+void ObjectCacher::bh_write_commit(sobject_t oid, loff_t start, size_t length, tid_t tid)
{
//lock.Lock();
dout(10) << "readx " << *ex_it << dendl;
// get Object cache
- Object *o = get_object(ex_it->oid, ino, ex_it->layout);
+ sobject_t soid(ex_it->oid, rd->snap);
+ Object *o = get_object(soid, ino, ex_it->layout);
// map extent into bufferheads
map<loff_t, BufferHead*> hits, missing, rx;
ex_it != wr->extents.end();
ex_it++) {
// get object cache
- Object *o = get_object(ex_it->oid, ino, ex_it->layout);
+ sobject_t soid(ex_it->oid, CEPH_NOSNAP);
+ Object *o = get_object(soid, ino, ex_it->layout);
// map it all into a single bufferhead.
BufferHead *bh = o->map_write(wr);
bool done = false;
objecter->read(rd->extents[0].oid, rd->extents[0].layout,
rd->extents[0].offset, rd->extents[0].length,
- rd->bl, 0,
+ rd->snap, rd->bl, 0,
new C_SafeCond(&flock, &cond, &done));
// block
for (map<object_t,ObjectExtent>::iterator i = by_oid.begin();
i != by_oid.end();
i++) {
- Object *o = get_object(i->first, ino, i->second.layout);
+ sobject_t soid(i->first, rd->snap);
+ Object *o = get_object(soid, ino, i->second.layout);
rdlock(o);
}
for (vector<ObjectExtent>::iterator ex_it = extents.begin();
ex_it != extents.end();
ex_it++) {
- assert(objects.count(ex_it->oid));
- Object *o = objects[ex_it->oid];
+ sobject_t soid(ex_it->oid, rd->snap);
+ assert(objects.count(soid));
+ Object *o = objects[soid];
rdunlock(o);
}
}
// single object.
// make sure we aren't already locking/locked...
- object_t oid = wr->extents.front().oid;
+ sobject_t oid(wr->extents.front().oid, CEPH_NOSNAP);
Object *o = 0;
if (objects.count(oid)) o = get_object(oid, ino, wr->extents.front().layout);
if (!o ||
for (map<object_t,ObjectExtent>::iterator i = by_oid.begin();
i != by_oid.end();
i++) {
- Object *o = get_object(i->first, ino, i->second.layout);
+ sobject_t soid(i->first, CEPH_NOSNAP);
+ Object *o = get_object(soid, ino, i->second.layout);
wrlock(o);
}
for (vector<ObjectExtent>::iterator ex_it = extents.begin();
ex_it != extents.end();
ex_it++) {
- assert(objects.count(ex_it->oid));
- Object *o = objects[ex_it->oid];
+ sobject_t soid(ex_it->oid, CEPH_NOSNAP);
+ assert(objects.count(soid));
+ Object *o = objects[soid];
wrunlock(o);
}
o->lock_state = Object::LOCK_RDLOCKING;
- C_LockAck *ack = new C_LockAck(this, o->get_oid());
- C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0);
+ C_LockAck *ack = new C_LockAck(this, o->get_soid());
+ C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
commit->tid =
ack->tid =
op = CEPH_OSD_OP_WRLOCK;
}
- C_LockAck *ack = new C_LockAck(this, o->get_oid());
- C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0);
+ C_LockAck *ack = new C_LockAck(this, o->get_soid());
+ C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
commit->tid =
ack->tid =
o->lock_state = Object::LOCK_RDUNLOCKING;
- C_LockAck *lockack = new C_LockAck(this, o->get_oid());
- C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0);
+ C_LockAck *lockack = new C_LockAck(this, o->get_soid());
+ C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
commit->tid =
lockack->tid =
o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), CEPH_OSD_OP_RDUNLOCK, 0, lockack, commit);
o->lock_state = Object::LOCK_WRUNLOCKING;
}
- C_LockAck *lockack = new C_LockAck(this, o->get_oid());
- C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0);
+ C_LockAck *lockack = new C_LockAck(this, o->get_soid());
+ C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
commit->tid =
lockack->tid =
o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), op, 0, lockack, commit);
p != exls.end();
++p) {
ObjectExtent &ex = *p;
- if (objects.count(ex.oid) == 0) continue;
- Object *ob = objects[ex.oid];
+ sobject_t soid(ex.oid, CEPH_NOSNAP);
+ if (objects.count(soid) == 0)
+ continue;
+ Object *ob = objects[soid];
// purge or truncate?
if (ex.offset == 0) {
// read scatter/gather
struct OSDRead {
vector<ObjectExtent> extents;
+ snapid_t snap;
map<object_t, bufferlist*> read_data; // bits of data as they come back
bufferlist *bl;
int flags;
- OSDRead(bufferlist *b, int f) : bl(b), flags(f) {}
+ OSDRead(snapid_t s, bufferlist *b, int f) : snap(s), bl(b), flags(f) {}
};
- OSDRead *prepare_read(bufferlist *b, int f) {
- return new OSDRead(b, f);
+ OSDRead *prepare_read(snapid_t snap, bufferlist *b, int f) {
+ return new OSDRead(snap, b, f);
}
// write scatter/gather
private:
// ObjectCacher::Object fields
ObjectCacher *oc;
- object_t oid; // this _always_ is oid.rev=0
+ sobject_t oid;
inodeno_t ino;
ceph_object_layout layout;
int rdlock_ref; // how many ppl want or are using a READ lock
public:
- Object(ObjectCacher *_oc, object_t o, inodeno_t i, ceph_object_layout& l) :
+ Object(ObjectCacher *_oc, sobject_t o, inodeno_t i, ceph_object_layout& l) :
oc(_oc),
oid(o), ino(i), layout(l),
last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
assert(data.empty());
}
- object_t get_oid() { return oid; }
+ sobject_t get_soid() { return oid; }
+ object_t get_oid() { return oid.oid; }
+ snapid_t get_snap() { return oid.snap; }
inodeno_t get_ino() { return ino; }
ceph_object_layout& get_layout() { return layout; }
flush_set_callback_t flush_set_callback, commit_set_callback;
void *flush_set_callback_arg;
- hash_map<object_t, Object*> objects;
+ hash_map<sobject_t, Object*> objects;
hash_map<inodeno_t, set<Object*> > objects_by_ino;
hash_map<inodeno_t, int> dirty_tx_by_ino;
hash_map<inodeno_t, xlist<Object*> > uncommitted_by_ino;
// objects
- Object *get_object(object_t oid, inodeno_t ino, ceph_object_layout &l) {
+ Object *get_object(sobject_t oid, inodeno_t ino, ceph_object_layout &l) {
// have it?
if (objects.count(oid))
return objects[oid];
void wrunlock(Object *o);
public:
- void bh_read_finish(object_t oid, loff_t offset, size_t length, bufferlist &bl);
- void bh_write_ack(object_t oid, loff_t offset, size_t length, tid_t t);
- void bh_write_commit(object_t oid, loff_t offset, size_t length, tid_t t);
- void lock_ack(list<object_t>& oids, tid_t tid);
+ void bh_read_finish(sobject_t oid, loff_t offset, size_t length, bufferlist &bl);
+ void bh_write_ack(sobject_t oid, loff_t offset, size_t length, tid_t t);
+ void bh_write_commit(sobject_t oid, loff_t offset, size_t length, tid_t t);
+ void lock_ack(list<sobject_t>& oids, tid_t tid);
class C_ReadFinish : public Context {
ObjectCacher *oc;
- object_t oid;
+ sobject_t oid;
loff_t start;
size_t length;
public:
bufferlist bl;
- C_ReadFinish(ObjectCacher *c, object_t o, loff_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
+ C_ReadFinish(ObjectCacher *c, sobject_t o, loff_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
void finish(int r) {
oc->bh_read_finish(oid, start, length, bl);
}
class C_WriteAck : public Context {
ObjectCacher *oc;
- object_t oid;
+ sobject_t oid;
loff_t start;
size_t length;
public:
tid_t tid;
- C_WriteAck(ObjectCacher *c, object_t o, loff_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
+ C_WriteAck(ObjectCacher *c, sobject_t o, loff_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
void finish(int r) {
oc->bh_write_ack(oid, start, length, tid);
}
};
class C_WriteCommit : public Context {
ObjectCacher *oc;
- object_t oid;
+ sobject_t oid;
loff_t start;
size_t length;
public:
tid_t tid;
- C_WriteCommit(ObjectCacher *c, object_t o, loff_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
+ C_WriteCommit(ObjectCacher *c, sobject_t o, loff_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
void finish(int r) {
oc->bh_write_commit(oid, start, length, tid);
}
class C_LockAck : public Context {
ObjectCacher *oc;
public:
- list<object_t> oids;
+ list<sobject_t> oids;
tid_t tid;
- C_LockAck(ObjectCacher *c, object_t o) : oc(c) {
+ C_LockAck(ObjectCacher *c, sobject_t o) : oc(c) {
oids.push_back(o);
}
void finish(int r) {
bufferlist *bl,
int flags,
Context *onfinish) {
- OSDRead *rd = prepare_read(bl, flags);
- filer.file_to_extents(ino, layout, snapid, offset, len, rd->extents);
+ OSDRead *rd = prepare_read(snapid, bl, flags);
+ filer.file_to_extents(ino, layout, offset, len, rd->extents);
return readx(rd, ino, onfinish);
}
loff_t offset, size_t len,
bufferlist& bl, utime_t mtime, int flags) {
OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
- filer.file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents);
+ filer.file_to_extents(ino, layout, offset, len, wr->extents);
return writex(wr, ino);
}
loff_t offset, size_t len,
bufferlist *bl, int flags,
Mutex &lock) {
- OSDRead *rd = prepare_read(bl, flags);
- filer.file_to_extents(ino, layout, snapid, offset, len, rd->extents);
+ OSDRead *rd = prepare_read(snapid, bl, flags);
+ filer.file_to_extents(ino, layout, offset, len, rd->extents);
return atomic_sync_readx(rd, ino, lock);
}
bufferlist& bl, utime_t mtime, int flags,
Mutex &lock) {
OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
- filer.file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents);
+ filer.file_to_extents(ino, layout, offset, len, wr->extents);
return atomic_sync_writex(wr, ino, lock);
}
inline ostream& operator<<(ostream& out, ObjectCacher::Object &ob)
{
out << "object["
- << hex << ob.get_oid() << " ino " << ob.get_ino() << dec
+ << ob.get_soid() << " ino " << hex << ob.get_ino() << dec
<< " wr " << ob.last_write_tid << "/" << ob.last_ack_tid << "/" << ob.last_commit_tid;
switch (ob.lock_state) {
object_t oid;
ceph_object_layout layout;
vector<ceph_osd_op> ops;
+ snapid_t snap;
bufferlist bl;
bufferlist *pbl;
__u64 *psize;
bool paused;
- ReadOp(object_t o, ceph_object_layout& ol, vector<ceph_osd_op>& op, int f, Context *of) :
- oid(o), layout(ol),
+ ReadOp(object_t o, ceph_object_layout& ol, vector<ceph_osd_op>& op, snapid_t s, int f, Context *of) :
+ oid(o), layout(ol), snap(s),
pbl(0), psize(0), flags(f), onfinish(of),
tid(0), attempts(0), inc_lock(-1),
paused(false) {
tid_t read_submit(ReadOp *rd);
tid_t modify_submit(ModifyOp *wr);
- tid_t read(object_t oid, ceph_object_layout ol, vector<ceph_osd_op>& ops,
- bufferlist *pbl, __u64 *psize, int flags,
+ tid_t read(object_t oid, ceph_object_layout ol, vector<ceph_osd_op>& ops,
+ snapid_t snap, bufferlist *pbl, __u64 *psize, int flags,
Context *onfinish) {
- ReadOp *rd = new ReadOp(oid, ol, ops, flags, onfinish);
+ ReadOp *rd = new ReadOp(oid, ol, ops, snap, flags, onfinish);
rd->pbl = pbl;
rd->psize = psize;
return read_submit(rd);
}
tid_t read(object_t oid, ceph_object_layout ol,
- ObjectRead& read, bufferlist *pbl, int flags, Context *onfinish) {
- ReadOp *rd = new ReadOp(oid, ol, read.ops, flags, onfinish);
+ ObjectRead& read, snapid_t snap, bufferlist *pbl, int flags, Context *onfinish) {
+ ReadOp *rd = new ReadOp(oid, ol, read.ops, snap, flags, onfinish);
rd->bl = read.data;
rd->pbl = pbl;
return read_submit(rd);
}
// high-level helpers
- tid_t stat(object_t oid, ceph_object_layout ol,
+ tid_t stat(object_t oid, ceph_object_layout ol, snapid_t snap,
__u64 *psize, int flags,
Context *onfinish) {
vector<ceph_osd_op> ops(1);
memset(&ops[0], 0, sizeof(ops[0]));
ops[0].op = CEPH_OSD_OP_STAT;
- return read(oid, ol, ops, 0, psize, flags, onfinish);
+ return read(oid, ol, ops, snap, 0, psize, flags, onfinish);
}
- tid_t read(object_t oid, ceph_object_layout ol,
- __u64 off, size_t len, bufferlist *pbl, int flags,
+ tid_t read(object_t oid, ceph_object_layout ol,
+ __u64 off, size_t len, snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish) {
vector<ceph_osd_op> ops(1);
memset(&ops[0], 0, sizeof(ops[0]));
ops[0].op = CEPH_OSD_OP_READ;
ops[0].offset = off;
ops[0].length = len;
- return read(oid, ol, ops, pbl, 0, flags, onfinish);
+ return read(oid, ol, ops, snap, pbl, 0, flags, onfinish);
}
tid_t read_full(object_t oid, ceph_object_layout ol,
- bufferlist *pbl, int flags,
+ snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish) {
- return read(oid, ol, 0, 0, pbl, flags, onfinish);
+ return read(oid, ol, 0, 0, snap, pbl, flags, onfinish);
}
tid_t mutate(object_t oid, ceph_object_layout ol,
}
};
- void sg_read(vector<ObjectExtent>& extents, bufferlist *bl, int flags, Context *onfinish) {
+ void sg_read(vector<ObjectExtent>& extents, snapid_t snap, bufferlist *bl, int flags, Context *onfinish) {
if (extents.size() == 1) {
read(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length,
- bl, flags, onfinish);
+ snap, bl, flags, onfinish);
} else {
C_Gather *g = new C_Gather;
vector<bufferlist> resultbl(extents.size());
int i=0;
for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
read(p->oid, p->layout, p->offset, p->length,
- &resultbl[i++], flags, g->new_sub());
+ snap, &resultbl[i++], flags, g->new_sub());
}
g->set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
}
//cout << "stop at " << end << std::endl;
cout << "# offset\tack\tcommit" << std::endl;
while (now < end) {
- pobject_t poid(0, 0, object_t(1, 1));
+ pobject_t poid(object_t(1, 1), 0);
utime_t start = now;
set_start(pos, now);
ObjectStore::Transaction t;