]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
merged r1566:1626 from trunk into branches/sage/mds
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 13 Aug 2007 19:11:11 +0000 (19:11 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 13 Aug 2007 19:11:11 +0000 (19:11 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1627 29311d96-e01e-0410-9327-a35deaab8ce9

26 files changed:
branches/sage/mds/Makefile
branches/sage/mds/active/README [new file with mode: 0644]
branches/sage/mds/active/activemaster.cc [new file with mode: 0644]
branches/sage/mds/active/activemaster.h [new file with mode: 0644]
branches/sage/mds/active/activeslave.cc [new file with mode: 0644]
branches/sage/mds/active/activeslave.h [new file with mode: 0644]
branches/sage/mds/active/activetaskd [new file with mode: 0755]
branches/sage/mds/active/activetaskd.cc [new file with mode: 0644]
branches/sage/mds/active/activetaskd.h [new file with mode: 0644]
branches/sage/mds/active/client_init.cc [new file with mode: 0644]
branches/sage/mds/active/client_init.h [new file with mode: 0644]
branches/sage/mds/active/common.h [new file with mode: 0644]
branches/sage/mds/active/echotestclient [new file with mode: 0755]
branches/sage/mds/active/echotestclient.cc [new file with mode: 0644]
branches/sage/mds/active/echotestclient.h [new file with mode: 0644]
branches/sage/mds/active/inet.h [new file with mode: 0644]
branches/sage/mds/active/msgtestclient.cc [new file with mode: 0644]
branches/sage/mds/active/msgtestclient.h [new file with mode: 0644]
branches/sage/mds/active/trivial_task.cc [new file with mode: 0644]
branches/sage/mds/active/trivial_task.h [new file with mode: 0644]
branches/sage/mds/active/utility.h [new file with mode: 0644]
branches/sage/mds/common/Thread.h
branches/sage/mds/ebofs/Ebofs.cc
branches/sage/mds/msg/SimpleMessenger.cc
branches/sage/mds/msg/SimpleMessenger.h
branches/sage/mds/osd/ReplicatedPG.cc

index e15b4ab075ce2920598600256ede8578138c92be..acf52719b68bec2ee21375d44b2f1125047260ce 100644 (file)
@@ -158,6 +158,26 @@ cfuse: cfuse.cc client.o osdc.o client/fuse.o client/fuse_ll.o msg/SimpleMesseng
        ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
 
+# code shipping experiments
+activemaster: active/activemaster.cc client.o osdc.o msg/SimpleMessenger.o common.o
+       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
+activeslave: active/activeslave.cc client.o osdc.o msg/SimpleMessenger.o common.o
+       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
+echotestclient: active/echotestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
+       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
+msgtestclient: active/msgtestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
+       ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
+libtrivialtask.so: active/trivial_task.cc client.o osdc.o msg/SimpleMessenger.o common.o
+       ${CC} -fPIC -shared -Wl,-soname,$@.1 ${CFLAGS}  ${LIBS} $^ -o $@
+
+#libhadoopcephfs.so: client/hadoop/CephFSInterface.cc client.o osdc.o msg/SimpleMessenger.o common.o
+#      ${CC} -fPIC -shared -Wl,-soname,$@.1 ${CFLAGS}  ${LIBS} $^ -o $@
+
+
 
 # fake*
 fakefuse: fakefuse.cc mon.o mds.o client.o osd.o osdc.o ebofs.o client/fuse.o client/fuse_ll.o msg/FakeMessenger.o common.o
@@ -233,7 +253,7 @@ osbdb.o: ${OSBDB_OBJS}
        ${CC} -shared -fPIC ${CFLAGS} $< -o $@
 
 %.o: %.cc
-       ${CC} ${CFLAGS} -c $< -o $@
+       ${CC} -fPIC ${CFLAGS} -c $< -o $@
 
 %.po: %.cc
        ${CC} -fPIC ${CFLAGS} -c $< -o $@
diff --git a/branches/sage/mds/active/README b/branches/sage/mds/active/README
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/branches/sage/mds/active/activemaster.cc b/branches/sage/mds/active/activemaster.cc
new file mode 100644 (file)
index 0000000..b4dc742
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * Startup executable for 
+ * Ceph Active Storage. See README for details.
+ *
+ */
+#include "activemaster.h"
+
+
+/*
+ * What main() must do:
+ *
+ *  - start up a Ceph client
+ *  - find the set of OSDs that the file is striped across
+ *  - start up the Map task on each OSD, using ssh
+ *  - eat lunch?
+ *  - start up the Reduce task locally
+ */
+
+int main(int argc, const char* argv[]) {  
+
+  if (argc < 4) {
+    usage(argv[0]);
+    exit(-1);
+  }
+
+  const char* input_filename = argv[1];
+  const char* map_command = argv[2];
+  //const char* reduce_command = argv[3];
+
+  // fire up the client
+  Client* client = startCephClient();
+
+  // open the file as read_only
+  int fh = client->open(input_filename, O_RDONLY);
+  if (fh < 0)    {
+    cout << "The input file " << input_filename << " could not be opened." << endl;
+    exit(-1);
+  }
+
+  // How big is the file?
+  int filesize;
+  struct stat stbuf;
+  if (0 > client->lstat(input_filename, &stbuf))    {
+    cout << "Error: could not retrieve size of input file " << input_filename << endl;
+    exit(-1);
+  }
+  filesize = stbuf.st_size;
+  if (filesize < 1) {
+    cout << "Error: input file size is " << filesize << endl;
+    exit(-1);
+  }
+
+  // retrieve all the object extents
+  list<ObjectExtent> extents;
+  int offset = 0;
+  client->enumerate_layout(fh, extents, filesize, offset);
+  
+  // for each object extent, retrieve the OSD IP address and start up a Map task
+  list<ObjectExtent>::iterator i;
+  map<size_t, size_t>::iterator j;
+  int osd;
+  int start, length;
+  tcpaddr_t tcpaddr;
+
+  for (i = extents.begin(); i != extents.end(); i++)
+    {
+      // find the primary and get its IP address
+      osd = client->osdmap->get_pg_primary(i->pgid);      
+      entity_inst_t inst = client->osdmap->get_inst(osd); 
+      entity_addr_t entity_addr = inst.addr;
+      entity_addr.make_addr(tcpaddr);
+      
+      // iterate through each buffer_extent in the ObjectExtent
+      for (j = i->buffer_extents.begin();
+          j != i->buffer_extents.end(); j++)
+       {
+         // get the range of the buffer_extent
+         start = (*j).first;
+         length = (*j).second;
+         // fire up the Map task
+         start_map_task(map_command, input_filename, start, length, tcpaddr);
+       }
+    }
+  return 0; 
+}
+
+// Fires up the map task.
+// For the moment, all it does is echo the command line, not run it.
+int start_map_task(const char* command, const char* input_filename,
+                  long start, long length,  sockaddr_in ip_address)
+{
+  string ip_addr_string(inet_ntoa(ip_address.sin_addr));
+  
+  
+
+
+
+  cout << "ssh " << ip_addr_string << " " << command 
+       << " " << input_filename << " " << start << " " << length << endl;
+  return 0;
+}
+
+
+
+void usage(const char* name) {
+  cout << "usage: " << name << " inputfile map_task reduce_task" << endl;
+  cout << "inputfile must be a valid path in the running Ceph filesystem." << endl;
+  cout << "map_task should be given with an absolute path, and be present on ";
+  cout << "the REGULAR filesystem every node." << endl;
+  cout << "reduce_task need be present on this node only." << endl;
+}
+
+
+
diff --git a/branches/sage/mds/active/activemaster.h b/branches/sage/mds/active/activemaster.h
new file mode 100644 (file)
index 0000000..524138e
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * This is the master executable to start up
+ * a compute task across several nodes.
+ *
+ *
+ */  
+
+
+//#include <sys/stat.h>
+#include "utility.h"
+
+int start_map_task(const char* command, const char* input_filename, 
+                  long start, long length, tcpaddr_t ip_address);
+
+void usage(const char* name);
+
+//Client* startCephClient();
+//void kill_client(Client* client);
diff --git a/branches/sage/mds/active/activeslave.cc b/branches/sage/mds/active/activeslave.cc
new file mode 100644 (file)
index 0000000..d295349
--- /dev/null
@@ -0,0 +1,510 @@
+/*
+ * This is a slave for receiving and executing commands for 
+ * compute tasks on an OSD. This supersedes the daemon
+ * version in activetaskd.h/cc, because it's easier to debug
+ * if it's not a daemon.
+ *
+ * Networking code is based off examples from Stevens' UNIX Network Programming.
+ */
+
+#include "activeslave.h"
+
+int main(int argc, const char* argv[]) {
+
+  /* Set up TCP server */
+  int sockfd, newsockfd,  childpid;
+  socklen_t clilen;
+  struct sockaddr_in cli_addr, serv_addr;
+
+  //const char *pname = argv[0]; // process name
+
+  // Open a TCP socket
+  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+    cerr << "slave: can't open TCP socket. Exiting." << endl;
+    exit(-1);
+  }
+  cerr << "slave: opened TCP socket." << endl;
+
+  // set up the port
+  bzero((char*) &serv_addr, sizeof(serv_addr));
+  serv_addr.sin_family      = AF_INET;
+  serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+  serv_addr.sin_port        = htons(SERV_TCP_PORT);
+
+  if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
+    cerr << "slave: can't bind local address. Exiting." << endl;
+    exit(-1);
+  } 
+
+  if(listen(sockfd, SOMAXCONN) < 0) {
+    cerr << "slave: listening error. Exiting." << endl;
+    exit(-1);
+  }
+
+
+  /* The Big Loop */
+  while (1) {
+
+    // wait for a message and fork off a child process to handle it
+    clilen = sizeof(cli_addr);
+    newsockfd = accept(sockfd,
+                      (struct sockaddr *) &cli_addr,
+                      &clilen);
+
+    if (newsockfd < 0) {
+      cerr << "slave: accept error. Exiting." << endl;
+      exit(-1);
+    }
+
+    if ((childpid = fork()) < 0) {
+      cerr << "slave: fork error. Exiting." << endl;
+      exit(-1);
+    }
+
+    else if (childpid == 0) { // child process
+      cerr << "Forked child process for incoming socket" << endl;
+      close(sockfd);
+      process_request(newsockfd);
+      cerr << "Finished processing request. Exiting child." << endl;
+      exit(0);
+    }
+    
+    close (newsockfd); // parent
+
+    //sleep(30); /* wait 30 seconds */
+  }
+  exit(EXIT_SUCCESS);
+}
+
+
+/* This will process requests from the master.
+ * The protocol in a nutshell:
+ *   Master opens a socket to slave, and sends
+ * one message.
+ *   Slave replies with one message.
+ *   Socket is closed.
+ */
+
+void process_request(int newsockfd) {
+
+  // first, read the message type.
+  int msg_type = readmsgtype(newsockfd);
+  
+  
+  // Second, call some function based on the message type to process
+  // the rest of the message. The function is responsible for the rest
+  // of the message; this includes checking the message footer.
+
+  switch(msg_type) {
+
+  case PING: // ping
+    process_ping(newsockfd);
+    break;
+  case STARTTASK: // start_task
+    process_start_task(newsockfd);
+    break;
+  case RETRIEVELOCALFILE: // get_local
+    process_get_local(newsockfd);
+    break;
+  case SHIPCODE:
+    process_shipcode(newsockfd);
+    break;
+
+  case PINGREPLY:
+  case FINISHEDTASK:
+  case TASKFAILED:
+  case SENDLOCALFILE:
+  case LOCALFILENOTFOUND:
+  case CODESAVED:
+  case SHIPFAILED:
+    cerr << "activeslave: BUG received message " << CMD_LIST[msg_type] <<
+      " from master; master should never send this message." << endl;
+    exit(-1);
+    break;
+    
+
+  case -1:
+    cerr << "activeslave: message had an unidentifiable type. " <<
+      "Closing socket and discarding rest of message." << endl;
+  default:
+    cerr << "activeslave: BUG! received unexpected return value of" << msg_type <<
+      "from readmsgtype(). Closing socket and discarding rest of message." << endl;
+
+    exit(-1);
+  }
+}
+
+
+// Just write a ping_reply to the socket.
+void process_ping(int fd) {
+
+  // make sure the footer is valid
+  if (!check_footer(fd)) {
+    cerr << "process_ping warning: ping message has invalid or missing footer."
+        << endl;
+  }
+  // Even if the footer's invalid, send the reply. 
+  cerr << "Replying to ping..." << endl;
+  send_msg_header(fd, PINGREPLY);
+  send_msg_footer(fd);
+  cerr << "Ping processing completed." << endl;
+}
+
+
+
+// Process a start_task message. This reads the incoming message and
+// starts the corresponding task.
+
+// Parameter format: taskID(int) command(string) 
+// cephinputfile(string) offset(long) length(long) localoutputfile
+
+// WARNING: currently has the trivial task hardwired. It
+// ignores the command and the output file.
+void process_start_task(int fd) {
+
+  char command[MAX_STRING_SIZE + 1];
+  char cephinputfile[MAX_STRING_SIZE + 1];
+  char localoutputfile[MAX_STRING_SIZE + 1];
+
+  cout << "in process_start_task: ";
+  int taskID = read_positive_int(fd);
+  cout << "read taskID " << taskID;
+
+  read_string(fd, command);
+  cout << ", command " << command;
+
+  read_string(fd, cephinputfile);
+  cout << ", cephinputfile " << cephinputfile;
+  off_t offset = read_off_t(fd);
+  cout << ", offset " << offset;
+  off_t length = read_off_t(fd);
+  cout << ", length " << length;
+
+  read_string(fd, localoutputfile);
+  cout << ", localoutputfile " << localoutputfile << endl;
+
+  // make sure the footer is valid
+  if (!check_footer(fd)) {
+    cerr << "process_start_task warning: message has invalid or missing footer. "
+        << "Discarding message." << endl;
+    exit(-1);
+  }
+  
+
+  // To do: modify to load the task from a library instead of just
+  // using the hardwired one.
+
+  void (*task)(const char*, const char*, off_t, off_t) = 0;
+  task = start_trivial_task;
+
+
+  // start a task; create an output filename that uses the task ID, 'cause we might
+  // end up with multiple pieces of a file on each OSD.
+  // WARNING: always does the trivial task; prints answer to stdout but
+  // does not write it to disk.
+  cerr << "starting task: " << endl;
+  //start_trivial_task(cephinputfile, localoutputfile, offset, length);
+  task(cephinputfile, localoutputfile, offset, length);
+  cerr << "returned from task! Sending reply:" << endl;
+
+
+
+  // send the reply
+  send_msg_header(fd, FINISHEDTASK);
+  write_positive_int(fd, taskID);
+  send_msg_footer(fd);
+
+  // done
+  cout << "Done sending reply for taskID " << taskID << endl;
+  return;
+}
+
+
+
+void start_trivial_task (const char* ceph_filename, const char* local_filename, 
+                        off_t offset, off_t length) {
+  // Don't bother to copy the file to disk. Read the file directly from Ceph,
+  // and add up all the bytes.
+  // Write the total to the local file as a string.
+    Client * client = startCephClient();
+
+    bufferptr bp(CHUNK);
+
+    // get the source file's size. Sanity-check the request range.
+    struct stat st;
+    int r = client->lstat(ceph_filename, &st);
+    assert (r == 0);
+    
+    off_t src_total_size = st.st_size;
+    if (src_total_size < offset + length) {
+      cerr << "Error in copy ExtentToLocalFile: offset + length = " << offset << " + " << length
+          << " = " + (offset + length) << ", source file size is only " << src_total_size << endl;
+      exit(-1);
+    }
+    off_t remaining = length;
+    
+    // open the file and seek to the start position
+    cerr << "start_trivial_task: opening the source file and seeking " << endl;
+
+    int fh_ceph = client->open(ceph_filename, O_RDONLY);
+    assert (fh_ceph > -1); 
+    r = client->lseek(fh_ceph, offset, SEEK_SET);
+    assert (r == offset);
+    
+    int counter = 0;
+    // read through the extent and add up the bytes
+    cerr << "start_trivial_task: counting up bytes" << endl;
+    char* bp_c = bp.c_str();
+    while (remaining > 0) {
+      off_t got = client->read(fh_ceph, bp_c, MIN(remaining,CHUNK), -1);
+      assert(got > 0);
+      remaining -= got;
+      for (off_t i = 0; i < got; ++i) {
+       counter += (unsigned int)(bp_c[i]);
+      }
+    }
+    cerr << "start_trivial_task: Done! Answer is " << counter << endl;
+    client->close(fh_ceph);
+        
+    //assert(0);
+}
+
+
+// Starts a sloppy grep count of the hardwired search string over the
+// given Ceph file extent. It's sloppy because it copies the given
+// extent to a local file and runs "grep" on it, with no effort to take
+// care of boundary issues.
+void start_sloppy_grepcount (const char* ceph_filename, const char* local_filename,
+                            long offset, long size) {
+
+  Client* client = startCephClient();
+  char* search_string = "the";
+  // copy the file to a local file. 
+
+  copyExtentToLocalFile (client, ceph_filename, offset, size, local_filename);
+  // we want: grep -c search_string local_filename
+  // to get the number of occurrences of the string.
+  string command = "";
+  command.append("grep -c ");
+  command.append(search_string);
+  command.append(local_filename);
+
+  assert(0);
+
+}
+
+
+// Processes a SHIPCODE message. The message will have a shared
+// library attached to it, which must be stored locally.
+
+void process_shipcode(int fd) {
+
+
+  // get the size of the shared library
+  size_t library_size = read_size_t(fd);
+
+
+  // save the library to a file
+  cerr << "saving library..." << endl;
+  const char* libfile = "/tmp/libslavetask.so";
+  int local_fd = ::open(libfile, O_WRONLY | O_CREAT | O_TRUNC);
+  if (local_fd < 0) {
+    cerr << "Error opening " << libfile << " for writing." << endl;
+    exit(-1);
+  }
+
+  off_t remaining = library_size;
+
+  bufferptr bp(CHUNK);
+  char* bp_c = bp.c_str();
+  while (remaining > 0) {
+    off_t got = readn(fd, bp_c, MIN(remaining, CHUNK));
+    assert(got > 0);
+    remaining -= got;
+    ssize_t written = ::write(local_fd, bp_c, got);
+    assert (written == got);
+  }
+  cerr << "Received shared library and stored as " << libfile << endl;
+
+}
+
+
+// Processes a get_local message. The message
+// specifies the filename of a local file to
+// return to the sender.
+
+// Parameter format: requestID(int) localfilename(string)
+
+// INCOMPLETE: currently just reads the message.
+
+
+void process_get_local(int fd) {
+  cout << "in process_get_local: ";
+  int taskID = read_positive_int(fd);
+  cout << "read taskID " << taskID;
+
+  char localfilename[MAX_STRING_SIZE+1];
+  read_string(fd, localfilename);
+  cout << ", localfilename " << localfilename << endl;
+
+
+  // make sure the footer is valid
+  if (!check_footer(fd)) {
+    cerr << "process_get_local warning: message has invalid or missing footer."
+        << endl;
+  }
+
+  // not implemented
+  cerr << "Error: get_local command unimplemented." << endl;
+  assert(0);
+}
+
+
+// Retrieves a formatted message from the socket.
+// At the moment, this just reads and prints a fixed-
+// length message type.
+// DEPRECATED.
+void str_getmsg(int sockfd) {
+  
+  int  n;
+
+  // read message types until the connection dies
+  while(true) {
+    n = readmsgtype(sockfd);
+    if (n != 0) {
+      cerr << "from getmsg: some sort of error" << endl;
+      exit(-1);
+    }
+  }
+}
+
+// Echo a stream socket message back to the sender.
+// DEPRECATED.
+void str_echo(int sockfd) {
+  
+  int  n;
+  char line[MAXLINE];
+
+  while(true) {
+
+    // read from the stream
+    cerr << "str_echo: waiting for a line" << endl;
+    n = readline(sockfd, line, MAXLINE);
+    cerr << "str_echo: read a line" << endl;
+    if (0 == n) {
+      cerr << "str_echo: connection terminated" << endl;
+      return; // connection is terminated
+    }
+    else if (n < 0) {
+      cerr << "str_echo: readline error" << endl;
+      exit(-1);
+    }
+
+    // write back to the stream
+    if (n != writen(sockfd, line, n)) {
+      cerr << "str_echo: writen error" << endl;
+      exit(-1);
+    }
+    else
+      cerr << "Echoed line " << endl;
+  }
+}
+
+
+void str_ack(int sockfd) {
+  
+  int  n;
+  char line[MAXLINE];
+  //char *ack = "ack";
+
+  while(true) {
+
+    // read from the stream
+    n = readline(sockfd, line, MAXLINE);
+
+    if (0 == n)
+      return; // connection is terminated
+    else if (n < 0)
+      //err_dump("str_echo: readline error");
+      exit(-1);
+
+    // write back to the stream
+    if (4 != writen(sockfd, "ack\n", 4))
+      //err_dump("str_echo: writen error");
+      exit(-1);
+  }
+}
+
+
+
+// Read command lines from the socket and execute them
+
+void str_run(int sockfd) {
+  
+  int  n;
+  char line[MAXLINE];
+  char* error_msg = "str_run: No command interpreter found\n";
+  char* ack_msg = "Running command... ";
+  char* commit_msg = "Command executed!\n";
+
+  while(true) {
+
+    // read from the stream
+    n = readline(sockfd, line, MAXLINE);
+
+    if (0 == n)
+      return; // connection is terminated
+    else if (n < 0)
+      //err_dump("str_echo: readline error");
+      exit(-1);
+
+    if (system(NULL)) {
+      writen(sockfd, ack_msg, strlen(ack_msg));
+      system(line);
+      writen(sockfd, commit_msg, strlen(commit_msg));
+    }
+    else if ((int)strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) 
+      //err_dump("str_echo: writen error");
+      exit(-1);
+  }
+}
+
+
+// take a filename and copy it from Ceph to a local directory.
+// Not completed.
+
+void str_copytolocal(int sockfd) {
+  
+  int  n;
+  char line[MAXLINE];
+  char* error_msg = "str_copy: No command interpreter found\n";
+  char* ack_msg = "Running command... ";
+  char* commit_msg = "Command executed!\n";
+  //char* temp_dir = "/tmp";
+
+
+  while(true) {
+
+    // read from the stream
+    n = readline(sockfd, line, MAXLINE);
+
+    if (0 == n)
+      return; // connection is terminated
+    else if (n < 0)
+      //err_dump("str_echo: readline error");
+      exit(-1);
+
+    if (system(NULL)) {
+      writen(sockfd, ack_msg, strlen(ack_msg));
+      system(line);
+      writen(sockfd, commit_msg, strlen(commit_msg));
+    }
+    else if ((int)strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) 
+      //err_dump("str_echo: writen error");
+      exit(-1);
+  }
+}
+
+
+
diff --git a/branches/sage/mds/active/activeslave.h b/branches/sage/mds/active/activeslave.h
new file mode 100644 (file)
index 0000000..574824b
--- /dev/null
@@ -0,0 +1,23 @@
+#include "inet.h"
+#include "common.h"
+#include "utility.h"
+#include "client/Client.h"
+
+
+// The port number is "osdd" on a telephone keypad.
+#define SERV_TCP_PORT 6733
+
+#define MAXLINE 512
+
+void str_echo(int sockfd);
+void str_ack(int sockfd);
+void str_run(int sockfd);
+void str_getmsg(int sockfd);
+void process_request(int newsockfd);
+void process_ping(int fd);
+void process_start_task(int fd);
+void process_get_local(int fd);
+void process_shipcode(int fd);
+
+void start_trivial_task(const char* ceph_filename, const char* local_filename,
+                       long offset, long length);
diff --git a/branches/sage/mds/active/activetaskd b/branches/sage/mds/active/activetaskd
new file mode 100755 (executable)
index 0000000..7347ab6
Binary files /dev/null and b/branches/sage/mds/active/activetaskd differ
diff --git a/branches/sage/mds/active/activetaskd.cc b/branches/sage/mds/active/activetaskd.cc
new file mode 100644 (file)
index 0000000..ec9f290
--- /dev/null
@@ -0,0 +1,241 @@
+/*
+ * This is a daemon for receiving and executing commands for compute tasks on an OSD.
+ *
+ * The daemon uses skeleton code from
+ * http://www.linuxprofilm.com/articles/linux-daemon-howto.html. The
+ * site is no longer up, but can be seen through the archive.org.
+ * Networking code is based off examples from Stevens' UNIX Network Programming.
+ */
+
+#include "activetaskd.h"
+
+
+#define SERVER
+
+#undef SERVER
+
+int main(int argc, const char* argv[]) {
+        
+  /* Our process ID and Session ID */
+  pid_t pid, sid;
+        
+  /* Fork off the parent process */
+  pid = fork();
+  if (pid < 0) {
+    exit(EXIT_FAILURE);
+  }
+  /* If we got a good PID, then
+     we can exit the parent process. */
+  if (pid > 0) {
+    exit(EXIT_SUCCESS);
+  }
+
+  /* Change the file mode mask */
+  umask(0);
+                
+  /* Open any logs here */        
+                
+  /* Create a new SID for the child process */
+  sid = setsid();
+  if (sid < 0) {
+    /* Log the failure */
+    exit(EXIT_FAILURE);
+  }
+        
+        
+  /* Change the current working directory */
+  if ((chdir("/")) < 0) {
+    /* Log the failure */
+    exit(EXIT_FAILURE);
+  }
+        
+  /* Close out the standard file descriptors */
+  close(STDIN_FILENO);
+  close(STDOUT_FILENO);
+  close(STDERR_FILENO);
+        
+  /* Daemon-specific initialization goes here */
+
+
+
+  /* Set up TCP server */
+  int sockfd, newsockfd,  childpid;
+  socklen_t clilen;
+  struct sockaddr_in cli_addr, serv_addr;
+
+  const char *pname = argv[0]; // process name
+
+  // Open a TCP socket
+  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
+    exit(-1);
+  //err_dump("server: can't open stream socket");
+
+  // set up the port
+  bzero((char*) &serv_addr, sizeof(serv_addr));
+  serv_addr.sin_family      = AF_INET;
+  serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+  serv_addr.sin_port        = htons(SERV_TCP_PORT);
+
+  if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
+    exit(-1);
+    //err_dump("server: can't bind local address");
+
+  if(listen(sockfd, SOMAXCONN) < 0)
+    exit(-1);
+  //err_dump("server: listening error");
+
+  /* The Big Loop */
+  while (1) {
+
+    // wait for a message and fork off a child process to handle it
+    clilen = sizeof(cli_addr);
+    newsockfd = accept(sockfd,
+                      (struct sockaddr *) &cli_addr,
+                      &clilen);
+
+    if (newsockfd < 0)
+      exit(-1);
+      //err_dump("server: accept error");
+
+    if ( (childpid = fork()) < 0)
+      exit(-1);
+    // err_dump("server: fork error");
+
+    else if (childpid == 0) { // child process
+      close(sockfd);
+      //str_echo(newsockfd);
+      str_run(newsockfd);
+      // insert code to process the request
+      exit(0);
+    }
+    
+    close (newsockfd); // parent
+
+    //sleep(30); /* wait 30 seconds */
+  }
+  exit(EXIT_SUCCESS);
+}
+
+
+// Echo a stream socket message back to the sender.
+
+void str_echo(int sockfd) {
+  
+  int  n;
+  char line[MAXLINE];
+
+  while(true) {
+
+    // read from the stream
+    n = readline(sockfd, line, MAXLINE);
+
+    if (0 == n)
+      return; // connection is terminated
+    else if (n < 0)
+      //err_dump("str_echo: readline error");
+      exit(-1);
+
+    // write back to the stream
+    if (n != writen(sockfd, line, n)) 
+      //err_dump("str_echo: writen error");
+      exit(-1);
+  }
+}
+
+
+void str_ack(int sockfd) {
+  
+  int  n;
+  char line[MAXLINE];
+  char *ack = "ack";
+
+  while(true) {
+
+    // read from the stream
+    n = readline(sockfd, line, MAXLINE);
+
+    if (0 == n)
+      return; // connection is terminated
+    else if (n < 0)
+      //err_dump("str_echo: readline error");
+      exit(-1);
+
+    // write back to the stream
+    if (4 != writen(sockfd, "ack\n", 4))
+      //err_dump("str_echo: writen error");
+      exit(-1);
+  }
+}
+
+
+
+
+// Read command lines from the socket and execute them
+
+void str_run(int sockfd) {
+  
+  int  n;
+  char line[MAXLINE];
+  char* error_msg = "str_run: No command interpreter found\n";
+  char* ack_msg = "Running command... ";
+  char* commit_msg = "Command executed!\n";
+
+  while(true) {
+
+    // read from the stream
+    n = readline(sockfd, line, MAXLINE);
+
+    if (0 == n)
+      return; // connection is terminated
+    else if (n < 0)
+      //err_dump("str_echo: readline error");
+      exit(-1);
+
+    if (system(NULL)) {
+      writen(sockfd, ack_msg, strlen(ack_msg));
+      system(line);
+      writen(sockfd, commit_msg, strlen(commit_msg));
+    }
+    else if (strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) 
+      //err_dump("str_echo: writen error");
+      exit(-1);
+  }
+}
+
+
+// take a filename and copy it from Ceph to a local directory
+
+void str_copytolocal(int sockfd) {
+  
+  int  n;
+  char line[MAXLINE];
+  char* error_msg = "str_copy: No command interpreter found\n";
+  char* ack_msg = "Running command... ";
+  char* commit_msg = "Command executed!\n";
+  char* temp_dir = "/tmp";
+
+
+  while(true) {
+
+    // read from the stream
+    n = readline(sockfd, line, MAXLINE);
+
+    if (0 == n)
+      return; // connection is terminated
+    else if (n < 0)
+      //err_dump("str_echo: readline error");
+      exit(-1);
+
+    if (system(NULL)) {
+      writen(sockfd, ack_msg, strlen(ack_msg));
+      system(line);
+      writen(sockfd, commit_msg, strlen(commit_msg));
+    }
+    else if (strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) 
+      //err_dump("str_echo: writen error");
+      exit(-1);
+  }
+}
+
+
+
diff --git a/branches/sage/mds/active/activetaskd.h b/branches/sage/mds/active/activetaskd.h
new file mode 100644 (file)
index 0000000..fc5cec9
--- /dev/null
@@ -0,0 +1,14 @@
+#include "inet.h"
+#include "common.h"
+#include "utility.h"
+#include "client/Client.h"
+
+
+// The port number is "osdd" on a telephone keypad.
+#define SERV_TCP_PORT 6733
+
+#define MAXLINE 512
+
+void str_echo(int sockfd);
+void str_ack(int sockfd);
+void str_run(int sockfd);
diff --git a/branches/sage/mds/active/client_init.cc b/branches/sage/mds/active/client_init.cc
new file mode 100644 (file)
index 0000000..8b13789
--- /dev/null
@@ -0,0 +1 @@
+
diff --git a/branches/sage/mds/active/client_init.h b/branches/sage/mds/active/client_init.h
new file mode 100644 (file)
index 0000000..139597f
--- /dev/null
@@ -0,0 +1,2 @@
+
+
diff --git a/branches/sage/mds/active/common.h b/branches/sage/mds/active/common.h
new file mode 100644 (file)
index 0000000..36f862b
--- /dev/null
@@ -0,0 +1,95 @@
+#ifndef COMMON_H
+#define COMMON_H
+
+
+#include <sys/stat.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <unistd.h>
+#include <syslog.h>
+#include <string.h>
+#include <iostream>
+#include <string>
+
+// a bunch of string constants
+// for commands
+
+
+
+
+#define CMDLENGTH 10
+#define CMDCOUNT 11
+
+#define MAX_STRING_SIZE 255
+
+/*
+ * These are the various messages that can be sent between the master
+ * and slave. The slave sends one reply to each message from the master.
+
+ * PING/PINGREPLY: just what it sounds like.
+
+ * STARTTASK: starts a task. Needs to be reworked to allow code
+ * shipping. The slave attempts to perform the task, and replies with
+ * FINISHEDTASK or TASKFAILED.
+ *
+ * RETRIEVELOCALFILE: requests a file that the slave has stored
+ * locally. Slave replies with SENDLOCALFILE and the file, or with
+ * LOCALFILENOTFOUND.
+ * 
+ * SHIPCODE: sends a shared library to the slave, containing a
+ * function that is to be executed later by the STARTTASK
+ * command. Slave replies with CODESAVED or SHIPFAILED.
+ * 
+ */ 
+
+
+const off_t CHUNK = 1024 * 1024 * 4;
+
+#define PING              0
+#define STARTTASK         1
+#define RETRIEVELOCALFILE 2
+#define PINGREPLY         3
+#define FINISHEDTASK      4
+#define TASKFAILED        5
+#define SENDLOCALFILE     6
+#define LOCALFILENOTFOUND 7
+#define SHIPCODE          8
+#define CODESAVED         9
+#define SHIPFAILED        10
+
+
+#define FOOTER_LENGTH 7
+
+const char* CMD_LIST[CMDCOUNT] = {"______PING",
+                                    "START_TASK",
+                                    "_GET_LOCAL",
+                                    "PING_REPLY",
+                                    "_TASK_DONE",
+                                    "TASKFAILED",
+                                    "SEND_LOCAL",
+                                    "LOCAL_GONE",
+                                    "_SHIP_CODE",
+                                    "CODE_SAVED",
+                                    "SHIPFAILED"};
+
+const char FOOTER[FOOTER_LENGTH + 1] = "MSG_END";
+
+
+// const char* strArray[] = {"string1", "string2", "string3"};
+//const char commands[2][4]  = {"foo", "bar"};
+
+
+// error codes
+#define ARGUMENTSINVALID 1001
+#define CEPHCLIENTSTARTUPFAILED 1002
+#define INPUTFILEREADFAILED 1003
+
+
+// const char* name = "Njal";
+
+
+
+#endif //COMMON_H
diff --git a/branches/sage/mds/active/echotestclient b/branches/sage/mds/active/echotestclient
new file mode 100755 (executable)
index 0000000..60c44bd
Binary files /dev/null and b/branches/sage/mds/active/echotestclient differ
diff --git a/branches/sage/mds/active/echotestclient.cc b/branches/sage/mds/active/echotestclient.cc
new file mode 100644 (file)
index 0000000..2b2d15e
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * This is merely a test of an echo server; it's an early step in
+ * building up the Ceph distributed compute service. This is
+ * discardable once the next stage is up and running.
+ *
+ * Code is based off examples in Stevens' "Unix Network Programming".
+ */
+
+#include "echotestclient.h"
+
+int main(int argc, char* argv[]) {
+  
+  int sockfd;
+  struct sockaddr_in serv_addr;
+
+  char* pname = argv[0];
+
+  bzero((char *) &serv_addr, sizeof(serv_addr));
+  serv_addr.sin_family = AF_INET;
+  serv_addr.sin_addr.s_addr = inet_addr(SERV_HOST_ADDR);
+  serv_addr.sin_port = htons(SERV_TCP_PORT);
+
+  
+  // open a TCP socket
+  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+    printf("client: can't open stream socket");
+    exit (-1);
+  }
+
+  // connect to the server.
+  if (connect(sockfd, (struct sockaddr *) &serv_addr,
+             sizeof(serv_addr)) < 0) {
+    printf("client: can't connect to server");
+    exit (-1);
+  }
+  
+  // start the test echoer
+  str_cli(stdin, sockfd);
+      
+  
+  close (sockfd);
+  exit(0);
+}
+
+
+void str_cli(FILE *fp, int sockfd) {
+
+  int n;
+  char sendline[MAXLINE], recvline[MAXLINE + 1];
+
+  // read from the fp and write to the socket;
+  // then read from the socket and write to stdout
+  while (fgets(sendline, MAXLINE, fp) != NULL) {
+
+    n = strlen(sendline);
+    if (writen(sockfd, sendline, n) != n) {
+      printf("str_cli: writen error on socket");
+      exit(-1);
+    }
+    n = readline(sockfd, recvline, MAXLINE);
+    if (n < 0) {
+      printf("str_cli: readline error");
+      exit(-1);
+    }
+    recvline[n] = 0;
+    fputs(recvline, stdout);
+  }
+
+  if (ferror(fp)) {
+    printf("str_cli: error reading file");
+    exit(-1);
+  }
+
+}
diff --git a/branches/sage/mds/active/echotestclient.h b/branches/sage/mds/active/echotestclient.h
new file mode 100644 (file)
index 0000000..dd59124
--- /dev/null
@@ -0,0 +1,10 @@
+#include "inet.h"
+#include "common.h"
+#include "utility.h"
+
+#define SERV_HOST_ADDR "128.114.57.143" //issdm-8
+#define SERV_TCP_PORT 6733
+#define MAXLINE 512
+
+void str_cli(FILE *fp, int sockfd);
+
diff --git a/branches/sage/mds/active/inet.h b/branches/sage/mds/active/inet.h
new file mode 100644 (file)
index 0000000..385fa91
--- /dev/null
@@ -0,0 +1,9 @@
+/*
+ *  Generic TCP/IP definitions
+ */
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
diff --git a/branches/sage/mds/active/msgtestclient.cc b/branches/sage/mds/active/msgtestclient.cc
new file mode 100644 (file)
index 0000000..48cb90c
--- /dev/null
@@ -0,0 +1,417 @@
+/*
+ * This test client tests the sending of message headers to the slave.
+ *
+ * Code is based off examples in Stevens' "Unix Network Programming".
+ */
+
+#include "msgtestclient.h"
+#define REQUIRED_ARGS 2
+
+int main(int argc, char* argv[]) {
+  
+
+  // make sure we have all the arguments we need
+  if (argc < REQUIRED_ARGS) { usage(argv[0]);  exit(-1); }
+
+  // This file is rewired for running tests from a
+  // shell script. The first parameter specifies the
+  // name of the Ceph file that the test will be
+  // run on; the second parameter specifies which of
+  // four different tests will be run.
+  const char* input_filename = argv[1];
+  int test_number = atoi(argv[2]);
+  assert (test_number > 0);
+  assert (test_number < 4);
+
+  //const char* map_command = argv[2];
+  // These two variables aren't really used yet.
+  const char* map_command = "map_foo";
+  const char* output_filename = "out_foo";
+  //const char* output_filename = argv[3];
+  //const char* reduce_command = argv[4]; // not implemented yet
+  
+  // start up a Ceph client
+  Client* client = startCephClient();
+
+  // open the input file as read_only
+  int fh = client->open(input_filename, O_RDONLY);
+  if (fh < 0)    {
+    cerr << "The input file " << input_filename << " could not be opened." << endl;
+    exit(-1);
+  }
+
+  // How big is the file?
+  off_t filesize;
+  struct stat stbuf;
+  if (0 > client->lstat(input_filename, &stbuf))    {
+    cerr << "Error: could not retrieve size of input file " << input_filename << endl;
+    exit(-1);
+  }
+  filesize = stbuf.st_size;
+  if (filesize < 1) {
+    cerr << "Error: input file size is " << filesize << endl;
+    exit(-1);
+  }
+
+  // retrieve all the object extents
+  list<ObjectExtent> extents;
+  off_t offset = 0;
+  client->enumerate_layout(fh, extents, filesize, offset);
+
+  list<ObjectExtent>::iterator i;
+  map<size_t, size_t>::iterator j;
+  int osd;
+  int taskID = 0;
+
+  // Pull out all the extents, and make a vector of
+  // (ip_address, start, length).
+
+  vector<request_split> original_splits;
+
+  for (i = extents.begin(); i != extents.end(); i++) {
+
+    request_split split;
+    // find the primary and get its IP address
+    osd = client->osdmap->get_pg_primary(i->layout.pgid);      
+    entity_inst_t inst = client->osdmap->get_inst(osd); 
+    entity_addr_t entity_addr = inst.addr;
+    entity_addr.make_addr(split.ip_address);        
+
+    // iterate through each buffer_extent in the ObjectExtent
+    for (j = i->buffer_extents.begin();
+        j != i->buffer_extents.end(); j++) {
+
+      // get the range of the buffer_extent
+      split.start = (*j).first;
+      split.length = (*j).second;
+      // throw the split onto the vector
+      original_splits.push_back(split);
+    }
+  }
+
+  // close the client - we're done with it
+  client->shutdown();
+
+  // sanity check: display the splits
+  cerr << "Listing original splits:" << endl;
+  for (vector<request_split>::iterator i = original_splits.begin();
+       i != original_splits.end(); i++) {
+    cerr << "Split: IP " << i->ip_address << ", start " 
+        << i->start << ", length " << i->length << endl;
+  }
+
+  vector<request_split> test_splits;
+  // Now, modify the splits as needed for the test type.
+  // There are three types of tests.
+  // Test 1: regular test.
+  // Test 2: put all the tasks on the "wrong" OSD.
+  // Test 3: do the entire job off one node.
+
+  if (1 == test_number) {
+    cerr << "Test type 1: using original splits." << endl;
+    test_splits = original_splits;
+  }
+  else if (2 == test_number) {
+    cerr << "Test type 2: rotating split IP addresses. " << endl;
+    int split_count = original_splits.size();
+     for (int i = 0; i < split_count; ++i) {
+       request_split s;
+       s.start = original_splits.at(i).start;
+       s.length = original_splits.at(i).length;
+       s.ip_address = original_splits.at((i+1)%split_count).ip_address;
+       test_splits.push_back(s);
+     }
+  }
+  else if (3 == test_number) {
+    cerr << "Test type 3: one giant split." << endl;
+    request_split s;
+    s.start = 0;
+    s.length = filesize;
+    s.ip_address = original_splits.at(0).ip_address;
+    test_splits.push_back(s);
+  }
+  else {
+    cerr << "Error: received invalid test type " << test_number << endl;
+    exit(-1);
+  }
+
+  cerr << "Listing test splits:" << endl;
+  for (vector<request_split>::iterator i = test_splits.begin();
+       i != test_splits.end(); i++) {
+    cerr << "Split: IP " << i->ip_address << ", start " 
+        << i->start << ", length " << i->length << endl;
+  }
+     
+  // start the timer
+  utime_t start_time = g_clock.now();
+  int pending_tasks = 0;
+
+  // start up the tasks
+  for (vector<request_split>::iterator i = test_splits.begin();
+       i != test_splits.end(); i++) {
+    start_map_task(i->ip_address, taskID++, map_command, input_filename,
+                  i->start, i->length, output_filename);
+    ++pending_tasks;
+  }
+
+
+  // wait for all the tasks to finish
+  while (pending_tasks > 0) {
+    int exit_status;
+    cerr << "Waiting for " << pending_tasks << " tasks to return..." << endl;
+    pid_t pid = wait(&exit_status);
+    if (pid < 0) {
+      cerr << "ERROR on wait(): result was " << pid << endl;
+      exit(-1);
+    }
+    --pending_tasks;
+    if (WIFEXITED(exit_status)) {
+      cerr << "Task with pid " << pid << " returned with exit status " << 
+      WEXITSTATUS(exit_status) << endl;
+    }
+    else { cerr << "WARNING: Task with pid " << pid << " exited abnormally" << endl; }
+  }
+
+  cerr << "All tasks have returned." << endl;
+  // report the time
+  double elapsed_time;
+  elapsed_time = (g_clock.now() - start_time);
+  cerr << "Elapsed time: " << elapsed_time << endl;
+  cerr << elapsed_time << " " << endl;
+  // send the time to stdout for the shell script
+  cout << elapsed_time << " ";
+  exit(0);
+}
+
+
+// sends a complete ping message
+// through the file descriptor
+// and waits for a reply. This
+// will hang if there's no reply.
+
+void ping_test(int fd) {
+
+  // send the message header and footer.
+  // A ping message has no body.
+  send_msg_header(fd, PING);
+  send_msg_footer(fd);
+
+  // receive the reply.
+  int msg_type = readmsgtype(fd);
+  if (msg_type < 0) {
+    cerr << "ping_test: Failed reading the ping reply. Exiting." << endl;
+    exit(-1);
+  }
+  if (PINGREPLY != msg_type) {
+    assert((msg_type <= 0) && (msg_type < CMDCOUNT) && 
+          "readmsgtype return value out of range");
+    cerr << "ping_test: slave sent invalid reply: replied to ping with message type" << 
+      msg_type << ": " << CMD_LIST[msg_type] << ". Exiting. " << endl;
+    exit(-1);
+  }
+  else {
+    cerr << "Received valid ping reply!" << endl;
+  }
+      
+  if(!check_footer(fd)) {
+    cerr << "ping_test: message footer not found. Exiting." << endl;
+    exit(-1);
+  }
+}
+
+
+
+
+// send a test message for starting a task
+void start_task_test(int fd) {
+
+  // The test:
+  // TaskID 42
+  // command: "Burninate"
+  // input file: "countryside"
+  // offset: 8764 (TROG)
+  // length: 367 (DOR)
+
+  send_start_task_msg(fd, 42, strlen("Burninate"), "Burninate",
+                     strlen("countryside"), "countryside",
+                     8764, 367,
+                     strlen("toast"), "toast");
+}
+
+
+// sends a message to the fd telling it to start a task.
+// Remember: the message format requires any string to be
+// prefixed by its (unterminated) length.
+void send_start_task_msg(int fd,
+                        int taskID,
+                        int command_size, const char* command,
+                        int inputfilenamesize, const char* inputfilename,
+                        off_t offset,
+                        off_t length,
+                        int outputfilenamesize, const char* outputfilename) {
+
+  // write the header and the message to the file descriptor.
+
+  send_msg_header(fd, STARTTASK);
+
+  write_positive_int(fd, taskID);
+  write_positive_int(fd, command_size);
+  write_string(fd, command);
+  write_positive_int(fd, inputfilenamesize);
+  write_string(fd, inputfilename);
+  //write_long(fd, offset);
+  write_off_t (fd, offset);
+  //write_long(fd, length);
+  write_off_t (fd, length);
+  write_positive_int(fd, outputfilenamesize);
+  write_string(fd, outputfilename);
+
+  // terminate the message
+  send_msg_footer(fd);
+}
+
+
+
+
+// creates a new connection to the slave
+// at the given IP address and port.
+// Overloaded to take an IP address as a
+// string or as an in_addr_t.
+
+int create_new_connection(const char* ip_address, uint16_t port)
+{
+  in_addr_t ip = inet_addr(ip_address);
+  if ((in_addr_t)-1 == ip) {
+    cerr << "Error creating new connection: \"" << ip_address << 
+      "\" is not a valid IP address." << endl;
+    return -1;
+  }
+  else
+    //cerr << "Opening connection to " << ip_address << ":" << endl;
+    return create_new_connection(ip, port);
+}
+
+
+int create_new_connection(in_addr_t ip_address, uint16_t port) {
+
+  struct sockaddr_in serv_addr;
+  int sockfd;
+
+  bzero((char *) &serv_addr, sizeof(serv_addr));
+  serv_addr.sin_family = AF_INET;
+  //serv_addr.sin_addr.s_addr = inet_addr(SERV_HOST_ADDR);
+  serv_addr.sin_addr.s_addr = ip_address;
+  serv_addr.sin_port = htons(SERV_TCP_PORT);
+  // open a TCP socket
+  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+    cerr << "msgtestclient: can't open stream socket. Exiting." << endl;
+    exit (-1);
+  }
+
+  // connect to the server.
+  if (connect(sockfd, (struct sockaddr *) &serv_addr,
+             sizeof(serv_addr)) < 0) {
+    cerr << "msgtestclient: can't connect to server." << endl;
+    exit (-1);
+  }
+  //cerr << "opened connection!" << endl;
+  return sockfd;
+}
+
+void msg_type_sender(int sockfd) {
+
+  for (int i = 0; i < CMDCOUNT; ++i) {
+    send_msg_header(sockfd, i);
+  }
+
+}
+
+// Fires up the map task.
+// For the moment, all it does is echo the command line, not run it.
+int start_map_task(sockaddr_in ip_address, int taskID, 
+                  const char* command, const char* input_filename,
+                  off_t start, off_t length, 
+                  const char* output_filename)
+{
+  int childpid;
+  // fork off a child process to do the work, and return
+  if ((childpid = fork()) < 0) {
+    cerr << "start_map_task: fork error. Exiting." << endl;
+    exit(-1);
+  }
+  
+  else if (childpid != 0) { // parent
+    cerr << "start_map_task: forked child process "
+        << childpid << " to start task. " << endl;
+    return 0;
+  }
+      
+  
+  string ip_addr_string(inet_ntoa(ip_address.sin_addr));
+  //  cerr << "command: " << ip_addr_string << " taskID " 
+  //   << taskID << ": " << command 
+  //   << " " << input_filename << " " << start << " " << length 
+  //   << " " << output_filename << endl;
+
+  // open a socket to the slave, and send the message
+  //cerr << "Sending message: " << endl;
+  int sockfd = create_new_connection(ip_addr_string.c_str(), SERV_TCP_PORT);
+  send_start_task_msg(sockfd, taskID, strlen(command), command,
+                     strlen(input_filename), input_filename,
+                     start, length,
+                     strlen(output_filename), output_filename);
+
+  // wait for a reply
+  cerr << "Sent message for taskID " << taskID << ". Waiting for reply..." << endl;
+
+  // receive the reply.
+  int msg_type = readmsgtype(sockfd);
+  if (msg_type < 0) {
+    cerr << "start_map_task: Failed reading the reply. Exiting." << endl;
+    exit(-1);
+  }
+  if (FINISHEDTASK != msg_type) {
+    assert((msg_type <= 0) && (msg_type < CMDCOUNT));
+    cerr << "start_map_task: slave sent invalid reply: replied with message type" << 
+      msg_type << ": " << CMD_LIST[msg_type] << ". Exiting. " << endl;
+    exit(-1);
+  }
+  // read the taskID of the reply
+  
+  int reply_taskID = read_positive_int(sockfd);
+      
+  if(!check_footer(sockfd)) {
+    cerr << "ping_test: message footer not found. Exiting." << endl;
+    exit(-1);
+  }  
+  
+  // done!
+  close(sockfd);
+  cerr << "Task " << taskID << "/" << reply_taskID << 
+    " complete! Ending child process." << endl;
+  exit(0);
+  //_exit(0);
+  cerr << "exit(0) returned. Strange things are afoot." << endl;
+}
+
+
+
+
+void usage(const char* name) {
+  //cout << "usage: " << name << " inputfile map_task outputfile" << endl;
+  //cout << "inputfile must be a valid path in the running Ceph filesystem." << endl;
+  //cout << "map_task should be given with an absolute path, and be present on ";
+  //cout << "the REGULAR filesystem every node." << endl;
+  //cout << "output_file will be written locally to the node." << endl;
+
+  cout << "usage: " << name << " inputfile test_number" << endl;
+  cout << "inputfile must be a valid path in the running Ceph filesystem." << endl;
+  cout << "test_number must be 1, 2, or 3." << endl;
+  cout << "    1: run the test task normally (one slave per OSD)" << endl;
+  cout << "    2: run the test task on the \"wrong\" OSDs" << endl;
+  cout << "    3: run the entire task in a single process" << endl;
+}
+
+
+
diff --git a/branches/sage/mds/active/msgtestclient.h b/branches/sage/mds/active/msgtestclient.h
new file mode 100644 (file)
index 0000000..568c905
--- /dev/null
@@ -0,0 +1,44 @@
+#include "inet.h"
+#include "common.h"
+#include "utility.h"
+#include "client/Client.h"
+
+// wait.h MUST NOT be #included before client/Client.h
+#include <sys/wait.h>
+#include <vector>
+
+  struct request_split {
+    tcpaddr_t ip_address;
+    off_t start;
+    off_t length;
+  };
+
+
+//#define SERV_HOST_ADDR "128.114.57.143" //issdm-8
+#define SERV_HOST_ADDR "128.114.57.166" //issdm-31
+
+#define SERV_TCP_PORT 6733
+#define MAXLINE 512
+
+void msg_type_sender(int sockfd);
+
+
+int create_new_connection(const char* ip_address, uint16_t port);
+int create_new_connection(in_addr_t ip_address, uint16_t port);
+void usage(const char* name);
+void ping_test(int fd);
+void start_task_test(int fd);
+
+int start_map_task(sockaddr_in ip_address, int taskID,
+                  const char* map_command, 
+                  const char* input_filename,
+                  off_t start, off_t length,
+                  const char* output_filename);
+
+void send_start_task_msg(int fd,
+                        int taskID,
+                        int command_size, const char* command,
+                        int inputfilenamesize, const char* inputfilename,
+                        off_t offset,
+                        off_t length,
+                        int outputfilenamesize, const char* outputfilename);
diff --git a/branches/sage/mds/active/trivial_task.cc b/branches/sage/mds/active/trivial_task.cc
new file mode 100644 (file)
index 0000000..7a72ecb
--- /dev/null
@@ -0,0 +1,50 @@
+#include "trivial_task.h"
+
+void start_trivial_task (const char* ceph_filename, const char* local_filename, 
+                        off_t offset, off_t length) {
+  // Don't bother to copy the file to disk. Read the file directly from Ceph,
+  // and add up all the bytes.
+  // Write the total to the local file as a string.
+    Client * client = startCephClient();
+
+    bufferptr bp(CHUNK);
+
+    // get the source file's size. Sanity-check the request range.
+    struct stat st;
+    int r = client->lstat(ceph_filename, &st);
+    assert (r == 0);
+    
+    off_t src_total_size = st.st_size;
+    if (src_total_size < offset + length) {
+      cerr << "Error in copy ExtentToLocalFile: offset + length = " << offset << " + " << length
+          << " = " + (offset + length) << ", source file size is only " << src_total_size << endl;
+      exit(-1);
+    }
+    off_t remaining = length;
+    
+    // open the file and seek to the start position
+    cerr << "start_trivial_task: opening the source file and seeking " << endl;
+
+    int fh_ceph = client->open(ceph_filename, O_RDONLY);
+    assert (fh_ceph > -1); 
+    r = client->lseek(fh_ceph, offset, SEEK_SET);
+    assert (r == offset);
+    
+    int counter = 0;
+    // read through the extent and add up the bytes
+    cerr << "start_trivial_task: counting up bytes" << endl;
+    char* bp_c = bp.c_str();
+    while (remaining > 0) {
+      off_t got = client->read(fh_ceph, bp_c, MIN(remaining,CHUNK), -1);
+      assert(got > 0);
+      remaining -= got;
+      for (off_t i = 0; i < got; ++i) {
+       counter += (unsigned int)(bp_c[i]);
+      }
+    }
+    cerr << "start_trivial_task: Done! Answer is " << counter << endl;
+    client->close(fh_ceph);
+        
+    //assert(0);
+}
+
diff --git a/branches/sage/mds/active/trivial_task.h b/branches/sage/mds/active/trivial_task.h
new file mode 100644 (file)
index 0000000..ce9b47c
--- /dev/null
@@ -0,0 +1,12 @@
+// Shared library for the trivial task of adding up all the bytes in a file
+
+//#include "inet.h"
+#include "common.h"
+#include "utility.h"
+#include "client/Client.h"
+
+
+extern "C" void start_trivial_task (const char* ceph_filename,
+                                   const char* local_filename, 
+                                   off_t offset, off_t length);
+
diff --git a/branches/sage/mds/active/utility.h b/branches/sage/mds/active/utility.h
new file mode 100644 (file)
index 0000000..6e219d7
--- /dev/null
@@ -0,0 +1,446 @@
+/*
+ * Miscellaneous Active OSD helper functions.
+ *
+ */
+
+//#include <sys/stat.h>
+#include "client/Client.h"
+#include "common.h"
+#include "config.h"
+#include "common/Timer.h"
+#include "msg/SimpleMessenger.h"
+
+
+Client* startCephClient();
+void kill_client(Client* client);
+
+
+int readline(int fd, char *ptr, int maxlen);
+int writen(int fd, const char *ptr, int nbytes);
+int send_msg_header(int fd, int header_ID);
+int readmsgtype(int fd);
+bool check_footer(int fd);
+int send_msg_header(int fd, int header_ID);
+int send_msg_footer(int fd);
+int read_positive_int(int fd);
+long read_long(int fd);
+string read_string(int fd, string *buf);
+bool write_positive_int(int fd, int value);
+off_t read_off_t(int fd);
+
+/*
+ * Fires up a Ceph client and returns a pointer to it.
+ */ 
+
+Client* startCephClient()
+{
+  dout(3) << "ActiveMaster: Initializing Ceph client:" << endl;
+  
+  // parse args from CEPH_ARGS, not command line 
+  vector<char*> args; 
+  env_to_vec(args);
+  parse_config_options(args);
+
+  if (g_conf.clock_tare) g_clock.tare();
+
+  // be safe
+  g_conf.use_abspaths = true;
+
+  // load monmap
+  MonMap monmap;
+  int r = monmap.read(".ceph_monmap");
+  if (r < 0) {
+    dout(0) << "ActiveMaster: could not find .ceph_monmap" << endl; 
+    return 0;
+  }
+  assert(r >= 0);
+
+  // start up network
+  rank.start_rank();
+
+  // start client
+  Client *client;
+  client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
+  client->init();
+    
+  // mount
+  client->mount();
+   
+  return client;
+}
+
+void kill_client (Client * client)
+{ 
+  client->unmount();
+  client->shutdown();
+  delete client;
+  
+  // wait for messenger to finish
+  rank.wait();
+}
+
+
+// Read n bytes from a descriptor.
+int readn(int fd, char * ptr, int nbytes)
+{
+  int nleft, nread;
+
+  nleft = nbytes;
+  while (nleft > 0) {
+    nread = read(fd, ptr, nleft);
+    if (nread < 0)
+      return nread; // error
+    else if (nread == 0)
+      break;
+
+    nleft -= nread;
+    ptr += nread;
+  }
+  return (nbytes - nleft);
+}
+
+// Read a line from the socket. This is horrifically slow, as
+// it goes one character at a time to catch the newline.
+
+int readline(int fd, char *ptr, int maxlen) {
+
+  int n, rc;
+  char c;
+
+  for (n = 1; n < maxlen; ++n) {
+    if ( (rc = read(fd, &c, 1)) == 1) {
+      *ptr++ = c;
+      if (c == '\n')
+       break;
+    } 
+    else if (rc == 0) {
+      if (n == 1)
+       return 0; // EOF, no data
+      else
+       break; // EOF, data read
+    }
+    else
+      return -1; // error
+
+  }
+
+  // null-terminate the string and return the number of bytes read
+  *ptr = 0;
+  return n;
+}
+
+// write n bytes to a stream file descriptor.
+
+int writen(int fd, const char *ptr, int nbytes) {
+  int nleft, nwritten;
+
+  nleft = nbytes;
+
+  // write until everything's sent
+  while (nleft > 0) {
+    nwritten = write(fd, ptr, nleft);
+    if (nwritten <= 0) {
+      cerr << "writen: error writing " << nbytes << 
+       "bytes to file descriptor " << fd << endl;
+      return nwritten; //error
+    }
+    nleft -= nwritten;
+    ptr += nwritten;
+  }
+  assert (0 == nleft);
+  return (nbytes - nleft);
+}
+
+
+
+// read a message type from the socket, and print it.
+
+int readmsgtype(int fd) {
+  int rc;
+  char typebuf[CMDLENGTH + 1];
+
+  rc = read(fd, &typebuf, CMDLENGTH);
+  
+  // read a fixed-length text command
+  if (rc != CMDLENGTH) {
+    cerr << "in readmsgtype: read error: result is " << rc << endl;
+    return -1;
+  }
+
+  // null-terminate the string
+  typebuf[CMDLENGTH] = 0;
+
+  // print the command
+  //cerr << "readmsgtype: text type is " << typebuf << ", " ;
+
+  // figure out which one it is, by number
+  for (int i = 0; i < CMDCOUNT; ++i) {
+    if (!strcmp(typebuf, CMD_LIST[i])) {
+      //cerr << "which is identified as type " << i << endl;
+      return i;
+    } 
+ }
+  
+  // if we get here the type was invalid
+  cerr << "readmsgtype: unrecognized message type " << typebuf << endl;
+  return -1;
+}
+
+// Attempt to read the message footer off
+// the given stream.
+bool check_footer(int fd) {
+
+  // leave space for null termination
+  char footer_buf[FOOTER_LENGTH+1];
+
+  // read the footer
+  int rc = read(fd, &footer_buf, FOOTER_LENGTH);
+  if (rc != FOOTER_LENGTH) {
+    cerr << "in check_footer: read error: result is " << rc << endl;
+    return false;
+  }
+
+  // null-terminate the string
+  footer_buf[FOOTER_LENGTH] = 0;
+
+  // Is the footer correct?
+  if (0 == strcmp(footer_buf, FOOTER))
+    return true;
+  else
+    return false;
+}
+
+
+// Attempt to read a positive signed integer off the given stream.
+// This function assumes that the sender and receiver use the same
+// integer size. If this is false, weird stuff will happen.
+int read_positive_int(int fd) {
+
+  char buf[sizeof(int)];
+  int rc = read(fd, &buf, sizeof(int));
+  if (rc != sizeof(int)) {
+    cerr << "in read_positive_int: read error: result is " << rc <<
+        ". Exiting process. " << endl;
+    exit(-1);
+  }
+  return *((int*)buf);
+}
+
+long read_long(int fd) {
+  char buf[sizeof(long)];
+  int rc = read(fd, &buf, sizeof(long));
+  if (rc != sizeof(long)) {
+    cerr << "in read_long: read error: result is " << rc <<
+        ". Exiting process. " << endl;
+    exit(-1);
+  }
+  return *((long*)buf);
+}
+
+
+off_t read_off_t(int fd) {
+  char buf[sizeof(off_t)];
+  int rc = read(fd, &buf, sizeof(off_t));
+  if (rc != sizeof(off_t)) {
+    cerr << "in read_off_t: read error: result is " << rc <<
+        ". Exiting process. " << endl;
+    exit(-1);
+  }
+  return *((off_t*)buf);
+}
+
+size_t read_size_t(int fd) {
+  char buf[sizeof(size_t)];
+  int rc = read(fd, &buf, sizeof(size_t));
+  if (rc != sizeof(size_t)) {
+    cerr << "in read_size_t: read error: result is " << rc <<
+        ". Exiting process. " << endl;
+    exit(-1);
+  }
+  return *((size_t*)buf);
+}
+
+
+
+
+// attempt to write an integer to the given
+// file descriptor.
+bool write_positive_int(int fd, int value) {
+  
+  char* buf = (char*)(&(value));
+  int rc = writen(fd, buf, sizeof(int));
+  
+  if (rc != sizeof(int)) {
+    cerr << "in write_positive_int: write failed, result is " << rc <<
+      ", sizeof(int) is " << sizeof(int) << ". Exiting process." << endl;
+    exit(-1);
+  }
+
+  return true;
+}
+
+// attempt to write a long integer to the given
+// file descriptor.
+bool write_long(int fd, long value) {
+  
+  char* buf = (char*)(&(value));
+  int rc = writen(fd, buf, sizeof(long));
+  
+  if (rc != sizeof(long)) {
+    cerr << "in write_long: write failed, result is " << rc <<
+      ", sizeof(long) is " << sizeof(long) << ". Exiting process." << endl;
+    exit(-1);
+  }
+
+  return true;
+}
+
+
+// attempt to write a long integer to the given
+// file descriptor.
+bool write_off_t(int fd, off_t value) {
+  
+  char* buf = (char*)(&(value));
+  int rc = writen(fd, buf, sizeof(off_t));
+  
+  if (rc != sizeof(off_t)) {
+    cerr << "in writeoff_t: write failed, result is " << rc <<
+      ", sizeof(off_t) is " << sizeof(off_t) << ". Exiting process." << endl;
+    exit(-1);
+  }
+
+  return true;
+}
+
+
+
+
+// read a string from the given file descriptor.
+// The expected format is an int n denoting the
+// length of the string, followed by a series of n
+// bytes, not null-terminated.
+void read_string(int fd, char* buf) {  
+
+  // get the size of the string
+  int size = read_positive_int(fd);
+  if (size < 1) {
+    cerr << "Error in read_string: invalid string size of " << size << endl;
+    exit(-1);
+      }
+  if (size > MAX_STRING_SIZE) {
+    cerr << "Error in read_string: string size of " << size << "is more than maximum of"
+        << MAX_STRING_SIZE << endl;
+    exit(-1);
+  }
+
+  // read the string
+  int result = readn(fd, buf, size);
+  if (result != size) {
+    cerr << "Error in read_string: attempted read size was " << size << 
+      ", result was " << result << endl;
+    exit(-1);
+  }
+  // null-terminate
+  buf[size] = 0;
+
+  cerr << "in read_string: read string \"" << buf << "\" of size " << size << endl;
+
+}
+
+
+// send a fixed-length message header
+// given the header's ID.
+int send_msg_header(int fd, int header_ID) {
+  if ((header_ID < 0) || (header_ID >= CMDCOUNT)) {
+    cerr << "In send_msg_header: received out-of-range header ID " << header_ID <<
+      ". Exiting process." << endl;
+    exit(-1);
+  }
+
+  //cerr << "attempting to send message " << CMD_LIST[header_ID] << 
+  //  " with ID " << header_ID << endl;
+
+  if (CMDLENGTH != writen(fd, CMD_LIST[header_ID], CMDLENGTH)) {
+    cerr << "In send_msg_header: error writing header ID " << header_ID << 
+      "to file descriptor " << fd << ". Exiting process." << endl;
+    exit(-1);
+  }
+
+  return 0;
+}
+
+// send the fixed-length message footer.
+int send_msg_footer(int fd) {
+  //cerr << "attempting to send message footer: " << endl;
+  if (FOOTER_LENGTH != writen(fd, FOOTER, FOOTER_LENGTH)) {
+    cerr << "in send_msg_footer: error writing footer to file descriptor " <<
+      fd << ". Exiting process." << endl;
+    exit(-1);
+  } else {
+    //cerr << "Sent message footer!" << endl; 
+  }
+  return 0;
+}
+
+
+// Writes a string to a stream file descriptor.
+// Dies loudly and horribly on any error.
+bool write_string(int fd, const char* buf) {
+
+  int length = strlen(buf);
+  assert (length >= 0);
+  int result = writen(fd, buf, length);
+  if (result != length) {
+    cerr << "Error in write_string: string length is " << length << 
+      ", result is " << result << endl;
+    exit(-1);
+  }
+
+  return true;
+}
+
+
+// Copy a given extent of a Ceph file to the local disk.
+// Requires a running Ceph client.
+void copyExtentToLocalFile (Client* client, const char* ceph_source,
+                           long offset, long length,
+                           const char* local_destination) {
+
+  // get the source file's size. Sanity-check the request range.
+  struct stat st;
+  int r = client->lstat(ceph_source, &st);
+  assert (r == 0);
+
+  off_t src_total_size = st.st_size;
+  if (src_total_size < offset + length) {
+    cerr << "Error in copy ExtentToLocalFile: offset + size = " << offset << " + " << length
+        << " = " + (offset + length) << ", source file size is only " << src_total_size << endl;
+    exit(-1);
+  }
+  off_t remaining = length;
+
+  // open the source and destination files. Advance the source
+  // file to the desired offset.
+  int fh_ceph = client->open(ceph_source, O_RDONLY);
+  assert (fh_ceph > -1); 
+  r = client->lseek(fh_ceph, offset, SEEK_SET);
+  assert (r == offset);
+  
+  int fh_local = ::open(local_destination, O_WRONLY|O_CREAT|O_TRUNC, 0644);
+  assert (fh_local > -1);
+
+  // copy the file 4 MB at a time
+  const int chunk = 4*1024*1024;
+  bufferptr bp(chunk);
+
+    while (remaining > 0) {
+      off_t got = client->read(fh_ceph, bp.c_str(), MIN(remaining,chunk), -1);
+      assert(got > 0);
+      remaining -= got;
+      off_t wrote = ::write(fh_local, bp.c_str(), got);
+      assert (got == wrote);
+    }
+    // close the files
+    client->close(fh_ceph);
+    ::close(fh_local);
+}
index 2fd81a621773309b404e9f5349caa5e217a81e48..2a4226bd0ef7c0a7787ecfe67b736838dcf5632a 100644 (file)
@@ -17,6 +17,7 @@
 #define __THREAD_H
 
 #include <pthread.h>
+#include <signal.h>
 #include <errno.h>
 
 class Thread {
@@ -40,6 +41,9 @@ class Thread {
   bool is_started() { return thread_id != 0; }
   bool am_self() { return (pthread_self() == thread_id); }
 
+  int kill(int signal) {
+    return pthread_kill(thread_id, signal);
+  }
   int create() {
     return pthread_create( &thread_id, NULL, _entry_func, (void*)this );
   }
index 9b2259a5ee6985033c806e82e441be999d422e66..1f9ca12b46ee35e27ef0e1d215f16a9aa003582f 100644 (file)
@@ -906,7 +906,7 @@ void Ebofs::remove_onode(Onode *on)
     dirty_onodes.erase(on);
   }
 
-  if (on->get_ref_count() > 1) cout << "remove_onode **** will survive " << *on << endl;
+  if (on->get_ref_count() > 1) dout(10) << "remove_onode **** will survive " << *on << endl;
   put_onode(on);
 
   dirty = true;
index 027826dc45cba73243ce20764d2a3e1867514915..a38e3eaf7bd88faffbad5e25b6daf0d203047a80 100644 (file)
@@ -73,6 +73,10 @@ void Rank::sigint()
 
 
 
+void noop_signal_handler(int s)
+{
+  //cout << "blah_handler got " << s << endl;
+}
 
 int Rank::Accepter::start()
 {
@@ -131,6 +135,12 @@ int Rank::Accepter::start()
   // set up signal handler
   old_sigint_handler = signal(SIGINT, simplemessenger_sigint);
 
+  // set a harmless handle for SIGUSR1 (we'll use it to stop the accepter)
+  struct sigaction sa;
+  sa.sa_handler = noop_signal_handler;
+  sa.sa_flags = 0;
+  sigaction(SIGUSR1, &sa, NULL);
+
   // start thread
   create();
 
@@ -140,12 +150,21 @@ int Rank::Accepter::start()
 void *Rank::Accepter::entry()
 {
   dout(10) << "accepter starting" << endl;
-
+  
+  fd_set fds;
   while (!done) {
+    FD_ZERO(&fds);
+    FD_SET(listen_sd, &fds);
+    dout(20) << "accepter calling select" << endl;
+    int r = ::select(listen_sd+1, &fds, 0, &fds, 0);
+    dout(20) << "accepter select got " << r << endl;
+    
+    if (done) break;
+
     // accept
     struct sockaddr_in addr;
     socklen_t slen = sizeof(addr);
-    int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen);
+    int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen);  // FIXME: make this non-blocking.
     if (sd > 0) {
       dout(10) << "accepted incoming on sd " << sd << endl;
       
@@ -157,13 +176,20 @@ void *Rank::Accepter::entry()
       rank.lock.Unlock();
     } else {
       dout(10) << "no incoming connection?" << endl;
-      break;
     }
   }
 
+  ::close(listen_sd);
+
   return 0;
 }
 
+void Rank::Accepter::stop()
+{
+  done = true;
+  this->kill(SIGUSR1);
+  join();
+}
 
 
 /**************************************
@@ -980,9 +1006,9 @@ void Rank::wait()
   lock.Unlock();
   
   // done!  clean up.
-
-  //dout(10) << "wait: stopping accepter thread" << endl;
-  //accepter.stop();
+  dout(-10) << "wait: stopping accepter thread" << endl;
+  accepter.stop();
+  dout(-10) << "wait: stopped accepter thread" << endl;
 
   // stop dispatch thread
   if (g_conf.ms_single_dispatch) {
index 0f49ce6a8882438cfc5312e33f319f8ad351ec65..85732307b543e0c9504c30166de020ff6eca7707 100644 (file)
@@ -36,7 +36,6 @@ using namespace __gnu_cxx;
 
 
 
-
 /* Rank - per-process
  */
 class Rank {
@@ -57,11 +56,7 @@ private:
     Accepter() : done(false) {}
     
     void *entry();
-    void stop() {
-      done = true;
-      ::close(listen_sd);
-      join();
-    }
+    void stop();
     int start();
   } accepter;
 
index ea6d123195e43df2dba1ac87e4d610fac87b8003..279c56b6f095e6ddedbe6bf27079e3b551fb7210 100644 (file)
@@ -273,7 +273,7 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
       }
 
       // do it now
-      dout(-10) << "preprocess_op data is in cache, reading from cache" << *op <<  dendl;
+      dout(10) << "preprocess_op data is in cache, reading from cache" << *op <<  dendl;
       do_op(op);
       return true;
     }