]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools: Create cephfs-journal-tool
authorJohn Spray <john.spray@inktank.com>
Tue, 25 Mar 2014 13:30:57 +0000 (13:30 +0000)
committerJohn Spray <john.spray@inktank.com>
Tue, 20 May 2014 13:07:49 +0000 (14:07 +0100)
This is for debugging/repairing CephFS journals.

Signed-off-by: John Spray <john.spray@inktank.com>
26 files changed:
src/ceph_mds.cc
src/mds/Dumper.cc [deleted file]
src/mds/Dumper.h [deleted file]
src/mds/MDSUtility.cc [deleted file]
src/mds/MDSUtility.h [deleted file]
src/mds/Makefile.am
src/mds/Resetter.cc [deleted file]
src/mds/Resetter.h [deleted file]
src/mds/events/ENoOp.h
src/mds/journal.cc
src/tools/Makefile.am
src/tools/cephfs/Dumper.cc [new file with mode: 0644]
src/tools/cephfs/Dumper.h [new file with mode: 0644]
src/tools/cephfs/EventOutput.cc [new file with mode: 0644]
src/tools/cephfs/EventOutput.h [new file with mode: 0644]
src/tools/cephfs/JournalFilter.cc [new file with mode: 0644]
src/tools/cephfs/JournalFilter.h [new file with mode: 0644]
src/tools/cephfs/JournalScanner.cc [new file with mode: 0644]
src/tools/cephfs/JournalScanner.h [new file with mode: 0644]
src/tools/cephfs/JournalTool.cc [new file with mode: 0644]
src/tools/cephfs/JournalTool.h [new file with mode: 0644]
src/tools/cephfs/MDSUtility.cc [new file with mode: 0644]
src/tools/cephfs/MDSUtility.h [new file with mode: 0644]
src/tools/cephfs/Resetter.cc [new file with mode: 0644]
src/tools/cephfs/Resetter.h [new file with mode: 0644]
src/tools/cephfs/cephfs-journal-tool.cc [new file with mode: 0644]

index 1f1fbd319d7d7fed45056c3768c2e4eb31c3d30f..3e9fb223bd5bbb84da045189966a35b6b0675d4f 100644 (file)
@@ -27,8 +27,6 @@ using namespace std;
 
 #include "mon/MonMap.h"
 #include "mds/MDS.h"
-#include "mds/Dumper.h"
-#include "mds/Resetter.h"
 
 #include "msg/Messenger.h"
 
@@ -55,65 +53,14 @@ void usage()
        << "        connect to monitor at given address\n"
        << "  --debug_mds n\n"
        << "        debug MDS level (e.g. 10)\n"
-       << "  --dump-journal rank filename\n"
-       << "        dump the MDS journal (binary) for rank.\n"
-       << "  --dump-journal-entries rank filename\n"
-       << "        dump the MDS journal (JSON) for rank.\n"
        << "  --journal-check rank\n"
        << "        replay the journal for rank, then exit\n"
        << "  --hot-standby rank\n"
        << "        start up as a hot standby for rank\n"
-       << "  --reset-journal rank\n"
-       << "        discard the MDS journal for rank, and replace it with a single\n"
-       << "        event that updates/resets inotable and sessionmap on replay.\n"
        << dendl;
   generic_server_usage();
 }
 
-static int do_cmds_special_action(const std::string &action,
-                                 const std::string &dump_file, int rank)
-{
-  common_init_finish(g_ceph_context, CINIT_FLAG_NO_DAEMON_ACTIONS);
-
-  if (action == "dump-journal") {
-    dout(0) << "dumping journal for mds." << rank << " to " << dump_file << dendl;
-    Dumper journal_dumper;
-    journal_dumper.init(rank);
-    journal_dumper.dump(dump_file.c_str());
-    journal_dumper.shutdown();
-  } else if (action == "dump-journal-entries") {
-    Dumper journal_dumper;
-    journal_dumper.init(rank);
-    journal_dumper.dump_entries();
-    journal_dumper.shutdown();
-  } else if (action == "undump-journal") {
-    dout(0) << "undumping journal for mds." << rank << " from " << dump_file << dendl;
-    Dumper journal_dumper;
-    journal_dumper.init(rank);
-    journal_dumper.undump(dump_file.c_str());
-    journal_dumper.shutdown();
-  } else if (action == "reset-journal") {
-    dout(0) << "resetting journal" << dendl;
-    Resetter resetter;
-    resetter.init(rank);
-    resetter.reset();
-    resetter.shutdown();
-  } else {
-    assert(0);
-  }
-  return 0;
-}
-
-static void set_special_action(std::string &dest, const std::string &act)
-{
-  if (!dest.empty()) {
-    derr << "Parse error! Can't specify more than one action. You "
-        << "specified both " << act << " and " << dest << "\n" << dendl;
-    usage();
-    exit(1);
-  }
-  dest = act;
-}
 
 static int parse_rank(const char *opt_name, const std::string &val)
 {
@@ -148,7 +95,6 @@ int main(int argc, const char **argv)
 
   // mds specific args
   int shadow = 0;
-  int rank = -1;
   std::string dump_file;
 
   std::string val, action;
@@ -156,36 +102,6 @@ int main(int argc, const char **argv)
     if (ceph_argparse_double_dash(args, i)) {
       break;
     }
-    else if (ceph_argparse_witharg(args, i, &val, "--dump-journal", (char*)NULL)) {
-      set_special_action(action, "dump-journal");
-      rank = parse_rank("dump-journal", val);
-      if (i == args.end()) {
-       derr << "error parsing --dump-journal: you must give a second "
-            << "dump-journal argument: the filename to dump the journal to. "
-            << "\n" << dendl;
-       usage();
-      }
-      dump_file = *i++;
-    }
-    else if (ceph_argparse_witharg(args, i, &val, "--undump-journal", (char*)NULL)) {
-      set_special_action(action, "undump-journal");
-      rank = parse_rank("undump-journal", val);
-      if (i == args.end()) {
-       derr << "error parsing --undump-journal: you must give a second "
-            << "undump-journal argument: the filename to undump the journal from. "
-            << "\n" << dendl;
-       usage();
-      }
-      dump_file = *i++;
-    }
-    else if (ceph_argparse_witharg(args, i, &val, "--dump-journal-entries", (char*)NULL)){
-      set_special_action(action, "dump-journal-entries");
-      rank = parse_rank("dump-journal-entries", val);
-    }
-    else if (ceph_argparse_witharg(args, i, &val, "--reset-journal", (char*)NULL)) {
-      set_special_action(action, "reset-journal");
-      rank = parse_rank("reset-journal", val);
-    }
     else if (ceph_argparse_witharg(args, i, &val, "--journal-check", (char*)NULL)) {
       int r = parse_rank("journal-check", val);
       if (shadow) {
@@ -220,11 +136,6 @@ int main(int argc, const char **argv)
 
   pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
 
-  // Check for special actions
-  if (!action.empty()) {
-    return do_cmds_special_action(action, dump_file, rank);
-  }
-
   // Normal startup
   if (g_conf->name.has_default_id()) {
     derr << "must specify '-i name' with the ceph-mds instance name" << dendl;
diff --git a/src/mds/Dumper.cc b/src/mds/Dumper.cc
deleted file mode 100644 (file)
index 1db68bc..0000000
+++ /dev/null
@@ -1,278 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2010 Greg Farnum <gregf@hq.newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#ifndef _BACKWARD_BACKWARD_WARNING_H
-#define _BACKWARD_BACKWARD_WARNING_H   // make gcc 4.3 shut up about hash_*
-#endif
-
-#include "include/compat.h"
-#include "common/entity_name.h"
-#include "common/errno.h"
-#include "common/safe_io.h"
-#include "mds/Dumper.h"
-#include "mds/mdstypes.h"
-#include "mds/LogEvent.h"
-#include "osdc/Journaler.h"
-
-#define dout_subsys ceph_subsys_mds
-
-
-int Dumper::init(int rank_)
-{
-  rank = rank_;
-
-  int r = MDSUtility::init();
-  if (r < 0) {
-    return r;
-  }
-
-  inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
-  journaler = new Journaler(ino, mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC,
-                                       objecter, 0, 0, &timer);
-  return 0;
-}
-
-
-int Dumper::recover_journal()
-{
-  bool done = false;
-  Cond cond;
-  Mutex localLock("dump:recover_journal");
-  int r;
-
-  lock.Lock();
-  journaler->recover(new C_SafeCond(&localLock, &cond, &done, &r));
-  lock.Unlock();
-  localLock.Lock();
-  while (!done)
-    cond.Wait(localLock);
-  localLock.Unlock();
-
-  if (r < 0) { // Error
-    derr << "error on recovery: " << cpp_strerror(r) << dendl;
-    return r;
-  } else {
-    dout(10) << "completed journal recovery" << dendl;
-    return 0;
-  }
-}
-
-
-void Dumper::dump(const char *dump_file)
-{
-  bool done = false;
-  int r = 0;
-  Cond cond;
-  Mutex localLock("dump:lock");
-
-  r = recover_journal();
-  if (r) {
-    return;
-  }
-  uint64_t start = journaler->get_read_pos();
-  uint64_t end = journaler->get_write_pos();
-  uint64_t len = end-start;
-  inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
-
-  cout << "journal is " << start << "~" << len << std::endl;
-
-  Filer filer(objecter);
-  bufferlist bl;
-
-  lock.Lock();
-  filer.read(ino, &journaler->get_layout(), CEPH_NOSNAP,
-             start, len, &bl, 0, new C_SafeCond(&localLock, &cond, &done));
-  lock.Unlock();
-  localLock.Lock();
-  while (!done)
-    cond.Wait(localLock);
-  localLock.Unlock();
-
-  cout << "read " << bl.length() << " bytes at offset " << start << std::endl;
-
-  int fd = ::open(dump_file, O_WRONLY|O_CREAT|O_TRUNC, 0644);
-  if (fd >= 0) {
-    // include an informative header
-    char buf[200];
-    memset(buf, 0, sizeof(buf));
-    sprintf(buf, "Ceph mds%d journal dump\n start offset %llu (0x%llx)\n       length %llu (0x%llx)\n%c",
-           rank, 
-           (unsigned long long)start, (unsigned long long)start,
-           (unsigned long long)bl.length(), (unsigned long long)bl.length(),
-           4);
-    int r = safe_write(fd, buf, sizeof(buf));
-    if (r)
-      ceph_abort();
-
-    // write the data
-    ::lseek64(fd, start, SEEK_SET);
-    bl.write_fd(fd);
-    ::close(fd);
-
-    cout << "wrote " << bl.length() << " bytes at offset " << start << " to " << dump_file << "\n"
-        << "NOTE: this is a _sparse_ file; you can\n"
-        << "\t$ tar cSzf " << dump_file << ".tgz " << dump_file << "\n"
-        << "      to efficiently compress it while preserving sparseness." << std::endl;
-  } else {
-    int err = errno;
-    derr << "unable to open " << dump_file << ": " << cpp_strerror(err) << dendl;
-  }
-}
-
-void Dumper::undump(const char *dump_file)
-{
-  Mutex localLock("undump:lock");
-  cout << "undump " << dump_file << std::endl;
-  
-  int fd = ::open(dump_file, O_RDONLY);
-  if (fd < 0) {
-    derr << "couldn't open " << dump_file << ": " << cpp_strerror(errno) << dendl;
-    return;
-  }
-
-  // Ceph mds0 journal dump
-  //  start offset 232401996 (0xdda2c4c)
-  //        length 1097504 (0x10bf20)
-
-  char buf[200];
-  int r = safe_read(fd, buf, sizeof(buf));
-  if (r < 0) {
-    VOID_TEMP_FAILURE_RETRY(::close(fd));
-    return;
-  }
-
-  long long unsigned start, len;
-  sscanf(strstr(buf, "start offset"), "start offset %llu", &start);
-  sscanf(strstr(buf, "length"), "length %llu", &len);
-
-  cout << "start " << start << " len " << len << std::endl;
-  
-  inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
-
-  Journaler::Header h;
-  h.trimmed_pos = start;
-  h.expire_pos = start;
-  h.write_pos = start+len;
-  h.magic = CEPH_FS_ONDISK_MAGIC;
-
-  h.layout = g_default_file_layout;
-  h.layout.fl_pg_pool = mdsmap->get_metadata_pool();
-  
-  bufferlist hbl;
-  ::encode(h, hbl);
-
-  object_t oid = file_object_t(ino, 0);
-  object_locator_t oloc(mdsmap->get_metadata_pool());
-  SnapContext snapc;
-
-  bool done = false;
-  Cond cond;
-  
-  cout << "writing header " << oid << std::endl;
-  lock.Lock();
-  objecter->write_full(oid, oloc, snapc, hbl, ceph_clock_now(g_ceph_context), 0, 
-                      NULL, 
-                      new C_SafeCond(&localLock, &cond, &done));
-  lock.Unlock();
-
-  localLock.Lock();
-  while (!done)
-    cond.Wait(localLock);
-  localLock.Unlock();
-  
-  // read
-  Filer filer(objecter);
-  uint64_t pos = start;
-  uint64_t left = len;
-  while (left > 0) {
-    bufferlist j;
-    lseek64(fd, pos, SEEK_SET);
-    uint64_t l = MIN(left, 1024*1024);
-    j.read_fd(fd, l);
-    cout << " writing " << pos << "~" << l << std::endl;
-    lock.Lock();
-    filer.write(ino, &h.layout, snapc, pos, l, j, ceph_clock_now(g_ceph_context), 0, NULL,
-        new C_SafeCond(&localLock, &cond, &done));
-    lock.Unlock();
-
-    localLock.Lock();
-    while (!done)
-      cond.Wait(localLock);
-    localLock.Unlock();
-      
-    pos += l;
-    left -= l;
-  }
-
-  VOID_TEMP_FAILURE_RETRY(::close(fd));
-  cout << "done." << std::endl;
-}
-
-
-/**
- * Write JSON-formatted log entries to standard out.
- */
-void Dumper::dump_entries()
-{
-  Mutex localLock("dump_entries");
-  JSONFormatter jf(true);
-
-  if (recover_journal()) {
-    return;
-  }
-
-  jf.open_array_section("log");
-  lock.Lock();
-  // Until the journal is empty, pop an event or wait for one to
-  // be available.
-  dout(10) << "Journaler read/write/size: "
-      << journaler->get_read_pos() << "/" << journaler->get_write_pos()
-      << "/" << journaler->get_write_pos() - journaler->get_read_pos() << dendl;
-  while (journaler->get_read_pos() != journaler->get_write_pos()) {
-    bufferlist entry_bl;
-    bool got_data = journaler->try_read_entry(entry_bl);
-    dout(10) << "try_read_entry: " << got_data << dendl;
-    if (got_data) {
-      LogEvent *le = LogEvent::decode(entry_bl);
-      if (!le) {
-       dout(0) << "Error decoding LogEvent" << dendl;
-       break;
-      } else {
-       jf.open_object_section("log_event");
-       jf.dump_unsigned("type", le->get_type());
-       jf.dump_unsigned("start_off", le->get_start_off());
-       jf.dump_unsigned("stamp_sec", le->get_stamp().tv.tv_sec);
-       jf.dump_unsigned("stamp_nsec", le->get_stamp().tv.tv_nsec);
-       le->dump(&jf);
-       jf.close_section();
-       delete le;
-      }
-    } else {
-      bool done = false;
-      Cond cond;
-
-      journaler->wait_for_readable(new C_SafeCond(&localLock, &cond, &done));
-      lock.Unlock();
-      localLock.Lock();
-      while (!done)
-        cond.Wait(localLock);
-      localLock.Unlock();
-      lock.Lock();
-    }
-  }
-  lock.Unlock();
-  jf.close_section();
-  jf.flush(cout);
-  return;
-}
diff --git a/src/mds/Dumper.h b/src/mds/Dumper.h
deleted file mode 100644 (file)
index 6218ef4..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2010 Greg Farnum <gregf@hq.newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- */
-
-#ifndef JOURNAL_DUMPER_H_
-#define JOURNAL_DUMPER_H_
-
-
-#include "mds/MDSUtility.h"
-#include "osdc/Journaler.h"
-
-/**
- * This class lets you dump out an mds journal for troubleshooting or whatever.
- *
- * It was built to work with cmds so some of the design choices are random.
- * To use, create a Dumper, call init(), and then call dump() with the name
- * of the file to dump to.
- */
-
-class Dumper : public MDSUtility {
-private:
-  Journaler *journaler;
-  int rank;
-
-public:
-  Dumper() : journaler(NULL), rank(-1)
-  {}
-
-  void handle_mds_map(MMDSMap* m);
-
-  int init(int rank);
-  int recover_journal();
-  void dump(const char *dumpfile);
-  void undump(const char *dumpfile);
-  void dump_entries();
-};
-
-#endif /* JOURNAL_DUMPER_H_ */
diff --git a/src/mds/MDSUtility.cc b/src/mds/MDSUtility.cc
deleted file mode 100644 (file)
index 09be280..0000000
+++ /dev/null
@@ -1,160 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 John Spray <john.spray@inktank.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- */
-
-#include "mds/MDSUtility.h"
-#include "mon/MonClient.h"
-
-#define dout_subsys ceph_subsys_mds
-
-
-MDSUtility::MDSUtility() :
-  Dispatcher(g_ceph_context),
-  objecter(NULL),
-  lock("MDSUtility::lock"),
-  timer(g_ceph_context, lock),
-  waiting_for_mds_map(NULL)
-{
-  monc = new MonClient(g_ceph_context);
-  messenger = Messenger::create(g_ceph_context, entity_name_t::CLIENT(), "mds", getpid());
-  mdsmap = new MDSMap();
-  osdmap = new OSDMap();
-  objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer, 0, 0);
-}
-
-
-MDSUtility::~MDSUtility()
-{
-  delete objecter;
-  delete monc;
-  delete messenger;
-  delete osdmap;
-  delete mdsmap;
-  assert(waiting_for_mds_map == NULL);
-}
-
-
-int MDSUtility::init()
-{
-  // Initialize Messenger
-  int r = messenger->bind(g_conf->public_addr);
-  if (r < 0)
-    return r;
-
-  messenger->add_dispatcher_head(this);
-  messenger->start();
-
-  // Initialize MonClient
-  if (monc->build_initial_monmap() < 0)
-    return -1;
-
-  monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD|CEPH_ENTITY_TYPE_MDS);
-  monc->set_messenger(messenger);
-  monc->init();
-  r = monc->authenticate();
-  if (r < 0) {
-    derr << "Authentication failed, did you specify an MDS ID with a valid keyring?" << dendl;
-    return r;
-  }
-
-  client_t whoami = monc->get_global_id();
-  messenger->set_myname(entity_name_t::CLIENT(whoami.v));
-
-  // Initialize Objecter and wait for OSD map
-  objecter->set_client_incarnation(0);
-  objecter->init_unlocked();
-  lock.Lock();
-  objecter->init_locked();
-  lock.Unlock();
-  objecter->wait_for_osd_map();
-  timer.init();
-
-  // Prepare to receive MDS map and request it
-  Mutex init_lock("MDSUtility:init");
-  Cond cond;
-  bool done = false;
-  assert(!mdsmap->get_epoch());
-  lock.Lock();
-  waiting_for_mds_map = new C_SafeCond(&init_lock, &cond, &done, NULL);
-  lock.Unlock();
-  monc->sub_want("mdsmap", 0, CEPH_SUBSCRIBE_ONETIME);
-  monc->renew_subs();
-
-  // Wait for MDS map
-  dout(4) << "waiting for MDS map..." << dendl;
-  init_lock.Lock();
-  while (!done)
-    cond.Wait(init_lock);
-  init_lock.Unlock();
-  dout(4) << "Got MDS map " << mdsmap->get_epoch() << dendl;
-
-  return 0;
-}
-
-
-void MDSUtility::shutdown()
-{
-  lock.Lock();
-  timer.shutdown();
-  objecter->shutdown_locked();
-  lock.Unlock();
-  objecter->shutdown_unlocked();
-  monc->shutdown();
-  messenger->shutdown();
-  messenger->wait();
-}
-
-
-bool MDSUtility::ms_dispatch(Message *m)
-{
-   Mutex::Locker locker(lock);
-   switch (m->get_type()) {
-   case CEPH_MSG_OSD_OPREPLY:
-     objecter->handle_osd_op_reply((MOSDOpReply *)m);
-     break;
-   case CEPH_MSG_OSD_MAP:
-     objecter->handle_osd_map((MOSDMap*)m);
-     break;
-   case CEPH_MSG_MDS_MAP:
-     handle_mds_map((MMDSMap*)m);
-     break;
-   default:
-     return false;
-   }
-   return true;
-}
-
-
-void MDSUtility::handle_mds_map(MMDSMap* m)
-{
-  mdsmap->decode(m->get_encoded());
-  if (waiting_for_mds_map) {
-    waiting_for_mds_map->complete(0);
-    waiting_for_mds_map = NULL;
-  }
-}
-
-
-bool MDSUtility::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
-                         bool force_new)
-{
-  if (dest_type == CEPH_ENTITY_TYPE_MON)
-    return true;
-
-  if (force_new) {
-    if (monc->wait_auth_rotating(10) < 0)
-      return false;
-  }
-
-  *authorizer = monc->auth->build_authorizer(dest_type);
-  return *authorizer != NULL;
-}
diff --git a/src/mds/MDSUtility.h b/src/mds/MDSUtility.h
deleted file mode 100644 (file)
index d3f938e..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 John Spray <john.spray@inktank.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- */
-
-#ifndef MDS_UTILITY_H_
-#define MDS_UTILITY_H_
-
-#include "osd/OSDMap.h"
-#include "osdc/Objecter.h"
-#include "mds/MDSMap.h"
-#include "messages/MMDSMap.h"
-#include "msg/Dispatcher.h"
-#include "msg/Messenger.h"
-#include "auth/Auth.h"
-
-/// MDS Utility
-/**
- * This class is the parent for MDS utilities, i.e. classes that
- * need access the objects belonging to the MDS without actually
- * acting as an MDS daemon themselves.
- */
-class MDSUtility : public Dispatcher {
-protected:
-  Objecter *objecter;
-  OSDMap *osdmap;
-  MDSMap *mdsmap;
-  Messenger *messenger;
-  MonClient *monc;
-
-  Mutex lock;
-  SafeTimer timer;
-
-  Context *waiting_for_mds_map;
-
-public:
-  MDSUtility();
-  ~MDSUtility();
-
-  void handle_mds_map(MMDSMap* m);
-  bool ms_dispatch(Message *m);
-  bool ms_handle_reset(Connection *con) { return false; }
-  void ms_handle_remote_reset(Connection *con) {}
-  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
-                         bool force_new);
-  int init();
-  void shutdown();
-};
-
-#endif /* MDS_UTILITY_H_ */
index be30eb2c4b8f96e8a456fc16488afcdae2ad3412..8d6ad2db055b706e079406613a7d6f17eb7d74c3 100644 (file)
@@ -1,7 +1,5 @@
 libmds_la_SOURCES = \
        mds/Capability.cc \
-       mds/Dumper.cc \
-       mds/Resetter.cc \
        mds/MDS.cc \
        mds/flock.cc \
        mds/locks.c \
@@ -25,7 +23,6 @@ libmds_la_SOURCES = \
        mds/snap.cc \
        mds/SessionMap.cc \
        mds/MDLog.cc \
-       mds/MDSUtility.cc \
        common/TrackedOp.cc
 libmds_la_LIBADD = $(LIBOSDC)
 noinst_LTLIBRARIES += libmds.la
@@ -39,7 +36,6 @@ noinst_HEADERS += \
        mds/CDir.h \
        mds/CInode.h \
        mds/Capability.h \
-       mds/Dumper.h \
        mds/InoTable.h \
        mds/LocalLock.h \
        mds/Locker.h \
@@ -53,10 +49,8 @@ noinst_HEADERS += \
        mds/MDSTable.h \
        mds/MDSTableServer.h \
        mds/MDSTableClient.h \
-       mds/MDSUtility.h \
        mds/Mutation.h \
        mds/Migrator.h \
-       mds/Resetter.h \
        mds/ScatterLock.h \
        mds/Server.h \
        mds/SessionMap.h \
diff --git a/src/mds/Resetter.cc b/src/mds/Resetter.cc
deleted file mode 100644 (file)
index 22839ec..0000000
+++ /dev/null
@@ -1,114 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2010 Greg Farnum <gregf@hq.newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#include "mds/Resetter.h"
-#include "osdc/Journaler.h"
-#include "mds/mdstypes.h"
-#include "mon/MonClient.h"
-#include "mds/events/EResetJournal.h"
-
-
-int Resetter::init(int rank)
-{
-  int r = MDSUtility::init();
-  if (r < 0) {
-    return r;
-  }
-
-  inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
-  journaler = new Journaler(ino,
-      mdsmap->get_metadata_pool(),
-      CEPH_FS_ONDISK_MAGIC,
-      objecter, 0, 0, &timer);
-
-  return 0;
-}
-
-void Resetter::reset()
-{
-  Mutex mylock("Resetter::reset::lock");
-  Cond cond;
-  bool done;
-  int r;
-
-  lock.Lock();
-  journaler->recover(new C_SafeCond(&mylock, &cond, &done, &r));
-  lock.Unlock();
-
-  mylock.Lock();
-  while (!done)
-    cond.Wait(mylock);
-  mylock.Unlock();
-
-  if (r != 0) {
-    if (r == -ENOENT) {
-      cerr << "journal does not exist on-disk. Did you set a bad rank?"
-          << std::endl;
-      shutdown();
-      return;
-    } else {
-      cerr << "got error " << r << "from Journaler, failling" << std::endl;
-      shutdown();
-      return;
-    }
-  }
-
-  lock.Lock();
-  uint64_t old_start = journaler->get_read_pos();
-  uint64_t old_end = journaler->get_write_pos();
-  uint64_t old_len = old_end - old_start;
-  cout << "old journal was " << old_start << "~" << old_len << std::endl;
-
-  uint64_t new_start = ROUND_UP_TO(old_end+1, journaler->get_layout_period());
-  cout << "new journal start will be " << new_start
-       << " (" << (new_start - old_end) << " bytes past old end)" << std::endl;
-
-  journaler->set_read_pos(new_start);
-  journaler->set_write_pos(new_start);
-  journaler->set_expire_pos(new_start);
-  journaler->set_trimmed_pos(new_start);
-  journaler->set_writeable();
-
-  cout << "writing journal head" << std::endl;
-  journaler->write_head(new C_SafeCond(&mylock, &cond, &done, &r));
-  lock.Unlock();
-
-  mylock.Lock();
-  while (!done)
-    cond.Wait(mylock);
-  mylock.Unlock();
-    
-  lock.Lock();
-  assert(r == 0);
-
-  LogEvent *le = new EResetJournal;
-
-  bufferlist bl;
-  le->encode_with_header(bl);
-  
-  cout << "writing EResetJournal entry" << std::endl;
-  journaler->append_entry(bl);
-  journaler->flush(new C_SafeCond(&mylock, &cond, &done,&r));
-
-  lock.Unlock();
-
-  mylock.Lock();
-  while (!done)
-    cond.Wait(mylock);
-  mylock.Unlock();
-
-  assert(r == 0);
-
-  cout << "done" << std::endl;
-}
diff --git a/src/mds/Resetter.h b/src/mds/Resetter.h
deleted file mode 100644 (file)
index e967b24..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2010 Greg Farnum <gregf@hq.newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- */
-
-#ifndef JOURNAL_RESETTER_H_
-#define JOURNAL_RESETTER_H_
-
-
-#include "osdc/Journaler.h"
-#include "mds/MDSUtility.h"
-
-/**
- * This class lets you reset an mds journal for troubleshooting or whatever.
- *
- * To use, create a Resetter, call init(), and then call reset() with the name
- * of the file to dump to.
- */
-class Resetter : public MDSUtility {
-public:
-  Journaler *journaler;
-
-  Resetter() : journaler(NULL) {}
-
-  int init(int rank);
-  void reset();
-};
-
-#endif /* JOURNAL_RESETTER_H_ */
index dfba5c414625c4fbaa68b1ab2c322761d9ab111a..a66d540502a36c4ddde8743dc5822b09bb3afba8 100644 (file)
 #include "../LogEvent.h"
 
 class ENoOp : public LogEvent {
-  uint32_t size;
+  uint32_t pad_size;
 
 public:
-  ENoOp() : LogEvent(EVENT_NOOP), size(0) { }
-  ENoOp(uint32_t size_) : LogEvent(EVENT_NOOP), size(size){ }
+  ENoOp() : LogEvent(EVENT_NOOP), pad_size(0) { }
+  ENoOp(uint32_t size_) : LogEvent(EVENT_NOOP), pad_size(size_){ }
 
   void encode(bufferlist& bl) const;
   void decode(bufferlist::iterator& bl);
index 83e2f665efb9af282b80fc07c3baaaeb3e46ee2c..9f2829f36d563578cb221b80632b800c59425fb5 100644 (file)
@@ -857,9 +857,8 @@ void EMetaBlob::get_inodes(
 void EMetaBlob::get_dentries(std::map<dirfrag_t, std::set<std::string> > &dentries)
 {
   for (std::map<dirfrag_t, dirlump>::iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
-    inodeno_t const dir_ino = i->first.ino;
     dirlump &dl = i->second;
-    dirfrag_t &df = i->first;
+    dirfrag_t const &df = i->first;
 
     // Get all bits
     dl._decode_bits();
@@ -2996,27 +2995,32 @@ void EResetJournal::replay(MDS *mds)
 
 void ENoOp::encode(bufferlist &bl) const
 {
-  ::encode(size, bl);
-  uint32_t const pad_bytes = size - sizeof(size);
-  uint32_t const pad = 0xdeadbeef;
-  for (unsigned int i = 0; i < pad_bytes / sizeof(uint32_t); ++i) {
+  ENCODE_START(2, 2, bl);
+  ::encode(pad_size, bl);
+  uint8_t const pad = 0xff;
+  for (unsigned int i = 0; i < pad_size; ++i) {
     ::encode(pad, bl);
   }
+  ENCODE_FINISH(bl);
 }
 
 
 void ENoOp::decode(bufferlist::iterator &bl)
 {
-  ::decode(size, bl);
-  if (bl.get_remaining() != (size - sizeof(size))) {
+  DECODE_START(2, bl);
+  ::decode(pad_size, bl);
+  if (bl.get_remaining() != pad_size) {
     // This is spiritually an assertion, but expressing in a way that will let
     // journal debug tools catch it and recognise a malformed entry.
     throw buffer::end_of_buffer();
+  } else {
+    bl.advance(pad_size);
   }
+  DECODE_FINISH(bl);
 }
 
 
 void ENoOp::replay(MDS *mds)
 {
-  dout(4) << "ENoOp::replay, " << size << " bytes skipped in journal" << dendl;
+  dout(4) << "ENoOp::replay, " << pad_size << " bytes skipped in journal" << dendl;
 }
index 6078eb47b8f4ef708e959e8e8cdf6d8d92b0bafe..003c82918db2ddedf4c0eae0f6176b66488764c6 100644 (file)
@@ -71,6 +71,18 @@ rados_SOURCES += common/obj_bencher.cc # needs cleanup so it can go in libcommon
 rados_LDADD = libcls_lock_client.la $(LIBRADOS) $(CEPH_GLOBAL)
 bin_PROGRAMS += rados
 
+cephfs_journal_tool_SOURCES = \
+       tools/cephfs/cephfs-journal-tool.cc \
+       tools/cephfs/JournalTool.cc \
+       tools/cephfs/JournalFilter.cc \
+       tools/cephfs/JournalScanner.cc \
+       tools/cephfs/EventOutput.cc \
+       tools/cephfs/Dumper.cc \
+       tools/cephfs/Resetter.cc \
+       tools/cephfs/MDSUtility.cc
+cephfs_journal_tool_LDADD = $(LIBMDS) $(LIBRADOS) $(CEPH_GLOBAL)
+bin_PROGRAMS += cephfs-journal-tool
+
 if WITH_REST_BENCH
 rest_bench_SOURCES = tools/rest_bench.cc
 rest_bench_SOURCES += common/obj_bencher.cc # needs cleanup so it can go in libcommon.la
@@ -99,6 +111,13 @@ ceph_mon_store_converter_LDADD = $(LIBMON) $(LIBOS) $(CEPH_GLOBAL)
 bin_PROGRAMS += ceph_mon_store_converter
 
 noinst_HEADERS += \
+       tools/cephfs/JournalTool.h \
+       tools/cephfs/JournalScanner.h \
+       tools/cephfs/JournalFilter.h \
+       tools/cephfs/EventOutput.h \
+       tools/cephfs/Resetter.h \
+       tools/cephfs/Dumper.h \
+       tools/cephfs/MDSUtility.h \
        tools/rados/rados_sync.h \
        tools/common.h
 
diff --git a/src/tools/cephfs/Dumper.cc b/src/tools/cephfs/Dumper.cc
new file mode 100644 (file)
index 0000000..c47f88b
--- /dev/null
@@ -0,0 +1,222 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2010 Greg Farnum <gregf@hq.newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef _BACKWARD_BACKWARD_WARNING_H
+#define _BACKWARD_BACKWARD_WARNING_H   // make gcc 4.3 shut up about hash_*
+#endif
+
+#include "include/compat.h"
+#include "common/entity_name.h"
+#include "common/errno.h"
+#include "common/safe_io.h"
+#include "mds/mdstypes.h"
+#include "mds/LogEvent.h"
+#include "osdc/Journaler.h"
+
+#include "Dumper.h"
+
+#define dout_subsys ceph_subsys_mds
+
+
+int Dumper::init(int rank_)
+{
+  rank = rank_;
+
+  int r = MDSUtility::init();
+  if (r < 0) {
+    return r;
+  }
+
+  inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
+  journaler = new Journaler(ino, mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC,
+                                       objecter, 0, 0, &timer);
+  return 0;
+}
+
+
+int Dumper::recover_journal()
+{
+  bool done = false;
+  Cond cond;
+  Mutex localLock("dump:recover_journal");
+  int r;
+
+  lock.Lock();
+  journaler->recover(new C_SafeCond(&localLock, &cond, &done, &r));
+  lock.Unlock();
+  localLock.Lock();
+  while (!done)
+    cond.Wait(localLock);
+  localLock.Unlock();
+
+  if (r < 0) { // Error
+    derr << "error on recovery: " << cpp_strerror(r) << dendl;
+    return r;
+  } else {
+    dout(10) << "completed journal recovery" << dendl;
+    return 0;
+  }
+}
+
+
+void Dumper::dump(const char *dump_file)
+{
+  bool done = false;
+  int r = 0;
+  Cond cond;
+  Mutex localLock("dump:lock");
+
+  r = recover_journal();
+  if (r) {
+    return;
+  }
+  uint64_t start = journaler->get_read_pos();
+  uint64_t end = journaler->get_write_pos();
+  uint64_t len = end-start;
+  inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
+
+  cout << "journal is " << start << "~" << len << std::endl;
+
+  Filer filer(objecter);
+  bufferlist bl;
+
+  lock.Lock();
+  filer.read(ino, &journaler->get_layout(), CEPH_NOSNAP,
+             start, len, &bl, 0, new C_SafeCond(&localLock, &cond, &done));
+  lock.Unlock();
+  localLock.Lock();
+  while (!done)
+    cond.Wait(localLock);
+  localLock.Unlock();
+
+  cout << "read " << bl.length() << " bytes at offset " << start << std::endl;
+
+  int fd = ::open(dump_file, O_WRONLY|O_CREAT|O_TRUNC, 0644);
+  if (fd >= 0) {
+    // include an informative header
+    char buf[200];
+    memset(buf, 0, sizeof(buf));
+    sprintf(buf, "Ceph mds%d journal dump\n start offset %llu (0x%llx)\n       length %llu (0x%llx)\n%c",
+           rank, 
+           (unsigned long long)start, (unsigned long long)start,
+           (unsigned long long)bl.length(), (unsigned long long)bl.length(),
+           4);
+    int r = safe_write(fd, buf, sizeof(buf));
+    if (r)
+      ceph_abort();
+
+    // write the data
+    ::lseek64(fd, start, SEEK_SET);
+    bl.write_fd(fd);
+    ::close(fd);
+
+    cout << "wrote " << bl.length() << " bytes at offset " << start << " to " << dump_file << "\n"
+        << "NOTE: this is a _sparse_ file; you can\n"
+        << "\t$ tar cSzf " << dump_file << ".tgz " << dump_file << "\n"
+        << "      to efficiently compress it while preserving sparseness." << std::endl;
+  } else {
+    int err = errno;
+    derr << "unable to open " << dump_file << ": " << cpp_strerror(err) << dendl;
+  }
+}
+
+void Dumper::undump(const char *dump_file)
+{
+  Mutex localLock("undump:lock");
+  cout << "undump " << dump_file << std::endl;
+  
+  int fd = ::open(dump_file, O_RDONLY);
+  if (fd < 0) {
+    derr << "couldn't open " << dump_file << ": " << cpp_strerror(errno) << dendl;
+    return;
+  }
+
+  // Ceph mds0 journal dump
+  //  start offset 232401996 (0xdda2c4c)
+  //        length 1097504 (0x10bf20)
+
+  char buf[200];
+  int r = safe_read(fd, buf, sizeof(buf));
+  if (r < 0) {
+    VOID_TEMP_FAILURE_RETRY(::close(fd));
+    return;
+  }
+
+  long long unsigned start, len;
+  sscanf(strstr(buf, "start offset"), "start offset %llu", &start);
+  sscanf(strstr(buf, "length"), "length %llu", &len);
+
+  cout << "start " << start << " len " << len << std::endl;
+  
+  inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
+
+  Journaler::Header h;
+  h.trimmed_pos = start;
+  h.expire_pos = start;
+  h.write_pos = start+len;
+  h.magic = CEPH_FS_ONDISK_MAGIC;
+
+  h.layout = g_default_file_layout;
+  h.layout.fl_pg_pool = mdsmap->get_metadata_pool();
+  
+  bufferlist hbl;
+  ::encode(h, hbl);
+
+  object_t oid = file_object_t(ino, 0);
+  object_locator_t oloc(mdsmap->get_metadata_pool());
+  SnapContext snapc;
+
+  bool done = false;
+  Cond cond;
+  
+  cout << "writing header " << oid << std::endl;
+  lock.Lock();
+  objecter->write_full(oid, oloc, snapc, hbl, ceph_clock_now(g_ceph_context), 0, 
+                      NULL, 
+                      new C_SafeCond(&localLock, &cond, &done));
+  lock.Unlock();
+
+  localLock.Lock();
+  while (!done)
+    cond.Wait(localLock);
+  localLock.Unlock();
+  
+  // read
+  Filer filer(objecter);
+  uint64_t pos = start;
+  uint64_t left = len;
+  while (left > 0) {
+    bufferlist j;
+    lseek64(fd, pos, SEEK_SET);
+    uint64_t l = MIN(left, 1024*1024);
+    j.read_fd(fd, l);
+    cout << " writing " << pos << "~" << l << std::endl;
+    lock.Lock();
+    filer.write(ino, &h.layout, snapc, pos, l, j, ceph_clock_now(g_ceph_context), 0, NULL,
+        new C_SafeCond(&localLock, &cond, &done));
+    lock.Unlock();
+
+    localLock.Lock();
+    while (!done)
+      cond.Wait(localLock);
+    localLock.Unlock();
+      
+    pos += l;
+    left -= l;
+  }
+
+  VOID_TEMP_FAILURE_RETRY(::close(fd));
+  cout << "done." << std::endl;
+}
+
diff --git a/src/tools/cephfs/Dumper.h b/src/tools/cephfs/Dumper.h
new file mode 100644 (file)
index 0000000..0df861e
--- /dev/null
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2010 Greg Farnum <gregf@hq.newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef JOURNAL_DUMPER_H_
+#define JOURNAL_DUMPER_H_
+
+
+#include "MDSUtility.h"
+#include "osdc/Journaler.h"
+
+/**
+ * This class lets you dump out an mds journal for troubleshooting or whatever.
+ *
+ * It was built to work with cmds so some of the design choices are random.
+ * To use, create a Dumper, call init(), and then call dump() with the name
+ * of the file to dump to.
+ */
+
+class Dumper : public MDSUtility {
+private:
+  Journaler *journaler;
+  int rank;
+
+public:
+  Dumper() : journaler(NULL), rank(-1)
+  {}
+
+  void handle_mds_map(MMDSMap* m);
+
+  int init(int rank);
+  int recover_journal();
+  void dump(const char *dumpfile);
+  void undump(const char *dumpfile);
+};
+
+#endif /* JOURNAL_DUMPER_H_ */
diff --git a/src/tools/cephfs/EventOutput.cc b/src/tools/cephfs/EventOutput.cc
new file mode 100644 (file)
index 0000000..c762558
--- /dev/null
@@ -0,0 +1,112 @@
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * ceph - scalable distributed file system
+ *
+ * copyright (c) 2014 john spray <john.spray@inktank.com>
+ *
+ * this is free software; you can redistribute it and/or
+ * modify it under the terms of the gnu lesser general public
+ * license version 2.1, as published by the free software
+ * foundation.  see file copying.
+ */
+
+
+#include <iostream>
+#include <fstream>
+
+#include "common/errno.h"
+#include "mds/mdstypes.h"
+#include "mds/events/EUpdate.h"
+#include "mds/LogEvent.h"
+#include "JournalScanner.h"
+
+#include "EventOutput.h"
+
+
+void EventOutput::binary() const
+{
+  // Binary output, files
+  int r = ::mkdir(path.c_str(), 0755);
+  if (r != 0) {
+    std::cerr << "Error creating output directory: " << cpp_strerror(r) << std::endl;
+    assert(r == 0);
+  }
+
+  for (JournalScanner::EventMap::const_iterator i = scan.events.begin(); i != scan.events.end(); ++i) {
+    LogEvent *le = i->second.log_event;
+    bufferlist le_bin;
+    le->encode(le_bin);
+
+    std::stringstream filename;
+    filename << "0x" << std::hex << i->first << std::dec << "_" << le->get_type_str() << ".bin";
+    std::string const file_path = path + std::string("/") + filename.str();
+    std::ofstream bin_file(file_path.c_str(), std::ofstream::out | std::ofstream::binary);
+    le_bin.write_stream(bin_file);
+    bin_file.close();
+  }
+  std::cerr << "Wrote output to binary files in directory '" << path << "'" << std::endl;
+}
+
+void EventOutput::json() const
+{
+  JSONFormatter jf(true);
+  std::ofstream out_file(path.c_str(), std::ofstream::out);
+  jf.open_array_section("journal");
+  {
+    for (JournalScanner::EventMap::const_iterator i = scan.events.begin(); i != scan.events.end(); ++i) {
+      LogEvent *le = i->second.log_event;
+      jf.open_object_section("log_event");
+      {
+        le->dump(&jf);
+      }
+      jf.close_section();  // log_event
+    }
+  }
+  jf.close_section();  // journal
+  jf.flush(out_file);
+  out_file.close();
+  std::cerr << "Wrote output to JSON file '" << path << "'" << std::endl;
+}
+
+void EventOutput::list() const
+{
+  for (JournalScanner::EventMap::const_iterator i = scan.events.begin(); i != scan.events.end(); ++i) {
+    std::vector<std::string> ev_paths;
+    EMetaBlob *emb = i->second.log_event->get_metablob();
+    if (emb) {
+      emb->get_paths(ev_paths);
+    }
+
+    std::string detail;
+    if (i->second.log_event->get_type() == EVENT_UPDATE) {
+      EUpdate *eu = reinterpret_cast<EUpdate*>(i->second.log_event);
+      detail = eu->type;
+    }
+
+    std::cerr << "0x"
+      << std::hex << i->first << std::dec << " "
+      << i->second.log_event->get_type_str() << ": "
+      << " (" << detail << ")" << std::endl;
+    for (std::vector<std::string>::iterator i = ev_paths.begin(); i != ev_paths.end(); ++i) {
+        std::cerr << "  " << *i << std::endl;
+    }
+  }
+}
+
+void EventOutput::summary() const
+{
+  std::map<std::string, int> type_count;
+  for (JournalScanner::EventMap::const_iterator i = scan.events.begin(); i != scan.events.end(); ++i) {
+    std::string const type = i->second.log_event->get_type_str();
+    if (type_count.count(type) == 0) {
+      type_count[type] = 0;
+    }
+    type_count[type] += 1;
+  }
+
+  std::cerr << "Events by type:" << std::endl;
+  for (std::map<std::string, int>::iterator i = type_count.begin(); i != type_count.end(); ++i) {
+      std::cerr << "  " << i->first << ": " << i->second << std::endl;
+  }
+}
diff --git a/src/tools/cephfs/EventOutput.h b/src/tools/cephfs/EventOutput.h
new file mode 100644 (file)
index 0000000..2c93794
--- /dev/null
@@ -0,0 +1,42 @@
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * ceph - scalable distributed file system
+ *
+ * copyright (c) 2014 john spray <john.spray@inktank.com>
+ *
+ * this is free software; you can redistribute it and/or
+ * modify it under the terms of the gnu lesser general public
+ * license version 2.1, as published by the free software
+ * foundation.  see file copying.
+ */
+
+
+#ifndef EVENT_OUTPUT_H
+#define EVENT_OUTPUT_H
+
+#include <string>
+
+class JournalScanner;
+
+/**
+ * Different output formats for the results of a journal scan
+ */
+class EventOutput
+{
+  private:
+    JournalScanner const &scan;
+    std::string const path;
+
+  public:
+    EventOutput(JournalScanner const &scan_, std::string const &path_)
+      : scan(scan_), path(path_) {}
+
+    void summary() const;
+    void list() const;
+    void json() const;
+    void binary() const;
+};
+
+#endif // EVENT_OUTPUT_H
+
diff --git a/src/tools/cephfs/JournalFilter.cc b/src/tools/cephfs/JournalFilter.cc
new file mode 100644 (file)
index 0000000..9299252
--- /dev/null
@@ -0,0 +1,278 @@
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * ceph - scalable distributed file system
+ *
+ * copyright (c) 2014 john spray <john.spray@inktank.com>
+ *
+ * this is free software; you can redistribute it and/or
+ * modify it under the terms of the gnu lesser general public
+ * license version 2.1, as published by the free software
+ * foundation.  see file copying.
+ */
+
+
+#include "JournalFilter.h"
+
+#include "common/ceph_argparse.h"
+
+#include "mds/events/ESession.h"
+#include "mds/events/EUpdate.h"
+
+#define dout_subsys ceph_subsys_mds
+
+
+const string JournalFilter::range_separator("..");
+
+
+/*
+ * Return whether a LogEvent is to be included or excluded.
+ *
+ * The filter parameters are applied on an AND basis: if any
+ * condition is not met, the event is excluded.  Try to do
+ * the fastest checks first.
+ */
+bool JournalFilter::apply(uint64_t pos, LogEvent &le) const
+{
+  /* Filtering by journal offset range */
+  if (pos < range_start || pos >= range_end) {
+    return false;
+  }
+
+  /* Filtering by event type */
+  if (event_type != 0) {
+    if (le.get_type() != event_type) {
+      return false;
+    }
+  }
+
+  /* Filtering by client */
+  if (client_name.num()) {
+    EMetaBlob *metablob = le.get_metablob();
+    if (metablob) {
+      if (metablob->get_client_name() != client_name) {
+        return false;
+      }
+    } else if (le.get_type() == EVENT_SESSION) {
+      ESession *es = reinterpret_cast<ESession*>(&le);
+      if (es->get_client_inst().name != client_name) {
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+
+  /* Filtering by inode */
+  if (inode) {
+    EMetaBlob *metablob = le.get_metablob();
+    if (metablob) {
+      std::set<inodeno_t> inodes;
+      metablob->get_inodes(inodes);
+      bool match_any = false;
+      for (std::set<inodeno_t>::iterator i = inodes.begin(); i != inodes.end(); ++i) {
+        if (*i == inode) {
+          match_any = true;
+          break;
+        }
+      }
+      if (!match_any) {
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+
+  /* Filtering by frag and dentry */
+  if (!frag_dentry.empty() || frag.ino) {
+    EMetaBlob *metablob = le.get_metablob();
+    if (metablob) {
+      std::map<dirfrag_t, std::set<std::string> > dentries;
+      metablob->get_dentries(dentries);
+
+      if (frag.ino) {
+        bool match_any = false;
+        for (std::map<dirfrag_t, std::set<std::string> >::iterator i = dentries.begin();
+            i != dentries.end(); ++i) {
+          if (i->first == frag) {
+            match_any = true;
+            break;
+          }
+        }
+        if (!match_any) {
+          return false;
+        }
+      }
+
+      if (!frag_dentry.empty()) {
+        bool match_any = false;
+        for (std::map<dirfrag_t, std::set<std::string> >::iterator i = dentries.begin();
+            i != dentries.end() && !match_any; ++i) {
+          std::set<std::string> const &names = i->second;
+          for (std::set<std::string>::iterator j = names.begin();
+              j != names.end() && !match_any; ++j) {
+            if (*j == frag_dentry) {
+              match_any = true;
+            }
+          }
+        }
+        if (!match_any) {
+          return false;
+        }
+      }
+
+    } else {
+      return false;
+    }
+  }
+
+  /* Filtering by file path */
+  if (!path_expr.empty()) {
+    EMetaBlob *metablob = le.get_metablob();
+    if (metablob) {
+      std::vector<std::string> paths;
+      metablob->get_paths(paths);
+      bool match_any = false;
+      for (std::vector<std::string>::iterator p = paths.begin(); p != paths.end(); ++p) {
+        if ((*p).find(path_expr) != std::string::npos) {
+          match_any = true;
+          break;
+        }
+      }
+      if (!match_any) {
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+
+int JournalFilter::parse_args(
+  std::vector<const char*> &argv, 
+  std::vector<const char*>::iterator &arg)
+{
+  while(arg != argv.end()) {
+    std::string arg_str;
+    if (ceph_argparse_witharg(argv, arg, &arg_str, "--range", (char*)NULL)) {
+      size_t sep_loc = arg_str.find(JournalFilter::range_separator);
+      if (sep_loc == std::string::npos || arg_str.size() <= JournalFilter::range_separator.size()) {
+        derr << "Invalid range '" << arg_str << "'" << dendl;
+        return -EINVAL;
+      }
+
+      // We have a lower bound
+      if (sep_loc > 0) {
+        std::string range_start_str = arg_str.substr(0, sep_loc); 
+        std::string parse_err;
+        range_start = strict_strtoll(range_start_str.c_str(), 0, &parse_err);
+        if (!parse_err.empty()) {
+          derr << "Invalid lower bound '" << range_start_str << "': " << parse_err << dendl;
+          return -EINVAL;
+        }
+      }
+
+      if (sep_loc < arg_str.size() - JournalFilter::range_separator.size()) {
+        std::string range_end_str = arg_str.substr(sep_loc + range_separator.size()); 
+        std::string parse_err;
+        range_end = strict_strtoll(range_end_str.c_str(), 0, &parse_err);
+        if (!parse_err.empty()) {
+          derr << "Invalid upper bound '" << range_end_str << "': " << parse_err << dendl;
+          return -EINVAL;
+        }
+      }
+    } else if (ceph_argparse_witharg(argv, arg, &arg_str, "--path", (char*)NULL)) {
+      dout(4) << "Filtering by path '" << arg_str << "'" << dendl;
+      path_expr = arg_str;
+    } else if (ceph_argparse_witharg(argv, arg, &arg_str, "--inode", (char*)NULL)) {
+      dout(4) << "Filtering by inode '" << arg_str << "'" << dendl;
+      std::string parse_err;
+      inode = strict_strtoll(arg_str.c_str(), 0, &parse_err);
+      if (!parse_err.empty()) {
+        derr << "Invalid inode '" << arg_str << "': " << parse_err << dendl;
+        return -EINVAL;
+      }
+    } else if (ceph_argparse_witharg(argv, arg, &arg_str, "--type", (char*)NULL)) {
+      std::string parse_err;
+      event_type = LogEvent::str_to_type(arg_str);
+      if (event_type == LogEvent::EventType(-1)) {
+        derr << "Invalid event type '" << arg_str << "': " << parse_err << dendl;
+        return -EINVAL;
+      }
+
+    } else if (ceph_argparse_witharg(argv, arg, &arg_str, "--frag", (char*)NULL)) {
+      std::string const frag_sep = ".";
+      size_t sep_loc = arg_str.find(frag_sep);
+      std::string inode_str;
+      std::string frag_str;
+      if (sep_loc != std::string::npos) {
+        inode_str = arg_str.substr(0, sep_loc);
+        frag_str = arg_str.substr(sep_loc + 1);
+      } else {
+        inode_str = arg_str;
+        frag_str = "0";
+      }
+
+      std::string parse_err;
+      inodeno_t frag_ino = strict_strtoll(inode_str.c_str(), 0, &parse_err);
+      if (!parse_err.empty()) {
+        derr << "Invalid inode '" << inode_str << "': " << parse_err << dendl;
+        return -EINVAL;
+      }
+
+      uint32_t frag_enc = strict_strtoll(frag_str.c_str(), 0, &parse_err);
+      if (!parse_err.empty()) {
+        derr << "Invalid frag '" << frag_str << "': " << parse_err << dendl;
+        return -EINVAL;
+      }
+
+      frag = dirfrag_t(frag_ino, frag_t(frag_enc));
+      dout(4) << "dirfrag filter: '" << frag << "'" << dendl;
+    } else if (ceph_argparse_witharg(argv, arg, &arg_str, "--dname", (char*)NULL)) {
+      frag_dentry = arg_str;
+      dout(4) << "dentry filter: '" << frag_dentry << "'" << dendl;
+    } else if (ceph_argparse_witharg(argv, arg, &arg_str, "--client", (char*)NULL)) {
+      std::string parse_err;
+      int64_t client_num = strict_strtoll(arg_str.c_str(), 0, &parse_err);
+      if (!parse_err.empty()) {
+        derr << "Invalid client number " << arg_str << dendl;
+        return -EINVAL;
+      }
+      client_name = entity_name_t::CLIENT(client_num);
+      
+      dout(4) << "dentry filter: '" << frag_dentry << "'" << dendl;
+    } else {
+      // We're done with args the filter understands
+      break;
+    }
+  }
+
+  return 0;
+}
+
+/**
+ * If the filter params are only range, then return
+ * true and set start & end.  Else return false.
+ *
+ * Use this to discover if the user has requested a contiguous range
+ * rather than any per-event filtering.
+ */
+bool JournalFilter::get_range(uint64_t &start, uint64_t &end) const
+{
+  if (!path_expr.empty()
+      || inode != 0
+      || event_type != 0
+      || frag.ino != 0
+      || client_name.num() != 0
+      || (range_start == 0 && range_end == (uint64_t)(-1))) {
+    return false;
+  } else {
+    start = range_start;
+    end = range_end;
+    return true;
+  }
+}
diff --git a/src/tools/cephfs/JournalFilter.h b/src/tools/cephfs/JournalFilter.h
new file mode 100644 (file)
index 0000000..4fd8fff
--- /dev/null
@@ -0,0 +1,64 @@
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * ceph - scalable distributed file system
+ *
+ * copyright (c) 2014 john spray <john.spray@inktank.com>
+ *
+ * this is free software; you can redistribute it and/or
+ * modify it under the terms of the gnu lesser general public
+ * license version 2.1, as published by the free software
+ * foundation.  see file copying.
+ */
+
+
+#ifndef JOURNAL_FILTER_H
+#define JOURNAL_FILTER_H
+
+#include "mds/mdstypes.h"
+#include "mds/LogEvent.h"
+
+/**
+ * A set of conditions for narrowing down a search through the journal
+ */
+class JournalFilter
+{
+  private:
+
+  /* Filtering by journal offset range */
+  uint64_t range_start;
+  uint64_t range_end;
+  static const std::string range_separator;
+
+  /* Filtering by file (sub) path */
+  std::string path_expr;
+
+  /* Filtering by inode */
+  inodeno_t inode;
+
+  /* Filtering by type */
+  LogEvent::EventType event_type;
+
+  /* Filtering by dirfrag */
+  dirfrag_t frag;
+  std::string frag_dentry;  //< optional, filter dentry name within fragment
+
+  /* Filtering by metablob client name */
+  entity_name_t client_name;
+
+  public:
+  JournalFilter() : 
+    range_start(0),
+    range_end(-1),
+    inode(0),
+    event_type(0) {}
+
+  bool get_range(uint64_t &start, uint64_t &end) const;
+  bool apply(uint64_t pos, LogEvent &le) const;
+  int parse_args(
+    std::vector<const char*> &argv, 
+    std::vector<const char*>::iterator &arg);
+};
+
+#endif // JOURNAL_FILTER_H
+
diff --git a/src/tools/cephfs/JournalScanner.cc b/src/tools/cephfs/JournalScanner.cc
new file mode 100644 (file)
index 0000000..3eb482d
--- /dev/null
@@ -0,0 +1,315 @@
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * ceph - scalable distributed file system
+ *
+ * copyright (c) 2014 john spray <john.spray@inktank.com>
+ *
+ * this is free software; you can redistribute it and/or
+ * modify it under the terms of the gnu lesser general public
+ * license version 2.1, as published by the free software
+ * foundation.  see file copying.
+ */
+
+
+#include "include/rados/librados.hpp"
+
+#include "JournalScanner.h"
+
+#define dout_subsys ceph_subsys_mds
+
+/**
+ * Read journal header, followed by sequential scan through journal space.
+ *
+ * Return 0 on success, else error code.  Note that success has the special meaning
+ * that we were able to apply our checks, it does *not* mean that the journal is
+ * healthy.
+ */
+int JournalScanner::scan(bool const full)
+{
+  int r = 0;
+
+  r = scan_header();
+  if (r < 0) {
+    return r;
+  }
+  if (full) {
+    r = scan_events();
+    if (r < 0) {
+      return r;
+    }
+  }
+
+  return 0;
+}
+
+int JournalScanner::scan_header()
+{
+  int r;
+
+  bufferlist header_bl;
+  std::string header_name = obj_name(0, rank);
+  dout(4) << "JournalScanner::scan: reading header object '" << header_name << "'" << dendl;
+  r = io.read(header_name, header_bl, INT_MAX, 0);
+  if (r < 0) {
+    derr << "Header " << header_name << " is unreadable" << dendl;
+    return 0;  // "Successfully" found an error
+  } else {
+    header_present = true;
+  }
+
+  bufferlist::iterator header_bl_i = header_bl.begin();
+  header = new Journaler::Header();
+  try
+  {
+    header->decode(header_bl_i);
+  }
+  catch (buffer::error e)
+  {
+    derr << "Header is corrupt (" << e.what() << ")" << dendl;
+    return 0;  // "Successfully" found an error
+  }
+
+  if (header->magic != std::string(CEPH_FS_ONDISK_MAGIC)) {
+    derr << "Header is corrupt (bad magic)" << dendl;
+    return 0;  // "Successfully" found an error
+  }
+  if (!((header->trimmed_pos <= header->expire_pos) && (header->expire_pos <= header->write_pos))) {
+    derr << "Header is corrupt (inconsistent offsets)" << dendl;
+    return 0;  // "Successfully" found an error
+  }
+  header_valid = true;
+
+  return 0;
+}
+
+
+int JournalScanner::scan_events()
+{
+  int r;
+
+  uint64_t object_size = g_conf->mds_log_segment_size;
+  if (object_size == 0) {
+    // Default layout object size
+    object_size = g_default_file_layout.fl_object_size;
+  }
+
+  uint64_t read_offset = header->expire_pos;
+  dout(10) << std::hex << "Header 0x"
+    << header->trimmed_pos << " 0x"
+    << header->expire_pos << " 0x"
+    << header->write_pos << std::dec << dendl;
+  dout(10) << "Starting journal scan from offset 0x" << std::hex << read_offset << std::dec << dendl;
+
+  // TODO also check for extraneous objects before the trimmed pos or after the write pos,
+  // which would indicate a bogus header.
+
+  bufferlist read_buf;
+  bool gap = false;
+  uint64_t gap_start = -1;
+  for (uint64_t obj_offset = (read_offset / object_size); ; obj_offset++) {
+    // Read this journal segment
+    bufferlist this_object;
+    std::string const oid = obj_name(obj_offset, rank);
+    r = io.read(oid, this_object, INT_MAX, 0);
+
+    // Handle absent journal segments
+    if (r < 0) {
+      if (obj_offset > (header->write_pos / object_size)) {
+        dout(4) << "Reached end of journal objects" << dendl;
+        break;
+      } else {
+        derr << "Missing object " << oid << dendl;
+      }
+
+      objects_missing.push_back(obj_offset);
+      gap = true;
+      gap_start = read_offset;
+      continue;
+    } else {
+      dout(4) << "Read 0x" << std::hex << this_object.length() << std::dec
+              << " bytes from " << oid << " gap=" << gap << dendl;
+      objects_valid.push_back(oid);
+      this_object.copy(0, this_object.length(), read_buf);
+    }
+
+    if (gap) {
+      // No valid data at the current read offset, scan forward until we find something valid looking
+      // or have to drop out to load another object.
+      dout(4) << "Searching for sentinel from 0x" << std::hex << read_offset
+              << ", 0x" << read_buf.length() << std::dec << " bytes available" << dendl;
+
+      do {
+        bufferlist::iterator p = read_buf.begin();
+        uint64_t candidate_sentinel;
+        ::decode(candidate_sentinel, p);
+
+        dout(4) << "Data at 0x" << std::hex << read_offset << " = 0x" << candidate_sentinel << std::dec << dendl;
+
+        if (candidate_sentinel == JournalStream::sentinel) {
+          dout(4) << "Found sentinel at 0x" << std::hex << read_offset << std::dec << dendl;
+          ranges_invalid.push_back(Range(gap_start, read_offset));
+          gap = false;
+          break;
+        } else {
+          // No sentinel, discard this byte
+          read_buf.splice(0, 1);
+          read_offset += 1;
+        }
+      } while (read_buf.length() >= sizeof(JournalStream::sentinel));
+      dout(4) << "read_buf size is " << read_buf.length() << dendl;
+    } else {
+      dout(10) << "Parsing data, 0x" << std::hex << read_buf.length() << std::dec << " bytes available" << dendl;
+      while(true) {
+        // TODO: detect and handle legacy format journals: can do many things
+        // on them but on read errors have to give up instead of searching
+        // for sentinels.
+        JournalStream journal_stream(JOURNAL_FORMAT_RESILIENT);
+        bool readable = false;
+        try {
+          uint64_t need;
+          readable = journal_stream.readable(read_buf, need);
+        } catch (buffer::error e) {
+          readable = false;
+          dout(4) << "Invalid container encoding at 0x" << std::hex << read_offset << std::dec << dendl;
+          gap = true;
+          gap_start = read_offset;
+          read_buf.splice(0, 1);
+          read_offset += 1;
+          break;
+        }
+
+        if (!readable) {
+          // Out of data, continue to read next object
+          break;
+        }
+
+        bufferlist le_bl;  //< Serialized LogEvent blob
+        dout(10) << "Attempting decode at 0x" << std::hex << read_offset << std::dec << dendl;
+        // This cannot fail to decode because we pre-checked that a serialized entry
+        // blob would be readable.
+        uint64_t start_ptr = 0;
+        uint64_t consumed = journal_stream.read(read_buf, le_bl, start_ptr);
+        dout(10) << "Consumed 0x" << std::hex << consumed << std::dec << " bytes" << dendl;
+        if (start_ptr != read_offset) {
+          derr << "Bad entry start ptr (0x" << std::hex << start_ptr << ") at 0x"
+              << read_offset << std::dec << dendl;
+          gap = true;
+          gap_start = read_offset;
+          // FIXME: given that entry was invalid, should we be skipping over it?
+          // maybe push bytes back onto start of read_buf and just advance one byte
+          // to start scanning instead.  e.g. if a bogus size value is found it can
+          // cause us to consume and thus skip a bunch of following valid events.
+          read_offset += consumed;
+          break;
+        }
+
+        LogEvent *le = LogEvent::decode(le_bl);
+        if (le) {
+          dout(10) << "Valid entry at 0x" << std::hex << read_offset << std::dec << dendl;
+
+          if (filter.apply(read_offset, *le)) {
+            events[read_offset] = EventRecord(le, consumed);
+          } else {
+            delete le;
+          }
+          events_valid.push_back(read_offset);
+          read_offset += consumed;
+        } else {
+          dout(10) << "Invalid entry at 0x" << std::hex << read_offset << std::dec << dendl;
+          gap = true;
+          gap_start = read_offset;
+          read_offset += consumed;
+        }
+      }
+    }
+  }
+
+  if (gap) {
+    // Ended on a gap, assume it ran to end
+    ranges_invalid.push_back(Range(gap_start, -1));
+  }
+
+  dout(4) << "Scanned objects, " << objects_missing.size() << " missing, " << objects_valid.size() << " valid" << dendl;
+  dout(4) << "Events scanned, " << ranges_invalid.size() << " gaps" << dendl;
+  dout(4) << "Found " << events_valid.size() << " valid events" << dendl;
+  dout(4) << "Selected " << events.size() << " events events for processing" << dendl;
+
+  return 0;
+}
+
+
+JournalScanner::~JournalScanner()
+{
+  if (header) {
+    delete header;
+    header = NULL;
+  }
+  dout(4) << events.size() << " events" << dendl;
+  for (EventMap::iterator i = events.begin(); i != events.end(); ++i) {
+    delete i->second.log_event;
+  }
+  events.clear();
+}
+
+
+/**
+ * Whether the journal data looks valid and replayable
+ */
+bool JournalScanner::is_healthy() const
+{
+  return (header_present && header_valid && ranges_invalid.empty() && objects_missing.empty());
+}
+
+
+/**
+ * Whether the journal data can be read from RADOS
+ */
+bool JournalScanner::is_readable() const
+{
+  return (header_present && header_valid && objects_missing.empty());
+}
+
+
+/**
+ * Calculate the object name for a given offset in a particular MDS's journal
+ */
+std::string JournalScanner::obj_name(uint64_t offset, int const rank)
+{
+  char header_name[60];
+  snprintf(header_name, sizeof(header_name), "%llx.%08llx",
+      (unsigned long long)(MDS_INO_LOG_OFFSET + rank),
+      (unsigned long long)offset);
+  return std::string(header_name);
+}
+
+
+void JournalScanner::report(std::ostream &out) const
+{
+  out << "Overall journal integrity: " << (is_healthy() ? "OK" : "DAMAGED") << std::endl;
+
+  if (!header_present) {
+    out << "Header not found" << std::endl;
+  }
+
+  if (header_present && !header_valid) {
+    out << "Header could not be decoded" << std::endl;
+  }
+
+  if (objects_missing.size()) {
+    out << "Objects missing:" << std::endl;
+    for (std::vector<uint64_t>::const_iterator om = objects_missing.begin();
+         om != objects_missing.end(); ++om) {
+      out << "  0x" << std::hex << *om << std::dec << std::endl;
+    }
+  }
+
+  if (ranges_invalid.size()) {
+    out << "Corrupt regions:" << std::endl;
+    for (std::vector<Range>::const_iterator r = ranges_invalid.begin();
+         r != ranges_invalid.end(); ++r) {
+      out << "  0x" << std::hex << r->first << "-" << r->second << std::dec << std::endl;
+    }
+  }
+}
+
diff --git a/src/tools/cephfs/JournalScanner.h b/src/tools/cephfs/JournalScanner.h
new file mode 100644 (file)
index 0000000..8e579a0
--- /dev/null
@@ -0,0 +1,101 @@
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * ceph - scalable distributed file system
+ *
+ * copyright (c) 2014 john spray <john.spray@inktank.com>
+ *
+ * this is free software; you can redistribute it and/or
+ * modify it under the terms of the gnu lesser general public
+ * license version 2.1, as published by the free software
+ * foundation.  see file copying.
+ */
+
+#ifndef JOURNAL_SCANNER_H
+#define JOURNAL_SCANNER_H
+
+// For Journaler::Header, can't forward-declare nested classes
+#include <osdc/Journaler.h>
+
+namespace librados {
+  class IoCtx;
+}
+
+#include "JournalFilter.h"
+
+/**
+ * A simple sequential reader for metadata journals.  Unlike
+ * the MDS Journaler class, this is written to detect, record,
+ * and read past corruptions and missing objects.  It is also
+ * less efficient but more plainly written.
+ */
+class JournalScanner
+{
+  private:
+  librados::IoCtx &io;
+
+  // Input constraints
+  int const rank;
+  JournalFilter const filter;
+
+  void gap_advance();
+
+  public:
+  JournalScanner(
+      librados::IoCtx &io_,
+      int rank_,
+      JournalFilter const &filter_) :
+    io(io_),
+    rank(rank_),
+    filter(filter_),
+    header_present(false),
+    header_valid(false),
+    header(NULL) {};
+
+  JournalScanner(
+      librados::IoCtx &io_,
+      int rank_) :
+    io(io_),
+    rank(rank_),
+    header_present(false),
+    header_valid(false),
+    header(NULL) {};
+
+  ~JournalScanner();
+
+  int scan(bool const full=true);
+  int scan_header();
+  int scan_events();
+  void report(std::ostream &out) const;
+
+  static std::string obj_name(uint64_t offset, int const rank);
+
+  // The results of the scan
+  class EventRecord {
+    public:
+    EventRecord() : log_event(NULL), raw_size(0) {}
+    EventRecord(LogEvent *le, uint32_t rs) : log_event(le), raw_size(rs) {}
+    LogEvent *log_event;
+    uint32_t raw_size;  //< Size from start offset including all encoding overhead
+  };
+  typedef std::map<uint64_t, EventRecord> EventMap;
+  typedef std::pair<uint64_t, uint64_t> Range;
+  bool header_present;
+  bool header_valid;
+  Journaler::Header *header;
+
+  bool is_healthy() const;
+  bool is_readable() const;
+  std::vector<std::string> objects_valid;
+  std::vector<uint64_t> objects_missing;
+  std::vector<Range> ranges_invalid;
+  std::vector<uint64_t> events_valid;
+  EventMap events;
+
+  private:
+  // Forbid copy construction because I have ptr members
+  JournalScanner(const JournalScanner &rhs);
+};
+
+#endif // JOURNAL_SCANNER_H
+
diff --git a/src/tools/cephfs/JournalTool.cc b/src/tools/cephfs/JournalTool.cc
new file mode 100644 (file)
index 0000000..141aa64
--- /dev/null
@@ -0,0 +1,714 @@
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * ceph - scalable distributed file system
+ *
+ * copyright (c) 2014 john spray <john.spray@inktank.com>
+ *
+ * this is free software; you can redistribute it and/or
+ * modify it under the terms of the gnu lesser general public
+ * license version 2.1, as published by the free software
+ * foundation.  see file copying.
+ */
+
+
+#include <sstream>
+
+#include "common/ceph_argparse.h"
+#include "common/errno.h"
+#include "osdc/Journaler.h"
+#include "mds/mdstypes.h"
+#include "mds/LogEvent.h"
+
+#include "mds/events/ENoOp.h"
+#include "mds/events/EUpdate.h"
+
+#include "JournalScanner.h"
+#include "EventOutput.h"
+#include "Dumper.h"
+#include "Resetter.h"
+
+#include "JournalTool.h"
+
+
+#define dout_subsys ceph_subsys_mds
+
+
+
+void JournalTool::usage()
+{
+  std::cout << "Usage: \n"
+    << "  cephfs-journal-tool [options] journal [inspect|import|export]\n"
+    << "  cephfs-journal-tool [options] header <get|set <field> <value>\n"
+    << "  cephfs-journal-tool [options] event <selector> <effect> <output>\n"
+    << "    <selector>:  [--by-type=<metablob|client|mds|...?>|--by-inode=<inode>|--by-path=<path>|by-tree=<path>|by-range=<N>..<M>|by-dirfrag-name=<dirfrag id>,<name>]\n"
+    << "    <effect>: [get|splice]\n"
+    << "    <output>: [summary|binary|json] [-o <path>] [--latest]\n"
+    << "\n"
+    << "Options:\n"
+    << "  --rank=<int>  Journal rank (default 0)\n";
+
+  generic_client_usage();
+}
+
+
+/**
+ * Handle arguments and hand off to journal/header/event mode
+ */
+int JournalTool::main(std::vector<const char*> &argv)
+{
+  int r;
+
+  dout(10) << "JournalTool::main " << dendl;
+  // Common arg parsing
+  // ==================
+  if (argv.empty()) {
+    usage();
+    return -EINVAL;
+  }
+
+  std::vector<const char*>::iterator arg = argv.begin();
+  std::string rank_str;
+  if(ceph_argparse_witharg(argv, arg, &rank_str, "--rank", (char*)NULL)) {
+    std::string rank_err;
+    rank = strict_strtol(rank_str.c_str(), 10, &rank_err);
+    if (!rank_err.empty()) {
+        derr << "Bad rank '" << rank_str << "'" << dendl;
+        usage();
+    }
+  }
+
+  std::string mode;
+  if (arg == argv.end()) {
+    derr << "Missing mode [journal|header|event]" << dendl;
+    return -EINVAL;
+  }
+  mode = std::string(*arg);
+  arg = argv.erase(arg);
+
+  // RADOS init
+  // ==========
+  r = rados.init_with_context(g_ceph_context);
+  if (r < 0) {
+    derr << "RADOS unavailable, cannot scan filesystem journal" << dendl;
+    return r;
+  }
+
+  dout(4) << "JournalTool: connecting to RADOS..." << dendl;
+  rados.connect();
+  int const pool_id = mdsmap->get_metadata_pool();
+  dout(4) << "JournalTool: resolving pool " << pool_id << dendl;
+  std::string pool_name;
+  r = rados.pool_reverse_lookup(pool_id, &pool_name);
+  if (r < 0) {
+    derr << "Pool " << pool_id << " named in MDS map not found in RADOS!" << dendl;
+    return r;
+  }
+
+  dout(4) << "JournalTool: creating IoCtx.." << dendl;
+  r = rados.ioctx_create(pool_name.c_str(), io);
+  assert(r == 0);
+
+  // Execution
+  // =========
+  dout(4) << "Executing for rank " << rank << dendl;
+  if (mode == std::string("journal")) {
+    return main_journal(argv);
+  } else if (mode == std::string("header")) {
+    return main_header(argv);
+  } else if (mode == std::string("event")) {
+    return main_event(argv);
+  } else {
+    derr << "Bad command '" << mode << "'" << dendl;
+    usage();
+    return -EINVAL;
+  }
+}
+
+
+/**
+ * Handle arguments for 'journal' mode
+ *
+ * This is for operations that act on the journal as a whole.
+ */
+int JournalTool::main_journal(std::vector<const char*> &argv)
+{
+  std::string command = argv[0];
+  if (command == "inspect") {
+    return journal_inspect();
+  } else if (command == "export" || command == "import") {
+    if (argv.size() >= 2) {
+      std::string const path = argv[1];
+      return journal_export(path, command == "import");
+    } else {
+      derr << "Missing path" << dendl;
+      return -EINVAL;
+    }
+  } else if (command == "reset") {
+      return journal_reset();
+  } else {
+    derr << "Bad journal command '" << command << "'" << dendl;
+    return -EINVAL;
+  }
+}
+
+
+/**
+ * Parse arguments and execute for 'header' mode
+ *
+ * This is for operations that act on the header only.
+ */
+int JournalTool::main_header(std::vector<const char*> &argv)
+{
+  JournalFilter filter;
+  JournalScanner js(io, rank, filter);
+  int r = js.scan(false);
+  if (r < 0) {
+    derr << "Unable to scan journal" << dendl;
+    return r;
+  }
+
+  if (!js.header_present) {
+    derr << "Header object not found!" << dendl;
+    return -ENOENT;
+  } else if (!js.header_valid) {
+    derr << "Header invalid!" << dendl;
+    return -ENOENT;
+  } else {
+    assert(js.header != NULL);
+  }
+
+  if (argv.size() == 0) {
+    derr << "Invalid header command, must be [get|set]" << dendl;
+    return -EINVAL;
+  }
+  std::vector<const char *>::iterator arg = argv.begin();
+  std::string const command = *arg;
+  arg = argv.erase(arg);
+
+  if (command == std::string("get")) {
+    // Write JSON journal dump to stdout
+    JSONFormatter jf(true);
+    js.header->dump(&jf);
+    jf.flush(std::cout);
+    std::cout << std::endl;
+  } else if (command == std::string("set")) {
+    // Need two more args <key> <val>
+    if (argv.size() != 2) {
+      derr << "'set' requires two arguments <trimmed_pos|expire_pos|write_pos> <value>" << dendl;
+      return -EINVAL;
+    }
+
+    std::string const field_name = *arg;
+    arg = argv.erase(arg);
+
+    std::string const value_str = *arg;
+    arg = argv.erase(arg);
+    assert(argv.empty());
+
+    std::string parse_err;
+    uint64_t new_val = strict_strtoll(value_str.c_str(), 0, &parse_err);
+    if (!parse_err.empty()) {
+      derr << "Invalid value '" << value_str << "': " << parse_err << dendl;
+      return -EINVAL;
+    }
+
+    uint64_t *field = NULL;
+    if (field_name == "trimmed_pos") {
+      field = &(js.header->trimmed_pos);
+    } else if (field_name == "expire_pos") {
+      field = &(js.header->expire_pos);
+    } else if (field_name == "write_pos") {
+      field = &(js.header->write_pos);
+    } else {
+      derr << "Invalid field '" << field_name << "'" << dendl;
+      return -EINVAL;
+    }
+
+    std::cout << "Updating " << field_name << std::hex << " 0x" << *field << " -> 0x" << new_val << std::dec << std::endl;
+    *field = new_val;
+
+    dout(4) << "Writing object..." << dendl;
+    bufferlist header_bl;
+    ::encode(*(js.header), header_bl);
+    io.write_full(JournalScanner::obj_name(0, rank), header_bl);
+    dout(4) << "Write complete." << dendl;
+    std::cout << "Successfully updated header." << std::endl;
+  } else {
+    derr << "Bad header command '" << command << "'" << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+
+/**
+ * Parse arguments and execute for 'event' mode
+ *
+ * This is for operations that act on LogEvents within the log
+ */
+int JournalTool::main_event(std::vector<const char*> &argv)
+{
+  int r;
+
+  std::vector<const char*>::iterator arg = argv.begin();
+
+  std::string command = *(arg++);
+  if (command != "get" && command != "apply" && command != "splice") {
+    derr << "Unknown argument '" << command << "'" << dendl;
+    usage();
+    return -EINVAL;
+  }
+
+  if (arg == argv.end()) {
+    derr << "Incomplete command line" << dendl;
+    usage();
+    return -EINVAL;
+  }
+
+  // Parse filter options
+  // ====================
+  JournalFilter filter;
+  r = filter.parse_args(argv, arg);
+  if (r) {
+    return r;
+  }
+
+  // Parse output options
+  // ====================
+  if (arg == argv.end()) {
+    derr << "Missing output command" << dendl;
+    usage();
+  }
+  std::string output_style = *(arg++);
+  if (output_style != "binary" && output_style != "json" &&
+      output_style != "summary" && output_style != "list") {
+      derr << "Unknown argument: '" << output_style << "'" << dendl;
+      usage();
+      return -EINVAL;
+  }
+
+  std::string output_path = "dump";
+  while(arg != argv.end()) {
+    std::string arg_str;
+    if (ceph_argparse_witharg(argv, arg, &arg_str, "--path", (char*)NULL)) {
+      output_path = arg_str;
+    } else {
+      derr << "Unknown argument: '" << *arg << "'" << dendl;
+      usage();
+      return -EINVAL;
+    }
+  }
+
+  // Execute command
+  // ===============
+  JournalScanner js(io, rank, filter);
+  if (command == "get") {
+    r = js.scan();
+    if (r) {
+      derr << "Failed to scan journal (" << cpp_strerror(r) << ")" << dendl;
+      return r;
+    }
+  } else if (command == "apply") {
+    r = js.scan();
+    if (r) {
+      derr << "Failed to scan journal (" << cpp_strerror(r) << ")" << dendl;
+      return r;
+    }
+
+    bool dry_run = false;
+    if (arg != argv.end() && ceph_argparse_flag(argv, arg, "--dry_run", (char*)NULL)) {
+      dry_run = true;
+    }
+
+    for (JournalScanner::EventMap::iterator i = js.events.begin(); i != js.events.end(); ++i) {
+      LogEvent *le = i->second.log_event;
+      EMetaBlob *mb = le->get_metablob();
+      if (mb) {
+        replay_offline(*mb, dry_run);
+      }
+    }
+  } else if (command == "splice") {
+    r = js.scan();
+    if (r) {
+      derr << "Failed to scan journal (" << cpp_strerror(r) << ")" << dendl;
+      return r;
+    }
+
+    uint64_t start, end;
+    if (filter.get_range(start, end)) {
+      // Special case for range filter: erase a numeric range in the log
+      uint64_t range = end - start;
+      int r = erase_region(start, range);
+      if (r) {
+        derr << "Failed to erase region 0x" << std::hex << start << "~0x" << range << std::dec
+             << ": " << cpp_strerror(r) << dendl;
+        return r;
+      }
+    } else {
+      // General case: erase a collection of individual entries in the log
+      for (JournalScanner::EventMap::iterator i = js.events.begin(); i != js.events.end(); ++i) {
+        dout(4) << "Erasing offset 0x" << std::hex << i->first << std::dec << dendl;
+
+        int r = erase_region(i->first, i->second.raw_size);
+        if (r) {
+          derr << "Failed to erase event 0x" << std::hex << i->first << std::dec
+               << ": " << cpp_strerror(r) << dendl;
+          return r;
+        }
+      }
+    }
+
+
+  } else {
+    derr << "Unknown argument '" << command << "'" << dendl;
+    usage();
+    return -EINVAL;
+  }
+
+  // Generate output
+  // ===============
+  EventOutput output(js, output_path);
+  if (output_style == "binary") {
+      output.binary();
+  } else if (output_style == "json") {
+      output.json();
+  } else if (output_style == "summary") {
+      output.summary();
+  } else if (output_style == "list") {
+      output.list();
+  } else {
+    derr << "Bad output command '" << output_style << "'" << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+/**
+ * Provide the user with information about the condition of the journal,
+ * especially indicating what range of log events is available and where
+ * any gaps or corruptions in the journal are.
+ */
+int JournalTool::journal_inspect()
+{
+  int r;
+
+  JournalFilter filter;
+  JournalScanner js(io, rank, filter);
+  r = js.scan();
+  if (r) {
+    std::cerr << "Failed to scan journal (" << cpp_strerror(r) << ")" << std::endl;
+    return r;
+  }
+
+  js.report(std::cout);
+
+  return 0;
+}
+
+
+/**
+ * Attempt to export a binary dump of the journal.
+ *
+ * This is allowed to fail if the header is malformed or there are
+ * objects inaccessible, in which case the user would have to fall
+ * back to manually listing RADOS objects and extracting them, which
+ * they can do with the ``rados`` CLI.
+ */
+int JournalTool::journal_export(std::string const &path, bool import)
+{
+  int r = 0;
+  Dumper dumper;
+  JournalScanner js(io, rank);
+
+  /*
+   * Check that the header is valid and no objects are missing before
+   * trying to dump
+   */
+  r = js.scan();
+  if (r < 0) {
+    derr << "Unable to scan journal, assuming badly damaged" << dendl;
+    return r;
+  }
+  if (!js.is_readable()) {
+    derr << "Journal not readable, attempt object-by-object dump with `rados`" << dendl;
+    return -EIO;
+  }
+
+  /*
+   * Assuming we can cleanly read the journal data, dump it out to a file
+   */
+  r = dumper.init(rank);
+  if (r < 0) {
+    derr << "dumper::init failed: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+  if (import) {
+    dumper.undump(path.c_str());
+  } else {
+    dumper.dump(path.c_str());
+  }
+  dumper.shutdown();
+
+  return r;
+}
+
+
+/**
+ * Truncate journal and insert EResetJournal
+ */
+int JournalTool::journal_reset()
+{
+  int r = 0;
+  Resetter resetter;
+  r = resetter.init(rank);
+  if (r < 0) {
+    derr << "resetter::init failed: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+  resetter.reset();
+  resetter.shutdown();
+
+  return r;
+}
+
+
+int JournalTool::replay_offline(EMetaBlob &metablob, bool const dry_run)
+{
+  int r;
+
+  // Replay roots
+  for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::iterator p = metablob.roots.begin(); p != metablob.roots.end(); ++p) {
+    EMetaBlob::fullbit &fb = *(*p);
+    inodeno_t ino = fb.inode.ino;
+    dout(4) << __func__ << ": updating root 0x" << std::hex << ino << std::dec << dendl;
+
+    object_t root_oid = InodeStore::get_object_name(ino, frag_t(), ".inode");
+    dout(4) << __func__ << ": object id " << root_oid.name << dendl;
+
+    bufferlist inode_bl;
+    r = io.read(root_oid.name, inode_bl, (1<<22), 0);
+    InodeStore inode;
+    if (r == -ENOENT) {
+      dout(4) << __func__ << ": root does not exist, will create" << dendl;
+    } else {
+      dout(4) << __func__ << ": root exists, will modify (" << inode_bl.length() << ")" << dendl;
+      // TODO: add some kind of force option so that we can overwrite bad inodes
+      // from the journal if needed
+      bufferlist::iterator inode_bl_iter = inode_bl.begin(); 
+      std::string magic;
+      ::decode(magic, inode_bl_iter);
+      if (magic == CEPH_FS_ONDISK_MAGIC) {
+        dout(4) << "magic ok" << dendl;
+      } else {
+        dout(4) << "magic bad: '" << magic << "'" << dendl;
+      }
+      inode.decode(inode_bl_iter);
+    }
+
+    // This is a distant cousin of EMetaBlob::fullbit::update_inode, but for use
+    // on an offline InodeStore instance.  It's way simpler, because we are just
+    // uncritically hauling the data between structs.
+    inode.inode = fb.inode;
+    inode.xattrs = fb.xattrs;
+    inode.dirfragtree = fb.dirfragtree;
+    inode.snap_blob = fb.snapbl;
+    inode.symlink = fb.symlink;
+    inode.old_inodes = fb.old_inodes;
+
+    inode_bl.clear();
+    std::string magic = CEPH_FS_ONDISK_MAGIC;
+    ::encode(magic, inode_bl);
+    inode.encode(inode_bl);
+
+    if (!dry_run) {
+      r = io.write_full(root_oid.name, inode_bl);
+      assert(r == 0);
+    }
+  }
+
+  // TODO: respect metablob.renamed_dirino (cues us as to which dirlumps
+  // indicate renamed directories)
+
+  // Replay fullbits (dentry+inode)
+  for (list<dirfrag_t>::iterator lp = metablob.lump_order.begin(); lp != metablob.lump_order.end(); ++lp) {
+    dirfrag_t &frag = *lp;
+    EMetaBlob::dirlump &lump = metablob.lump_map[frag];
+    lump._decode_bits();
+    object_t frag_object_id = InodeStore::get_object_name(frag.ino, frag.frag, "");
+
+    // Check for presence of dirfrag object
+    uint64_t psize;
+    time_t pmtime;
+    r = io.stat(frag_object_id.name, &psize, &pmtime);
+    if (r == -ENOENT) {
+      dout(4) << "Frag object " << frag_object_id.name << " did not exist, will create" << dendl;
+    } else if (r != 0) {
+      // FIXME: what else can happen here?  can I deal?
+      assert(r == 0);
+    } else {
+      dout(4) << "Frag object " << frag_object_id.name << " exists, will modify" << dendl;
+    }
+
+    // Write fnode to omap header of dirfrag object
+    bufferlist fnode_bl;
+    lump.fnode.encode(fnode_bl);
+    if (!dry_run) {
+      r = io.omap_set_header(frag_object_id.name, fnode_bl);
+      if (r != 0) {
+        derr << "Failed to write fnode for frag object " << frag_object_id.name << dendl;
+        return r;
+      }
+    }
+
+    // Try to get the existing dentry
+    list<ceph::shared_ptr<EMetaBlob::fullbit> > &fb_list = lump.get_dfull();
+    for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::iterator fbi = fb_list.begin(); fbi != fb_list.end(); ++fbi) {
+      EMetaBlob::fullbit &fb = *(*fbi);
+
+      // Get a key like "foobar_head"
+      std::string key;
+      dentry_key_t dn_key(fb.dnlast, fb.dn.c_str());
+      dn_key.encode(key);
+
+      // See if the dentry is present
+      std::set<std::string> keys;
+      keys.insert(key);
+      std::map<std::string, bufferlist> vals;
+      r = io.omap_get_vals_by_keys(frag_object_id.name, keys, &vals);
+      assert (r == 0);  // I assume success because I checked object existed and absence of 
+                        // dentry gives me empty map instead of failure
+                        // FIXME handle failures so we can replay other events
+                        // if this one is causing some unexpected issue
+    
+      if (vals.find(key) == vals.end()) {
+        dout(4) << "dentry " << key << " does not exist, will be created" << dendl;
+      } else {
+        dout(4) << "dentry " << key << " existed already" << dendl;
+        // TODO: read out existing dentry before adding new one so that
+        // we can print a bit of info about what we're overwriting
+      }
+    
+      bufferlist dentry_bl;
+      ::encode(fb.dnfirst, dentry_bl);
+      ::encode('I', dentry_bl);
+
+      InodeStore inode;
+      inode.inode = fb.inode;
+      inode.xattrs = fb.xattrs;
+      inode.dirfragtree = fb.dirfragtree;
+      inode.snap_blob = fb.snapbl;
+      inode.symlink = fb.symlink;
+      inode.old_inodes = fb.old_inodes;
+      inode.encode_bare(dentry_bl);
+      
+      vals[key] = dentry_bl;
+      if (!dry_run) {
+        r = io.omap_set(frag_object_id.name, vals);
+        assert(r == 0);  // FIXME handle failures
+      }
+    }
+
+    list<EMetaBlob::nullbit> &nb_list = lump.get_dnull();
+    for (list<EMetaBlob::nullbit>::const_iterator
+       iter = nb_list.begin(); iter != nb_list.end(); ++iter) {
+      EMetaBlob::nullbit const &nb = *iter;
+
+      // Get a key like "foobar_head"
+      std::string key;
+      dentry_key_t dn_key(nb.dnlast, nb.dn.c_str());
+      dn_key.encode(key);
+
+      // Remove it from the dirfrag
+      dout(4) << "Removing dentry " << key << dendl;
+      std::set<std::string> keys;
+      keys.insert(key);
+      if (!dry_run) {
+        r = io.omap_rm_keys(frag_object_id.name, keys);
+        assert(r == 0);
+      }
+    }
+  }
+
+  for (std::vector<inodeno_t>::iterator i = metablob.destroyed_inodes.begin();
+       i != metablob.destroyed_inodes.end(); ++i) {
+    dout(4) << "Destroyed inode: " << *i << dendl;
+    // TODO: if it was a dir, then delete its dirfrag objects
+  }
+
+  return 0;
+}
+
+
+/**
+ * Erase a region of the log by overwriting it with ENoOp
+ *
+ */
+int JournalTool::erase_region(uint64_t const pos, uint64_t const length)
+{
+  // To erase this region, we use our preamble, the encoding overhead
+  // of an ENoOp, and our trailing start ptr.  Calculate how much padding
+  // is needed inside the ENoOp to make up the difference.
+  bufferlist tmp;
+  ENoOp enoop(0);
+  enoop.encode_with_header(tmp);
+
+  dout(4) << "erase_region " << pos << " len=" << length << dendl;
+
+  // FIXME: get the preamble/postamble length via JournalStream
+  int32_t padding = length - tmp.length() - sizeof(uint32_t) - sizeof(uint64_t) - sizeof(uint64_t);
+  dout(4) << "erase_region padding=0x" << std::hex << padding << std::dec << dendl;
+
+  if (padding < 0) {
+    derr << "Erase region " << length << " too short" << dendl;
+    return -EINVAL;
+  }
+
+  // Serialize an ENoOp with the correct amount of padding
+  enoop = ENoOp(padding);
+  bufferlist entry;
+  enoop.encode_with_header(entry);
+  JournalStream stream(JOURNAL_FORMAT_RESILIENT);
+
+  // Serialize region of log stream
+  bufferlist log_data;
+  stream.write(entry, log_data, pos);
+
+  dout(4) << "erase_region data length " << log_data.length() << dendl;
+  assert(log_data.length() == length);
+
+  // Write log stream region to RADOS
+  // FIXME: get object size somewhere common to scan_events
+  uint32_t object_size = g_conf->mds_log_segment_size;
+  if (object_size == 0) {
+    // Default layout object size
+    object_size = g_default_file_layout.fl_object_size;
+  }
+
+  uint64_t write_offset = pos;
+  uint64_t obj_offset = (pos / object_size);
+  int r = 0;
+  while(log_data.length()) {
+    std::string const oid = JournalScanner::obj_name(obj_offset, rank);
+    uint32_t offset_in_obj = write_offset % object_size;
+    uint32_t write_len = min(log_data.length(), object_size - offset_in_obj);
+
+    r = io.write(oid, log_data, write_len, offset_in_obj);
+    if (r < 0) {
+      return r;
+    } else {
+      dout(4) << "Wrote " << write_len << " bytes to " << oid << dendl;
+      r = 0;
+    }
+     
+    log_data.splice(0, write_len);
+    write_offset += write_len;
+    obj_offset++;
+  }
+
+  return r;
+}
+
diff --git a/src/tools/cephfs/JournalTool.h b/src/tools/cephfs/JournalTool.h
new file mode 100644 (file)
index 0000000..96683b7
--- /dev/null
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "MDSUtility.h"
+#include <vector>
+
+#include "mds/mdstypes.h"
+#include "mds/LogEvent.h"
+
+#include "include/rados/librados.hpp"
+
+#include "JournalFilter.h"
+
+class EMetaBlob;
+
+
+/**
+ * Command line tool for investigating and repairing filesystems
+ * with damaged metadata logs
+ */
+class JournalTool : public MDSUtility
+{
+  private:
+    int rank;
+
+    // Entry points
+    int main_journal(std::vector<const char*> &argv);
+    int main_header(std::vector<const char*> &argv);
+    int main_event(std::vector<const char*> &argv);
+
+    // Shared functionality
+    int recover_journal();
+
+    // Journal operations
+    int journal_inspect();
+    int journal_export(std::string const &path, bool import);
+    int journal_reset();
+
+    // Header operations
+    int header_set();
+
+    // I/O handles
+    librados::Rados rados;
+    librados::IoCtx io;
+
+    // Metadata backing store manipulation
+    int replay_offline(EMetaBlob &metablob, bool const dry_run);
+
+    // Splicing
+    int erase_region(uint64_t const pos, uint64_t const length);
+
+  public:
+    void usage();
+    JournalTool() :
+      rank(0) {}
+    int main(std::vector<const char*> &argv);
+};
+
diff --git a/src/tools/cephfs/MDSUtility.cc b/src/tools/cephfs/MDSUtility.cc
new file mode 100644 (file)
index 0000000..5633c92
--- /dev/null
@@ -0,0 +1,160 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "MDSUtility.h"
+#include "mon/MonClient.h"
+
+#define dout_subsys ceph_subsys_mds
+
+
+MDSUtility::MDSUtility() :
+  Dispatcher(g_ceph_context),
+  objecter(NULL),
+  lock("MDSUtility::lock"),
+  timer(g_ceph_context, lock),
+  waiting_for_mds_map(NULL)
+{
+  monc = new MonClient(g_ceph_context);
+  messenger = Messenger::create(g_ceph_context, entity_name_t::CLIENT(), "mds", getpid());
+  mdsmap = new MDSMap();
+  osdmap = new OSDMap();
+  objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer, 0, 0);
+}
+
+
+MDSUtility::~MDSUtility()
+{
+  delete objecter;
+  delete monc;
+  delete messenger;
+  delete osdmap;
+  delete mdsmap;
+  assert(waiting_for_mds_map == NULL);
+}
+
+
+int MDSUtility::init()
+{
+  // Initialize Messenger
+  int r = messenger->bind(g_conf->public_addr);
+  if (r < 0)
+    return r;
+
+  messenger->add_dispatcher_head(this);
+  messenger->start();
+
+  // Initialize MonClient
+  if (monc->build_initial_monmap() < 0)
+    return -1;
+
+  monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD|CEPH_ENTITY_TYPE_MDS);
+  monc->set_messenger(messenger);
+  monc->init();
+  r = monc->authenticate();
+  if (r < 0) {
+    derr << "Authentication failed, did you specify an MDS ID with a valid keyring?" << dendl;
+    return r;
+  }
+
+  client_t whoami = monc->get_global_id();
+  messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+
+  // Initialize Objecter and wait for OSD map
+  objecter->set_client_incarnation(0);
+  objecter->init_unlocked();
+  lock.Lock();
+  objecter->init_locked();
+  lock.Unlock();
+  objecter->wait_for_osd_map();
+  timer.init();
+
+  // Prepare to receive MDS map and request it
+  Mutex init_lock("MDSUtility:init");
+  Cond cond;
+  bool done = false;
+  assert(!mdsmap->get_epoch());
+  lock.Lock();
+  waiting_for_mds_map = new C_SafeCond(&init_lock, &cond, &done, NULL);
+  lock.Unlock();
+  monc->sub_want("mdsmap", 0, CEPH_SUBSCRIBE_ONETIME);
+  monc->renew_subs();
+
+  // Wait for MDS map
+  dout(4) << "waiting for MDS map..." << dendl;
+  init_lock.Lock();
+  while (!done)
+    cond.Wait(init_lock);
+  init_lock.Unlock();
+  dout(4) << "Got MDS map " << mdsmap->get_epoch() << dendl;
+
+  return 0;
+}
+
+
+void MDSUtility::shutdown()
+{
+  lock.Lock();
+  timer.shutdown();
+  objecter->shutdown_locked();
+  lock.Unlock();
+  objecter->shutdown_unlocked();
+  monc->shutdown();
+  messenger->shutdown();
+  messenger->wait();
+}
+
+
+bool MDSUtility::ms_dispatch(Message *m)
+{
+   Mutex::Locker locker(lock);
+   switch (m->get_type()) {
+   case CEPH_MSG_OSD_OPREPLY:
+     objecter->handle_osd_op_reply((MOSDOpReply *)m);
+     break;
+   case CEPH_MSG_OSD_MAP:
+     objecter->handle_osd_map((MOSDMap*)m);
+     break;
+   case CEPH_MSG_MDS_MAP:
+     handle_mds_map((MMDSMap*)m);
+     break;
+   default:
+     return false;
+   }
+   return true;
+}
+
+
+void MDSUtility::handle_mds_map(MMDSMap* m)
+{
+  mdsmap->decode(m->get_encoded());
+  if (waiting_for_mds_map) {
+    waiting_for_mds_map->complete(0);
+    waiting_for_mds_map = NULL;
+  }
+}
+
+
+bool MDSUtility::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+                         bool force_new)
+{
+  if (dest_type == CEPH_ENTITY_TYPE_MON)
+    return true;
+
+  if (force_new) {
+    if (monc->wait_auth_rotating(10) < 0)
+      return false;
+  }
+
+  *authorizer = monc->auth->build_authorizer(dest_type);
+  return *authorizer != NULL;
+}
diff --git a/src/tools/cephfs/MDSUtility.h b/src/tools/cephfs/MDSUtility.h
new file mode 100644 (file)
index 0000000..d3f938e
--- /dev/null
@@ -0,0 +1,58 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef MDS_UTILITY_H_
+#define MDS_UTILITY_H_
+
+#include "osd/OSDMap.h"
+#include "osdc/Objecter.h"
+#include "mds/MDSMap.h"
+#include "messages/MMDSMap.h"
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+#include "auth/Auth.h"
+
+/// MDS Utility
+/**
+ * This class is the parent for MDS utilities, i.e. classes that
+ * need access the objects belonging to the MDS without actually
+ * acting as an MDS daemon themselves.
+ */
+class MDSUtility : public Dispatcher {
+protected:
+  Objecter *objecter;
+  OSDMap *osdmap;
+  MDSMap *mdsmap;
+  Messenger *messenger;
+  MonClient *monc;
+
+  Mutex lock;
+  SafeTimer timer;
+
+  Context *waiting_for_mds_map;
+
+public:
+  MDSUtility();
+  ~MDSUtility();
+
+  void handle_mds_map(MMDSMap* m);
+  bool ms_dispatch(Message *m);
+  bool ms_handle_reset(Connection *con) { return false; }
+  void ms_handle_remote_reset(Connection *con) {}
+  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+                         bool force_new);
+  int init();
+  void shutdown();
+};
+
+#endif /* MDS_UTILITY_H_ */
diff --git a/src/tools/cephfs/Resetter.cc b/src/tools/cephfs/Resetter.cc
new file mode 100644 (file)
index 0000000..0191bef
--- /dev/null
@@ -0,0 +1,115 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2010 Greg Farnum <gregf@hq.newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "osdc/Journaler.h"
+#include "mds/mdstypes.h"
+#include "mon/MonClient.h"
+#include "mds/events/EResetJournal.h"
+
+#include "Resetter.h"
+
+
+int Resetter::init(int rank)
+{
+  int r = MDSUtility::init();
+  if (r < 0) {
+    return r;
+  }
+
+  inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
+  journaler = new Journaler(ino,
+      mdsmap->get_metadata_pool(),
+      CEPH_FS_ONDISK_MAGIC,
+      objecter, 0, 0, &timer);
+
+  return 0;
+}
+
+void Resetter::reset()
+{
+  Mutex mylock("Resetter::reset::lock");
+  Cond cond;
+  bool done;
+  int r;
+
+  lock.Lock();
+  journaler->recover(new C_SafeCond(&mylock, &cond, &done, &r));
+  lock.Unlock();
+
+  mylock.Lock();
+  while (!done)
+    cond.Wait(mylock);
+  mylock.Unlock();
+
+  if (r != 0) {
+    if (r == -ENOENT) {
+      cerr << "journal does not exist on-disk. Did you set a bad rank?"
+          << std::endl;
+      shutdown();
+      return;
+    } else {
+      cerr << "got error " << r << "from Journaler, failling" << std::endl;
+      shutdown();
+      return;
+    }
+  }
+
+  lock.Lock();
+  uint64_t old_start = journaler->get_read_pos();
+  uint64_t old_end = journaler->get_write_pos();
+  uint64_t old_len = old_end - old_start;
+  cout << "old journal was " << old_start << "~" << old_len << std::endl;
+
+  uint64_t new_start = ROUND_UP_TO(old_end+1, journaler->get_layout_period());
+  cout << "new journal start will be " << new_start
+       << " (" << (new_start - old_end) << " bytes past old end)" << std::endl;
+
+  journaler->set_read_pos(new_start);
+  journaler->set_write_pos(new_start);
+  journaler->set_expire_pos(new_start);
+  journaler->set_trimmed_pos(new_start);
+  journaler->set_writeable();
+
+  cout << "writing journal head" << std::endl;
+  journaler->write_head(new C_SafeCond(&mylock, &cond, &done, &r));
+  lock.Unlock();
+
+  mylock.Lock();
+  while (!done)
+    cond.Wait(mylock);
+  mylock.Unlock();
+    
+  lock.Lock();
+  assert(r == 0);
+
+  LogEvent *le = new EResetJournal;
+
+  bufferlist bl;
+  le->encode_with_header(bl);
+  
+  cout << "writing EResetJournal entry" << std::endl;
+  journaler->append_entry(bl);
+  journaler->flush(new C_SafeCond(&mylock, &cond, &done,&r));
+
+  lock.Unlock();
+
+  mylock.Lock();
+  while (!done)
+    cond.Wait(mylock);
+  mylock.Unlock();
+
+  assert(r == 0);
+
+  cout << "done" << std::endl;
+}
diff --git a/src/tools/cephfs/Resetter.h b/src/tools/cephfs/Resetter.h
new file mode 100644 (file)
index 0000000..5664de3
--- /dev/null
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2010 Greg Farnum <gregf@hq.newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef JOURNAL_RESETTER_H_
+#define JOURNAL_RESETTER_H_
+
+
+#include "osdc/Journaler.h"
+#include "MDSUtility.h"
+
+/**
+ * This class lets you reset an mds journal for troubleshooting or whatever.
+ *
+ * To use, create a Resetter, call init(), and then call reset() with the name
+ * of the file to dump to.
+ */
+class Resetter : public MDSUtility {
+public:
+  Journaler *journaler;
+
+  Resetter() : journaler(NULL) {}
+
+  int init(int rank);
+  void reset();
+};
+
+#endif /* JOURNAL_RESETTER_H_ */
diff --git a/src/tools/cephfs/cephfs-journal-tool.cc b/src/tools/cephfs/cephfs-journal-tool.cc
new file mode 100644 (file)
index 0000000..7b0f778
--- /dev/null
@@ -0,0 +1,49 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+
+#include "include/types.h"
+#include "common/config.h"
+#include "common/ceph_argparse.h"
+#include "common/errno.h"
+#include "global/global_init.h"
+
+#include "JournalTool.h"
+
+
+int main(int argc, const char **argv)
+{
+  vector<const char*> args;
+  argv_to_vec(argc, argv, args);
+  env_to_vec(args);
+
+  global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+  common_init_finish(g_ceph_context);
+
+  JournalTool jt;
+  int rc = jt.init();
+  if (rc != 0) {
+      std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl;
+      return rc;
+  }
+
+  rc = jt.main(args);
+  if (rc != 0) {
+    std::cerr << "Error (" << cpp_strerror(rc) << ")" << std::endl;
+  }
+
+  jt.shutdown();
+
+  return rc;
+}
+