if ENABLE_SERVER
libos_a_SOURCES = \
- os/fs/FS.cc \
os/bluestore/kv.cc \
os/bluestore/Allocator.cc \
os/bluestore/BlockDevice.cc \
os/filestore/JournalingObjectStore.cc \
os/filestore/LFNIndex.cc \
os/filestore/WBThrottle.cc \
+ os/fs/FS.cc \
os/kstore/kv.cc \
os/kstore/KStore.cc \
+ os/memstore/MemStore.cc \
os/GenericObjectMap.cc \
- os/MemStore.cc \
os/KeyValueStore.cc \
os/ObjectStore.cc
os/filestore/WBThrottle.h \
os/filestore/XfsFileStoreBackend.h \
os/filestore/ZFSFileStoreBackend.h
+ os/fs/FS.h \
+ os/fs/XFS.h \
os/kstore/kstore_types.h \
os/kstore/KStore.h \
os/kstore/kv.h \
+ os/memstore/MemStore.h \
+ os/memstore/PageSet.h \
os/GenericObjectMap.h \
- os/fs/FS.h \
- os/fs/XFS.h \
- os/MemStore.h \
os/KeyValueStore.h \
os/ObjectMap.h \
- os/ObjectStore.h \
- os/PageSet.h
+ os/ObjectStore.h
if WITH_LIBZFS
libos_zfs_a_SOURCES = os/ZFS.cc
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2013 Inktank
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-#include "acconfig.h"
-
-#ifdef HAVE_SYS_MOUNT_H
-#include <sys/mount.h>
-#endif
-
-#ifdef HAVE_SYS_PARAM_H
-#include <sys/param.h>
-#endif
-
-#include "include/types.h"
-#include "include/stringify.h"
-#include "include/unordered_map.h"
-#include "include/memory.h"
-#include "common/errno.h"
-#include "MemStore.h"
-#include "include/compat.h"
-
-#define dout_subsys ceph_subsys_filestore
-#undef dout_prefix
-#define dout_prefix *_dout << "memstore(" << path << ") "
-
-// for comparing collections for lock ordering
-bool operator>(const MemStore::CollectionRef& l,
- const MemStore::CollectionRef& r)
-{
- return (unsigned long)l.get() > (unsigned long)r.get();
-}
-
-
-int MemStore::mount()
-{
- int r = _load();
- if (r < 0)
- return r;
- finisher.start();
- return 0;
-}
-
-int MemStore::umount()
-{
- finisher.stop();
- return _save();
-}
-
-int MemStore::_save()
-{
- dout(10) << __func__ << dendl;
- Mutex::Locker l(apply_lock); // block any writer
- dump_all();
- set<coll_t> collections;
- for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
- p != coll_map.end();
- ++p) {
- dout(20) << __func__ << " coll " << p->first << " " << p->second << dendl;
- collections.insert(p->first);
- bufferlist bl;
- assert(p->second);
- p->second->encode(bl);
- string fn = path + "/" + stringify(p->first);
- int r = bl.write_file(fn.c_str());
- if (r < 0)
- return r;
- }
-
- string fn = path + "/collections";
- bufferlist bl;
- ::encode(collections, bl);
- int r = bl.write_file(fn.c_str());
- if (r < 0)
- return r;
-
- return 0;
-}
-
-void MemStore::dump_all()
-{
- Formatter *f = Formatter::create("json-pretty");
- f->open_object_section("store");
- dump(f);
- f->close_section();
- dout(0) << "dump:";
- f->flush(*_dout);
- *_dout << dendl;
- delete f;
-}
-
-void MemStore::dump(Formatter *f)
-{
- f->open_array_section("collections");
- for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
- p != coll_map.end();
- ++p) {
- f->open_object_section("collection");
- f->dump_string("name", stringify(p->first));
-
- f->open_array_section("xattrs");
- for (map<string,bufferptr>::iterator q = p->second->xattr.begin();
- q != p->second->xattr.end();
- ++q) {
- f->open_object_section("xattr");
- f->dump_string("name", q->first);
- f->dump_int("length", q->second.length());
- f->close_section();
- }
- f->close_section();
-
- f->open_array_section("objects");
- for (map<ghobject_t,ObjectRef,ghobject_t::BitwiseComparator>::iterator q = p->second->object_map.begin();
- q != p->second->object_map.end();
- ++q) {
- f->open_object_section("object");
- f->dump_string("name", stringify(q->first));
- if (q->second)
- q->second->dump(f);
- f->close_section();
- }
- f->close_section();
-
- f->close_section();
- }
- f->close_section();
-}
-
-int MemStore::_load()
-{
- dout(10) << __func__ << dendl;
- bufferlist bl;
- string fn = path + "/collections";
- string err;
- int r = bl.read_file(fn.c_str(), &err);
- if (r < 0)
- return r;
-
- set<coll_t> collections;
- bufferlist::iterator p = bl.begin();
- ::decode(collections, p);
-
- for (set<coll_t>::iterator q = collections.begin();
- q != collections.end();
- ++q) {
- string fn = path + "/" + stringify(*q);
- bufferlist cbl;
- int r = cbl.read_file(fn.c_str(), &err);
- if (r < 0)
- return r;
- CollectionRef c(new Collection(cct));
- bufferlist::iterator p = cbl.begin();
- c->decode(p);
- coll_map[*q] = c;
- used_bytes += c->used_bytes();
- }
-
- dump_all();
-
- return 0;
-}
-
-void MemStore::set_fsid(uuid_d u)
-{
- int r = write_meta("fs_fsid", stringify(u));
- assert(r >= 0);
-}
-
-uuid_d MemStore::get_fsid()
-{
- string fsid_str;
- int r = read_meta("fs_fsid", &fsid_str);
- assert(r >= 0);
- uuid_d uuid;
- bool b = uuid.parse(fsid_str.c_str());
- assert(b);
- return uuid;
-}
-
-int MemStore::mkfs()
-{
- string fsid_str;
- int r = read_meta("fs_fsid", &fsid_str);
- if (r == -ENOENT) {
- uuid_d fsid;
- fsid.generate_random();
- fsid_str = stringify(fsid);
- r = write_meta("fs_fsid", fsid_str);
- if (r < 0)
- return r;
- dout(1) << __func__ << " new fsid " << fsid_str << dendl;
- } else {
- dout(1) << __func__ << " had fsid " << fsid_str << dendl;
- }
-
- string fn = path + "/collections";
- derr << path << dendl;
- bufferlist bl;
- set<coll_t> collections;
- ::encode(collections, bl);
- r = bl.write_file(fn.c_str());
- if (r < 0)
- return r;
-
- r = write_meta("type", "memstore");
- if (r < 0)
- return r;
-
- return 0;
-}
-
-int MemStore::statfs(struct statfs *st)
-{
- dout(10) << __func__ << dendl;
- st->f_bsize = 1024;
-
- // Device size is a configured constant
- st->f_blocks = g_conf->memstore_device_bytes / st->f_bsize;
-
- dout(10) << __func__ << ": used_bytes: " << used_bytes << "/" << g_conf->memstore_device_bytes << dendl;
- st->f_bfree = st->f_bavail = MAX((long(st->f_blocks) - long(used_bytes / st->f_bsize)), 0);
-
- return 0;
-}
-
-objectstore_perf_stat_t MemStore::get_cur_stats()
-{
- // fixme
- return objectstore_perf_stat_t();
-}
-
-MemStore::CollectionRef MemStore::get_collection(coll_t cid)
-{
- RWLock::RLocker l(coll_lock);
- ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
- if (cp == coll_map.end())
- return CollectionRef();
- return cp->second;
-}
-
-
-// ---------------
-// read operations
-
-bool MemStore::exists(coll_t cid, const ghobject_t& oid)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return false;
-
- // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
- // shared_ptr needs to be compared to nullptr.
- return (bool)c->get_object(oid);
-}
-
-int MemStore::stat(
- coll_t cid,
- const ghobject_t& oid,
- struct stat *st,
- bool allow_eio)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- st->st_size = o->get_size();
- st->st_blksize = 4096;
- st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
- st->st_nlink = 1;
- return 0;
-}
-
-int MemStore::read(
- coll_t cid,
- const ghobject_t& oid,
- uint64_t offset,
- size_t len,
- bufferlist& bl,
- uint32_t op_flags,
- bool allow_eio)
-{
- dout(10) << __func__ << " " << cid << " " << oid << " "
- << offset << "~" << len << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- if (offset >= o->get_size())
- return 0;
- size_t l = len;
- if (l == 0) // note: len == 0 means read the entire object
- l = o->get_size();
- else if (offset + l > o->get_size())
- l = o->get_size() - offset;
- bl.clear();
- return o->read(offset, l, bl);
-}
-
-int MemStore::fiemap(coll_t cid, const ghobject_t& oid,
- uint64_t offset, size_t len, bufferlist& bl)
-{
- dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
- << len << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- if (offset >= o->get_size())
- return 0;
- size_t l = len;
- if (offset + l > o->get_size())
- l = o->get_size() - offset;
- map<uint64_t, uint64_t> m;
- m[offset] = l;
- ::encode(m, bl);
- return 0;
-}
-
-int MemStore::getattr(coll_t cid, const ghobject_t& oid,
- const char *name, bufferptr& value)
-{
- dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- string k(name);
- std::lock_guard<std::mutex> lock(o->xattr_mutex);
- if (!o->xattr.count(k)) {
- return -ENODATA;
- }
- value = o->xattr[k];
- return 0;
-}
-
-int MemStore::getattrs(coll_t cid, const ghobject_t& oid,
- map<string,bufferptr>& aset)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->xattr_mutex);
- aset = o->xattr;
- return 0;
-}
-
-int MemStore::list_collections(vector<coll_t>& ls)
-{
- dout(10) << __func__ << dendl;
- RWLock::RLocker l(coll_lock);
- for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
- p != coll_map.end();
- ++p) {
- ls.push_back(p->first);
- }
- return 0;
-}
-
-bool MemStore::collection_exists(coll_t cid)
-{
- dout(10) << __func__ << " " << cid << dendl;
- RWLock::RLocker l(coll_lock);
- return coll_map.count(cid);
-}
-
-bool MemStore::collection_empty(coll_t cid)
-{
- dout(10) << __func__ << " " << cid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
- RWLock::RLocker l(c->lock);
-
- return c->object_map.empty();
-}
-
-int MemStore::collection_list(coll_t cid, ghobject_t start, ghobject_t end,
- bool sort_bitwise, int max,
- vector<ghobject_t> *ls, ghobject_t *next)
-{
- if (!sort_bitwise)
- return -EOPNOTSUPP;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
- RWLock::RLocker l(c->lock);
-
- map<ghobject_t,ObjectRef,ghobject_t::BitwiseComparator>::iterator p = c->object_map.lower_bound(start);
- while (p != c->object_map.end() &&
- ls->size() < (unsigned)max &&
- cmp_bitwise(p->first, end) < 0) {
- ls->push_back(p->first);
- ++p;
- }
- if (next != NULL) {
- if (p == c->object_map.end())
- *next = ghobject_t::get_max();
- else
- *next = p->first;
- }
- return 0;
-}
-
-int MemStore::omap_get(
- coll_t cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- bufferlist *header, ///< [out] omap header
- map<string, bufferlist> *out /// < [out] Key to value map
- )
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->omap_mutex);
- *header = o->omap_header;
- *out = o->omap;
- return 0;
-}
-
-int MemStore::omap_get_header(
- coll_t cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- bufferlist *header, ///< [out] omap header
- bool allow_eio ///< [in] don't assert on eio
- )
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->omap_mutex);
- *header = o->omap_header;
- return 0;
-}
-
-int MemStore::omap_get_keys(
- coll_t cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- set<string> *keys ///< [out] Keys defined on oid
- )
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->omap_mutex);
- for (map<string,bufferlist>::iterator p = o->omap.begin();
- p != o->omap.end();
- ++p)
- keys->insert(p->first);
- return 0;
-}
-
-int MemStore::omap_get_values(
- coll_t cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- const set<string> &keys, ///< [in] Keys to get
- map<string, bufferlist> *out ///< [out] Returned keys and values
- )
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->omap_mutex);
- for (set<string>::const_iterator p = keys.begin();
- p != keys.end();
- ++p) {
- map<string,bufferlist>::iterator q = o->omap.find(*p);
- if (q != o->omap.end())
- out->insert(*q);
- }
- return 0;
-}
-
-int MemStore::omap_check_keys(
- coll_t cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- const set<string> &keys, ///< [in] Keys to check
- set<string> *out ///< [out] Subset of keys defined on oid
- )
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->omap_mutex);
- for (set<string>::const_iterator p = keys.begin();
- p != keys.end();
- ++p) {
- map<string,bufferlist>::iterator q = o->omap.find(*p);
- if (q != o->omap.end())
- out->insert(*p);
- }
- return 0;
-}
-
-ObjectMap::ObjectMapIterator MemStore::get_omap_iterator(coll_t cid,
- const ghobject_t& oid)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return ObjectMap::ObjectMapIterator();
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return ObjectMap::ObjectMapIterator();
- return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o));
-}
-
-
-// ---------------
-// write operations
-
-int MemStore::queue_transactions(Sequencer *osr,
- list<Transaction*>& tls,
- TrackedOpRef op,
- ThreadPool::TPHandle *handle)
-{
- // because memstore operations are synchronous, we can implement the
- // Sequencer with a mutex. this guarantees ordering on a given sequencer,
- // while allowing operations on different sequencers to happen in parallel
- struct OpSequencer : public Sequencer_impl {
- std::mutex mutex;
- void flush() override {}
- bool flush_commit(Context*) override { return true; }
- };
-
- std::unique_lock<std::mutex> lock;
- if (osr) {
- auto seq = reinterpret_cast<OpSequencer**>(&osr->p);
- if (*seq == nullptr)
- *seq = new OpSequencer;
- lock = std::unique_lock<std::mutex>((*seq)->mutex);
- }
-
- for (list<Transaction*>::iterator p = tls.begin(); p != tls.end(); ++p) {
- // poke the TPHandle heartbeat just to exercise that code path
- if (handle)
- handle->reset_tp_timeout();
-
- _do_transaction(**p);
- }
-
- Context *on_apply = NULL, *on_apply_sync = NULL, *on_commit = NULL;
- ObjectStore::Transaction::collect_contexts(tls, &on_apply, &on_commit,
- &on_apply_sync);
- if (on_apply_sync)
- on_apply_sync->complete(0);
- if (on_apply)
- finisher.queue(on_apply);
- if (on_commit)
- finisher.queue(on_commit);
- return 0;
-}
-
-void MemStore::_do_transaction(Transaction& t)
-{
- Transaction::iterator i = t.begin();
- int pos = 0;
-
- while (i.have_op()) {
- Transaction::Op *op = i.decode_op();
- int r = 0;
-
- switch (op->op) {
- case Transaction::OP_NOP:
- break;
- case Transaction::OP_TOUCH:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- r = _touch(cid, oid);
- }
- break;
-
- case Transaction::OP_WRITE:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- uint64_t off = op->off;
- uint64_t len = op->len;
- uint32_t fadvise_flags = i.get_fadvise_flags();
- bufferlist bl;
- i.decode_bl(bl);
- r = _write(cid, oid, off, len, bl, fadvise_flags);
- }
- break;
-
- case Transaction::OP_ZERO:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- uint64_t off = op->off;
- uint64_t len = op->len;
- r = _zero(cid, oid, off, len);
- }
- break;
-
- case Transaction::OP_TRIMCACHE:
- {
- // deprecated, no-op
- }
- break;
-
- case Transaction::OP_TRUNCATE:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- uint64_t off = op->off;
- r = _truncate(cid, oid, off);
- }
- break;
-
- case Transaction::OP_REMOVE:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- r = _remove(cid, oid);
- }
- break;
-
- case Transaction::OP_SETATTR:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- string name = i.decode_string();
- bufferlist bl;
- i.decode_bl(bl);
- map<string, bufferptr> to_set;
- to_set[name] = bufferptr(bl.c_str(), bl.length());
- r = _setattrs(cid, oid, to_set);
- }
- break;
-
- case Transaction::OP_SETATTRS:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- map<string, bufferptr> aset;
- i.decode_attrset(aset);
- r = _setattrs(cid, oid, aset);
- }
- break;
-
- case Transaction::OP_RMATTR:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- string name = i.decode_string();
- r = _rmattr(cid, oid, name.c_str());
- }
- break;
-
- case Transaction::OP_RMATTRS:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- r = _rmattrs(cid, oid);
- }
- break;
-
- case Transaction::OP_CLONE:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- ghobject_t noid = i.get_oid(op->dest_oid);
- r = _clone(cid, oid, noid);
- }
- break;
-
- case Transaction::OP_CLONERANGE:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- ghobject_t noid = i.get_oid(op->dest_oid);
- uint64_t off = op->off;
- uint64_t len = op->len;
- r = _clone_range(cid, oid, noid, off, len, off);
- }
- break;
-
- case Transaction::OP_CLONERANGE2:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- ghobject_t noid = i.get_oid(op->dest_oid);
- uint64_t srcoff = op->off;
- uint64_t len = op->len;
- uint64_t dstoff = op->dest_off;
- r = _clone_range(cid, oid, noid, srcoff, len, dstoff);
- }
- break;
-
- case Transaction::OP_MKCOLL:
- {
- coll_t cid = i.get_cid(op->cid);
- r = _create_collection(cid);
- }
- break;
-
- case Transaction::OP_COLL_HINT:
- {
- coll_t cid = i.get_cid(op->cid);
- uint32_t type = op->hint_type;
- bufferlist hint;
- i.decode_bl(hint);
- bufferlist::iterator hiter = hint.begin();
- if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
- uint32_t pg_num;
- uint64_t num_objs;
- ::decode(pg_num, hiter);
- ::decode(num_objs, hiter);
- r = _collection_hint_expected_num_objs(cid, pg_num, num_objs);
- } else {
- // Ignore the hint
- dout(10) << "Unrecognized collection hint type: " << type << dendl;
- }
- }
- break;
-
- case Transaction::OP_RMCOLL:
- {
- coll_t cid = i.get_cid(op->cid);
- r = _destroy_collection(cid);
- }
- break;
-
- case Transaction::OP_COLL_ADD:
- {
- coll_t ocid = i.get_cid(op->cid);
- coll_t ncid = i.get_cid(op->dest_cid);
- ghobject_t oid = i.get_oid(op->oid);
- r = _collection_add(ncid, ocid, oid);
- }
- break;
-
- case Transaction::OP_COLL_REMOVE:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- r = _remove(cid, oid);
- }
- break;
-
- case Transaction::OP_COLL_MOVE:
- assert(0 == "deprecated");
- break;
-
- case Transaction::OP_COLL_MOVE_RENAME:
- {
- coll_t oldcid = i.get_cid(op->cid);
- ghobject_t oldoid = i.get_oid(op->oid);
- coll_t newcid = i.get_cid(op->dest_cid);
- ghobject_t newoid = i.get_oid(op->dest_oid);
- r = _collection_move_rename(oldcid, oldoid, newcid, newoid);
- }
- break;
-
- case Transaction::OP_COLL_SETATTR:
- {
- assert(0 == "not implemented");
- }
- break;
-
- case Transaction::OP_COLL_RMATTR:
- {
- assert(0 == "not implemented");
- }
- break;
-
- case Transaction::OP_COLL_RENAME:
- {
- assert(0 == "not implemented");
- }
- break;
-
- case Transaction::OP_OMAP_CLEAR:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- r = _omap_clear(cid, oid);
- }
- break;
- case Transaction::OP_OMAP_SETKEYS:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- bufferlist aset_bl;
- i.decode_attrset_bl(&aset_bl);
- r = _omap_setkeys(cid, oid, aset_bl);
- }
- break;
- case Transaction::OP_OMAP_RMKEYS:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- bufferlist keys_bl;
- i.decode_keyset_bl(&keys_bl);
- r = _omap_rmkeys(cid, oid, keys_bl);
- }
- break;
- case Transaction::OP_OMAP_RMKEYRANGE:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- string first, last;
- first = i.decode_string();
- last = i.decode_string();
- r = _omap_rmkeyrange(cid, oid, first, last);
- }
- break;
- case Transaction::OP_OMAP_SETHEADER:
- {
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- bufferlist bl;
- i.decode_bl(bl);
- r = _omap_setheader(cid, oid, bl);
- }
- break;
- case Transaction::OP_SPLIT_COLLECTION:
- assert(0 == "deprecated");
- break;
- case Transaction::OP_SPLIT_COLLECTION2:
- {
- coll_t cid = i.get_cid(op->cid);
- uint32_t bits = op->split_bits;
- uint32_t rem = op->split_rem;
- coll_t dest = i.get_cid(op->dest_cid);
- r = _split_collection(cid, bits, rem, dest);
- }
- break;
-
- case Transaction::OP_SETALLOCHINT:
- {
- r = 0;
- }
- break;
-
- default:
- derr << "bad op " << op->op << dendl;
- assert(0);
- }
-
- if (r < 0) {
- bool ok = false;
-
- if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
- op->op == Transaction::OP_CLONE ||
- op->op == Transaction::OP_CLONERANGE2 ||
- op->op == Transaction::OP_COLL_ADD))
- // -ENOENT is usually okay
- ok = true;
- if (r == -ENODATA)
- ok = true;
-
- if (!ok) {
- const char *msg = "unexpected error code";
-
- if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
- op->op == Transaction::OP_CLONE ||
- op->op == Transaction::OP_CLONERANGE2))
- msg = "ENOENT on clone suggests osd bug";
-
- if (r == -ENOSPC)
- // For now, if we hit _any_ ENOSPC, crash, before we do any damage
- // by partially applying transactions.
- msg = "ENOSPC handling not implemented";
-
- if (r == -ENOTEMPTY) {
- msg = "ENOTEMPTY suggests garbage data in osd data dir";
- dump_all();
- }
-
- dout(0) << " error " << cpp_strerror(r) << " not handled on operation " << op->op
- << " (op " << pos << ", counting from 0)" << dendl;
- dout(0) << msg << dendl;
- dout(0) << " transaction dump:\n";
- JSONFormatter f(true);
- f.open_object_section("transaction");
- t.dump(&f);
- f.close_section();
- f.flush(*_dout);
- *_dout << dendl;
- assert(0 == "unexpected error");
- }
- }
-
- ++pos;
- }
-}
-
-int MemStore::_touch(coll_t cid, const ghobject_t& oid)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- c->get_or_create_object(oid);
- return 0;
-}
-
-int MemStore::_write(coll_t cid, const ghobject_t& oid,
- uint64_t offset, size_t len, const bufferlist& bl,
- uint32_t fadvise_flags)
-{
- dout(10) << __func__ << " " << cid << " " << oid << " "
- << offset << "~" << len << dendl;
- assert(len == bl.length());
-
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_or_create_object(oid);
- const ssize_t old_size = o->get_size();
- o->write(offset, bl);
- used_bytes += (o->get_size() - old_size);
-
- return 0;
-}
-
-int MemStore::_zero(coll_t cid, const ghobject_t& oid,
- uint64_t offset, size_t len)
-{
- dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
- << len << dendl;
- bufferptr bp(len);
- bp.zero();
- bufferlist bl;
- bl.push_back(bp);
- return _write(cid, oid, offset, len, bl);
-}
-
-int MemStore::_truncate(coll_t cid, const ghobject_t& oid, uint64_t size)
-{
- dout(10) << __func__ << " " << cid << " " << oid << " " << size << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- const ssize_t old_size = o->get_size();
- int r = o->truncate(size);
- used_bytes += (o->get_size() - old_size);
- return r;
-}
-
-int MemStore::_remove(coll_t cid, const ghobject_t& oid)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
- RWLock::WLocker l(c->lock);
-
- auto i = c->object_hash.find(oid);
- if (i == c->object_hash.end())
- return -ENOENT;
- used_bytes -= i->second->get_size();
- c->object_hash.erase(i);
- c->object_map.erase(oid);
-
- return 0;
-}
-
-int MemStore::_setattrs(coll_t cid, const ghobject_t& oid,
- map<string,bufferptr>& aset)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->xattr_mutex);
- for (map<string,bufferptr>::const_iterator p = aset.begin(); p != aset.end(); ++p)
- o->xattr[p->first] = p->second;
- return 0;
-}
-
-int MemStore::_rmattr(coll_t cid, const ghobject_t& oid, const char *name)
-{
- dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->xattr_mutex);
- auto i = o->xattr.find(name);
- if (i == o->xattr.end())
- return -ENODATA;
- o->xattr.erase(i);
- return 0;
-}
-
-int MemStore::_rmattrs(coll_t cid, const ghobject_t& oid)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->xattr_mutex);
- o->xattr.clear();
- return 0;
-}
-
-int MemStore::_clone(coll_t cid, const ghobject_t& oldoid,
- const ghobject_t& newoid)
-{
- dout(10) << __func__ << " " << cid << " " << oldoid
- << " -> " << newoid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef oo = c->get_object(oldoid);
- if (!oo)
- return -ENOENT;
- ObjectRef no = c->get_or_create_object(newoid);
- used_bytes += oo->get_size() - no->get_size();
- no->clone(oo.get(), 0, oo->get_size(), 0);
-
- // take xattr and omap locks with std::lock()
- std::unique_lock<std::mutex>
- ox_lock(oo->xattr_mutex, std::defer_lock),
- nx_lock(no->xattr_mutex, std::defer_lock),
- oo_lock(oo->omap_mutex, std::defer_lock),
- no_lock(no->omap_mutex, std::defer_lock);
- std::lock(ox_lock, nx_lock, oo_lock, no_lock);
-
- no->omap_header = oo->omap_header;
- no->omap = oo->omap;
- no->xattr = oo->xattr;
- return 0;
-}
-
-int MemStore::_clone_range(coll_t cid, const ghobject_t& oldoid,
- const ghobject_t& newoid,
- uint64_t srcoff, uint64_t len, uint64_t dstoff)
-{
- dout(10) << __func__ << " " << cid << " "
- << oldoid << " " << srcoff << "~" << len << " -> "
- << newoid << " " << dstoff << "~" << len
- << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef oo = c->get_object(oldoid);
- if (!oo)
- return -ENOENT;
- ObjectRef no = c->get_or_create_object(newoid);
- if (srcoff >= oo->get_size())
- return 0;
- if (srcoff + len >= oo->get_size())
- len = oo->get_size() - srcoff;
-
- const ssize_t old_size = no->get_size();
- no->clone(oo.get(), srcoff, len, dstoff);
- used_bytes += (no->get_size() - old_size);
-
- return len;
-}
-
-int MemStore::_omap_clear(coll_t cid, const ghobject_t &oid)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->omap_mutex);
- o->omap.clear();
- o->omap_header.clear();
- return 0;
-}
-
-int MemStore::_omap_setkeys(coll_t cid, const ghobject_t &oid,
- bufferlist& aset_bl)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->omap_mutex);
- bufferlist::iterator p = aset_bl.begin();
- __u32 num;
- ::decode(num, p);
- while (num--) {
- string key;
- ::decode(key, p);
- ::decode(o->omap[key], p);
- }
- return 0;
-}
-
-int MemStore::_omap_rmkeys(coll_t cid, const ghobject_t &oid,
- bufferlist& keys_bl)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->omap_mutex);
- bufferlist::iterator p = keys_bl.begin();
- __u32 num;
- ::decode(num, p);
- while (num--) {
- string key;
- ::decode(key, p);
- o->omap.erase(key);
- }
- return 0;
-}
-
-int MemStore::_omap_rmkeyrange(coll_t cid, const ghobject_t &oid,
- const string& first, const string& last)
-{
- dout(10) << __func__ << " " << cid << " " << oid << " " << first
- << " " << last << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->omap_mutex);
- map<string,bufferlist>::iterator p = o->omap.lower_bound(first);
- map<string,bufferlist>::iterator e = o->omap.lower_bound(last);
- o->omap.erase(p, e);
- return 0;
-}
-
-int MemStore::_omap_setheader(coll_t cid, const ghobject_t &oid,
- const bufferlist &bl)
-{
- dout(10) << __func__ << " " << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
-
- ObjectRef o = c->get_object(oid);
- if (!o)
- return -ENOENT;
- std::lock_guard<std::mutex> lock(o->omap_mutex);
- o->omap_header = bl;
- return 0;
-}
-
-int MemStore::_create_collection(coll_t cid)
-{
- dout(10) << __func__ << " " << cid << dendl;
- RWLock::WLocker l(coll_lock);
- auto result = coll_map.insert(std::make_pair(cid, CollectionRef()));
- if (!result.second)
- return -EEXIST;
- result.first->second.reset(new Collection(cct));
- return 0;
-}
-
-int MemStore::_destroy_collection(coll_t cid)
-{
- dout(10) << __func__ << " " << cid << dendl;
- RWLock::WLocker l(coll_lock);
- ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
- if (cp == coll_map.end())
- return -ENOENT;
- {
- RWLock::RLocker l2(cp->second->lock);
- if (!cp->second->object_map.empty())
- return -ENOTEMPTY;
- }
- used_bytes -= cp->second->used_bytes();
- coll_map.erase(cp);
- return 0;
-}
-
-int MemStore::_collection_add(coll_t cid, coll_t ocid, const ghobject_t& oid)
-{
- dout(10) << __func__ << " " << cid << " " << ocid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
- CollectionRef oc = get_collection(ocid);
- if (!oc)
- return -ENOENT;
- RWLock::WLocker l1(MIN(&(*c), &(*oc))->lock);
- RWLock::WLocker l2(MAX(&(*c), &(*oc))->lock);
-
- if (c->object_hash.count(oid))
- return -EEXIST;
- if (oc->object_hash.count(oid) == 0)
- return -ENOENT;
- ObjectRef o = oc->object_hash[oid];
- c->object_map[oid] = o;
- c->object_hash[oid] = o;
- return 0;
-}
-
-int MemStore::_collection_move_rename(coll_t oldcid, const ghobject_t& oldoid,
- coll_t cid, const ghobject_t& oid)
-{
- dout(10) << __func__ << " " << oldcid << " " << oldoid << " -> "
- << cid << " " << oid << dendl;
- CollectionRef c = get_collection(cid);
- if (!c)
- return -ENOENT;
- CollectionRef oc = get_collection(oldcid);
- if (!oc)
- return -ENOENT;
-
- // note: c and oc may be the same
- if (&(*c) == &(*oc)) {
- c->lock.get_write();
- } else if (&(*c) < &(*oc)) {
- c->lock.get_write();
- oc->lock.get_write();
- } else if (&(*c) > &(*oc)) {
- oc->lock.get_write();
- c->lock.get_write();
- }
-
- int r = -EEXIST;
- if (c->object_hash.count(oid))
- goto out;
- r = -ENOENT;
- if (oc->object_hash.count(oldoid) == 0)
- goto out;
- {
- ObjectRef o = oc->object_hash[oldoid];
- c->object_map[oid] = o;
- c->object_hash[oid] = o;
- oc->object_map.erase(oldoid);
- oc->object_hash.erase(oldoid);
- }
- r = 0;
- out:
- c->lock.put_write();
- if (&(*c) != &(*oc))
- oc->lock.put_write();
- return r;
-}
-
-int MemStore::_split_collection(coll_t cid, uint32_t bits, uint32_t match,
- coll_t dest)
-{
- dout(10) << __func__ << " " << cid << " " << bits << " " << match << " "
- << dest << dendl;
- CollectionRef sc = get_collection(cid);
- if (!sc)
- return -ENOENT;
- CollectionRef dc = get_collection(dest);
- if (!dc)
- return -ENOENT;
- RWLock::WLocker l1(MIN(&(*sc), &(*dc))->lock);
- RWLock::WLocker l2(MAX(&(*sc), &(*dc))->lock);
-
- map<ghobject_t,ObjectRef,ghobject_t::BitwiseComparator>::iterator p = sc->object_map.begin();
- while (p != sc->object_map.end()) {
- if (p->first.match(bits, match)) {
- dout(20) << " moving " << p->first << dendl;
- dc->object_map.insert(make_pair(p->first, p->second));
- dc->object_hash.insert(make_pair(p->first, p->second));
- sc->object_hash.erase(p->first);
- sc->object_map.erase(p++);
- } else {
- ++p;
- }
- }
-
- return 0;
-}
-
-// BufferlistObject
-int MemStore::BufferlistObject::read(uint64_t offset, uint64_t len,
- bufferlist &bl)
-{
- std::lock_guard<Spinlock> lock(mutex);
- bl.substr_of(data, offset, len);
- return bl.length();
-}
-
-int MemStore::BufferlistObject::write(uint64_t offset, const bufferlist &src)
-{
- unsigned len = src.length();
-
- std::lock_guard<Spinlock> lock(mutex);
-
- // before
- bufferlist newdata;
- if (get_size() >= offset) {
- newdata.substr_of(data, 0, offset);
- } else {
- newdata.substr_of(data, 0, get_size());
- bufferptr bp(offset - get_size());
- bp.zero();
- newdata.append(bp);
- }
-
- newdata.append(src);
-
- // after
- if (get_size() > offset + len) {
- bufferlist tail;
- tail.substr_of(data, offset + len, get_size() - (offset + len));
- newdata.append(tail);
- }
-
- data.claim(newdata);
- return 0;
-}
-
-int MemStore::BufferlistObject::clone(Object *src, uint64_t srcoff,
- uint64_t len, uint64_t dstoff)
-{
- auto srcbl = dynamic_cast<BufferlistObject*>(src);
- if (srcbl == nullptr)
- return -ENOTSUP;
-
- bufferlist bl;
- {
- std::lock_guard<Spinlock> lock(srcbl->mutex);
- if (srcoff == dstoff && len == src->get_size()) {
- data = srcbl->data;
- return 0;
- }
- bl.substr_of(srcbl->data, srcoff, len);
- }
- return write(dstoff, bl);
-}
-
-int MemStore::BufferlistObject::truncate(uint64_t size)
-{
- std::lock_guard<Spinlock> lock(mutex);
- if (get_size() > size) {
- bufferlist bl;
- bl.substr_of(data, 0, size);
- data.claim(bl);
- } else if (get_size() == size) {
- // do nothing
- } else {
- bufferptr bp(size - get_size());
- bp.zero();
- data.append(bp);
- }
- return 0;
-}
-
-// PageSetObject
-
-#if defined(__GLIBCXX__)
-// use a thread-local vector for the pages returned by PageSet, so we
-// can avoid allocations in read/write()
-thread_local PageSet::page_vector MemStore::PageSetObject::tls_pages;
-#define DEFINE_PAGE_VECTOR(name)
-#else
-#define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
-#endif
-
-int MemStore::PageSetObject::read(uint64_t offset, uint64_t len, bufferlist& bl)
-{
- const auto start = offset;
- const auto end = offset + len;
- auto remaining = len;
-
- DEFINE_PAGE_VECTOR(tls_pages);
- data.get_range(offset, len, tls_pages);
-
- // allocate a buffer for the data
- buffer::ptr buf(len);
-
- auto p = tls_pages.begin();
- while (remaining) {
- // no more pages in range
- if (p == tls_pages.end() || (*p)->offset >= end) {
- buf.zero(offset - start, remaining);
- break;
- }
- auto page = *p;
-
- // fill any holes between pages with zeroes
- if (page->offset > offset) {
- const auto count = std::min(remaining, page->offset - offset);
- buf.zero(offset - start, count);
- remaining -= count;
- offset = page->offset;
- if (!remaining)
- break;
- }
-
- // read from page
- const auto page_offset = offset - page->offset;
- const auto count = min(remaining, data.get_page_size() - page_offset);
-
- buf.copy_in(offset - start, count, page->data + page_offset);
-
- remaining -= count;
- offset += count;
-
- ++p;
- }
-
- tls_pages.clear(); // drop page refs
-
- bl.append(buf);
- return len;
-}
-
-int MemStore::PageSetObject::write(uint64_t offset, const bufferlist &src)
-{
- unsigned len = src.length();
-
- DEFINE_PAGE_VECTOR(tls_pages);
- // make sure the page range is allocated
- data.alloc_range(offset, src.length(), tls_pages);
-
- auto page = tls_pages.begin();
-
- // XXX: cast away the const because bufferlist doesn't have a const_iterator
- auto p = const_cast<bufferlist&>(src).begin();
- while (len > 0) {
- unsigned page_offset = offset - (*page)->offset;
- unsigned pageoff = data.get_page_size() - page_offset;
- unsigned count = min(len, pageoff);
- p.copy(count, (*page)->data + page_offset);
- offset += count;
- len -= count;
- if (count == pageoff)
- ++page;
- }
- if (data_len < offset)
- data_len = offset;
- tls_pages.clear(); // drop page refs
- return 0;
-}
-
-int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff,
- uint64_t len, uint64_t dstoff)
-{
- const int64_t delta = dstoff - srcoff;
-
- auto &src_data = static_cast<PageSetObject*>(src)->data;
- const uint64_t src_page_size = src_data.get_page_size();
-
- auto &dst_data = data;
- const auto dst_page_size = dst_data.get_page_size();
-
- DEFINE_PAGE_VECTOR(tls_pages);
- PageSet::page_vector dst_pages;
-
- while (len) {
- const auto count = std::min(len, (uint64_t)src_page_size * 16);
- src_data.get_range(srcoff, count, tls_pages);
-
- for (auto &src_page : tls_pages) {
- auto sbegin = std::max(srcoff, src_page->offset);
- auto send = std::min(srcoff + count, src_page->offset + src_page_size);
- dst_data.alloc_range(sbegin + delta, send - sbegin, dst_pages);
-
- // copy data from src page to dst pages
- for (auto &dst_page : dst_pages) {
- auto dbegin = std::max(sbegin + delta, dst_page->offset);
- auto dend = std::min(send + delta, dst_page->offset + dst_page_size);
-
- std::copy(src_page->data + (dbegin - delta) - src_page->offset,
- src_page->data + (dend - delta) - src_page->offset,
- dst_page->data + dbegin - dst_page->offset);
- }
- dst_pages.clear(); // drop page refs
- }
- srcoff += count;
- dstoff += count;
- len -= count;
- tls_pages.clear(); // drop page refs
- }
-
- // update object size
- if (data_len < dstoff + len)
- data_len = dstoff + len;
- return 0;
-}
-
-int MemStore::PageSetObject::truncate(uint64_t size)
-{
- data.free_pages_after(size);
- data_len = size;
-
- const auto page_size = data.get_page_size();
- const auto page_offset = size & ~(page_size-1);
- if (page_offset == size)
- return 0;
-
- DEFINE_PAGE_VECTOR(tls_pages);
- // write zeroes to the rest of the last page
- data.get_range(page_offset, page_size, tls_pages);
- if (tls_pages.empty())
- return 0;
-
- auto page = tls_pages.begin();
- auto data = (*page)->data;
- std::fill(data + (size - page_offset), data + page_size, 0);
- tls_pages.clear(); // drop page ref
- return 0;
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2013- Sage Weil <sage@inktank.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef CEPH_MEMSTORE_H
-#define CEPH_MEMSTORE_H
-
-#include <mutex>
-#include <boost/intrusive_ptr.hpp>
-
-#include "include/unordered_map.h"
-#include "include/memory.h"
-#include "include/Spinlock.h"
-#include "common/Finisher.h"
-#include "common/RefCountedObj.h"
-#include "common/RWLock.h"
-#include "ObjectStore.h"
-#include "PageSet.h"
-#include "include/assert.h"
-
-class MemStore : public ObjectStore {
-private:
- CephContext *const cct;
-
-public:
- struct Object : public RefCountedObject {
- std::mutex xattr_mutex;
- std::mutex omap_mutex;
- map<string,bufferptr> xattr;
- bufferlist omap_header;
- map<string,bufferlist> omap;
-
- typedef boost::intrusive_ptr<Object> Ref;
- friend void intrusive_ptr_add_ref(Object *o) { o->get(); }
- friend void intrusive_ptr_release(Object *o) { o->put(); }
-
- // interface for object data
- virtual size_t get_size() const = 0;
- virtual int read(uint64_t offset, uint64_t len, bufferlist &bl) = 0;
- virtual int write(uint64_t offset, const bufferlist &bl) = 0;
- virtual int clone(Object *src, uint64_t srcoff, uint64_t len,
- uint64_t dstoff) = 0;
- virtual int truncate(uint64_t offset) = 0;
- virtual void encode(bufferlist& bl) const = 0;
- virtual void decode(bufferlist::iterator& p) = 0;
-
- void encode_base(bufferlist& bl) const {
- ::encode(xattr, bl);
- ::encode(omap_header, bl);
- ::encode(omap, bl);
- }
- void decode_base(bufferlist::iterator& p) {
- ::decode(xattr, p);
- ::decode(omap_header, p);
- ::decode(omap, p);
- }
-
- void dump(Formatter *f) const {
- f->dump_int("data_len", get_size());
- f->dump_int("omap_header_len", omap_header.length());
-
- f->open_array_section("xattrs");
- for (map<string,bufferptr>::const_iterator p = xattr.begin();
- p != xattr.end();
- ++p) {
- f->open_object_section("xattr");
- f->dump_string("name", p->first);
- f->dump_int("length", p->second.length());
- f->close_section();
- }
- f->close_section();
-
- f->open_array_section("omap");
- for (map<string,bufferlist>::const_iterator p = omap.begin();
- p != omap.end();
- ++p) {
- f->open_object_section("pair");
- f->dump_string("key", p->first);
- f->dump_int("length", p->second.length());
- f->close_section();
- }
- f->close_section();
- }
- };
- typedef Object::Ref ObjectRef;
-
- struct BufferlistObject : public Object {
- Spinlock mutex;
- bufferlist data;
-
- size_t get_size() const override { return data.length(); }
-
- int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
- int write(uint64_t offset, const bufferlist &bl) override;
- int clone(Object *src, uint64_t srcoff, uint64_t len,
- uint64_t dstoff) override;
- int truncate(uint64_t offset) override;
-
- void encode(bufferlist& bl) const override {
- ENCODE_START(1, 1, bl);
- ::encode(data, bl);
- encode_base(bl);
- ENCODE_FINISH(bl);
- }
- void decode(bufferlist::iterator& p) override {
- DECODE_START(1, p);
- ::decode(data, p);
- decode_base(p);
- DECODE_FINISH(p);
- }
- };
-
- struct PageSetObject : public Object {
- PageSet data;
- uint64_t data_len;
-#if defined(__GLIBCXX__)
- // use a thread-local vector for the pages returned by PageSet, so we
- // can avoid allocations in read/write()
- static thread_local PageSet::page_vector tls_pages;
-#endif
-
- PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
-
- size_t get_size() const override { return data_len; }
-
- int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
- int write(uint64_t offset, const bufferlist &bl) override;
- int clone(Object *src, uint64_t srcoff, uint64_t len,
- uint64_t dstoff) override;
- int truncate(uint64_t offset) override;
-
- void encode(bufferlist& bl) const override {
- ENCODE_START(1, 1, bl);
- ::encode(data_len, bl);
- data.encode(bl);
- encode_base(bl);
- ENCODE_FINISH(bl);
- }
- void decode(bufferlist::iterator& p) override {
- DECODE_START(1, p);
- ::decode(data_len, p);
- data.decode(p);
- decode_base(p);
- DECODE_FINISH(p);
- }
- };
-
- struct Collection : public RefCountedObject {
- CephContext *cct;
- bool use_page_set;
- ceph::unordered_map<ghobject_t, ObjectRef> object_hash; ///< for lookup
- map<ghobject_t, ObjectRef,ghobject_t::BitwiseComparator> object_map; ///< for iteration
- map<string,bufferptr> xattr;
- RWLock lock; ///< for object_{map,hash}
-
- typedef boost::intrusive_ptr<Collection> Ref;
- friend void intrusive_ptr_add_ref(Collection *c) { c->get(); }
- friend void intrusive_ptr_release(Collection *c) { c->put(); }
-
- ObjectRef create_object() const {
- if (use_page_set)
- return new PageSetObject(cct->_conf->memstore_page_size);
- return new BufferlistObject();
- }
-
- // NOTE: The lock only needs to protect the object_map/hash, not the
- // contents of individual objects. The osd is already sequencing
- // reads and writes, so we will never see them concurrently at this
- // level.
-
- ObjectRef get_object(ghobject_t oid) {
- RWLock::RLocker l(lock);
- auto o = object_hash.find(oid);
- if (o == object_hash.end())
- return ObjectRef();
- return o->second;
- }
-
- ObjectRef get_or_create_object(ghobject_t oid) {
- RWLock::WLocker l(lock);
- auto result = object_hash.emplace(oid, ObjectRef());
- if (result.second)
- object_map[oid] = result.first->second = create_object();
- return result.first->second;
- }
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ::encode(xattr, bl);
- ::encode(use_page_set, bl);
- uint32_t s = object_map.size();
- ::encode(s, bl);
- for (map<ghobject_t, ObjectRef,ghobject_t::BitwiseComparator>::const_iterator p = object_map.begin();
- p != object_map.end();
- ++p) {
- ::encode(p->first, bl);
- p->second->encode(bl);
- }
- ENCODE_FINISH(bl);
- }
- void decode(bufferlist::iterator& p) {
- DECODE_START(1, p);
- ::decode(xattr, p);
- ::decode(use_page_set, p);
- uint32_t s;
- ::decode(s, p);
- while (s--) {
- ghobject_t k;
- ::decode(k, p);
- auto o = create_object();
- o->decode(p);
- object_map.insert(make_pair(k, o));
- object_hash.insert(make_pair(k, o));
- }
- DECODE_FINISH(p);
- }
-
- uint64_t used_bytes() const {
- uint64_t result = 0;
- for (map<ghobject_t, ObjectRef,ghobject_t::BitwiseComparator>::const_iterator p = object_map.begin();
- p != object_map.end();
- ++p) {
- result += p->second->get_size();
- }
-
- return result;
- }
-
- Collection(CephContext *cct)
- : cct(cct), use_page_set(cct->_conf->memstore_page_set),
- lock("MemStore::Collection::lock") {}
- };
- typedef Collection::Ref CollectionRef;
-
-private:
- class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
- CollectionRef c;
- ObjectRef o;
- map<string,bufferlist>::iterator it;
- public:
- OmapIteratorImpl(CollectionRef c, ObjectRef o)
- : c(c), o(o), it(o->omap.begin()) {}
-
- int seek_to_first() {
- std::lock_guard<std::mutex>(o->omap_mutex);
- it = o->omap.begin();
- return 0;
- }
- int upper_bound(const string &after) {
- std::lock_guard<std::mutex>(o->omap_mutex);
- it = o->omap.upper_bound(after);
- return 0;
- }
- int lower_bound(const string &to) {
- std::lock_guard<std::mutex>(o->omap_mutex);
- it = o->omap.lower_bound(to);
- return 0;
- }
- bool valid() {
- std::lock_guard<std::mutex>(o->omap_mutex);
- return it != o->omap.end();
- }
- int next(bool validate=true) {
- std::lock_guard<std::mutex>(o->omap_mutex);
- ++it;
- return 0;
- }
- string key() {
- std::lock_guard<std::mutex>(o->omap_mutex);
- return it->first;
- }
- bufferlist value() {
- std::lock_guard<std::mutex>(o->omap_mutex);
- return it->second;
- }
- int status() {
- return 0;
- }
- };
-
-
- ceph::unordered_map<coll_t, CollectionRef> coll_map;
- RWLock coll_lock; ///< rwlock to protect coll_map
- Mutex apply_lock; ///< serialize all updates
-
- CollectionRef get_collection(coll_t cid);
-
- Finisher finisher;
-
- uint64_t used_bytes;
-
- void _do_transaction(Transaction& t);
-
- int _touch(coll_t cid, const ghobject_t& oid);
- int _write(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
- const bufferlist& bl, uint32_t fadvsie_flags = 0);
- int _zero(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len);
- int _truncate(coll_t cid, const ghobject_t& oid, uint64_t size);
- int _remove(coll_t cid, const ghobject_t& oid);
- int _setattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& aset);
- int _rmattr(coll_t cid, const ghobject_t& oid, const char *name);
- int _rmattrs(coll_t cid, const ghobject_t& oid);
- int _clone(coll_t cid, const ghobject_t& oldoid, const ghobject_t& newoid);
- int _clone_range(coll_t cid, const ghobject_t& oldoid,
- const ghobject_t& newoid,
- uint64_t srcoff, uint64_t len, uint64_t dstoff);
- int _omap_clear(coll_t cid, const ghobject_t &oid);
- int _omap_setkeys(coll_t cid, const ghobject_t &oid, bufferlist& aset_bl);
- int _omap_rmkeys(coll_t cid, const ghobject_t &oid, bufferlist& keys_bl);
- int _omap_rmkeyrange(coll_t cid, const ghobject_t &oid,
- const string& first, const string& last);
- int _omap_setheader(coll_t cid, const ghobject_t &oid, const bufferlist &bl);
-
- int _collection_hint_expected_num_objs(coll_t cid, uint32_t pg_num,
- uint64_t num_objs) const { return 0; }
- int _create_collection(coll_t c);
- int _destroy_collection(coll_t c);
- int _collection_add(coll_t cid, coll_t ocid, const ghobject_t& oid);
- int _collection_move_rename(coll_t oldcid, const ghobject_t& oldoid,
- coll_t cid, const ghobject_t& o);
- int _split_collection(coll_t cid, uint32_t bits, uint32_t rem, coll_t dest);
-
- int _save();
- int _load();
-
- void dump(Formatter *f);
- void dump_all();
-
-public:
- MemStore(CephContext *cct, const string& path)
- : ObjectStore(path),
- cct(cct),
- coll_lock("MemStore::coll_lock"),
- apply_lock("MemStore::apply_lock"),
- finisher(cct),
- used_bytes(0) {}
- ~MemStore() { }
-
- bool test_mount_in_use() {
- return false;
- }
-
- int mount();
- int umount();
-
- unsigned get_max_object_name_length() {
- return 4096;
- }
- unsigned get_max_attr_name_length() {
- return 256; // arbitrary; there is no real limit internally
- }
-
- int mkfs();
- int mkjournal() {
- return 0;
- }
- bool wants_journal() {
- return false;
- }
- bool allows_journal() {
- return false;
- }
- bool needs_journal() {
- return false;
- }
-
- int statfs(struct statfs *buf);
-
- bool exists(coll_t cid, const ghobject_t& oid);
- int stat(
- coll_t cid,
- const ghobject_t& oid,
- struct stat *st,
- bool allow_eio = false); // struct stat?
- int read(
- coll_t cid,
- const ghobject_t& oid,
- uint64_t offset,
- size_t len,
- bufferlist& bl,
- uint32_t op_flags = 0,
- bool allow_eio = false);
- int fiemap(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl);
- int getattr(coll_t cid, const ghobject_t& oid, const char *name, bufferptr& value);
- int getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& aset);
-
- int list_collections(vector<coll_t>& ls);
- bool collection_exists(coll_t c);
- bool collection_empty(coll_t c);
- int collection_list(coll_t cid, ghobject_t start, ghobject_t end,
- bool sort_bitwise, int max,
- vector<ghobject_t> *ls, ghobject_t *next);
-
- int omap_get(
- coll_t cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- bufferlist *header, ///< [out] omap header
- map<string, bufferlist> *out /// < [out] Key to value map
- );
-
- /// Get omap header
- int omap_get_header(
- coll_t cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- bufferlist *header, ///< [out] omap header
- bool allow_eio = false ///< [in] don't assert on eio
- );
-
- /// Get keys defined on oid
- int omap_get_keys(
- coll_t cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- set<string> *keys ///< [out] Keys defined on oid
- );
-
- /// Get key values
- int omap_get_values(
- coll_t cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- const set<string> &keys, ///< [in] Keys to get
- map<string, bufferlist> *out ///< [out] Returned keys and values
- );
-
- /// Filters keys into out which are defined on oid
- int omap_check_keys(
- coll_t cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- const set<string> &keys, ///< [in] Keys to check
- set<string> *out ///< [out] Subset of keys defined on oid
- );
-
- ObjectMap::ObjectMapIterator get_omap_iterator(
- coll_t cid, ///< [in] collection
- const ghobject_t &oid ///< [in] object
- );
-
- void set_fsid(uuid_d u);
- uuid_d get_fsid();
-
- objectstore_perf_stat_t get_cur_stats();
-
- int queue_transactions(
- Sequencer *osr, list<Transaction*>& tls,
- TrackedOpRef op = TrackedOpRef(),
- ThreadPool::TPHandle *handle = NULL);
-};
-
-
-
-
-#endif
#include "common/safe_io.h"
#include "filestore/FileStore.h"
-#include "MemStore.h"
+#include "memstore/MemStore.h"
#include "KeyValueStore.h"
#if defined(HAVE_LIBAIO)
#include "bluestore/BlueStore.h"
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2013- Sage Weil <sage@inktank.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_PAGESET_H
-#define CEPH_PAGESET_H
-
-#include <algorithm>
-#include <atomic>
-#include <cassert>
-#include <mutex>
-#include <vector>
-#include <boost/intrusive/avl_set.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-#include "include/encoding.h"
-#include "include/Spinlock.h"
-
-
-struct Page {
- char *const data;
- boost::intrusive::avl_set_member_hook<> hook;
- uint64_t offset;
-
- // avoid RefCountedObject because it has a virtual destructor
- std::atomic<uint16_t> nrefs;
- void get() { ++nrefs; }
- void put() { if (--nrefs == 0) delete this; }
-
- typedef boost::intrusive_ptr<Page> Ref;
- friend void intrusive_ptr_add_ref(Page *p) { p->get(); }
- friend void intrusive_ptr_release(Page *p) { p->put(); }
-
- // key-value comparison functor for avl
- struct Less {
- bool operator()(uint64_t offset, const Page &page) const {
- return offset < page.offset;
- }
- bool operator()(const Page &page, uint64_t offset) const {
- return page.offset < offset;
- }
- bool operator()(const Page &lhs, const Page &rhs) const {
- return lhs.offset < rhs.offset;
- }
- };
- void encode(bufferlist &bl, size_t page_size) const {
- bl.append(buffer::copy(data, page_size));
- ::encode(offset, bl);
- }
- void decode(bufferlist::iterator &p, size_t page_size) {
- ::decode_array_nohead(data, page_size, p);
- ::decode(offset, p);
- }
-
- static Ref create(size_t page_size, uint64_t offset = 0) {
- // allocate the Page and its data in a single buffer
- auto buffer = new char[page_size + sizeof(Page)];
- // place the Page structure at the end of the buffer
- return new (buffer + page_size) Page(buffer, offset);
- }
-
- // copy disabled
- Page(const Page&) = delete;
- const Page& operator=(const Page&) = delete;
-
- private: // private constructor, use create() instead
- Page(char *data, uint64_t offset) : data(data), offset(offset), nrefs(1) {}
-
- static void operator delete(void *p) {
- delete[] reinterpret_cast<Page*>(p)->data;
- }
-};
-
-class PageSet {
- public:
- // alloc_range() and get_range() return page refs in a vector
- typedef std::vector<Page::Ref> page_vector;
-
- private:
- // store pages in a boost intrusive avl_set
- typedef Page::Less page_cmp;
- typedef boost::intrusive::member_hook<Page,
- boost::intrusive::avl_set_member_hook<>,
- &Page::hook> member_option;
- typedef boost::intrusive::avl_set<Page,
- boost::intrusive::compare<page_cmp>, member_option> page_set;
-
- typedef typename page_set::iterator iterator;
-
- page_set pages;
- uint64_t page_size;
-
- typedef Spinlock lock_type;
- lock_type mutex;
-
- void free_pages(iterator cur, iterator end) {
- while (cur != end) {
- Page *page = &*cur;
- cur = pages.erase(cur);
- page->put();
- }
- }
-
- int count_pages(uint64_t offset, uint64_t len) const {
- // count the overlapping pages
- int count = 0;
- if (offset % page_size) {
- count++;
- size_t rem = page_size - offset % page_size;
- len = len <= rem ? 0 : len - rem;
- }
- count += len / page_size;
- if (len % page_size)
- count++;
- return count;
- }
-
- public:
- PageSet(size_t page_size) : page_size(page_size) {}
- PageSet(PageSet &&rhs)
- : pages(std::move(rhs.pages)), page_size(rhs.page_size) {}
- ~PageSet() {
- free_pages(pages.begin(), pages.end());
- }
-
- // disable copy
- PageSet(const PageSet&) = delete;
- const PageSet& operator=(const PageSet&) = delete;
-
- bool empty() const { return pages.empty(); }
- size_t size() const { return pages.size(); }
- size_t get_page_size() const { return page_size; }
-
- // allocate all pages that intersect the range [offset,length)
- void alloc_range(uint64_t offset, uint64_t length, page_vector &range) {
- // loop in reverse so we can provide hints to avl_set::insert_check()
- // and get O(1) insertions after the first
- uint64_t position = offset + length - 1;
-
- range.resize(count_pages(offset, length));
- auto out = range.rbegin();
-
- std::lock_guard<lock_type> lock(mutex);
- iterator cur = pages.end();
- while (length) {
- const uint64_t page_offset = position & ~(page_size-1);
-
- typename page_set::insert_commit_data commit;
- auto insert = pages.insert_check(cur, page_offset, page_cmp(), commit);
- if (insert.second) {
- auto page = Page::create(page_size, page_offset);
- cur = pages.insert_commit(*page, commit);
-
- // assume that the caller will write to the range [offset,length),
- // so we only need to zero memory outside of this range
-
- // zero end of page past offset + length
- if (offset + length < page->offset + page_size)
- std::fill(page->data + offset + length - page->offset,
- page->data + page_size, 0);
- // zero front of page between page_offset and offset
- if (offset > page->offset)
- std::fill(page->data, page->data + offset - page->offset, 0);
- } else { // exists
- cur = insert.first;
- }
- // add a reference to output vector
- out->reset(&*cur);
- ++out;
-
- auto c = std::min(length, (position & (page_size-1)) + 1);
- position -= c;
- length -= c;
- }
- // make sure we sized the vector correctly
- assert(out == range.rend());
- }
-
- // return all allocated pages that intersect the range [offset,length)
- void get_range(uint64_t offset, uint64_t length, page_vector &range) {
- auto cur = pages.lower_bound(offset & ~(page_size-1), page_cmp());
- while (cur != pages.end() && cur->offset < offset + length)
- range.push_back(&*cur++);
- }
-
- void free_pages_after(uint64_t offset) {
- std::lock_guard<lock_type> lock(mutex);
- auto cur = pages.lower_bound(offset & ~(page_size-1), page_cmp());
- if (cur == pages.end())
- return;
- if (cur->offset < offset)
- cur++;
- free_pages(cur, pages.end());
- }
-
- void encode(bufferlist &bl) const {
- ::encode(page_size, bl);
- unsigned count = pages.size();
- ::encode(count, bl);
- for (auto p = pages.rbegin(); p != pages.rend(); ++p)
- p->encode(bl, page_size);
- }
- void decode(bufferlist::iterator &p) {
- assert(empty());
- ::decode(page_size, p);
- unsigned count;
- ::decode(count, p);
- auto cur = pages.end();
- for (unsigned i = 0; i < count; i++) {
- auto page = Page::create(page_size);
- page->decode(p, page_size);
- cur = pages.insert_before(cur, *page);
- }
- }
-};
-
-#endif // CEPH_PAGESET_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include "acconfig.h"
+
+#ifdef HAVE_SYS_MOUNT_H
+#include <sys/mount.h>
+#endif
+
+#ifdef HAVE_SYS_PARAM_H
+#include <sys/param.h>
+#endif
+
+#include "include/types.h"
+#include "include/stringify.h"
+#include "include/unordered_map.h"
+#include "include/memory.h"
+#include "common/errno.h"
+#include "MemStore.h"
+#include "include/compat.h"
+
+#define dout_subsys ceph_subsys_filestore
+#undef dout_prefix
+#define dout_prefix *_dout << "memstore(" << path << ") "
+
+// for comparing collections for lock ordering
+bool operator>(const MemStore::CollectionRef& l,
+ const MemStore::CollectionRef& r)
+{
+ return (unsigned long)l.get() > (unsigned long)r.get();
+}
+
+
+int MemStore::mount()
+{
+ int r = _load();
+ if (r < 0)
+ return r;
+ finisher.start();
+ return 0;
+}
+
+int MemStore::umount()
+{
+ finisher.stop();
+ return _save();
+}
+
+int MemStore::_save()
+{
+ dout(10) << __func__ << dendl;
+ Mutex::Locker l(apply_lock); // block any writer
+ dump_all();
+ set<coll_t> collections;
+ for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
+ p != coll_map.end();
+ ++p) {
+ dout(20) << __func__ << " coll " << p->first << " " << p->second << dendl;
+ collections.insert(p->first);
+ bufferlist bl;
+ assert(p->second);
+ p->second->encode(bl);
+ string fn = path + "/" + stringify(p->first);
+ int r = bl.write_file(fn.c_str());
+ if (r < 0)
+ return r;
+ }
+
+ string fn = path + "/collections";
+ bufferlist bl;
+ ::encode(collections, bl);
+ int r = bl.write_file(fn.c_str());
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+void MemStore::dump_all()
+{
+ Formatter *f = Formatter::create("json-pretty");
+ f->open_object_section("store");
+ dump(f);
+ f->close_section();
+ dout(0) << "dump:";
+ f->flush(*_dout);
+ *_dout << dendl;
+ delete f;
+}
+
+void MemStore::dump(Formatter *f)
+{
+ f->open_array_section("collections");
+ for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
+ p != coll_map.end();
+ ++p) {
+ f->open_object_section("collection");
+ f->dump_string("name", stringify(p->first));
+
+ f->open_array_section("xattrs");
+ for (map<string,bufferptr>::iterator q = p->second->xattr.begin();
+ q != p->second->xattr.end();
+ ++q) {
+ f->open_object_section("xattr");
+ f->dump_string("name", q->first);
+ f->dump_int("length", q->second.length());
+ f->close_section();
+ }
+ f->close_section();
+
+ f->open_array_section("objects");
+ for (map<ghobject_t,ObjectRef,ghobject_t::BitwiseComparator>::iterator q = p->second->object_map.begin();
+ q != p->second->object_map.end();
+ ++q) {
+ f->open_object_section("object");
+ f->dump_string("name", stringify(q->first));
+ if (q->second)
+ q->second->dump(f);
+ f->close_section();
+ }
+ f->close_section();
+
+ f->close_section();
+ }
+ f->close_section();
+}
+
+int MemStore::_load()
+{
+ dout(10) << __func__ << dendl;
+ bufferlist bl;
+ string fn = path + "/collections";
+ string err;
+ int r = bl.read_file(fn.c_str(), &err);
+ if (r < 0)
+ return r;
+
+ set<coll_t> collections;
+ bufferlist::iterator p = bl.begin();
+ ::decode(collections, p);
+
+ for (set<coll_t>::iterator q = collections.begin();
+ q != collections.end();
+ ++q) {
+ string fn = path + "/" + stringify(*q);
+ bufferlist cbl;
+ int r = cbl.read_file(fn.c_str(), &err);
+ if (r < 0)
+ return r;
+ CollectionRef c(new Collection(cct));
+ bufferlist::iterator p = cbl.begin();
+ c->decode(p);
+ coll_map[*q] = c;
+ used_bytes += c->used_bytes();
+ }
+
+ dump_all();
+
+ return 0;
+}
+
+void MemStore::set_fsid(uuid_d u)
+{
+ int r = write_meta("fs_fsid", stringify(u));
+ assert(r >= 0);
+}
+
+uuid_d MemStore::get_fsid()
+{
+ string fsid_str;
+ int r = read_meta("fs_fsid", &fsid_str);
+ assert(r >= 0);
+ uuid_d uuid;
+ bool b = uuid.parse(fsid_str.c_str());
+ assert(b);
+ return uuid;
+}
+
+int MemStore::mkfs()
+{
+ string fsid_str;
+ int r = read_meta("fs_fsid", &fsid_str);
+ if (r == -ENOENT) {
+ uuid_d fsid;
+ fsid.generate_random();
+ fsid_str = stringify(fsid);
+ r = write_meta("fs_fsid", fsid_str);
+ if (r < 0)
+ return r;
+ dout(1) << __func__ << " new fsid " << fsid_str << dendl;
+ } else {
+ dout(1) << __func__ << " had fsid " << fsid_str << dendl;
+ }
+
+ string fn = path + "/collections";
+ derr << path << dendl;
+ bufferlist bl;
+ set<coll_t> collections;
+ ::encode(collections, bl);
+ r = bl.write_file(fn.c_str());
+ if (r < 0)
+ return r;
+
+ r = write_meta("type", "memstore");
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+int MemStore::statfs(struct statfs *st)
+{
+ dout(10) << __func__ << dendl;
+ st->f_bsize = 1024;
+
+ // Device size is a configured constant
+ st->f_blocks = g_conf->memstore_device_bytes / st->f_bsize;
+
+ dout(10) << __func__ << ": used_bytes: " << used_bytes << "/" << g_conf->memstore_device_bytes << dendl;
+ st->f_bfree = st->f_bavail = MAX((long(st->f_blocks) - long(used_bytes / st->f_bsize)), 0);
+
+ return 0;
+}
+
+objectstore_perf_stat_t MemStore::get_cur_stats()
+{
+ // fixme
+ return objectstore_perf_stat_t();
+}
+
+MemStore::CollectionRef MemStore::get_collection(coll_t cid)
+{
+ RWLock::RLocker l(coll_lock);
+ ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
+ if (cp == coll_map.end())
+ return CollectionRef();
+ return cp->second;
+}
+
+
+// ---------------
+// read operations
+
+bool MemStore::exists(coll_t cid, const ghobject_t& oid)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return false;
+
+ // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
+ // shared_ptr needs to be compared to nullptr.
+ return (bool)c->get_object(oid);
+}
+
+int MemStore::stat(
+ coll_t cid,
+ const ghobject_t& oid,
+ struct stat *st,
+ bool allow_eio)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ st->st_size = o->get_size();
+ st->st_blksize = 4096;
+ st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
+ st->st_nlink = 1;
+ return 0;
+}
+
+int MemStore::read(
+ coll_t cid,
+ const ghobject_t& oid,
+ uint64_t offset,
+ size_t len,
+ bufferlist& bl,
+ uint32_t op_flags,
+ bool allow_eio)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << " "
+ << offset << "~" << len << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ if (offset >= o->get_size())
+ return 0;
+ size_t l = len;
+ if (l == 0) // note: len == 0 means read the entire object
+ l = o->get_size();
+ else if (offset + l > o->get_size())
+ l = o->get_size() - offset;
+ bl.clear();
+ return o->read(offset, l, bl);
+}
+
+int MemStore::fiemap(coll_t cid, const ghobject_t& oid,
+ uint64_t offset, size_t len, bufferlist& bl)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
+ << len << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ if (offset >= o->get_size())
+ return 0;
+ size_t l = len;
+ if (offset + l > o->get_size())
+ l = o->get_size() - offset;
+ map<uint64_t, uint64_t> m;
+ m[offset] = l;
+ ::encode(m, bl);
+ return 0;
+}
+
+int MemStore::getattr(coll_t cid, const ghobject_t& oid,
+ const char *name, bufferptr& value)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ string k(name);
+ std::lock_guard<std::mutex> lock(o->xattr_mutex);
+ if (!o->xattr.count(k)) {
+ return -ENODATA;
+ }
+ value = o->xattr[k];
+ return 0;
+}
+
+int MemStore::getattrs(coll_t cid, const ghobject_t& oid,
+ map<string,bufferptr>& aset)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->xattr_mutex);
+ aset = o->xattr;
+ return 0;
+}
+
+int MemStore::list_collections(vector<coll_t>& ls)
+{
+ dout(10) << __func__ << dendl;
+ RWLock::RLocker l(coll_lock);
+ for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
+ p != coll_map.end();
+ ++p) {
+ ls.push_back(p->first);
+ }
+ return 0;
+}
+
+bool MemStore::collection_exists(coll_t cid)
+{
+ dout(10) << __func__ << " " << cid << dendl;
+ RWLock::RLocker l(coll_lock);
+ return coll_map.count(cid);
+}
+
+bool MemStore::collection_empty(coll_t cid)
+{
+ dout(10) << __func__ << " " << cid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+ RWLock::RLocker l(c->lock);
+
+ return c->object_map.empty();
+}
+
+int MemStore::collection_list(coll_t cid, ghobject_t start, ghobject_t end,
+ bool sort_bitwise, int max,
+ vector<ghobject_t> *ls, ghobject_t *next)
+{
+ if (!sort_bitwise)
+ return -EOPNOTSUPP;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+ RWLock::RLocker l(c->lock);
+
+ map<ghobject_t,ObjectRef,ghobject_t::BitwiseComparator>::iterator p = c->object_map.lower_bound(start);
+ while (p != c->object_map.end() &&
+ ls->size() < (unsigned)max &&
+ cmp_bitwise(p->first, end) < 0) {
+ ls->push_back(p->first);
+ ++p;
+ }
+ if (next != NULL) {
+ if (p == c->object_map.end())
+ *next = ghobject_t::get_max();
+ else
+ *next = p->first;
+ }
+ return 0;
+}
+
+int MemStore::omap_get(
+ coll_t cid, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ bufferlist *header, ///< [out] omap header
+ map<string, bufferlist> *out /// < [out] Key to value map
+ )
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->omap_mutex);
+ *header = o->omap_header;
+ *out = o->omap;
+ return 0;
+}
+
+int MemStore::omap_get_header(
+ coll_t cid, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ bufferlist *header, ///< [out] omap header
+ bool allow_eio ///< [in] don't assert on eio
+ )
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->omap_mutex);
+ *header = o->omap_header;
+ return 0;
+}
+
+int MemStore::omap_get_keys(
+ coll_t cid, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ set<string> *keys ///< [out] Keys defined on oid
+ )
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->omap_mutex);
+ for (map<string,bufferlist>::iterator p = o->omap.begin();
+ p != o->omap.end();
+ ++p)
+ keys->insert(p->first);
+ return 0;
+}
+
+int MemStore::omap_get_values(
+ coll_t cid, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ const set<string> &keys, ///< [in] Keys to get
+ map<string, bufferlist> *out ///< [out] Returned keys and values
+ )
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->omap_mutex);
+ for (set<string>::const_iterator p = keys.begin();
+ p != keys.end();
+ ++p) {
+ map<string,bufferlist>::iterator q = o->omap.find(*p);
+ if (q != o->omap.end())
+ out->insert(*q);
+ }
+ return 0;
+}
+
+int MemStore::omap_check_keys(
+ coll_t cid, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ const set<string> &keys, ///< [in] Keys to check
+ set<string> *out ///< [out] Subset of keys defined on oid
+ )
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->omap_mutex);
+ for (set<string>::const_iterator p = keys.begin();
+ p != keys.end();
+ ++p) {
+ map<string,bufferlist>::iterator q = o->omap.find(*p);
+ if (q != o->omap.end())
+ out->insert(*p);
+ }
+ return 0;
+}
+
+ObjectMap::ObjectMapIterator MemStore::get_omap_iterator(coll_t cid,
+ const ghobject_t& oid)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return ObjectMap::ObjectMapIterator();
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return ObjectMap::ObjectMapIterator();
+ return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o));
+}
+
+
+// ---------------
+// write operations
+
+int MemStore::queue_transactions(Sequencer *osr,
+ list<Transaction*>& tls,
+ TrackedOpRef op,
+ ThreadPool::TPHandle *handle)
+{
+ // because memstore operations are synchronous, we can implement the
+ // Sequencer with a mutex. this guarantees ordering on a given sequencer,
+ // while allowing operations on different sequencers to happen in parallel
+ struct OpSequencer : public Sequencer_impl {
+ std::mutex mutex;
+ void flush() override {}
+ bool flush_commit(Context*) override { return true; }
+ };
+
+ std::unique_lock<std::mutex> lock;
+ if (osr) {
+ auto seq = reinterpret_cast<OpSequencer**>(&osr->p);
+ if (*seq == nullptr)
+ *seq = new OpSequencer;
+ lock = std::unique_lock<std::mutex>((*seq)->mutex);
+ }
+
+ for (list<Transaction*>::iterator p = tls.begin(); p != tls.end(); ++p) {
+ // poke the TPHandle heartbeat just to exercise that code path
+ if (handle)
+ handle->reset_tp_timeout();
+
+ _do_transaction(**p);
+ }
+
+ Context *on_apply = NULL, *on_apply_sync = NULL, *on_commit = NULL;
+ ObjectStore::Transaction::collect_contexts(tls, &on_apply, &on_commit,
+ &on_apply_sync);
+ if (on_apply_sync)
+ on_apply_sync->complete(0);
+ if (on_apply)
+ finisher.queue(on_apply);
+ if (on_commit)
+ finisher.queue(on_commit);
+ return 0;
+}
+
+void MemStore::_do_transaction(Transaction& t)
+{
+ Transaction::iterator i = t.begin();
+ int pos = 0;
+
+ while (i.have_op()) {
+ Transaction::Op *op = i.decode_op();
+ int r = 0;
+
+ switch (op->op) {
+ case Transaction::OP_NOP:
+ break;
+ case Transaction::OP_TOUCH:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ r = _touch(cid, oid);
+ }
+ break;
+
+ case Transaction::OP_WRITE:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ uint64_t off = op->off;
+ uint64_t len = op->len;
+ uint32_t fadvise_flags = i.get_fadvise_flags();
+ bufferlist bl;
+ i.decode_bl(bl);
+ r = _write(cid, oid, off, len, bl, fadvise_flags);
+ }
+ break;
+
+ case Transaction::OP_ZERO:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ uint64_t off = op->off;
+ uint64_t len = op->len;
+ r = _zero(cid, oid, off, len);
+ }
+ break;
+
+ case Transaction::OP_TRIMCACHE:
+ {
+ // deprecated, no-op
+ }
+ break;
+
+ case Transaction::OP_TRUNCATE:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ uint64_t off = op->off;
+ r = _truncate(cid, oid, off);
+ }
+ break;
+
+ case Transaction::OP_REMOVE:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ r = _remove(cid, oid);
+ }
+ break;
+
+ case Transaction::OP_SETATTR:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ string name = i.decode_string();
+ bufferlist bl;
+ i.decode_bl(bl);
+ map<string, bufferptr> to_set;
+ to_set[name] = bufferptr(bl.c_str(), bl.length());
+ r = _setattrs(cid, oid, to_set);
+ }
+ break;
+
+ case Transaction::OP_SETATTRS:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ map<string, bufferptr> aset;
+ i.decode_attrset(aset);
+ r = _setattrs(cid, oid, aset);
+ }
+ break;
+
+ case Transaction::OP_RMATTR:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ string name = i.decode_string();
+ r = _rmattr(cid, oid, name.c_str());
+ }
+ break;
+
+ case Transaction::OP_RMATTRS:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ r = _rmattrs(cid, oid);
+ }
+ break;
+
+ case Transaction::OP_CLONE:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ ghobject_t noid = i.get_oid(op->dest_oid);
+ r = _clone(cid, oid, noid);
+ }
+ break;
+
+ case Transaction::OP_CLONERANGE:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ ghobject_t noid = i.get_oid(op->dest_oid);
+ uint64_t off = op->off;
+ uint64_t len = op->len;
+ r = _clone_range(cid, oid, noid, off, len, off);
+ }
+ break;
+
+ case Transaction::OP_CLONERANGE2:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ ghobject_t noid = i.get_oid(op->dest_oid);
+ uint64_t srcoff = op->off;
+ uint64_t len = op->len;
+ uint64_t dstoff = op->dest_off;
+ r = _clone_range(cid, oid, noid, srcoff, len, dstoff);
+ }
+ break;
+
+ case Transaction::OP_MKCOLL:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ r = _create_collection(cid);
+ }
+ break;
+
+ case Transaction::OP_COLL_HINT:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ uint32_t type = op->hint_type;
+ bufferlist hint;
+ i.decode_bl(hint);
+ bufferlist::iterator hiter = hint.begin();
+ if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
+ uint32_t pg_num;
+ uint64_t num_objs;
+ ::decode(pg_num, hiter);
+ ::decode(num_objs, hiter);
+ r = _collection_hint_expected_num_objs(cid, pg_num, num_objs);
+ } else {
+ // Ignore the hint
+ dout(10) << "Unrecognized collection hint type: " << type << dendl;
+ }
+ }
+ break;
+
+ case Transaction::OP_RMCOLL:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ r = _destroy_collection(cid);
+ }
+ break;
+
+ case Transaction::OP_COLL_ADD:
+ {
+ coll_t ocid = i.get_cid(op->cid);
+ coll_t ncid = i.get_cid(op->dest_cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ r = _collection_add(ncid, ocid, oid);
+ }
+ break;
+
+ case Transaction::OP_COLL_REMOVE:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ r = _remove(cid, oid);
+ }
+ break;
+
+ case Transaction::OP_COLL_MOVE:
+ assert(0 == "deprecated");
+ break;
+
+ case Transaction::OP_COLL_MOVE_RENAME:
+ {
+ coll_t oldcid = i.get_cid(op->cid);
+ ghobject_t oldoid = i.get_oid(op->oid);
+ coll_t newcid = i.get_cid(op->dest_cid);
+ ghobject_t newoid = i.get_oid(op->dest_oid);
+ r = _collection_move_rename(oldcid, oldoid, newcid, newoid);
+ }
+ break;
+
+ case Transaction::OP_COLL_SETATTR:
+ {
+ assert(0 == "not implemented");
+ }
+ break;
+
+ case Transaction::OP_COLL_RMATTR:
+ {
+ assert(0 == "not implemented");
+ }
+ break;
+
+ case Transaction::OP_COLL_RENAME:
+ {
+ assert(0 == "not implemented");
+ }
+ break;
+
+ case Transaction::OP_OMAP_CLEAR:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ r = _omap_clear(cid, oid);
+ }
+ break;
+ case Transaction::OP_OMAP_SETKEYS:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ bufferlist aset_bl;
+ i.decode_attrset_bl(&aset_bl);
+ r = _omap_setkeys(cid, oid, aset_bl);
+ }
+ break;
+ case Transaction::OP_OMAP_RMKEYS:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ bufferlist keys_bl;
+ i.decode_keyset_bl(&keys_bl);
+ r = _omap_rmkeys(cid, oid, keys_bl);
+ }
+ break;
+ case Transaction::OP_OMAP_RMKEYRANGE:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ string first, last;
+ first = i.decode_string();
+ last = i.decode_string();
+ r = _omap_rmkeyrange(cid, oid, first, last);
+ }
+ break;
+ case Transaction::OP_OMAP_SETHEADER:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ bufferlist bl;
+ i.decode_bl(bl);
+ r = _omap_setheader(cid, oid, bl);
+ }
+ break;
+ case Transaction::OP_SPLIT_COLLECTION:
+ assert(0 == "deprecated");
+ break;
+ case Transaction::OP_SPLIT_COLLECTION2:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ uint32_t bits = op->split_bits;
+ uint32_t rem = op->split_rem;
+ coll_t dest = i.get_cid(op->dest_cid);
+ r = _split_collection(cid, bits, rem, dest);
+ }
+ break;
+
+ case Transaction::OP_SETALLOCHINT:
+ {
+ r = 0;
+ }
+ break;
+
+ default:
+ derr << "bad op " << op->op << dendl;
+ assert(0);
+ }
+
+ if (r < 0) {
+ bool ok = false;
+
+ if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
+ op->op == Transaction::OP_CLONE ||
+ op->op == Transaction::OP_CLONERANGE2 ||
+ op->op == Transaction::OP_COLL_ADD))
+ // -ENOENT is usually okay
+ ok = true;
+ if (r == -ENODATA)
+ ok = true;
+
+ if (!ok) {
+ const char *msg = "unexpected error code";
+
+ if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
+ op->op == Transaction::OP_CLONE ||
+ op->op == Transaction::OP_CLONERANGE2))
+ msg = "ENOENT on clone suggests osd bug";
+
+ if (r == -ENOSPC)
+ // For now, if we hit _any_ ENOSPC, crash, before we do any damage
+ // by partially applying transactions.
+ msg = "ENOSPC handling not implemented";
+
+ if (r == -ENOTEMPTY) {
+ msg = "ENOTEMPTY suggests garbage data in osd data dir";
+ dump_all();
+ }
+
+ dout(0) << " error " << cpp_strerror(r) << " not handled on operation " << op->op
+ << " (op " << pos << ", counting from 0)" << dendl;
+ dout(0) << msg << dendl;
+ dout(0) << " transaction dump:\n";
+ JSONFormatter f(true);
+ f.open_object_section("transaction");
+ t.dump(&f);
+ f.close_section();
+ f.flush(*_dout);
+ *_dout << dendl;
+ assert(0 == "unexpected error");
+ }
+ }
+
+ ++pos;
+ }
+}
+
+int MemStore::_touch(coll_t cid, const ghobject_t& oid)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ c->get_or_create_object(oid);
+ return 0;
+}
+
+int MemStore::_write(coll_t cid, const ghobject_t& oid,
+ uint64_t offset, size_t len, const bufferlist& bl,
+ uint32_t fadvise_flags)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << " "
+ << offset << "~" << len << dendl;
+ assert(len == bl.length());
+
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_or_create_object(oid);
+ const ssize_t old_size = o->get_size();
+ o->write(offset, bl);
+ used_bytes += (o->get_size() - old_size);
+
+ return 0;
+}
+
+int MemStore::_zero(coll_t cid, const ghobject_t& oid,
+ uint64_t offset, size_t len)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
+ << len << dendl;
+ bufferptr bp(len);
+ bp.zero();
+ bufferlist bl;
+ bl.push_back(bp);
+ return _write(cid, oid, offset, len, bl);
+}
+
+int MemStore::_truncate(coll_t cid, const ghobject_t& oid, uint64_t size)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << " " << size << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ const ssize_t old_size = o->get_size();
+ int r = o->truncate(size);
+ used_bytes += (o->get_size() - old_size);
+ return r;
+}
+
+int MemStore::_remove(coll_t cid, const ghobject_t& oid)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+ RWLock::WLocker l(c->lock);
+
+ auto i = c->object_hash.find(oid);
+ if (i == c->object_hash.end())
+ return -ENOENT;
+ used_bytes -= i->second->get_size();
+ c->object_hash.erase(i);
+ c->object_map.erase(oid);
+
+ return 0;
+}
+
+int MemStore::_setattrs(coll_t cid, const ghobject_t& oid,
+ map<string,bufferptr>& aset)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->xattr_mutex);
+ for (map<string,bufferptr>::const_iterator p = aset.begin(); p != aset.end(); ++p)
+ o->xattr[p->first] = p->second;
+ return 0;
+}
+
+int MemStore::_rmattr(coll_t cid, const ghobject_t& oid, const char *name)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->xattr_mutex);
+ auto i = o->xattr.find(name);
+ if (i == o->xattr.end())
+ return -ENODATA;
+ o->xattr.erase(i);
+ return 0;
+}
+
+int MemStore::_rmattrs(coll_t cid, const ghobject_t& oid)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->xattr_mutex);
+ o->xattr.clear();
+ return 0;
+}
+
+int MemStore::_clone(coll_t cid, const ghobject_t& oldoid,
+ const ghobject_t& newoid)
+{
+ dout(10) << __func__ << " " << cid << " " << oldoid
+ << " -> " << newoid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef oo = c->get_object(oldoid);
+ if (!oo)
+ return -ENOENT;
+ ObjectRef no = c->get_or_create_object(newoid);
+ used_bytes += oo->get_size() - no->get_size();
+ no->clone(oo.get(), 0, oo->get_size(), 0);
+
+ // take xattr and omap locks with std::lock()
+ std::unique_lock<std::mutex>
+ ox_lock(oo->xattr_mutex, std::defer_lock),
+ nx_lock(no->xattr_mutex, std::defer_lock),
+ oo_lock(oo->omap_mutex, std::defer_lock),
+ no_lock(no->omap_mutex, std::defer_lock);
+ std::lock(ox_lock, nx_lock, oo_lock, no_lock);
+
+ no->omap_header = oo->omap_header;
+ no->omap = oo->omap;
+ no->xattr = oo->xattr;
+ return 0;
+}
+
+int MemStore::_clone_range(coll_t cid, const ghobject_t& oldoid,
+ const ghobject_t& newoid,
+ uint64_t srcoff, uint64_t len, uint64_t dstoff)
+{
+ dout(10) << __func__ << " " << cid << " "
+ << oldoid << " " << srcoff << "~" << len << " -> "
+ << newoid << " " << dstoff << "~" << len
+ << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef oo = c->get_object(oldoid);
+ if (!oo)
+ return -ENOENT;
+ ObjectRef no = c->get_or_create_object(newoid);
+ if (srcoff >= oo->get_size())
+ return 0;
+ if (srcoff + len >= oo->get_size())
+ len = oo->get_size() - srcoff;
+
+ const ssize_t old_size = no->get_size();
+ no->clone(oo.get(), srcoff, len, dstoff);
+ used_bytes += (no->get_size() - old_size);
+
+ return len;
+}
+
+int MemStore::_omap_clear(coll_t cid, const ghobject_t &oid)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->omap_mutex);
+ o->omap.clear();
+ o->omap_header.clear();
+ return 0;
+}
+
+int MemStore::_omap_setkeys(coll_t cid, const ghobject_t &oid,
+ bufferlist& aset_bl)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->omap_mutex);
+ bufferlist::iterator p = aset_bl.begin();
+ __u32 num;
+ ::decode(num, p);
+ while (num--) {
+ string key;
+ ::decode(key, p);
+ ::decode(o->omap[key], p);
+ }
+ return 0;
+}
+
+int MemStore::_omap_rmkeys(coll_t cid, const ghobject_t &oid,
+ bufferlist& keys_bl)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->omap_mutex);
+ bufferlist::iterator p = keys_bl.begin();
+ __u32 num;
+ ::decode(num, p);
+ while (num--) {
+ string key;
+ ::decode(key, p);
+ o->omap.erase(key);
+ }
+ return 0;
+}
+
+int MemStore::_omap_rmkeyrange(coll_t cid, const ghobject_t &oid,
+ const string& first, const string& last)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << " " << first
+ << " " << last << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->omap_mutex);
+ map<string,bufferlist>::iterator p = o->omap.lower_bound(first);
+ map<string,bufferlist>::iterator e = o->omap.lower_bound(last);
+ o->omap.erase(p, e);
+ return 0;
+}
+
+int MemStore::_omap_setheader(coll_t cid, const ghobject_t &oid,
+ const bufferlist &bl)
+{
+ dout(10) << __func__ << " " << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_object(oid);
+ if (!o)
+ return -ENOENT;
+ std::lock_guard<std::mutex> lock(o->omap_mutex);
+ o->omap_header = bl;
+ return 0;
+}
+
+int MemStore::_create_collection(coll_t cid)
+{
+ dout(10) << __func__ << " " << cid << dendl;
+ RWLock::WLocker l(coll_lock);
+ auto result = coll_map.insert(std::make_pair(cid, CollectionRef()));
+ if (!result.second)
+ return -EEXIST;
+ result.first->second.reset(new Collection(cct));
+ return 0;
+}
+
+int MemStore::_destroy_collection(coll_t cid)
+{
+ dout(10) << __func__ << " " << cid << dendl;
+ RWLock::WLocker l(coll_lock);
+ ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
+ if (cp == coll_map.end())
+ return -ENOENT;
+ {
+ RWLock::RLocker l2(cp->second->lock);
+ if (!cp->second->object_map.empty())
+ return -ENOTEMPTY;
+ }
+ used_bytes -= cp->second->used_bytes();
+ coll_map.erase(cp);
+ return 0;
+}
+
+int MemStore::_collection_add(coll_t cid, coll_t ocid, const ghobject_t& oid)
+{
+ dout(10) << __func__ << " " << cid << " " << ocid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+ CollectionRef oc = get_collection(ocid);
+ if (!oc)
+ return -ENOENT;
+ RWLock::WLocker l1(MIN(&(*c), &(*oc))->lock);
+ RWLock::WLocker l2(MAX(&(*c), &(*oc))->lock);
+
+ if (c->object_hash.count(oid))
+ return -EEXIST;
+ if (oc->object_hash.count(oid) == 0)
+ return -ENOENT;
+ ObjectRef o = oc->object_hash[oid];
+ c->object_map[oid] = o;
+ c->object_hash[oid] = o;
+ return 0;
+}
+
+int MemStore::_collection_move_rename(coll_t oldcid, const ghobject_t& oldoid,
+ coll_t cid, const ghobject_t& oid)
+{
+ dout(10) << __func__ << " " << oldcid << " " << oldoid << " -> "
+ << cid << " " << oid << dendl;
+ CollectionRef c = get_collection(cid);
+ if (!c)
+ return -ENOENT;
+ CollectionRef oc = get_collection(oldcid);
+ if (!oc)
+ return -ENOENT;
+
+ // note: c and oc may be the same
+ if (&(*c) == &(*oc)) {
+ c->lock.get_write();
+ } else if (&(*c) < &(*oc)) {
+ c->lock.get_write();
+ oc->lock.get_write();
+ } else if (&(*c) > &(*oc)) {
+ oc->lock.get_write();
+ c->lock.get_write();
+ }
+
+ int r = -EEXIST;
+ if (c->object_hash.count(oid))
+ goto out;
+ r = -ENOENT;
+ if (oc->object_hash.count(oldoid) == 0)
+ goto out;
+ {
+ ObjectRef o = oc->object_hash[oldoid];
+ c->object_map[oid] = o;
+ c->object_hash[oid] = o;
+ oc->object_map.erase(oldoid);
+ oc->object_hash.erase(oldoid);
+ }
+ r = 0;
+ out:
+ c->lock.put_write();
+ if (&(*c) != &(*oc))
+ oc->lock.put_write();
+ return r;
+}
+
+int MemStore::_split_collection(coll_t cid, uint32_t bits, uint32_t match,
+ coll_t dest)
+{
+ dout(10) << __func__ << " " << cid << " " << bits << " " << match << " "
+ << dest << dendl;
+ CollectionRef sc = get_collection(cid);
+ if (!sc)
+ return -ENOENT;
+ CollectionRef dc = get_collection(dest);
+ if (!dc)
+ return -ENOENT;
+ RWLock::WLocker l1(MIN(&(*sc), &(*dc))->lock);
+ RWLock::WLocker l2(MAX(&(*sc), &(*dc))->lock);
+
+ map<ghobject_t,ObjectRef,ghobject_t::BitwiseComparator>::iterator p = sc->object_map.begin();
+ while (p != sc->object_map.end()) {
+ if (p->first.match(bits, match)) {
+ dout(20) << " moving " << p->first << dendl;
+ dc->object_map.insert(make_pair(p->first, p->second));
+ dc->object_hash.insert(make_pair(p->first, p->second));
+ sc->object_hash.erase(p->first);
+ sc->object_map.erase(p++);
+ } else {
+ ++p;
+ }
+ }
+
+ return 0;
+}
+
+// BufferlistObject
+int MemStore::BufferlistObject::read(uint64_t offset, uint64_t len,
+ bufferlist &bl)
+{
+ std::lock_guard<Spinlock> lock(mutex);
+ bl.substr_of(data, offset, len);
+ return bl.length();
+}
+
+int MemStore::BufferlistObject::write(uint64_t offset, const bufferlist &src)
+{
+ unsigned len = src.length();
+
+ std::lock_guard<Spinlock> lock(mutex);
+
+ // before
+ bufferlist newdata;
+ if (get_size() >= offset) {
+ newdata.substr_of(data, 0, offset);
+ } else {
+ newdata.substr_of(data, 0, get_size());
+ bufferptr bp(offset - get_size());
+ bp.zero();
+ newdata.append(bp);
+ }
+
+ newdata.append(src);
+
+ // after
+ if (get_size() > offset + len) {
+ bufferlist tail;
+ tail.substr_of(data, offset + len, get_size() - (offset + len));
+ newdata.append(tail);
+ }
+
+ data.claim(newdata);
+ return 0;
+}
+
+int MemStore::BufferlistObject::clone(Object *src, uint64_t srcoff,
+ uint64_t len, uint64_t dstoff)
+{
+ auto srcbl = dynamic_cast<BufferlistObject*>(src);
+ if (srcbl == nullptr)
+ return -ENOTSUP;
+
+ bufferlist bl;
+ {
+ std::lock_guard<Spinlock> lock(srcbl->mutex);
+ if (srcoff == dstoff && len == src->get_size()) {
+ data = srcbl->data;
+ return 0;
+ }
+ bl.substr_of(srcbl->data, srcoff, len);
+ }
+ return write(dstoff, bl);
+}
+
+int MemStore::BufferlistObject::truncate(uint64_t size)
+{
+ std::lock_guard<Spinlock> lock(mutex);
+ if (get_size() > size) {
+ bufferlist bl;
+ bl.substr_of(data, 0, size);
+ data.claim(bl);
+ } else if (get_size() == size) {
+ // do nothing
+ } else {
+ bufferptr bp(size - get_size());
+ bp.zero();
+ data.append(bp);
+ }
+ return 0;
+}
+
+// PageSetObject
+
+#if defined(__GLIBCXX__)
+// use a thread-local vector for the pages returned by PageSet, so we
+// can avoid allocations in read/write()
+thread_local PageSet::page_vector MemStore::PageSetObject::tls_pages;
+#define DEFINE_PAGE_VECTOR(name)
+#else
+#define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
+#endif
+
+int MemStore::PageSetObject::read(uint64_t offset, uint64_t len, bufferlist& bl)
+{
+ const auto start = offset;
+ const auto end = offset + len;
+ auto remaining = len;
+
+ DEFINE_PAGE_VECTOR(tls_pages);
+ data.get_range(offset, len, tls_pages);
+
+ // allocate a buffer for the data
+ buffer::ptr buf(len);
+
+ auto p = tls_pages.begin();
+ while (remaining) {
+ // no more pages in range
+ if (p == tls_pages.end() || (*p)->offset >= end) {
+ buf.zero(offset - start, remaining);
+ break;
+ }
+ auto page = *p;
+
+ // fill any holes between pages with zeroes
+ if (page->offset > offset) {
+ const auto count = std::min(remaining, page->offset - offset);
+ buf.zero(offset - start, count);
+ remaining -= count;
+ offset = page->offset;
+ if (!remaining)
+ break;
+ }
+
+ // read from page
+ const auto page_offset = offset - page->offset;
+ const auto count = min(remaining, data.get_page_size() - page_offset);
+
+ buf.copy_in(offset - start, count, page->data + page_offset);
+
+ remaining -= count;
+ offset += count;
+
+ ++p;
+ }
+
+ tls_pages.clear(); // drop page refs
+
+ bl.append(buf);
+ return len;
+}
+
+int MemStore::PageSetObject::write(uint64_t offset, const bufferlist &src)
+{
+ unsigned len = src.length();
+
+ DEFINE_PAGE_VECTOR(tls_pages);
+ // make sure the page range is allocated
+ data.alloc_range(offset, src.length(), tls_pages);
+
+ auto page = tls_pages.begin();
+
+ // XXX: cast away the const because bufferlist doesn't have a const_iterator
+ auto p = const_cast<bufferlist&>(src).begin();
+ while (len > 0) {
+ unsigned page_offset = offset - (*page)->offset;
+ unsigned pageoff = data.get_page_size() - page_offset;
+ unsigned count = min(len, pageoff);
+ p.copy(count, (*page)->data + page_offset);
+ offset += count;
+ len -= count;
+ if (count == pageoff)
+ ++page;
+ }
+ if (data_len < offset)
+ data_len = offset;
+ tls_pages.clear(); // drop page refs
+ return 0;
+}
+
+int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff,
+ uint64_t len, uint64_t dstoff)
+{
+ const int64_t delta = dstoff - srcoff;
+
+ auto &src_data = static_cast<PageSetObject*>(src)->data;
+ const uint64_t src_page_size = src_data.get_page_size();
+
+ auto &dst_data = data;
+ const auto dst_page_size = dst_data.get_page_size();
+
+ DEFINE_PAGE_VECTOR(tls_pages);
+ PageSet::page_vector dst_pages;
+
+ while (len) {
+ const auto count = std::min(len, (uint64_t)src_page_size * 16);
+ src_data.get_range(srcoff, count, tls_pages);
+
+ for (auto &src_page : tls_pages) {
+ auto sbegin = std::max(srcoff, src_page->offset);
+ auto send = std::min(srcoff + count, src_page->offset + src_page_size);
+ dst_data.alloc_range(sbegin + delta, send - sbegin, dst_pages);
+
+ // copy data from src page to dst pages
+ for (auto &dst_page : dst_pages) {
+ auto dbegin = std::max(sbegin + delta, dst_page->offset);
+ auto dend = std::min(send + delta, dst_page->offset + dst_page_size);
+
+ std::copy(src_page->data + (dbegin - delta) - src_page->offset,
+ src_page->data + (dend - delta) - src_page->offset,
+ dst_page->data + dbegin - dst_page->offset);
+ }
+ dst_pages.clear(); // drop page refs
+ }
+ srcoff += count;
+ dstoff += count;
+ len -= count;
+ tls_pages.clear(); // drop page refs
+ }
+
+ // update object size
+ if (data_len < dstoff + len)
+ data_len = dstoff + len;
+ return 0;
+}
+
+int MemStore::PageSetObject::truncate(uint64_t size)
+{
+ data.free_pages_after(size);
+ data_len = size;
+
+ const auto page_size = data.get_page_size();
+ const auto page_offset = size & ~(page_size-1);
+ if (page_offset == size)
+ return 0;
+
+ DEFINE_PAGE_VECTOR(tls_pages);
+ // write zeroes to the rest of the last page
+ data.get_range(page_offset, page_size, tls_pages);
+ if (tls_pages.empty())
+ return 0;
+
+ auto page = tls_pages.begin();
+ auto data = (*page)->data;
+ std::fill(data + (size - page_offset), data + page_size, 0);
+ tls_pages.clear(); // drop page ref
+ return 0;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013- Sage Weil <sage@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef CEPH_MEMSTORE_H
+#define CEPH_MEMSTORE_H
+
+#include <mutex>
+#include <boost/intrusive_ptr.hpp>
+
+#include "include/unordered_map.h"
+#include "include/memory.h"
+#include "include/Spinlock.h"
+#include "common/Finisher.h"
+#include "common/RefCountedObj.h"
+#include "common/RWLock.h"
+#include "os/ObjectStore.h"
+#include "PageSet.h"
+#include "include/assert.h"
+
+class MemStore : public ObjectStore {
+private:
+ CephContext *const cct;
+
+public:
+ struct Object : public RefCountedObject {
+ std::mutex xattr_mutex;
+ std::mutex omap_mutex;
+ map<string,bufferptr> xattr;
+ bufferlist omap_header;
+ map<string,bufferlist> omap;
+
+ typedef boost::intrusive_ptr<Object> Ref;
+ friend void intrusive_ptr_add_ref(Object *o) { o->get(); }
+ friend void intrusive_ptr_release(Object *o) { o->put(); }
+
+ // interface for object data
+ virtual size_t get_size() const = 0;
+ virtual int read(uint64_t offset, uint64_t len, bufferlist &bl) = 0;
+ virtual int write(uint64_t offset, const bufferlist &bl) = 0;
+ virtual int clone(Object *src, uint64_t srcoff, uint64_t len,
+ uint64_t dstoff) = 0;
+ virtual int truncate(uint64_t offset) = 0;
+ virtual void encode(bufferlist& bl) const = 0;
+ virtual void decode(bufferlist::iterator& p) = 0;
+
+ void encode_base(bufferlist& bl) const {
+ ::encode(xattr, bl);
+ ::encode(omap_header, bl);
+ ::encode(omap, bl);
+ }
+ void decode_base(bufferlist::iterator& p) {
+ ::decode(xattr, p);
+ ::decode(omap_header, p);
+ ::decode(omap, p);
+ }
+
+ void dump(Formatter *f) const {
+ f->dump_int("data_len", get_size());
+ f->dump_int("omap_header_len", omap_header.length());
+
+ f->open_array_section("xattrs");
+ for (map<string,bufferptr>::const_iterator p = xattr.begin();
+ p != xattr.end();
+ ++p) {
+ f->open_object_section("xattr");
+ f->dump_string("name", p->first);
+ f->dump_int("length", p->second.length());
+ f->close_section();
+ }
+ f->close_section();
+
+ f->open_array_section("omap");
+ for (map<string,bufferlist>::const_iterator p = omap.begin();
+ p != omap.end();
+ ++p) {
+ f->open_object_section("pair");
+ f->dump_string("key", p->first);
+ f->dump_int("length", p->second.length());
+ f->close_section();
+ }
+ f->close_section();
+ }
+ };
+ typedef Object::Ref ObjectRef;
+
+ struct BufferlistObject : public Object {
+ Spinlock mutex;
+ bufferlist data;
+
+ size_t get_size() const override { return data.length(); }
+
+ int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
+ int write(uint64_t offset, const bufferlist &bl) override;
+ int clone(Object *src, uint64_t srcoff, uint64_t len,
+ uint64_t dstoff) override;
+ int truncate(uint64_t offset) override;
+
+ void encode(bufferlist& bl) const override {
+ ENCODE_START(1, 1, bl);
+ ::encode(data, bl);
+ encode_base(bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator& p) override {
+ DECODE_START(1, p);
+ ::decode(data, p);
+ decode_base(p);
+ DECODE_FINISH(p);
+ }
+ };
+
+ struct PageSetObject : public Object {
+ PageSet data;
+ uint64_t data_len;
+#if defined(__GLIBCXX__)
+ // use a thread-local vector for the pages returned by PageSet, so we
+ // can avoid allocations in read/write()
+ static thread_local PageSet::page_vector tls_pages;
+#endif
+
+ PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
+
+ size_t get_size() const override { return data_len; }
+
+ int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
+ int write(uint64_t offset, const bufferlist &bl) override;
+ int clone(Object *src, uint64_t srcoff, uint64_t len,
+ uint64_t dstoff) override;
+ int truncate(uint64_t offset) override;
+
+ void encode(bufferlist& bl) const override {
+ ENCODE_START(1, 1, bl);
+ ::encode(data_len, bl);
+ data.encode(bl);
+ encode_base(bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator& p) override {
+ DECODE_START(1, p);
+ ::decode(data_len, p);
+ data.decode(p);
+ decode_base(p);
+ DECODE_FINISH(p);
+ }
+ };
+
+ struct Collection : public RefCountedObject {
+ CephContext *cct;
+ bool use_page_set;
+ ceph::unordered_map<ghobject_t, ObjectRef> object_hash; ///< for lookup
+ map<ghobject_t, ObjectRef,ghobject_t::BitwiseComparator> object_map; ///< for iteration
+ map<string,bufferptr> xattr;
+ RWLock lock; ///< for object_{map,hash}
+
+ typedef boost::intrusive_ptr<Collection> Ref;
+ friend void intrusive_ptr_add_ref(Collection *c) { c->get(); }
+ friend void intrusive_ptr_release(Collection *c) { c->put(); }
+
+ ObjectRef create_object() const {
+ if (use_page_set)
+ return new PageSetObject(cct->_conf->memstore_page_size);
+ return new BufferlistObject();
+ }
+
+ // NOTE: The lock only needs to protect the object_map/hash, not the
+ // contents of individual objects. The osd is already sequencing
+ // reads and writes, so we will never see them concurrently at this
+ // level.
+
+ ObjectRef get_object(ghobject_t oid) {
+ RWLock::RLocker l(lock);
+ auto o = object_hash.find(oid);
+ if (o == object_hash.end())
+ return ObjectRef();
+ return o->second;
+ }
+
+ ObjectRef get_or_create_object(ghobject_t oid) {
+ RWLock::WLocker l(lock);
+ auto result = object_hash.emplace(oid, ObjectRef());
+ if (result.second)
+ object_map[oid] = result.first->second = create_object();
+ return result.first->second;
+ }
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(xattr, bl);
+ ::encode(use_page_set, bl);
+ uint32_t s = object_map.size();
+ ::encode(s, bl);
+ for (map<ghobject_t, ObjectRef,ghobject_t::BitwiseComparator>::const_iterator p = object_map.begin();
+ p != object_map.end();
+ ++p) {
+ ::encode(p->first, bl);
+ p->second->encode(bl);
+ }
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator& p) {
+ DECODE_START(1, p);
+ ::decode(xattr, p);
+ ::decode(use_page_set, p);
+ uint32_t s;
+ ::decode(s, p);
+ while (s--) {
+ ghobject_t k;
+ ::decode(k, p);
+ auto o = create_object();
+ o->decode(p);
+ object_map.insert(make_pair(k, o));
+ object_hash.insert(make_pair(k, o));
+ }
+ DECODE_FINISH(p);
+ }
+
+ uint64_t used_bytes() const {
+ uint64_t result = 0;
+ for (map<ghobject_t, ObjectRef,ghobject_t::BitwiseComparator>::const_iterator p = object_map.begin();
+ p != object_map.end();
+ ++p) {
+ result += p->second->get_size();
+ }
+
+ return result;
+ }
+
+ Collection(CephContext *cct)
+ : cct(cct), use_page_set(cct->_conf->memstore_page_set),
+ lock("MemStore::Collection::lock") {}
+ };
+ typedef Collection::Ref CollectionRef;
+
+private:
+ class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
+ CollectionRef c;
+ ObjectRef o;
+ map<string,bufferlist>::iterator it;
+ public:
+ OmapIteratorImpl(CollectionRef c, ObjectRef o)
+ : c(c), o(o), it(o->omap.begin()) {}
+
+ int seek_to_first() {
+ std::lock_guard<std::mutex>(o->omap_mutex);
+ it = o->omap.begin();
+ return 0;
+ }
+ int upper_bound(const string &after) {
+ std::lock_guard<std::mutex>(o->omap_mutex);
+ it = o->omap.upper_bound(after);
+ return 0;
+ }
+ int lower_bound(const string &to) {
+ std::lock_guard<std::mutex>(o->omap_mutex);
+ it = o->omap.lower_bound(to);
+ return 0;
+ }
+ bool valid() {
+ std::lock_guard<std::mutex>(o->omap_mutex);
+ return it != o->omap.end();
+ }
+ int next(bool validate=true) {
+ std::lock_guard<std::mutex>(o->omap_mutex);
+ ++it;
+ return 0;
+ }
+ string key() {
+ std::lock_guard<std::mutex>(o->omap_mutex);
+ return it->first;
+ }
+ bufferlist value() {
+ std::lock_guard<std::mutex>(o->omap_mutex);
+ return it->second;
+ }
+ int status() {
+ return 0;
+ }
+ };
+
+
+ ceph::unordered_map<coll_t, CollectionRef> coll_map;
+ RWLock coll_lock; ///< rwlock to protect coll_map
+ Mutex apply_lock; ///< serialize all updates
+
+ CollectionRef get_collection(coll_t cid);
+
+ Finisher finisher;
+
+ uint64_t used_bytes;
+
+ void _do_transaction(Transaction& t);
+
+ int _touch(coll_t cid, const ghobject_t& oid);
+ int _write(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
+ const bufferlist& bl, uint32_t fadvsie_flags = 0);
+ int _zero(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len);
+ int _truncate(coll_t cid, const ghobject_t& oid, uint64_t size);
+ int _remove(coll_t cid, const ghobject_t& oid);
+ int _setattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& aset);
+ int _rmattr(coll_t cid, const ghobject_t& oid, const char *name);
+ int _rmattrs(coll_t cid, const ghobject_t& oid);
+ int _clone(coll_t cid, const ghobject_t& oldoid, const ghobject_t& newoid);
+ int _clone_range(coll_t cid, const ghobject_t& oldoid,
+ const ghobject_t& newoid,
+ uint64_t srcoff, uint64_t len, uint64_t dstoff);
+ int _omap_clear(coll_t cid, const ghobject_t &oid);
+ int _omap_setkeys(coll_t cid, const ghobject_t &oid, bufferlist& aset_bl);
+ int _omap_rmkeys(coll_t cid, const ghobject_t &oid, bufferlist& keys_bl);
+ int _omap_rmkeyrange(coll_t cid, const ghobject_t &oid,
+ const string& first, const string& last);
+ int _omap_setheader(coll_t cid, const ghobject_t &oid, const bufferlist &bl);
+
+ int _collection_hint_expected_num_objs(coll_t cid, uint32_t pg_num,
+ uint64_t num_objs) const { return 0; }
+ int _create_collection(coll_t c);
+ int _destroy_collection(coll_t c);
+ int _collection_add(coll_t cid, coll_t ocid, const ghobject_t& oid);
+ int _collection_move_rename(coll_t oldcid, const ghobject_t& oldoid,
+ coll_t cid, const ghobject_t& o);
+ int _split_collection(coll_t cid, uint32_t bits, uint32_t rem, coll_t dest);
+
+ int _save();
+ int _load();
+
+ void dump(Formatter *f);
+ void dump_all();
+
+public:
+ MemStore(CephContext *cct, const string& path)
+ : ObjectStore(path),
+ cct(cct),
+ coll_lock("MemStore::coll_lock"),
+ apply_lock("MemStore::apply_lock"),
+ finisher(cct),
+ used_bytes(0) {}
+ ~MemStore() { }
+
+ bool test_mount_in_use() {
+ return false;
+ }
+
+ int mount();
+ int umount();
+
+ unsigned get_max_object_name_length() {
+ return 4096;
+ }
+ unsigned get_max_attr_name_length() {
+ return 256; // arbitrary; there is no real limit internally
+ }
+
+ int mkfs();
+ int mkjournal() {
+ return 0;
+ }
+ bool wants_journal() {
+ return false;
+ }
+ bool allows_journal() {
+ return false;
+ }
+ bool needs_journal() {
+ return false;
+ }
+
+ int statfs(struct statfs *buf);
+
+ bool exists(coll_t cid, const ghobject_t& oid);
+ int stat(
+ coll_t cid,
+ const ghobject_t& oid,
+ struct stat *st,
+ bool allow_eio = false); // struct stat?
+ int read(
+ coll_t cid,
+ const ghobject_t& oid,
+ uint64_t offset,
+ size_t len,
+ bufferlist& bl,
+ uint32_t op_flags = 0,
+ bool allow_eio = false);
+ int fiemap(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl);
+ int getattr(coll_t cid, const ghobject_t& oid, const char *name, bufferptr& value);
+ int getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& aset);
+
+ int list_collections(vector<coll_t>& ls);
+ bool collection_exists(coll_t c);
+ bool collection_empty(coll_t c);
+ int collection_list(coll_t cid, ghobject_t start, ghobject_t end,
+ bool sort_bitwise, int max,
+ vector<ghobject_t> *ls, ghobject_t *next);
+
+ int omap_get(
+ coll_t cid, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ bufferlist *header, ///< [out] omap header
+ map<string, bufferlist> *out /// < [out] Key to value map
+ );
+
+ /// Get omap header
+ int omap_get_header(
+ coll_t cid, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ bufferlist *header, ///< [out] omap header
+ bool allow_eio = false ///< [in] don't assert on eio
+ );
+
+ /// Get keys defined on oid
+ int omap_get_keys(
+ coll_t cid, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ set<string> *keys ///< [out] Keys defined on oid
+ );
+
+ /// Get key values
+ int omap_get_values(
+ coll_t cid, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ const set<string> &keys, ///< [in] Keys to get
+ map<string, bufferlist> *out ///< [out] Returned keys and values
+ );
+
+ /// Filters keys into out which are defined on oid
+ int omap_check_keys(
+ coll_t cid, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ const set<string> &keys, ///< [in] Keys to check
+ set<string> *out ///< [out] Subset of keys defined on oid
+ );
+
+ ObjectMap::ObjectMapIterator get_omap_iterator(
+ coll_t cid, ///< [in] collection
+ const ghobject_t &oid ///< [in] object
+ );
+
+ void set_fsid(uuid_d u);
+ uuid_d get_fsid();
+
+ objectstore_perf_stat_t get_cur_stats();
+
+ int queue_transactions(
+ Sequencer *osr, list<Transaction*>& tls,
+ TrackedOpRef op = TrackedOpRef(),
+ ThreadPool::TPHandle *handle = NULL);
+};
+
+
+
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013- Sage Weil <sage@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_PAGESET_H
+#define CEPH_PAGESET_H
+
+#include <algorithm>
+#include <atomic>
+#include <cassert>
+#include <mutex>
+#include <vector>
+#include <boost/intrusive/avl_set.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+#include "include/encoding.h"
+#include "include/Spinlock.h"
+
+
+struct Page {
+ char *const data;
+ boost::intrusive::avl_set_member_hook<> hook;
+ uint64_t offset;
+
+ // avoid RefCountedObject because it has a virtual destructor
+ std::atomic<uint16_t> nrefs;
+ void get() { ++nrefs; }
+ void put() { if (--nrefs == 0) delete this; }
+
+ typedef boost::intrusive_ptr<Page> Ref;
+ friend void intrusive_ptr_add_ref(Page *p) { p->get(); }
+ friend void intrusive_ptr_release(Page *p) { p->put(); }
+
+ // key-value comparison functor for avl
+ struct Less {
+ bool operator()(uint64_t offset, const Page &page) const {
+ return offset < page.offset;
+ }
+ bool operator()(const Page &page, uint64_t offset) const {
+ return page.offset < offset;
+ }
+ bool operator()(const Page &lhs, const Page &rhs) const {
+ return lhs.offset < rhs.offset;
+ }
+ };
+ void encode(bufferlist &bl, size_t page_size) const {
+ bl.append(buffer::copy(data, page_size));
+ ::encode(offset, bl);
+ }
+ void decode(bufferlist::iterator &p, size_t page_size) {
+ ::decode_array_nohead(data, page_size, p);
+ ::decode(offset, p);
+ }
+
+ static Ref create(size_t page_size, uint64_t offset = 0) {
+ // allocate the Page and its data in a single buffer
+ auto buffer = new char[page_size + sizeof(Page)];
+ // place the Page structure at the end of the buffer
+ return new (buffer + page_size) Page(buffer, offset);
+ }
+
+ // copy disabled
+ Page(const Page&) = delete;
+ const Page& operator=(const Page&) = delete;
+
+ private: // private constructor, use create() instead
+ Page(char *data, uint64_t offset) : data(data), offset(offset), nrefs(1) {}
+
+ static void operator delete(void *p) {
+ delete[] reinterpret_cast<Page*>(p)->data;
+ }
+};
+
+class PageSet {
+ public:
+ // alloc_range() and get_range() return page refs in a vector
+ typedef std::vector<Page::Ref> page_vector;
+
+ private:
+ // store pages in a boost intrusive avl_set
+ typedef Page::Less page_cmp;
+ typedef boost::intrusive::member_hook<Page,
+ boost::intrusive::avl_set_member_hook<>,
+ &Page::hook> member_option;
+ typedef boost::intrusive::avl_set<Page,
+ boost::intrusive::compare<page_cmp>, member_option> page_set;
+
+ typedef typename page_set::iterator iterator;
+
+ page_set pages;
+ uint64_t page_size;
+
+ typedef Spinlock lock_type;
+ lock_type mutex;
+
+ void free_pages(iterator cur, iterator end) {
+ while (cur != end) {
+ Page *page = &*cur;
+ cur = pages.erase(cur);
+ page->put();
+ }
+ }
+
+ int count_pages(uint64_t offset, uint64_t len) const {
+ // count the overlapping pages
+ int count = 0;
+ if (offset % page_size) {
+ count++;
+ size_t rem = page_size - offset % page_size;
+ len = len <= rem ? 0 : len - rem;
+ }
+ count += len / page_size;
+ if (len % page_size)
+ count++;
+ return count;
+ }
+
+ public:
+ PageSet(size_t page_size) : page_size(page_size) {}
+ PageSet(PageSet &&rhs)
+ : pages(std::move(rhs.pages)), page_size(rhs.page_size) {}
+ ~PageSet() {
+ free_pages(pages.begin(), pages.end());
+ }
+
+ // disable copy
+ PageSet(const PageSet&) = delete;
+ const PageSet& operator=(const PageSet&) = delete;
+
+ bool empty() const { return pages.empty(); }
+ size_t size() const { return pages.size(); }
+ size_t get_page_size() const { return page_size; }
+
+ // allocate all pages that intersect the range [offset,length)
+ void alloc_range(uint64_t offset, uint64_t length, page_vector &range) {
+ // loop in reverse so we can provide hints to avl_set::insert_check()
+ // and get O(1) insertions after the first
+ uint64_t position = offset + length - 1;
+
+ range.resize(count_pages(offset, length));
+ auto out = range.rbegin();
+
+ std::lock_guard<lock_type> lock(mutex);
+ iterator cur = pages.end();
+ while (length) {
+ const uint64_t page_offset = position & ~(page_size-1);
+
+ typename page_set::insert_commit_data commit;
+ auto insert = pages.insert_check(cur, page_offset, page_cmp(), commit);
+ if (insert.second) {
+ auto page = Page::create(page_size, page_offset);
+ cur = pages.insert_commit(*page, commit);
+
+ // assume that the caller will write to the range [offset,length),
+ // so we only need to zero memory outside of this range
+
+ // zero end of page past offset + length
+ if (offset + length < page->offset + page_size)
+ std::fill(page->data + offset + length - page->offset,
+ page->data + page_size, 0);
+ // zero front of page between page_offset and offset
+ if (offset > page->offset)
+ std::fill(page->data, page->data + offset - page->offset, 0);
+ } else { // exists
+ cur = insert.first;
+ }
+ // add a reference to output vector
+ out->reset(&*cur);
+ ++out;
+
+ auto c = std::min(length, (position & (page_size-1)) + 1);
+ position -= c;
+ length -= c;
+ }
+ // make sure we sized the vector correctly
+ assert(out == range.rend());
+ }
+
+ // return all allocated pages that intersect the range [offset,length)
+ void get_range(uint64_t offset, uint64_t length, page_vector &range) {
+ auto cur = pages.lower_bound(offset & ~(page_size-1), page_cmp());
+ while (cur != pages.end() && cur->offset < offset + length)
+ range.push_back(&*cur++);
+ }
+
+ void free_pages_after(uint64_t offset) {
+ std::lock_guard<lock_type> lock(mutex);
+ auto cur = pages.lower_bound(offset & ~(page_size-1), page_cmp());
+ if (cur == pages.end())
+ return;
+ if (cur->offset < offset)
+ cur++;
+ free_pages(cur, pages.end());
+ }
+
+ void encode(bufferlist &bl) const {
+ ::encode(page_size, bl);
+ unsigned count = pages.size();
+ ::encode(count, bl);
+ for (auto p = pages.rbegin(); p != pages.rend(); ++p)
+ p->encode(bl, page_size);
+ }
+ void decode(bufferlist::iterator &p) {
+ assert(empty());
+ ::decode(page_size, p);
+ unsigned count;
+ ::decode(count, p);
+ auto cur = pages.end();
+ for (unsigned i = 0; i < count; i++) {
+ auto page = Page::create(page_size);
+ page->decode(p, page_size);
+ cur = pages.insert_before(cur, *page);
+ }
+ }
+};
+
+#endif // CEPH_PAGESET_H
// vim: ts=8 sw=2 smarttab
#include "gtest/gtest.h"
-#include "os/PageSet.h"
+#include "os/memstore/PageSet.h"
TEST(PageSet, AllocAligned)
{