int r = cbl.read_file(fn.c_str(), &err);
if (r < 0)
return r;
- CollectionRef c(new Collection);
+ CollectionRef c(new Collection(cct));
bufferlist::iterator p = cbl.begin();
c->decode(p);
coll_map[*q] = c;
return o->read(offset, l, bl);
}
+
int MemStore::fiemap(coll_t cid, const ghobject_t& oid,
uint64_t offset, size_t len, bufferlist& bl)
{
if (!c)
return -ENOENT;
- ObjectRef o = c->get_object(oid);
- if (!o) {
- o.reset(new BufferlistObject);
- c->object_map[oid] = o;
- c->object_hash[oid] = o;
- }
+ c->get_or_create_object(oid);
return 0;
}
if (!c)
return -ENOENT;
- ObjectRef o = c->get_object(oid);
- if (!o) {
- // write implicitly creates a missing object
- o.reset(new BufferlistObject);
- c->object_map[oid] = o;
- c->object_hash[oid] = o;
- }
-
+ ObjectRef o = c->get_or_create_object(oid);
const ssize_t old_size = o->get_size();
o->write(offset, bl);
used_bytes += (o->get_size() - old_size);
ObjectRef oo = c->get_object(oldoid);
if (!oo)
return -ENOENT;
- ObjectRef no = c->get_object(newoid);
- if (!no) {
- no.reset(new BufferlistObject);
- c->object_map[newoid] = no;
- c->object_hash[newoid] = no;
- }
+ ObjectRef no = c->get_or_create_object(newoid);
used_bytes += oo->get_size() - no->get_size();
no->clone(oo.get(), 0, oo->get_size(), 0);
ObjectRef oo = c->get_object(oldoid);
if (!oo)
return -ENOENT;
- ObjectRef no = c->get_object(newoid);
- if (!no) {
- no.reset(new BufferlistObject);
- c->object_map[newoid] = no;
- c->object_hash[newoid] = no;
- }
+ ObjectRef no = c->get_or_create_object(newoid);
if (srcoff >= oo->get_size())
return 0;
if (srcoff + len >= oo->get_size())
{
dout(10) << __func__ << " " << cid << dendl;
RWLock::WLocker l(coll_lock);
- ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
- if (cp != coll_map.end())
+ auto result = coll_map.insert(std::make_pair(cid, CollectionRef()));
+ if (!result.second)
return -EEXIST;
- coll_map[cid].reset(new Collection);
+ result.first->second.reset(new Collection(cct));
return 0;
}
}
return 0;
}
+
+// PageSetObject
+int MemStore::PageSetObject::read(uint64_t offset, uint64_t len, bufferlist& bl)
+{
+ const auto start = offset;
+ const auto end = offset + len;
+ auto remaining = len;
+
+ PageSet::page_vector pages;
+ data.get_range(offset, len, pages);
+
+ // allocate a buffer for the data
+ buffer::ptr buf(len);
+
+ auto p = pages.begin();
+ while (remaining) {
+ // no more pages in range
+ if (p == pages.end() || (*p)->offset >= end) {
+ buf.zero(offset - start, remaining);
+ break;
+ }
+ auto page = *p;
+
+ // fill any holes between pages with zeroes
+ if (page->offset > offset) {
+ const auto count = std::min(remaining, page->offset - offset);
+ buf.zero(offset - start, count);
+ remaining -= count;
+ offset = page->offset;
+ if (!remaining)
+ break;
+ }
+
+ // read from page
+ const auto page_offset = offset - page->offset;
+ const auto count = min(remaining, data.get_page_size() - page_offset);
+
+ buf.copy_in(offset - start, count, page->data + page_offset);
+
+ remaining -= count;
+ offset += count;
+
+ ++p;
+ }
+
+ bl.append(buf);
+ return len;
+}
+
+int MemStore::PageSetObject::write(uint64_t offset, const bufferlist &src)
+{
+ unsigned len = src.length();
+
+ // make sure the page range is allocated
+ PageSet::page_vector pages;
+ data.alloc_range(offset, src.length(), pages);
+
+ auto page = pages.begin();
+
+ // XXX: cast away the const because bufferlist doesn't have a const_iterator
+ auto p = const_cast<bufferlist&>(src).begin();
+ while (len > 0) {
+ unsigned page_offset = offset - (*page)->offset;
+ unsigned pageoff = data.get_page_size() - page_offset;
+ unsigned count = min(len, pageoff);
+ p.copy(count, (*page)->data + page_offset);
+ offset += count;
+ len -= count;
+ if (count == pageoff)
+ ++page;
+ }
+ if (data_len < offset)
+ data_len = offset;
+ return 0;
+}
+
+int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff,
+ uint64_t len, uint64_t dstoff)
+{
+ const int64_t delta = dstoff - srcoff;
+
+ auto &src_data = static_cast<PageSetObject*>(src)->data;
+ const auto src_page_size = src_data.get_page_size();
+
+ auto &dst_data = data;
+ const auto dst_page_size = dst_data.get_page_size();
+
+ PageSet::page_vector src_pages, dst_pages;
+
+ while (len) {
+ const auto count = std::min(len, src_page_size * 16);
+ src_data.get_range(srcoff, count, src_pages);
+
+ for (auto &src_page : src_pages) {
+ auto sbegin = std::max(srcoff, src_page->offset);
+ auto send = std::min(srcoff + count, src_page->offset + src_page_size);
+ dst_data.alloc_range(sbegin + delta, send - sbegin, dst_pages);
+
+ // copy data from src page to dst pages
+ for (auto &dst_page : dst_pages) {
+ auto dbegin = std::max(sbegin + delta, dst_page->offset);
+ auto dend = std::min(send + delta, dst_page->offset + dst_page_size);
+
+ std::copy(src_page->data + (dbegin - delta) - src_page->offset,
+ src_page->data + (dend - delta) - src_page->offset,
+ dst_page->data + dbegin - dst_page->offset);
+ }
+ dst_pages.clear(); // drop page refs
+ srcoff += count;
+ dstoff += count;
+ len -= count;
+ }
+ src_pages.clear(); // drop page refs
+ }
+
+ // update object size
+ if (data_len < dstoff + len)
+ data_len = dstoff + len;
+ return 0;
+}
+
+int MemStore::PageSetObject::truncate(uint64_t size)
+{
+ data.free_pages_after(size);
+ data_len = size;
+
+ const auto page_size = data.get_page_size();
+ const auto page_offset = size & ~(page_size-1);
+ if (page_offset == size)
+ return 0;
+
+ // write zeroes to the rest of the last page
+ PageSet::page_vector pages;
+ data.get_range(page_offset, page_size, pages);
+ if (pages.empty())
+ return 0;
+
+ auto page = pages.begin();
+ auto data = (*page)->data;
+ std::fill(data + (size - page_offset), data + page_size, 0);
+ return 0;
+}
#include <mutex>
#include <boost/intrusive_ptr.hpp>
-#include "include/assert.h"
#include "include/unordered_map.h"
#include "include/memory.h"
#include "include/Spinlock.h"
#include "common/RefCountedObj.h"
#include "common/RWLock.h"
#include "ObjectStore.h"
+#include "PageSet.h"
+#include "include/assert.h"
class MemStore : public ObjectStore {
+private:
+ CephContext *const cct;
+
public:
struct Object : public RefCountedObject {
std::mutex xattr_mutex;
}
};
+ struct PageSetObject : public Object {
+ PageSet data;
+ size_t data_len;
+
+ PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
+
+ size_t get_size() const override { return data_len; }
+
+ int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
+ int write(uint64_t offset, const bufferlist &bl) override;
+ int clone(Object *src, uint64_t srcoff, uint64_t len,
+ uint64_t dstoff) override;
+ int truncate(uint64_t offset) override;
+
+ void encode(bufferlist& bl) const override {
+ ENCODE_START(1, 1, bl);
+ ::encode(data_len, bl);
+ data.encode(bl);
+ encode_base(bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator& p) override {
+ DECODE_START(1, p);
+ ::decode(data_len, p);
+ data.decode(p);
+ decode_base(p);
+ DECODE_FINISH(p);
+ }
+ };
+
struct Collection : public RefCountedObject {
+ CephContext *cct;
+ bool use_page_set;
ceph::unordered_map<ghobject_t, ObjectRef> object_hash; ///< for lookup
map<ghobject_t, ObjectRef,ghobject_t::BitwiseComparator> object_map; ///< for iteration
map<string,bufferptr> xattr;
friend void intrusive_ptr_add_ref(Collection *c) { c->get(); }
friend void intrusive_ptr_release(Collection *c) { c->put(); }
+ ObjectRef create_object() const {
+ if (use_page_set)
+ return new PageSetObject(cct->_conf->memstore_page_size);
+ return new BufferlistObject();
+ }
+
// NOTE: The lock only needs to protect the object_map/hash, not the
// contents of individual objects. The osd is already sequencing
// reads and writes, so we will never see them concurrently at this
return o->second;
}
+ ObjectRef get_or_create_object(ghobject_t oid) {
+ RWLock::WLocker l(lock);
+ auto result = object_hash.emplace(oid, ObjectRef());
+ if (result.second)
+ object_map[oid] = result.first->second = create_object();
+ return result.first->second;
+ }
+
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(xattr, bl);
+ ::encode(use_page_set, bl);
uint32_t s = object_map.size();
::encode(s, bl);
for (map<ghobject_t, ObjectRef,ghobject_t::BitwiseComparator>::const_iterator p = object_map.begin();
void decode(bufferlist::iterator& p) {
DECODE_START(1, p);
::decode(xattr, p);
+ ::decode(use_page_set, p);
uint32_t s;
::decode(s, p);
while (s--) {
ghobject_t k;
::decode(k, p);
- ObjectRef o(new BufferlistObject);
+ auto o = create_object();
o->decode(p);
object_map.insert(make_pair(k, o));
object_hash.insert(make_pair(k, o));
return result;
}
- Collection() : lock("MemStore::Collection::lock") {}
+ Collection(CephContext *cct)
+ : cct(cct), use_page_set(cct->_conf->memstore_page_set),
+ lock("MemStore::Collection::lock") {}
};
typedef Collection::Ref CollectionRef;
void _do_transaction(Transaction& t);
- void _write_into_bl(const bufferlist& src, unsigned offset, bufferlist *dst);
-
int _touch(coll_t cid, const ghobject_t& oid);
int _write(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
const bufferlist& bl, uint32_t fadvsie_flags = 0);
public:
MemStore(CephContext *cct, const string& path)
: ObjectStore(path),
+ cct(cct),
coll_lock("MemStore::coll_lock"),
apply_lock("MemStore::apply_lock"),
finisher(cct),