]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
remove ancient active/ stuff
authorSage Weil <sage.weil@dreamhost.com>
Tue, 1 Feb 2011 05:09:15 +0000 (21:09 -0800)
committerSage Weil <sage.weil@dreamhost.com>
Tue, 1 Feb 2011 05:09:15 +0000 (21:09 -0800)
18 files changed:
src/active/README [deleted file]
src/active/activemaster.cc [deleted file]
src/active/activemaster.h [deleted file]
src/active/activeslave.cc [deleted file]
src/active/activeslave.h [deleted file]
src/active/activetaskd.cc [deleted file]
src/active/activetaskd.h [deleted file]
src/active/client_init.cc [deleted file]
src/active/client_init.h [deleted file]
src/active/common.h [deleted file]
src/active/echotestclient.cc [deleted file]
src/active/echotestclient.h [deleted file]
src/active/inet.h [deleted file]
src/active/msgtestclient.cc [deleted file]
src/active/msgtestclient.h [deleted file]
src/active/trivial_task.cc [deleted file]
src/active/trivial_task.h [deleted file]
src/active/utility.h [deleted file]

diff --git a/src/active/README b/src/active/README
deleted file mode 100644 (file)
index bcf9fab..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-
-
-activeslave.h/cc: OSD slave process. Must be running on each OSD.
-
-msgtestclient.h/cc: OSD master process. Run ./msgtestclient with no parameters for help.
-
-trivial_task_ipc.h/cc: A trivial task to be built as a library. Uses
-client interface from ../ceph_ipc; server from ../ceph_ipc must be
-running on each OSD.
-
-common.h, inet.h, utility.h, socket_utility.h: Helper functions.
-
-activemaster, activetaskd, trivial_task_test, trivial_task: obsolete.
diff --git a/src/active/activemaster.cc b/src/active/activemaster.cc
deleted file mode 100644 (file)
index b4dc742..0000000
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Startup executable for 
- * Ceph Active Storage. See README for details.
- *
- */
-#include "activemaster.h"
-
-
-/*
- * What main() must do:
- *
- *  - start up a Ceph client
- *  - find the set of OSDs that the file is striped across
- *  - start up the Map task on each OSD, using ssh
- *  - eat lunch?
- *  - start up the Reduce task locally
- */
-
-int main(int argc, const char* argv[]) {  
-
-  if (argc < 4) {
-    usage(argv[0]);
-    exit(-1);
-  }
-
-  const char* input_filename = argv[1];
-  const char* map_command = argv[2];
-  //const char* reduce_command = argv[3];
-
-  // fire up the client
-  Client* client = startCephClient();
-
-  // open the file as read_only
-  int fh = client->open(input_filename, O_RDONLY);
-  if (fh < 0)    {
-    cout << "The input file " << input_filename << " could not be opened." << endl;
-    exit(-1);
-  }
-
-  // How big is the file?
-  int filesize;
-  struct stat stbuf;
-  if (0 > client->lstat(input_filename, &stbuf))    {
-    cout << "Error: could not retrieve size of input file " << input_filename << endl;
-    exit(-1);
-  }
-  filesize = stbuf.st_size;
-  if (filesize < 1) {
-    cout << "Error: input file size is " << filesize << endl;
-    exit(-1);
-  }
-
-  // retrieve all the object extents
-  list<ObjectExtent> extents;
-  int offset = 0;
-  client->enumerate_layout(fh, extents, filesize, offset);
-  
-  // for each object extent, retrieve the OSD IP address and start up a Map task
-  list<ObjectExtent>::iterator i;
-  map<size_t, size_t>::iterator j;
-  int osd;
-  int start, length;
-  tcpaddr_t tcpaddr;
-
-  for (i = extents.begin(); i != extents.end(); i++)
-    {
-      // find the primary and get its IP address
-      osd = client->osdmap->get_pg_primary(i->pgid);      
-      entity_inst_t inst = client->osdmap->get_inst(osd); 
-      entity_addr_t entity_addr = inst.addr;
-      entity_addr.make_addr(tcpaddr);
-      
-      // iterate through each buffer_extent in the ObjectExtent
-      for (j = i->buffer_extents.begin();
-          j != i->buffer_extents.end(); j++)
-       {
-         // get the range of the buffer_extent
-         start = (*j).first;
-         length = (*j).second;
-         // fire up the Map task
-         start_map_task(map_command, input_filename, start, length, tcpaddr);
-       }
-    }
-  return 0; 
-}
-
-// Fires up the map task.
-// For the moment, all it does is echo the command line, not run it.
-int start_map_task(const char* command, const char* input_filename,
-                  long start, long length,  sockaddr_in ip_address)
-{
-  string ip_addr_string(inet_ntoa(ip_address.sin_addr));
-  
-  
-
-
-
-  cout << "ssh " << ip_addr_string << " " << command 
-       << " " << input_filename << " " << start << " " << length << endl;
-  return 0;
-}
-
-
-
-void usage(const char* name) {
-  cout << "usage: " << name << " inputfile map_task reduce_task" << endl;
-  cout << "inputfile must be a valid path in the running Ceph filesystem." << endl;
-  cout << "map_task should be given with an absolute path, and be present on ";
-  cout << "the REGULAR filesystem every node." << endl;
-  cout << "reduce_task need be present on this node only." << endl;
-}
-
-
-
diff --git a/src/active/activemaster.h b/src/active/activemaster.h
deleted file mode 100644 (file)
index 524138e..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * This is the master executable to start up
- * a compute task across several nodes.
- *
- *
- */  
-
-
-//#include <sys/stat.h>
-#include "utility.h"
-
-int start_map_task(const char* command, const char* input_filename, 
-                  long start, long length, tcpaddr_t ip_address);
-
-void usage(const char* name);
-
-//Client* startCephClient();
-//void kill_client(Client* client);
diff --git a/src/active/activeslave.cc b/src/active/activeslave.cc
deleted file mode 100644 (file)
index a57263a..0000000
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * This is a slave for receiving and executing commands for 
- * compute tasks on an OSD. This supersedes the daemon
- * version in activetaskd.h/cc, because it's easier to debug
- * if it's not a daemon.
- *
- * Networking code is based off examples from Stevens' UNIX Network Programming.
- */
-
-#include "activeslave.h"
-
-int main(int argc, const char* argv[]) {
-
-  /* Set up TCP server */
-  int sockfd, newsockfd,  childpid;
-  socklen_t clilen;
-  struct sockaddr_in cli_addr, serv_addr;
-
-  // Open a TCP socket
-  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    cerr << "slave: can't open TCP socket. Exiting." << endl;
-    exit(-1);
-  }
-  cerr << "slave: opened TCP socket." << endl;
-
-  // set up the port
-  bzero((char*) &serv_addr, sizeof(serv_addr));
-  serv_addr.sin_family      = AF_INET;
-  serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
-  serv_addr.sin_port        = htons(SERV_TCP_PORT);
-
-  if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
-    cerr << "slave: can't bind local address. Exiting." << endl;
-    exit(-1);
-  } 
-
-  if(listen(sockfd, SOMAXCONN) < 0) {
-    cerr << "slave: listening error. Exiting." << endl;
-    exit(-1);
-  }
-
-
-  /* The Big Loop */
-  while (1) {
-
-    // wait for a message and fork off a child process to handle it
-    clilen = sizeof(cli_addr);
-    newsockfd = accept(sockfd,
-                      (struct sockaddr *) &cli_addr,
-                      &clilen);
-
-    if (newsockfd < 0) {
-      cerr << "slave: accept error. Exiting." << endl;
-      exit(-1);
-    }
-
-    if ((childpid = fork()) < 0) {
-      cerr << "slave: fork error. Exiting." << endl;
-      exit(-1);
-    }
-
-    else if (childpid == 0) { // child process
-      cerr << "Forked child process for incoming socket" << endl;
-      close(sockfd);
-      process_request(newsockfd);
-      cerr << "Finished processing request. Exiting child." << endl;
-      exit(0);
-    }
-    
-    close (newsockfd); // parent
-
-    //sleep(30); /* wait 30 seconds */
-  }
-  exit(EXIT_SUCCESS);
-}
-
-
-/* This will process requests from the master.
- * The protocol in a nutshell:
- *   Master opens a socket to slave, and sends
- * one message.
- *   Slave replies with one message.
- *   Socket is closed.
- */
-
-void process_request(int newsockfd) {
-
-  // first, read the message type.
-  int msg_type = readmsgtype(newsockfd);
-    
-  // Second, call some function based on the message type to process
-  // the rest of the message. The function is responsible for the rest
-  // of the message; this includes checking the message footer.
-
-  switch(msg_type) {
-
-  case PING: // ping
-    process_ping(newsockfd);
-    break;
-  case STARTTASK: // start_task
-    process_start_task(newsockfd);
-    break;
-  case RETRIEVELOCALFILE: // get_local
-    process_get_local(newsockfd);
-    break;
-  case SHIPCODE:
-    assert(0); // obsolete
-    //process_shipcode(newsockfd);
-    break;
-
-  case PINGREPLY:
-  case FINISHEDTASK:
-  case TASKFAILED:
-  case SENDLOCALFILE:
-  case LOCALFILENOTFOUND:
-  case CODESAVED:
-  case SHIPFAILED:
-    cerr << "activeslave: BUG received message " << CMD_LIST[msg_type] <<
-      " from master; master should never send this message." << endl;
-    exit(-1);
-    break;
-    
-
-  case -1:
-    cerr << "activeslave: message had an unidentifiable type. " <<
-      "Closing socket and discarding rest of message." << endl;
-  default:
-    cerr << "activeslave: BUG! received unexpected return value of" << msg_type <<
-      "from readmsgtype(). Closing socket and discarding rest of message." << endl;
-
-    exit(-1);
-  }
-}
-
-
-// Just write a ping_reply to the socket.
-void process_ping(int fd) {
-
-  // make sure the footer is valid
-  if (!check_footer(fd)) {
-    cerr << "process_ping warning: ping message has invalid or missing footer."
-        << endl;
-  }
-  // Even if the footer's invalid, send the reply. 
-  cerr << "Replying to ping..." << endl;
-  send_msg_header(fd, PINGREPLY);
-  send_msg_footer(fd);
-  cerr << "Ping processing completed." << endl;
-}
-
-
-// Process a start_task message. This reads the incoming message,
-// retrieves the necessary library, and starts the corresponding task.
-
-// Parameter format: taskID(int) library(string) 
-// cephinputfile(string) offset(long) length(long) localoutputfile
-
-void process_start_task(int fd) {
-
-  char libraryname[MAX_STRING_SIZE + 1];
-  char cephinputfile[MAX_STRING_SIZE + 1];
-  char localoutputfile[MAX_STRING_SIZE + 1];
-  char options[MAX_STRING_SIZE + 1];
-
-  cout << "in process_start_task: ";
-  int taskID = read_positive_int(fd);
-  cout << "read taskID " << taskID;
-
-  // There may be multiple instances running on the same OSD. Cheap
-  // and dirty hack: append the taskID to the filename to avoid contention.
-
-  read_string(fd, libraryname);
-  string libraryfilename("lib");
-  libraryfilename += libraryname;
-  libraryfilename += ".so";
-  cout << ", library name " << libraryname << " -> " << libraryfilename;
-
-  read_string(fd, cephinputfile);
-  cout << ", cephinputfile " << cephinputfile;
-  off_t offset = read_off_t(fd);
-  cout << ", offset " << offset;
-  off_t length = read_off_t(fd);
-  cout << ", length " << length;
-
-  read_string(fd, localoutputfile);
-  cout << ", localoutputfile " << localoutputfile;
-  read_string(fd, options);
-  cout << ", options: " << options << endl;
-
-
-  // make sure the footer is valid
-  if (!check_footer(fd)) {
-    cerr << "process_start_task warning: message has invalid or missing footer. "
-        << "Discarding message." << endl;
-    exit(-1);
-  }
-
-
-  // copy the library over from Ceph
-
-  ostringstream locallibraryfilename;
-  locallibraryfilename << "/tmp/lib" << libraryname << "_" << taskID << ".so";
-
-  //string locallibraryfilename("lib");
-  //locallibraryfilename += libraryname;
-  //locallibraryfilename += "_";
-  //locallibraryfilename += taskID;
-  //locallibraryfilename += ".so";
-  cout << "Naming local library copy "  << locallibraryfilename.str() << endl;
-
-  Client* client = startCephClient();
-  copyCephFileToLocalFile(client, libraryfilename.c_str(), locallibraryfilename.str().c_str());
-  kill_client(client);
-  cout << "Local library copy acquired" << endl;
-
-  // load the task from the shared library
-  void (*task)(const char*, const char*, int,
-              off_t, off_t, const char*) = 0;
-  void* dl_h = dlopen(locallibraryfilename.str().c_str(), RTLD_LAZY);
-  if (NULL == dl_h) {
-    cerr << "Dynamic linking error: " << dlerror() << endl;
-    exit(-1);
-  }
-  task = (void (*)(const char*, const char*, int, 
-                  off_t, off_t, const char*)
-         ) dlsym(dl_h, "run_task");
-  if (NULL == dl_h) {
-    cerr << "Symbol lookup error: " << dlerror() << endl;
-    exit(-1);
-  }
-
-  // start a task; create an output filename that uses the task ID,
-  // 'cause we might end up with multiple pieces of a file on each
-  // OSD.
-  cerr << "starting task: " << endl;
-  task(cephinputfile, localoutputfile, taskID, offset, length, options);
-  cerr << "returned from task! Sending reply:" << endl;
-
-  // send the reply
-  send_msg_header(fd, FINISHEDTASK);
-  write_positive_int(fd, taskID);
-  send_msg_footer(fd);
-
-  // done
-  cout << "Done sending reply for taskID " << taskID << endl;
-  return;
-}
-
-
-// Starts a sloppy grep count of the hardwired search string over the
-// given Ceph file extent. It's sloppy because it copies the given
-// extent to a local file and runs "grep" on it, with no effort to take
-// care of boundary issues.
-void start_sloppy_grepcount (const char* ceph_filename, const char* local_filename,
-                            long offset, long size) {
-
-  Client* client = startCephClient();
-  char* search_string = "the";
-  // copy the file to a local file. 
-
-  copyExtentToLocalFile (client, ceph_filename, offset, size, local_filename);
-  // we want: grep -c search_string local_filename
-  // to get the number of occurrences of the string.
-  string command = "";
-  command.append("grep -c ");
-  command.append(search_string);
-  command.append(local_filename);
-
-  assert(0);
-}
-
-
-// SHIPCODE messages have been removed.
-
-void process_shipcode(int fd) { assert(0); }
-
-
-// Processes a get_local message. The message
-// specifies the filename of a local file to
-// return to the sender.
-
-// Parameter format: requestID(int) localfilename(string)
-
-// INCOMPLETE: currently just reads the message.
-
-
-void process_get_local(int fd) {
-  cout << "in process_get_local: ";
-  int taskID = read_positive_int(fd);
-  cout << "read taskID " << taskID;
-
-  char localfilename[MAX_STRING_SIZE+1];
-  read_string(fd, localfilename);
-  cout << ", localfilename " << localfilename << endl;
-
-
-  // make sure the footer is valid
-  if (!check_footer(fd)) {
-    cerr << "process_get_local warning: message has invalid or missing footer."
-        << endl;
-  }
-
-  // not implemented
-  cerr << "Error: get_local command unimplemented." << endl;
-  assert(0);
-}
-
-
-// Retrieves a formatted message from the socket.
-// At the moment, this just reads and prints a fixed-
-// length message type.
-// DEPRECATED.
-void str_getmsg(int sockfd) {
-  
-  int  n;
-
-  // read message types until the connection dies
-  while(true) {
-    n = readmsgtype(sockfd);
-    if (n != 0) {
-      cerr << "from getmsg: some sort of error" << endl;
-      exit(-1);
-    }
-  }
-}
-
-// Echo a stream socket message back to the sender.
-// DEPRECATED.
-void str_echo(int sockfd) {
-  
-  int  n;
-  char line[MAXLINE];
-
-  while(true) {
-
-    // read from the stream
-    cerr << "str_echo: waiting for a line" << endl;
-    n = readline(sockfd, line, MAXLINE);
-    cerr << "str_echo: read a line" << endl;
-    if (0 == n) {
-      cerr << "str_echo: connection terminated" << endl;
-      return; // connection is terminated
-    }
-    else if (n < 0) {
-      cerr << "str_echo: readline error" << endl;
-      exit(-1);
-    }
-
-    // write back to the stream
-    if (n != writen(sockfd, line, n)) {
-      cerr << "str_echo: writen error" << endl;
-      exit(-1);
-    }
-    else
-      cerr << "Echoed line " << endl;
-  }
-}
-
-
-void str_ack(int sockfd) {
-  
-  int  n;
-  char line[MAXLINE];
-  //char *ack = "ack";
-
-  while(true) {
-
-    // read from the stream
-    n = readline(sockfd, line, MAXLINE);
-
-    if (0 == n)
-      return; // connection is terminated
-    else if (n < 0)
-      //err_dump("str_echo: readline error");
-      exit(-1);
-
-    // write back to the stream
-    if (4 != writen(sockfd, "ack\n", 4))
-      //err_dump("str_echo: writen error");
-      exit(-1);
-  }
-}
-
-
-
-// Read command lines from the socket and execute them
-
-void str_run(int sockfd) {
-  
-  int  n;
-  char line[MAXLINE];
-  char* error_msg = "str_run: No command interpreter found\n";
-  char* ack_msg = "Running command... ";
-  char* commit_msg = "Command executed!\n";
-
-  while(true) {
-
-    // read from the stream
-    n = readline(sockfd, line, MAXLINE);
-
-    if (0 == n)
-      return; // connection is terminated
-    else if (n < 0)
-      //err_dump("str_echo: readline error");
-      exit(-1);
-
-    if (system(NULL)) {
-      writen(sockfd, ack_msg, strlen(ack_msg));
-      system(line);
-      writen(sockfd, commit_msg, strlen(commit_msg));
-    }
-    else if ((int)strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) 
-      //err_dump("str_echo: writen error");
-      exit(-1);
-  }
-}
-
-
-// take a filename and copy it from Ceph to a local directory.
-// Not completed.
-
-void str_copytolocal(int sockfd) {
-  
-  int  n;
-  char line[MAXLINE];
-  char* error_msg = "str_copy: No command interpreter found\n";
-  char* ack_msg = "Running command... ";
-  char* commit_msg = "Command executed!\n";
-  //char* temp_dir = "/tmp";
-
-
-  while(true) {
-
-    // read from the stream
-    n = readline(sockfd, line, MAXLINE);
-
-    if (0 == n)
-      return; // connection is terminated
-    else if (n < 0)
-      //err_dump("str_echo: readline error");
-      exit(-1);
-
-    if (system(NULL)) {
-      writen(sockfd, ack_msg, strlen(ack_msg));
-      system(line);
-      writen(sockfd, commit_msg, strlen(commit_msg));
-    }
-    else if ((int)strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) 
-      //err_dump("str_echo: writen error");
-      exit(-1);
-  }
-}
-
-
-
diff --git a/src/active/activeslave.h b/src/active/activeslave.h
deleted file mode 100644 (file)
index 8fe268c..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-#include <dlfcn.h>
-#include <string>
-#include <sstream>
-#include "inet.h"
-#include "common.h"
-#include "utility.h"
-#include "client/Client.h"
-
-// The port number is "osdd" on a telephone keypad.
-#define SERV_TCP_PORT 6733
-
-#define MAXLINE 512
-
-void str_echo(int sockfd);
-void str_ack(int sockfd);
-void str_run(int sockfd);
-void str_getmsg(int sockfd);
-void process_request(int newsockfd);
-void process_ping(int fd);
-void process_start_task(int fd);
-void process_get_local(int fd);
-void process_shipcode(int fd);
-
-void start_trivial_task(const char* ceph_filename, const char* local_filename,
-                       long offset, long length);
diff --git a/src/active/activetaskd.cc b/src/active/activetaskd.cc
deleted file mode 100644 (file)
index ec9f290..0000000
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * This is a daemon for receiving and executing commands for compute tasks on an OSD.
- *
- * The daemon uses skeleton code from
- * http://www.linuxprofilm.com/articles/linux-daemon-howto.html. The
- * site is no longer up, but can be seen through the archive.org.
- * Networking code is based off examples from Stevens' UNIX Network Programming.
- */
-
-#include "activetaskd.h"
-
-
-#define SERVER
-
-#undef SERVER
-
-int main(int argc, const char* argv[]) {
-        
-  /* Our process ID and Session ID */
-  pid_t pid, sid;
-        
-  /* Fork off the parent process */
-  pid = fork();
-  if (pid < 0) {
-    exit(EXIT_FAILURE);
-  }
-  /* If we got a good PID, then
-     we can exit the parent process. */
-  if (pid > 0) {
-    exit(EXIT_SUCCESS);
-  }
-
-  /* Change the file mode mask */
-  umask(0);
-                
-  /* Open any logs here */        
-                
-  /* Create a new SID for the child process */
-  sid = setsid();
-  if (sid < 0) {
-    /* Log the failure */
-    exit(EXIT_FAILURE);
-  }
-        
-        
-  /* Change the current working directory */
-  if ((chdir("/")) < 0) {
-    /* Log the failure */
-    exit(EXIT_FAILURE);
-  }
-        
-  /* Close out the standard file descriptors */
-  close(STDIN_FILENO);
-  close(STDOUT_FILENO);
-  close(STDERR_FILENO);
-        
-  /* Daemon-specific initialization goes here */
-
-
-
-  /* Set up TCP server */
-  int sockfd, newsockfd,  childpid;
-  socklen_t clilen;
-  struct sockaddr_in cli_addr, serv_addr;
-
-  const char *pname = argv[0]; // process name
-
-  // Open a TCP socket
-  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
-    exit(-1);
-  //err_dump("server: can't open stream socket");
-
-  // set up the port
-  bzero((char*) &serv_addr, sizeof(serv_addr));
-  serv_addr.sin_family      = AF_INET;
-  serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
-  serv_addr.sin_port        = htons(SERV_TCP_PORT);
-
-  if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
-    exit(-1);
-    //err_dump("server: can't bind local address");
-
-  if(listen(sockfd, SOMAXCONN) < 0)
-    exit(-1);
-  //err_dump("server: listening error");
-
-  /* The Big Loop */
-  while (1) {
-
-    // wait for a message and fork off a child process to handle it
-    clilen = sizeof(cli_addr);
-    newsockfd = accept(sockfd,
-                      (struct sockaddr *) &cli_addr,
-                      &clilen);
-
-    if (newsockfd < 0)
-      exit(-1);
-      //err_dump("server: accept error");
-
-    if ( (childpid = fork()) < 0)
-      exit(-1);
-    // err_dump("server: fork error");
-
-    else if (childpid == 0) { // child process
-      close(sockfd);
-      //str_echo(newsockfd);
-      str_run(newsockfd);
-      // insert code to process the request
-      exit(0);
-    }
-    
-    close (newsockfd); // parent
-
-    //sleep(30); /* wait 30 seconds */
-  }
-  exit(EXIT_SUCCESS);
-}
-
-
-// Echo a stream socket message back to the sender.
-
-void str_echo(int sockfd) {
-  
-  int  n;
-  char line[MAXLINE];
-
-  while(true) {
-
-    // read from the stream
-    n = readline(sockfd, line, MAXLINE);
-
-    if (0 == n)
-      return; // connection is terminated
-    else if (n < 0)
-      //err_dump("str_echo: readline error");
-      exit(-1);
-
-    // write back to the stream
-    if (n != writen(sockfd, line, n)) 
-      //err_dump("str_echo: writen error");
-      exit(-1);
-  }
-}
-
-
-void str_ack(int sockfd) {
-  
-  int  n;
-  char line[MAXLINE];
-  char *ack = "ack";
-
-  while(true) {
-
-    // read from the stream
-    n = readline(sockfd, line, MAXLINE);
-
-    if (0 == n)
-      return; // connection is terminated
-    else if (n < 0)
-      //err_dump("str_echo: readline error");
-      exit(-1);
-
-    // write back to the stream
-    if (4 != writen(sockfd, "ack\n", 4))
-      //err_dump("str_echo: writen error");
-      exit(-1);
-  }
-}
-
-
-
-
-// Read command lines from the socket and execute them
-
-void str_run(int sockfd) {
-  
-  int  n;
-  char line[MAXLINE];
-  char* error_msg = "str_run: No command interpreter found\n";
-  char* ack_msg = "Running command... ";
-  char* commit_msg = "Command executed!\n";
-
-  while(true) {
-
-    // read from the stream
-    n = readline(sockfd, line, MAXLINE);
-
-    if (0 == n)
-      return; // connection is terminated
-    else if (n < 0)
-      //err_dump("str_echo: readline error");
-      exit(-1);
-
-    if (system(NULL)) {
-      writen(sockfd, ack_msg, strlen(ack_msg));
-      system(line);
-      writen(sockfd, commit_msg, strlen(commit_msg));
-    }
-    else if (strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) 
-      //err_dump("str_echo: writen error");
-      exit(-1);
-  }
-}
-
-
-// take a filename and copy it from Ceph to a local directory
-
-void str_copytolocal(int sockfd) {
-  
-  int  n;
-  char line[MAXLINE];
-  char* error_msg = "str_copy: No command interpreter found\n";
-  char* ack_msg = "Running command... ";
-  char* commit_msg = "Command executed!\n";
-  char* temp_dir = "/tmp";
-
-
-  while(true) {
-
-    // read from the stream
-    n = readline(sockfd, line, MAXLINE);
-
-    if (0 == n)
-      return; // connection is terminated
-    else if (n < 0)
-      //err_dump("str_echo: readline error");
-      exit(-1);
-
-    if (system(NULL)) {
-      writen(sockfd, ack_msg, strlen(ack_msg));
-      system(line);
-      writen(sockfd, commit_msg, strlen(commit_msg));
-    }
-    else if (strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) 
-      //err_dump("str_echo: writen error");
-      exit(-1);
-  }
-}
-
-
-
diff --git a/src/active/activetaskd.h b/src/active/activetaskd.h
deleted file mode 100644 (file)
index fc5cec9..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-#include "inet.h"
-#include "common.h"
-#include "utility.h"
-#include "client/Client.h"
-
-
-// The port number is "osdd" on a telephone keypad.
-#define SERV_TCP_PORT 6733
-
-#define MAXLINE 512
-
-void str_echo(int sockfd);
-void str_ack(int sockfd);
-void str_run(int sockfd);
diff --git a/src/active/client_init.cc b/src/active/client_init.cc
deleted file mode 100644 (file)
index 8b13789..0000000
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/src/active/client_init.h b/src/active/client_init.h
deleted file mode 100644 (file)
index 139597f..0000000
+++ /dev/null
@@ -1,2 +0,0 @@
-
-
diff --git a/src/active/common.h b/src/active/common.h
deleted file mode 100644 (file)
index 9b2742f..0000000
+++ /dev/null
@@ -1,86 +0,0 @@
-#ifndef CEPH_COMMON_H
-#define CEPH_COMMON_H
-
-
-#include <sys/stat.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <unistd.h>
-#include <syslog.h>
-#include <string.h>
-#include <iostream>
-#include <string>
-
-
-
-
-#define CMDLENGTH 10
-#define CMDCOUNT 11
-
-#define MAX_STRING_SIZE 255
-
-/*
- * These are the various messages that can be sent between the master
- * and slave. The slave sends one reply to each message from the master.
-
- * PING/PINGREPLY: just what it sounds like.
-
- * STARTTASK: starts a task from the given library. The slave attempts
- * to perform the task, and replies with FINISHEDTASK or TASKFAILED.
- *
- * RETRIEVELOCALFILE: requests a file that the slave has stored
- * locally. Slave replies with SENDLOCALFILE and the file, or with
- * LOCALFILENOTFOUND.
- * 
- * (deprecated) SHIPCODE: sends a shared library to the slave, containing a
- * function that is to be executed later by the STARTTASK
- * command. Slave replies with CODESAVED or SHIPFAILED.
- *
- * SHIPCODE is replaced by transport using Ceph.
- */ 
-
-
-const off_t CHUNK = 1024 * 1024 * 4;
-
-// a bunch of string constants for commands
-
-#define PING              0
-#define STARTTASK         1
-#define RETRIEVELOCALFILE 2
-#define PINGREPLY         3
-#define FINISHEDTASK      4
-#define TASKFAILED        5
-#define SENDLOCALFILE     6
-#define LOCALFILENOTFOUND 7
-#define SHIPCODE          8
-#define CODESAVED         9
-#define SHIPFAILED        10
-
-#define FOOTER_LENGTH 7
-
-const char* CMD_LIST[CMDCOUNT] = {"______PING",
-                                    "START_TASK",
-                                    "_GET_LOCAL",
-                                    "PING_REPLY",
-                                    "_TASK_DONE",
-                                    "TASKFAILED",
-                                    "SEND_LOCAL",
-                                    "LOCAL_GONE",
-                                    "_SHIP_CODE",
-                                    "CODE_SAVED",
-                                    "SHIPFAILED"};
-
-const char FOOTER[FOOTER_LENGTH + 1] = "MSG_END";
-
-// const char* strArray[] = {"string1", "string2", "string3"};
-//const char commands[2][4]  = {"foo", "bar"};
-
-// error codes
-#define ARGUMENTSINVALID 1001
-#define CEPHCLIENTSTARTUPFAILED 1002
-#define INPUTFILEREADFAILED 1003
-
-#endif //COMMON_H
diff --git a/src/active/echotestclient.cc b/src/active/echotestclient.cc
deleted file mode 100644 (file)
index 2b2d15e..0000000
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * This is merely a test of an echo server; it's an early step in
- * building up the Ceph distributed compute service. This is
- * discardable once the next stage is up and running.
- *
- * Code is based off examples in Stevens' "Unix Network Programming".
- */
-
-#include "echotestclient.h"
-
-int main(int argc, char* argv[]) {
-  
-  int sockfd;
-  struct sockaddr_in serv_addr;
-
-  char* pname = argv[0];
-
-  bzero((char *) &serv_addr, sizeof(serv_addr));
-  serv_addr.sin_family = AF_INET;
-  serv_addr.sin_addr.s_addr = inet_addr(SERV_HOST_ADDR);
-  serv_addr.sin_port = htons(SERV_TCP_PORT);
-
-  
-  // open a TCP socket
-  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    printf("client: can't open stream socket");
-    exit (-1);
-  }
-
-  // connect to the server.
-  if (connect(sockfd, (struct sockaddr *) &serv_addr,
-             sizeof(serv_addr)) < 0) {
-    printf("client: can't connect to server");
-    exit (-1);
-  }
-  
-  // start the test echoer
-  str_cli(stdin, sockfd);
-      
-  
-  close (sockfd);
-  exit(0);
-}
-
-
-void str_cli(FILE *fp, int sockfd) {
-
-  int n;
-  char sendline[MAXLINE], recvline[MAXLINE + 1];
-
-  // read from the fp and write to the socket;
-  // then read from the socket and write to stdout
-  while (fgets(sendline, MAXLINE, fp) != NULL) {
-
-    n = strlen(sendline);
-    if (writen(sockfd, sendline, n) != n) {
-      printf("str_cli: writen error on socket");
-      exit(-1);
-    }
-    n = readline(sockfd, recvline, MAXLINE);
-    if (n < 0) {
-      printf("str_cli: readline error");
-      exit(-1);
-    }
-    recvline[n] = 0;
-    fputs(recvline, stdout);
-  }
-
-  if (ferror(fp)) {
-    printf("str_cli: error reading file");
-    exit(-1);
-  }
-
-}
diff --git a/src/active/echotestclient.h b/src/active/echotestclient.h
deleted file mode 100644 (file)
index 9b26416..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-#include "inet.h"
-#include "common.h"
-#include "socket_utility.h"
-
-#define SERV_HOST_ADDR "128.114.57.143" //issdm-8
-#define SERV_TCP_PORT 6733
-#define MAXLINE 512
-
-void str_cli(FILE *fp, int sockfd);
-
diff --git a/src/active/inet.h b/src/active/inet.h
deleted file mode 100644 (file)
index 385fa91..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-/*
- *  Generic TCP/IP definitions
- */
-
-#include <stdio.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
diff --git a/src/active/msgtestclient.cc b/src/active/msgtestclient.cc
deleted file mode 100644 (file)
index 0d8a418..0000000
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * This client invokes a distributed task across OSDs.
- *
- * Networking code is based off examples in Stevens' "Unix Network Programming".
- */
-#include "msgtestclient.h"
-#define REQUIRED_ARGS 5
-
-int main(int argc, char* argv[]) {  
-
-  // make sure we have all the arguments we need
-  if (argc < REQUIRED_ARGS) { usage(argv[0]);  exit(-1); }
-
-  // This file is rewired for running tests from a
-  // shell script. The first parameter specifies the
-  // name of the Ceph file that the test will be
-  // run on; the second parameter specifies which of
-  // four different tests will be run.
-  const char* library_name = argv[1];
-  const char* input_filename = argv[2];
-  const char* output_filename = argv[3];
-  int test_number = atoi(argv[4]);
-  assert (test_number > 0);
-  assert (test_number < 4);
-  const char* job_options = argv[5];
-
-  string library_filename("lib");
-  library_filename += library_name;
-  library_filename += ".so";
-  
-  // start up a Ceph client
-  Client* client = startCephClient();
-  cerr << "loaded test client" << endl;
-
-  // generate the file extent tuples for each OSD
-  vector<request_split> original_splits;
-  off_t filesize = generate_splits(client, input_filename, original_splits);
-
-  // copy the library to Ceph
-  copyLocalFileToCeph(client, library_filename.c_str(), library_filename.c_str());
-
-  // close the client - we're done with it
-  kill_client(client);
-  cerr << "Closed Ceph client instance" << endl;
-  
-  // sanity check: display the splits
-  cerr << "Listing original splits:" << endl;
-  for (vector<request_split>::iterator i = original_splits.begin();
-       i != original_splits.end(); i++) {
-    cerr << "Split: IP " << i->ip_address << ", start " 
-        << i->start << ", length " << i->length << endl;
-  }
-
-  vector<request_split> test_splits;
-  // Now, modify the splits as needed for the test type.
-  // There are three types of tests.
-  // Test 1: regular test.
-  // Test 2: put all the tasks on the "wrong" OSD.
-  // Test 3: do the entire job off one node.
-
-  if (1 == test_number) {
-    cerr << "Test type 1: using original splits." << endl;
-    test_splits = original_splits;
-  }
-  else if (2 == test_number) {
-    cerr << "Test type 2: rotating split IP addresses. " << endl;
-    int split_count = original_splits.size();
-     for (int i = 0; i < split_count; ++i) {
-       request_split s;
-       s.start = original_splits.at(i).start;
-       s.length = original_splits.at(i).length;
-       s.ip_address = original_splits.at((i+1)%split_count).ip_address;
-       test_splits.push_back(s);
-     }
-  }
-  else if (3 == test_number) {
-    cerr << "Test type 3: one giant split." << endl;
-    request_split s;
-    s.start = 0;
-    s.length = filesize;
-    s.ip_address = original_splits.at(0).ip_address;
-    test_splits.push_back(s);
-  }
-  else {
-    cerr << "Error: received invalid test type " << test_number << endl;
-    exit(-1);
-  }
-
-  cerr << "Listing test splits:" << endl;
-  for (vector<request_split>::iterator i = test_splits.begin();
-       i != test_splits.end(); i++) {
-    cerr << "Split: IP " << i->ip_address << ", start " 
-        << i->start << ", length " << i->length << endl;
-  }
-     
-  // start the timer
-  utime_t start_time = g_clock.now();
-  int pending_tasks = 0;
-
-  int taskID = 0;
-  // start up the tasks
-  for (vector<request_split>::iterator i = test_splits.begin();
-       i != test_splits.end(); i++) {
-    start_map_task(i->ip_address, ++taskID, library_name, input_filename,
-                  i->start, i->length, output_filename, job_options);
-    ++pending_tasks;
-  }
-
-  cerr << "Waiting for " << pending_tasks << " tasks to return..." << endl;
-
-  // wait for all the tasks to finish
-  while (pending_tasks > 0) {
-    int exit_status;
-    cerr << "Waiting for " << pending_tasks << " tasks to return..." << endl;
-    pid_t pid = wait(&exit_status);
-    if (pid < 0) {
-      cerr << "ERROR on wait(): result was " << pid << endl;
-      exit(-1);
-    }
-    --pending_tasks;
-    if (WIFEXITED(exit_status)) {
-      cerr << "Task with pid " << pid << " returned with exit status " << 
-      WEXITSTATUS(exit_status) << endl;
-    }
-    else { cerr << "WARNING: Task with pid " << pid << " exited abnormally" << endl; }
-  }
-
-  cerr << "All tasks have returned." << endl;
-  // report the time
-  double elapsed_time;
-  elapsed_time = (g_clock.now() - start_time);
-  cerr << "Elapsed time: " << elapsed_time << endl;
-  cerr << elapsed_time << " " << endl;
-  // send the time to stdout for the shell script
-  cout << elapsed_time << " ";
-  exit(0);
-}
-
-
-// sends a complete ping message
-// through the file descriptor
-// and waits for a reply. This
-// will hang if there's no reply.
-
-void ping_test(int fd) {
-
-  // send the message header and footer.
-  // A ping message has no body.
-  send_msg_header(fd, PING);
-  send_msg_footer(fd);
-
-  // receive the reply.
-  int msg_type = readmsgtype(fd);
-  if (msg_type < 0) {
-    cerr << "ping_test: Failed reading the ping reply. Exiting." << endl;
-    exit(-1);
-  }
-  if (PINGREPLY != msg_type) {
-    assert((msg_type <= 0) && (msg_type < CMDCOUNT) && 
-          "readmsgtype return value out of range");
-    cerr << "ping_test: slave sent invalid reply: replied to ping with message type" << 
-      msg_type << ": " << CMD_LIST[msg_type] << ". Exiting. " << endl;
-    exit(-1);
-  }
-  else {
-    cerr << "Received valid ping reply!" << endl;
-  }
-      
-  if(!check_footer(fd)) {
-    cerr << "ping_test: message footer not found. Exiting." << endl;
-    exit(-1);
-  }
-}
-
-// sends a message to the fd telling it to start a task.
-// Remember: the message format requires any string to be
-// prefixed by its (unterminated) length.
-void send_start_task_msg(int fd,
-                        int taskID,
-                        int library_name_size, const char* library_name,
-                        int inputfilenamesize, const char* inputfilename,
-                        off_t offset,
-                        off_t length,
-                        int outputfilenamesize, const char* outputfilename,
-                        int options_size, const char* options) {
-
-  // write the header and the message to the file descriptor.
-
-  send_msg_header(fd, STARTTASK);
-
-  write_positive_int(fd, taskID);
-  write_positive_int(fd, library_name_size);
-  write_string(fd, library_name);
-  write_positive_int(fd, inputfilenamesize);
-  write_string(fd, inputfilename);
-  //write_long(fd, offset);
-  write_off_t (fd, offset);
-  write_off_t (fd, length);
-  write_positive_int(fd, outputfilenamesize);
-  write_string(fd, outputfilename);
-  write_positive_int(fd, options_size);
-  write_string(fd, options);
-
-  // terminate the message
-  send_msg_footer(fd);
-}
-
-
-
-
-// creates a new connection to the slave
-// at the given IP address and port.
-// Overloaded to take an IP address as a
-// string or as an in_addr_t.
-
-int create_new_connection(const char* ip_address, uint16_t port)
-{
-  in_addr_t ip = inet_addr(ip_address);
-  if ((in_addr_t)-1 == ip) {
-    cerr << "Error creating new connection: \"" << ip_address << 
-      "\" is not a valid IP address." << endl;
-    return -1;
-  }
-  else
-    //cerr << "Opening connection to " << ip_address << ":" << endl;
-    return create_new_connection(ip, port);
-}
-
-
-int create_new_connection(in_addr_t ip_address, uint16_t port) {
-
-  struct sockaddr_in serv_addr;
-  int sockfd;
-
-  bzero((char *) &serv_addr, sizeof(serv_addr));
-  serv_addr.sin_family = AF_INET;
-  //serv_addr.sin_addr.s_addr = inet_addr(SERV_HOST_ADDR);
-  serv_addr.sin_addr.s_addr = ip_address;
-  serv_addr.sin_port = htons(SERV_TCP_PORT);
-  // open a TCP socket
-  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    cerr << "msgtestclient: can't open stream socket. Exiting." << endl;
-    exit (-1);
-  }
-
-  // connect to the server.
-  if (connect(sockfd, (struct sockaddr *) &serv_addr,
-             sizeof(serv_addr)) < 0) {
-    cerr << "msgtestclient: can't connect to server." << endl;
-    exit (-1);
-  }
-  //cerr << "opened connection!" << endl;
-  return sockfd;
-}
-
-void msg_type_sender(int sockfd) {
-
-  for (int i = 0; i < CMDCOUNT; ++i) {
-    send_msg_header(sockfd, i);
-  }
-
-}
-
-// Fires up a task on a single OSD.
-int start_map_task(sockaddr_in ip_address, int taskID, 
-                  const char* library_name, const char* input_filename,
-                  off_t start, off_t length, 
-                  const char* output_filename,
-                  const char* job_options)
-{
-  int childpid;
-  // fork off a child process to do the work, and return
-  if ((childpid = fork()) < 0) {
-    cerr << "start_map_task: fork error. Exiting." << endl;
-    exit(-1);
-  }
-  
-  else if (childpid != 0) { // parent
-    cerr << "start_map_task: forked child process "
-        << childpid << " to start task. " << endl;
-    return 0;
-  }
-
-  string ip_addr_string(inet_ntoa(ip_address.sin_addr));
-  //  cerr << "command: " << ip_addr_string << " taskID " 
-  //   << taskID << ": " << command 
-  //   << " " << input_filename << " " << start << " " << length 
-  //   << " " << output_filename << endl;
-
-  // open a socket to the slave, and send the message
-  //cerr << "Sending message: " << endl;
-  int sockfd = create_new_connection(ip_addr_string.c_str(), SERV_TCP_PORT);
-  send_start_task_msg(sockfd, taskID, strlen(library_name), library_name,
-                     strlen(input_filename), input_filename,
-                     start, length,
-                     strlen(output_filename), output_filename,
-                     strlen(job_options), job_options);
-
-  // wait for a reply
-  cerr << "Sent message for taskID " << taskID << ". Waiting for reply..." << endl;
-
-  // receive the reply.
-  int msg_type = readmsgtype(sockfd);
-  if (msg_type < 0) {
-    cerr << "start_map_task: Failed reading the reply. Exiting." << endl;
-    exit(-1);
-  }
-  if (FINISHEDTASK != msg_type) {
-    assert((msg_type <= 0) && (msg_type < CMDCOUNT));
-    cerr << "start_map_task: slave sent invalid reply: replied with message type" << 
-      msg_type << ": " << CMD_LIST[msg_type] << ". Exiting. " << endl;
-    exit(-1);
-  }
-  // read the taskID of the reply
-  
-  int reply_taskID = read_positive_int(sockfd);
-      
-  if(!check_footer(sockfd)) {
-    cerr << "ping_test: message footer not found. Exiting." << endl;
-    exit(-1);
-  }  
-  
-  // done!
-  close(sockfd);
-  cerr << "Task " << taskID << "/" << reply_taskID << 
-    " complete! Ending child process." << endl;
-  //exit(0);
-  _exit(0);
-  cerr << "exit(0) returned. Strange things are afoot." << endl;
-}
-
-
-// Creates a set of (ip address, start position, length) tuples giving
-// the location of each piece of the given Ceph file. Returns the size
-// of the file.
-
-
-off_t generate_splits(Client* client, const char* input_filename,
-                                     vector<request_split>& original_splits) {
-
-  // Open the file and get its size
-  int fh = client->open(input_filename, O_RDONLY);
-  if (fh < 0)    {
-    cerr << "The input file " << input_filename << " could not be opened." << endl;
-    exit(-1);
-  }
-  cerr << "Opened file " << input_filename << endl;
-  off_t filesize;
-  struct stat stbuf;
-  if (0 > client->lstat(input_filename, &stbuf)) {
-    cerr << "Error: could not retrieve size of input file " << input_filename << endl;
-    exit(-1);
-  }
-  filesize = stbuf.st_size;
-  if (filesize < 1) {
-    cerr << "Error: input file size is " << filesize << endl;
-    exit(-1);
-  }
-
-  // grab all the object extents
-  list<ObjectExtent> extents;
-  off_t offset = 0;
-  client->enumerate_layout(fh, extents, filesize, offset);
-  client->close(fh);
-  cerr << "Retrieved all object extents" << endl;
-
-
-  // generate the tuples
-  list<ObjectExtent>::iterator i;
-  map<size_t, size_t>::iterator j;
-  int osd;
-
-  for (i = extents.begin(); i != extents.end(); i++) {
-
-    request_split split;
-    // find the primary and get its IP address
-    osd = client->osdmap->get_pg_primary(i->layout.pgid);      
-    entity_inst_t inst = client->osdmap->get_inst(osd); 
-    entity_addr_t entity_addr = inst.addr;
-    entity_addr.make_addr(split.ip_address);        
-
-    // iterate through each buffer_extent in the ObjectExtent
-    for (j = i->buffer_extents.begin();
-        j != i->buffer_extents.end(); j++) {
-
-      // get the range of the buffer_extent
-      split.start = (*j).first;
-      split.length = (*j).second;
-      // throw the split onto the vector
-      original_splits.push_back(split);
-    }
-  }
-  return filesize;
-}
-
-
-void usage(const char* name) {
-  cout << "usage: " << name << " libraryname inputfile outputfile test_number" << endl;
-  cout << "libraryname is the name of a proper shared task library in the _local_ filesystem. "
-       << "e.g. entering \"foo\" requires the presence of libfoo.so in the " 
-       << "working directory." << endl;
-  cout << "inputfile must be a valid path in the running Ceph filesystem." << endl;
-  cout << "outputfile is a Ceph filename prefix for writing results. e.g. \"bar\" will give "
-       << "output files bar.1, bar.2, &c." << endl;
-  cout << "test_number must be 1, 2, or 3." << endl;
-  cout << "    1: run the test task normally (one slave per OSD file extent)" << endl;
-  cout << "    2: run the test task on the \"wrong\" OSDs" << endl;
-  cout << "    3: run the entire task in a single process" << endl;
-}
-
-
-
diff --git a/src/active/msgtestclient.h b/src/active/msgtestclient.h
deleted file mode 100644 (file)
index 85ac7fb..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-#include "inet.h"
-#include "common.h"
-#include "utility.h"
-#include "client/Client.h"
-
-// wait.h MUST NOT be #included before client/Client.h
-#include <sys/wait.h>
-#include <vector>
-#include <string>
-
-struct request_split {
-  tcpaddr_t ip_address;
-  off_t start;
-  off_t length;
-};
-
-
-//#define SERV_HOST_ADDR "128.114.57.143" //issdm-8
-#define SERV_HOST_ADDR "128.114.57.166" //issdm-31
-
-#define SERV_TCP_PORT 6733
-#define MAXLINE 512
-
-void msg_type_sender(int sockfd);
-
-
-
-int create_new_connection(const char* ip_address, uint16_t port);
-int create_new_connection(in_addr_t ip_address, uint16_t port);
-void usage(const char* name);
-void ping_test(int fd);
-void start_task_test(int fd);
-
-
-off_t generate_splits(Client* client, const char* input_filename,
-                     vector<request_split>& original_splits);
-
-
-int start_map_task(sockaddr_in ip_address, int taskID,
-                  const char* map_command, 
-                  const char* input_filename,
-                  off_t start, off_t length,
-                  const char* output_filename,
-                  const char* job_options);
-
-void send_start_task_msg(int fd,
-                        int taskID,
-                        int command_size, const char* command,
-                        int inputfilenamesize, const char* inputfilename,
-                        off_t offset,
-                        off_t length,
-                        int outputfilenamesize, const char* outputfilename,
-                        int options_size, const char* options);
diff --git a/src/active/trivial_task.cc b/src/active/trivial_task.cc
deleted file mode 100644 (file)
index 7a72ecb..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-#include "trivial_task.h"
-
-void start_trivial_task (const char* ceph_filename, const char* local_filename, 
-                        off_t offset, off_t length) {
-  // Don't bother to copy the file to disk. Read the file directly from Ceph,
-  // and add up all the bytes.
-  // Write the total to the local file as a string.
-    Client * client = startCephClient();
-
-    bufferptr bp(CHUNK);
-
-    // get the source file's size. Sanity-check the request range.
-    struct stat st;
-    int r = client->lstat(ceph_filename, &st);
-    assert (r == 0);
-    
-    off_t src_total_size = st.st_size;
-    if (src_total_size < offset + length) {
-      cerr << "Error in copy ExtentToLocalFile: offset + length = " << offset << " + " << length
-          << " = " + (offset + length) << ", source file size is only " << src_total_size << endl;
-      exit(-1);
-    }
-    off_t remaining = length;
-    
-    // open the file and seek to the start position
-    cerr << "start_trivial_task: opening the source file and seeking " << endl;
-
-    int fh_ceph = client->open(ceph_filename, O_RDONLY);
-    assert (fh_ceph > -1); 
-    r = client->lseek(fh_ceph, offset, SEEK_SET);
-    assert (r == offset);
-    
-    int counter = 0;
-    // read through the extent and add up the bytes
-    cerr << "start_trivial_task: counting up bytes" << endl;
-    char* bp_c = bp.c_str();
-    while (remaining > 0) {
-      off_t got = client->read(fh_ceph, bp_c, MIN(remaining,CHUNK), -1);
-      assert(got > 0);
-      remaining -= got;
-      for (off_t i = 0; i < got; ++i) {
-       counter += (unsigned int)(bp_c[i]);
-      }
-    }
-    cerr << "start_trivial_task: Done! Answer is " << counter << endl;
-    client->close(fh_ceph);
-        
-    //assert(0);
-}
-
diff --git a/src/active/trivial_task.h b/src/active/trivial_task.h
deleted file mode 100644 (file)
index ce9b47c..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-// Shared library for the trivial task of adding up all the bytes in a file
-
-//#include "inet.h"
-#include "common.h"
-#include "utility.h"
-#include "client/Client.h"
-
-
-extern "C" void start_trivial_task (const char* ceph_filename,
-                                   const char* local_filename, 
-                                   off_t offset, off_t length);
-
diff --git a/src/active/utility.h b/src/active/utility.h
deleted file mode 100644 (file)
index d303655..0000000
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Miscellaneous Active OSD helper functions.
- *
- */
-
-//#include <sys/stat.h>
-#include "client/Client.h"
-#include "common.h"
-#include "config.h"
-#include "common/Timer.h"
-#include "msg/SimpleMessenger.h"
-#include "socket_utility.h"
-
-Client* startCephClient();
-void kill_client(Client* client);
-
-int send_msg_header(int fd, int header_ID);
-int readmsgtype(int fd);
-bool check_footer(int fd);
-int send_msg_header(int fd, int header_ID);
-int send_msg_footer(int fd);
-
-/*
- * Fires up a Ceph client and returns a pointer to it.
- */ 
-
-Client* startCephClient()
-{
-  cout << "ActiveMaster: Initializing Ceph client:" << endl;
-  
-  // parse args from CEPH_ARGS, not command line 
-  vector<char*> args; 
-  env_to_vec(args);
-  parse_config_options(args);
-
-  if (g_conf.clock_tare) g_clock.tare();
-
-  // be safe
-  g_conf.use_abspaths = true;
-
-  // load monmap
-  MonMap* monmap = new MonMap();
-  int r = monmap->read(".ceph_monmap");
-  if (r < 0) {
-    cout << "ActiveMaster: could not find .ceph_monmap" << endl; 
-    return 0;
-  }
-  assert(r >= 0);
-
-  // start up network
-  rank.start_rank();
-
-  // start client
-  Client *client;
-  client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), monmap);
-  client->init();
-    
-  // mount
-  client->mount();
-   
-  return client;
-}
-
-void kill_client (Client * client)
-{ 
-  client->unmount();
-  client->shutdown();
-  delete client;
-  
-  // wait for messenger to finish
-  rank.wait();
-}
-
-
-
-// reads a message type from the socket, and prints it.
-
-int readmsgtype(int fd) {
-  int rc;
-  char typebuf[CMDLENGTH + 1];
-
-  rc = read(fd, &typebuf, CMDLENGTH);
-  
-  // read a fixed-length text command
-  if (rc != CMDLENGTH) {
-    cerr << "in readmsgtype: read error: result is " << rc << endl;
-    return -1;
-  }
-
-  // null-terminate the string
-  typebuf[CMDLENGTH] = 0;
-
-  // print the command
-  //cerr << "readmsgtype: text type is " << typebuf << ", " ;
-
-  // figure out which one it is, by number
-  for (int i = 0; i < CMDCOUNT; ++i) {
-    if (!strcmp(typebuf, CMD_LIST[i])) {
-      //cerr << "which is identified as type " << i << endl;
-      return i;
-    } 
- }
-  
-  // if we get here the type was invalid
-  cerr << "readmsgtype: unrecognized message type " << typebuf << endl;
-  return -1;
-}
-
-// Attempt to read the message footer off
-// the given stream.
-bool check_footer(int fd) {
-
-  // leave space for null termination
-  char footer_buf[FOOTER_LENGTH+1];
-
-  // read the footer
-  int rc = read(fd, &footer_buf, FOOTER_LENGTH);
-  if (rc != FOOTER_LENGTH) {
-    cerr << "in check_footer: read error: result is " << rc << endl;
-    return false;
-  }
-
-  // null-terminate the string
-  footer_buf[FOOTER_LENGTH] = 0;
-
-  // Is the footer correct?
-  if (0 == strcmp(footer_buf, FOOTER))
-    return true;
-  else
-    return false;
-}
-
-
-// send a fixed-length message header
-// given the header's ID.
-int send_msg_header(int fd, int header_ID) {
-  if ((header_ID < 0) || (header_ID >= CMDCOUNT)) {
-    cerr << "In send_msg_header: received out-of-range header ID " << header_ID <<
-      ". Exiting process." << endl;
-    exit(-1);
-  }
-
-  //cerr << "attempting to send message " << CMD_LIST[header_ID] << 
-  //  " with ID " << header_ID << endl;
-
-  if (CMDLENGTH != writen(fd, CMD_LIST[header_ID], CMDLENGTH)) {
-    cerr << "In send_msg_header: error writing header ID " << header_ID << 
-      "to file descriptor " << fd << ". Exiting process." << endl;
-    exit(-1);
-  }
-
-  return 0;
-}
-
-// send the fixed-length message footer.
-int send_msg_footer(int fd) {
-  //cerr << "attempting to send message footer: " << endl;
-  if (FOOTER_LENGTH != writen(fd, FOOTER, FOOTER_LENGTH)) {
-    cerr << "in send_msg_footer: error writing footer to file descriptor " <<
-      fd << ". Exiting process." << endl;
-    exit(-1);
-  } else {
-    //cerr << "Sent message footer!" << endl; 
-  }
-  return 0;
-}
-
-
-
-// Copy a given extent of a Ceph file to the local disk.
-// Requires a running Ceph client.
-void copyExtentToLocalFile (Client* client, const char* ceph_source,
-                           long offset, long length,
-                           const char* local_destination) {
-
-  // get the source file's size. Sanity-check the request range.
-  struct stat st;
-  int r = client->lstat(ceph_source, &st);
-  assert (r == 0);
-
-  off_t src_total_size = st.st_size;
-  if (src_total_size < offset + length) {
-    cerr << "Error in copy ExtentToLocalFile: offset + size = " << offset << " + " << length
-        << " = " + (offset + length) << ", source file size is only " << src_total_size << endl;
-    exit(-1);
-  }
-  off_t remaining = length;
-
-  // open the source and destination files. Advance the source
-  // file to the desired offset.
-  int fh_ceph = client->open(ceph_source, O_RDONLY);
-  assert (fh_ceph > -1); 
-  r = client->lseek(fh_ceph, offset, SEEK_SET);
-  assert (r == offset);
-  
-  int fh_local = ::open(local_destination, O_WRONLY|O_CREAT|O_TRUNC, 0644);
-  assert (fh_local > -1);
-
-  // copy the file 4 MB at a time
-  const int chunk = 4*1024*1024;
-  bufferptr bp(chunk);
-
-  while (remaining > 0) {
-    off_t got = client->read(fh_ceph, bp.c_str(), MIN(remaining,chunk), -1);
-    assert(got > 0);
-    remaining -= got;
-    off_t wrote = ::write(fh_local, bp.c_str(), got);
-    assert (got == wrote);
-  }
-  // close the files
-  client->close(fh_ceph);
-  ::close(fh_local);
-}
-
-
-// Copy a Ceph file to the local disk. Requires a running Ceph client.
-// Overwrites the destination if it exists.
-void copyCephFileToLocalFile (Client* client, const char* ceph_source,
-                             const char* local_destination) {
-
-  // get the source file's size.
-  struct stat st;
-  int r = client->lstat(ceph_source, &st);
-  assert (r == 0);
-  
-  copyExtentToLocalFile(client, ceph_source, 0, st.st_size,
-                       local_destination);
-  
-
-}
-// Copies a file from the local disk to Ceph. Annihilates the
-// destination if it exists.
-
-void copyLocalFileToCeph(Client* client, const char* local_source,
-                        const char* ceph_destination) {
-
-  // Get the source file's size.
-  struct stat st;
-  int r = ::lstat(local_source, &st);
-  if (0 != r) {
-    cerr << "in copyLocalFileToCeph: error retrieving size for file " << local_source
-        << ": is the file missing?" << endl;
-    assert(0);
-  }
-
-  off_t remaining = st.st_size;
-
-  // Open the source and destination files.
-  int fh_source = ::open(local_source, O_RDONLY);
-  assert (fh_source > -1);
-  int fh_dest = client->open(ceph_destination, O_WRONLY|O_CREAT|O_TRUNC, 0644);
-  assert (fh_dest > -1);
-
-  // Copy the file.
-  const int chunk = 4 * 1024* 1024; // 4 MB
-  bufferptr bp(chunk);
-
-  while(remaining > 0) {
-    off_t got = ::read(fh_source, bp.c_str(), MIN(remaining, chunk));
-    assert(got > 0);
-    remaining -= got;
-    off_t wrote = client->write(fh_dest, bp.c_str(), got);
-    assert (got == wrote);
-  }
-
-  // close the files
-  ::close(fh_source);
-  client->close(fh_dest);
-
-}