From: Sage Weil Date: Tue, 1 Feb 2011 05:09:15 +0000 (-0800) Subject: remove ancient active/ stuff X-Git-Tag: v0.25~231^2~39 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a45e8f2dd574b2b935a9bb0954360f82f6bf9dcd;p=ceph.git remove ancient active/ stuff --- diff --git a/src/active/README b/src/active/README deleted file mode 100644 index bcf9fab6b2f2..000000000000 --- a/src/active/README +++ /dev/null @@ -1,13 +0,0 @@ - - -activeslave.h/cc: OSD slave process. Must be running on each OSD. - -msgtestclient.h/cc: OSD master process. Run ./msgtestclient with no parameters for help. - -trivial_task_ipc.h/cc: A trivial task to be built as a library. Uses -client interface from ../ceph_ipc; server from ../ceph_ipc must be -running on each OSD. - -common.h, inet.h, utility.h, socket_utility.h: Helper functions. - -activemaster, activetaskd, trivial_task_test, trivial_task: obsolete. diff --git a/src/active/activemaster.cc b/src/active/activemaster.cc deleted file mode 100644 index b4dc742c414a..000000000000 --- a/src/active/activemaster.cc +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Startup executable for - * Ceph Active Storage. See README for details. - * - */ -#include "activemaster.h" - - -/* - * What main() must do: - * - * - start up a Ceph client - * - find the set of OSDs that the file is striped across - * - start up the Map task on each OSD, using ssh - * - eat lunch? - * - start up the Reduce task locally - */ - -int main(int argc, const char* argv[]) { - - if (argc < 4) { - usage(argv[0]); - exit(-1); - } - - const char* input_filename = argv[1]; - const char* map_command = argv[2]; - //const char* reduce_command = argv[3]; - - // fire up the client - Client* client = startCephClient(); - - // open the file as read_only - int fh = client->open(input_filename, O_RDONLY); - if (fh < 0) { - cout << "The input file " << input_filename << " could not be opened." << endl; - exit(-1); - } - - // How big is the file? - int filesize; - struct stat stbuf; - if (0 > client->lstat(input_filename, &stbuf)) { - cout << "Error: could not retrieve size of input file " << input_filename << endl; - exit(-1); - } - filesize = stbuf.st_size; - if (filesize < 1) { - cout << "Error: input file size is " << filesize << endl; - exit(-1); - } - - // retrieve all the object extents - list extents; - int offset = 0; - client->enumerate_layout(fh, extents, filesize, offset); - - // for each object extent, retrieve the OSD IP address and start up a Map task - list::iterator i; - map::iterator j; - int osd; - int start, length; - tcpaddr_t tcpaddr; - - for (i = extents.begin(); i != extents.end(); i++) - { - // find the primary and get its IP address - osd = client->osdmap->get_pg_primary(i->pgid); - entity_inst_t inst = client->osdmap->get_inst(osd); - entity_addr_t entity_addr = inst.addr; - entity_addr.make_addr(tcpaddr); - - // iterate through each buffer_extent in the ObjectExtent - for (j = i->buffer_extents.begin(); - j != i->buffer_extents.end(); j++) - { - // get the range of the buffer_extent - start = (*j).first; - length = (*j).second; - // fire up the Map task - start_map_task(map_command, input_filename, start, length, tcpaddr); - } - } - return 0; -} - -// Fires up the map task. -// For the moment, all it does is echo the command line, not run it. -int start_map_task(const char* command, const char* input_filename, - long start, long length, sockaddr_in ip_address) -{ - string ip_addr_string(inet_ntoa(ip_address.sin_addr)); - - - - - - cout << "ssh " << ip_addr_string << " " << command - << " " << input_filename << " " << start << " " << length << endl; - return 0; -} - - - -void usage(const char* name) { - cout << "usage: " << name << " inputfile map_task reduce_task" << endl; - cout << "inputfile must be a valid path in the running Ceph filesystem." << endl; - cout << "map_task should be given with an absolute path, and be present on "; - cout << "the REGULAR filesystem every node." << endl; - cout << "reduce_task need be present on this node only." << endl; -} - - - - diff --git a/src/active/activemaster.h b/src/active/activemaster.h deleted file mode 100644 index 524138e253c7..000000000000 --- a/src/active/activemaster.h +++ /dev/null @@ -1,18 +0,0 @@ -/* - * This is the master executable to start up - * a compute task across several nodes. - * - * - */ - - -//#include -#include "utility.h" - -int start_map_task(const char* command, const char* input_filename, - long start, long length, tcpaddr_t ip_address); - -void usage(const char* name); - -//Client* startCephClient(); -//void kill_client(Client* client); diff --git a/src/active/activeslave.cc b/src/active/activeslave.cc deleted file mode 100644 index a57263a9987c..000000000000 --- a/src/active/activeslave.cc +++ /dev/null @@ -1,456 +0,0 @@ -/* - * This is a slave for receiving and executing commands for - * compute tasks on an OSD. This supersedes the daemon - * version in activetaskd.h/cc, because it's easier to debug - * if it's not a daemon. - * - * Networking code is based off examples from Stevens' UNIX Network Programming. - */ - -#include "activeslave.h" - -int main(int argc, const char* argv[]) { - - /* Set up TCP server */ - int sockfd, newsockfd, childpid; - socklen_t clilen; - struct sockaddr_in cli_addr, serv_addr; - - // Open a TCP socket - if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - cerr << "slave: can't open TCP socket. Exiting." << endl; - exit(-1); - } - cerr << "slave: opened TCP socket." << endl; - - // set up the port - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); - serv_addr.sin_port = htons(SERV_TCP_PORT); - - if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - cerr << "slave: can't bind local address. Exiting." << endl; - exit(-1); - } - - if(listen(sockfd, SOMAXCONN) < 0) { - cerr << "slave: listening error. Exiting." << endl; - exit(-1); - } - - - /* The Big Loop */ - while (1) { - - // wait for a message and fork off a child process to handle it - clilen = sizeof(cli_addr); - newsockfd = accept(sockfd, - (struct sockaddr *) &cli_addr, - &clilen); - - if (newsockfd < 0) { - cerr << "slave: accept error. Exiting." << endl; - exit(-1); - } - - if ((childpid = fork()) < 0) { - cerr << "slave: fork error. Exiting." << endl; - exit(-1); - } - - else if (childpid == 0) { // child process - cerr << "Forked child process for incoming socket" << endl; - close(sockfd); - process_request(newsockfd); - cerr << "Finished processing request. Exiting child." << endl; - exit(0); - } - - close (newsockfd); // parent - - //sleep(30); /* wait 30 seconds */ - } - exit(EXIT_SUCCESS); -} - - -/* This will process requests from the master. - * The protocol in a nutshell: - * Master opens a socket to slave, and sends - * one message. - * Slave replies with one message. - * Socket is closed. - */ - -void process_request(int newsockfd) { - - // first, read the message type. - int msg_type = readmsgtype(newsockfd); - - // Second, call some function based on the message type to process - // the rest of the message. The function is responsible for the rest - // of the message; this includes checking the message footer. - - switch(msg_type) { - - case PING: // ping - process_ping(newsockfd); - break; - case STARTTASK: // start_task - process_start_task(newsockfd); - break; - case RETRIEVELOCALFILE: // get_local - process_get_local(newsockfd); - break; - case SHIPCODE: - assert(0); // obsolete - //process_shipcode(newsockfd); - break; - - case PINGREPLY: - case FINISHEDTASK: - case TASKFAILED: - case SENDLOCALFILE: - case LOCALFILENOTFOUND: - case CODESAVED: - case SHIPFAILED: - cerr << "activeslave: BUG received message " << CMD_LIST[msg_type] << - " from master; master should never send this message." << endl; - exit(-1); - break; - - - case -1: - cerr << "activeslave: message had an unidentifiable type. " << - "Closing socket and discarding rest of message." << endl; - default: - cerr << "activeslave: BUG! received unexpected return value of" << msg_type << - "from readmsgtype(). Closing socket and discarding rest of message." << endl; - - exit(-1); - } -} - - -// Just write a ping_reply to the socket. -void process_ping(int fd) { - - // make sure the footer is valid - if (!check_footer(fd)) { - cerr << "process_ping warning: ping message has invalid or missing footer." - << endl; - } - // Even if the footer's invalid, send the reply. - cerr << "Replying to ping..." << endl; - send_msg_header(fd, PINGREPLY); - send_msg_footer(fd); - cerr << "Ping processing completed." << endl; -} - - -// Process a start_task message. This reads the incoming message, -// retrieves the necessary library, and starts the corresponding task. - -// Parameter format: taskID(int) library(string) -// cephinputfile(string) offset(long) length(long) localoutputfile - -void process_start_task(int fd) { - - char libraryname[MAX_STRING_SIZE + 1]; - char cephinputfile[MAX_STRING_SIZE + 1]; - char localoutputfile[MAX_STRING_SIZE + 1]; - char options[MAX_STRING_SIZE + 1]; - - cout << "in process_start_task: "; - int taskID = read_positive_int(fd); - cout << "read taskID " << taskID; - - // There may be multiple instances running on the same OSD. Cheap - // and dirty hack: append the taskID to the filename to avoid contention. - - read_string(fd, libraryname); - string libraryfilename("lib"); - libraryfilename += libraryname; - libraryfilename += ".so"; - cout << ", library name " << libraryname << " -> " << libraryfilename; - - read_string(fd, cephinputfile); - cout << ", cephinputfile " << cephinputfile; - off_t offset = read_off_t(fd); - cout << ", offset " << offset; - off_t length = read_off_t(fd); - cout << ", length " << length; - - read_string(fd, localoutputfile); - cout << ", localoutputfile " << localoutputfile; - read_string(fd, options); - cout << ", options: " << options << endl; - - - // make sure the footer is valid - if (!check_footer(fd)) { - cerr << "process_start_task warning: message has invalid or missing footer. " - << "Discarding message." << endl; - exit(-1); - } - - - // copy the library over from Ceph - - ostringstream locallibraryfilename; - locallibraryfilename << "/tmp/lib" << libraryname << "_" << taskID << ".so"; - - //string locallibraryfilename("lib"); - //locallibraryfilename += libraryname; - //locallibraryfilename += "_"; - //locallibraryfilename += taskID; - //locallibraryfilename += ".so"; - cout << "Naming local library copy " << locallibraryfilename.str() << endl; - - Client* client = startCephClient(); - copyCephFileToLocalFile(client, libraryfilename.c_str(), locallibraryfilename.str().c_str()); - kill_client(client); - cout << "Local library copy acquired" << endl; - - // load the task from the shared library - void (*task)(const char*, const char*, int, - off_t, off_t, const char*) = 0; - void* dl_h = dlopen(locallibraryfilename.str().c_str(), RTLD_LAZY); - if (NULL == dl_h) { - cerr << "Dynamic linking error: " << dlerror() << endl; - exit(-1); - } - task = (void (*)(const char*, const char*, int, - off_t, off_t, const char*) - ) dlsym(dl_h, "run_task"); - if (NULL == dl_h) { - cerr << "Symbol lookup error: " << dlerror() << endl; - exit(-1); - } - - - // start a task; create an output filename that uses the task ID, - // 'cause we might end up with multiple pieces of a file on each - // OSD. - cerr << "starting task: " << endl; - task(cephinputfile, localoutputfile, taskID, offset, length, options); - cerr << "returned from task! Sending reply:" << endl; - - // send the reply - send_msg_header(fd, FINISHEDTASK); - write_positive_int(fd, taskID); - send_msg_footer(fd); - - // done - cout << "Done sending reply for taskID " << taskID << endl; - return; -} - - -// Starts a sloppy grep count of the hardwired search string over the -// given Ceph file extent. It's sloppy because it copies the given -// extent to a local file and runs "grep" on it, with no effort to take -// care of boundary issues. -void start_sloppy_grepcount (const char* ceph_filename, const char* local_filename, - long offset, long size) { - - Client* client = startCephClient(); - char* search_string = "the"; - // copy the file to a local file. - - copyExtentToLocalFile (client, ceph_filename, offset, size, local_filename); - // we want: grep -c search_string local_filename - // to get the number of occurrences of the string. - string command = ""; - command.append("grep -c "); - command.append(search_string); - command.append(local_filename); - - assert(0); -} - - -// SHIPCODE messages have been removed. - -void process_shipcode(int fd) { assert(0); } - - -// Processes a get_local message. The message -// specifies the filename of a local file to -// return to the sender. - -// Parameter format: requestID(int) localfilename(string) - -// INCOMPLETE: currently just reads the message. - - -void process_get_local(int fd) { - cout << "in process_get_local: "; - int taskID = read_positive_int(fd); - cout << "read taskID " << taskID; - - char localfilename[MAX_STRING_SIZE+1]; - read_string(fd, localfilename); - cout << ", localfilename " << localfilename << endl; - - - // make sure the footer is valid - if (!check_footer(fd)) { - cerr << "process_get_local warning: message has invalid or missing footer." - << endl; - } - - // not implemented - cerr << "Error: get_local command unimplemented." << endl; - assert(0); -} - - -// Retrieves a formatted message from the socket. -// At the moment, this just reads and prints a fixed- -// length message type. -// DEPRECATED. -void str_getmsg(int sockfd) { - - int n; - - // read message types until the connection dies - while(true) { - n = readmsgtype(sockfd); - if (n != 0) { - cerr << "from getmsg: some sort of error" << endl; - exit(-1); - } - } -} - -// Echo a stream socket message back to the sender. -// DEPRECATED. -void str_echo(int sockfd) { - - int n; - char line[MAXLINE]; - - while(true) { - - // read from the stream - cerr << "str_echo: waiting for a line" << endl; - n = readline(sockfd, line, MAXLINE); - cerr << "str_echo: read a line" << endl; - if (0 == n) { - cerr << "str_echo: connection terminated" << endl; - return; // connection is terminated - } - else if (n < 0) { - cerr << "str_echo: readline error" << endl; - exit(-1); - } - - // write back to the stream - if (n != writen(sockfd, line, n)) { - cerr << "str_echo: writen error" << endl; - exit(-1); - } - else - cerr << "Echoed line " << endl; - } -} - - -void str_ack(int sockfd) { - - int n; - char line[MAXLINE]; - //char *ack = "ack"; - - while(true) { - - // read from the stream - n = readline(sockfd, line, MAXLINE); - - if (0 == n) - return; // connection is terminated - else if (n < 0) - //err_dump("str_echo: readline error"); - exit(-1); - - // write back to the stream - if (4 != writen(sockfd, "ack\n", 4)) - //err_dump("str_echo: writen error"); - exit(-1); - } -} - - - -// Read command lines from the socket and execute them - -void str_run(int sockfd) { - - int n; - char line[MAXLINE]; - char* error_msg = "str_run: No command interpreter found\n"; - char* ack_msg = "Running command... "; - char* commit_msg = "Command executed!\n"; - - while(true) { - - // read from the stream - n = readline(sockfd, line, MAXLINE); - - if (0 == n) - return; // connection is terminated - else if (n < 0) - //err_dump("str_echo: readline error"); - exit(-1); - - if (system(NULL)) { - writen(sockfd, ack_msg, strlen(ack_msg)); - system(line); - writen(sockfd, commit_msg, strlen(commit_msg)); - } - else if ((int)strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) - //err_dump("str_echo: writen error"); - exit(-1); - } -} - - -// take a filename and copy it from Ceph to a local directory. -// Not completed. - -void str_copytolocal(int sockfd) { - - int n; - char line[MAXLINE]; - char* error_msg = "str_copy: No command interpreter found\n"; - char* ack_msg = "Running command... "; - char* commit_msg = "Command executed!\n"; - //char* temp_dir = "/tmp"; - - - while(true) { - - // read from the stream - n = readline(sockfd, line, MAXLINE); - - if (0 == n) - return; // connection is terminated - else if (n < 0) - //err_dump("str_echo: readline error"); - exit(-1); - - if (system(NULL)) { - writen(sockfd, ack_msg, strlen(ack_msg)); - system(line); - writen(sockfd, commit_msg, strlen(commit_msg)); - } - else if ((int)strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) - //err_dump("str_echo: writen error"); - exit(-1); - } -} - - - diff --git a/src/active/activeslave.h b/src/active/activeslave.h deleted file mode 100644 index 8fe268c20123..000000000000 --- a/src/active/activeslave.h +++ /dev/null @@ -1,25 +0,0 @@ -#include -#include -#include -#include "inet.h" -#include "common.h" -#include "utility.h" -#include "client/Client.h" - -// The port number is "osdd" on a telephone keypad. -#define SERV_TCP_PORT 6733 - -#define MAXLINE 512 - -void str_echo(int sockfd); -void str_ack(int sockfd); -void str_run(int sockfd); -void str_getmsg(int sockfd); -void process_request(int newsockfd); -void process_ping(int fd); -void process_start_task(int fd); -void process_get_local(int fd); -void process_shipcode(int fd); - -void start_trivial_task(const char* ceph_filename, const char* local_filename, - long offset, long length); diff --git a/src/active/activetaskd.cc b/src/active/activetaskd.cc deleted file mode 100644 index ec9f29054309..000000000000 --- a/src/active/activetaskd.cc +++ /dev/null @@ -1,241 +0,0 @@ -/* - * This is a daemon for receiving and executing commands for compute tasks on an OSD. - * - * The daemon uses skeleton code from - * http://www.linuxprofilm.com/articles/linux-daemon-howto.html. The - * site is no longer up, but can be seen through the archive.org. - * Networking code is based off examples from Stevens' UNIX Network Programming. - */ - -#include "activetaskd.h" - - -#define SERVER - -#undef SERVER - -int main(int argc, const char* argv[]) { - - /* Our process ID and Session ID */ - pid_t pid, sid; - - /* Fork off the parent process */ - pid = fork(); - if (pid < 0) { - exit(EXIT_FAILURE); - } - /* If we got a good PID, then - we can exit the parent process. */ - if (pid > 0) { - exit(EXIT_SUCCESS); - } - - /* Change the file mode mask */ - umask(0); - - /* Open any logs here */ - - /* Create a new SID for the child process */ - sid = setsid(); - if (sid < 0) { - /* Log the failure */ - exit(EXIT_FAILURE); - } - - - /* Change the current working directory */ - if ((chdir("/")) < 0) { - /* Log the failure */ - exit(EXIT_FAILURE); - } - - /* Close out the standard file descriptors */ - close(STDIN_FILENO); - close(STDOUT_FILENO); - close(STDERR_FILENO); - - /* Daemon-specific initialization goes here */ - - - - /* Set up TCP server */ - int sockfd, newsockfd, childpid; - socklen_t clilen; - struct sockaddr_in cli_addr, serv_addr; - - const char *pname = argv[0]; // process name - - // Open a TCP socket - if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) - exit(-1); - //err_dump("server: can't open stream socket"); - - // set up the port - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); - serv_addr.sin_port = htons(SERV_TCP_PORT); - - if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) - exit(-1); - //err_dump("server: can't bind local address"); - - if(listen(sockfd, SOMAXCONN) < 0) - exit(-1); - //err_dump("server: listening error"); - - /* The Big Loop */ - while (1) { - - // wait for a message and fork off a child process to handle it - clilen = sizeof(cli_addr); - newsockfd = accept(sockfd, - (struct sockaddr *) &cli_addr, - &clilen); - - if (newsockfd < 0) - exit(-1); - //err_dump("server: accept error"); - - if ( (childpid = fork()) < 0) - exit(-1); - // err_dump("server: fork error"); - - else if (childpid == 0) { // child process - close(sockfd); - //str_echo(newsockfd); - str_run(newsockfd); - // insert code to process the request - exit(0); - } - - close (newsockfd); // parent - - //sleep(30); /* wait 30 seconds */ - } - exit(EXIT_SUCCESS); -} - - -// Echo a stream socket message back to the sender. - -void str_echo(int sockfd) { - - int n; - char line[MAXLINE]; - - while(true) { - - // read from the stream - n = readline(sockfd, line, MAXLINE); - - if (0 == n) - return; // connection is terminated - else if (n < 0) - //err_dump("str_echo: readline error"); - exit(-1); - - // write back to the stream - if (n != writen(sockfd, line, n)) - //err_dump("str_echo: writen error"); - exit(-1); - } -} - - -void str_ack(int sockfd) { - - int n; - char line[MAXLINE]; - char *ack = "ack"; - - while(true) { - - // read from the stream - n = readline(sockfd, line, MAXLINE); - - if (0 == n) - return; // connection is terminated - else if (n < 0) - //err_dump("str_echo: readline error"); - exit(-1); - - // write back to the stream - if (4 != writen(sockfd, "ack\n", 4)) - //err_dump("str_echo: writen error"); - exit(-1); - } -} - - - - -// Read command lines from the socket and execute them - -void str_run(int sockfd) { - - int n; - char line[MAXLINE]; - char* error_msg = "str_run: No command interpreter found\n"; - char* ack_msg = "Running command... "; - char* commit_msg = "Command executed!\n"; - - while(true) { - - // read from the stream - n = readline(sockfd, line, MAXLINE); - - if (0 == n) - return; // connection is terminated - else if (n < 0) - //err_dump("str_echo: readline error"); - exit(-1); - - if (system(NULL)) { - writen(sockfd, ack_msg, strlen(ack_msg)); - system(line); - writen(sockfd, commit_msg, strlen(commit_msg)); - } - else if (strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) - //err_dump("str_echo: writen error"); - exit(-1); - } -} - - -// take a filename and copy it from Ceph to a local directory - -void str_copytolocal(int sockfd) { - - int n; - char line[MAXLINE]; - char* error_msg = "str_copy: No command interpreter found\n"; - char* ack_msg = "Running command... "; - char* commit_msg = "Command executed!\n"; - char* temp_dir = "/tmp"; - - - while(true) { - - // read from the stream - n = readline(sockfd, line, MAXLINE); - - if (0 == n) - return; // connection is terminated - else if (n < 0) - //err_dump("str_echo: readline error"); - exit(-1); - - if (system(NULL)) { - writen(sockfd, ack_msg, strlen(ack_msg)); - system(line); - writen(sockfd, commit_msg, strlen(commit_msg)); - } - else if (strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg))) - //err_dump("str_echo: writen error"); - exit(-1); - } -} - - - diff --git a/src/active/activetaskd.h b/src/active/activetaskd.h deleted file mode 100644 index fc5cec923c4b..000000000000 --- a/src/active/activetaskd.h +++ /dev/null @@ -1,14 +0,0 @@ -#include "inet.h" -#include "common.h" -#include "utility.h" -#include "client/Client.h" - - -// The port number is "osdd" on a telephone keypad. -#define SERV_TCP_PORT 6733 - -#define MAXLINE 512 - -void str_echo(int sockfd); -void str_ack(int sockfd); -void str_run(int sockfd); diff --git a/src/active/client_init.cc b/src/active/client_init.cc deleted file mode 100644 index 8b137891791f..000000000000 --- a/src/active/client_init.cc +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/active/client_init.h b/src/active/client_init.h deleted file mode 100644 index 139597f9cb07..000000000000 --- a/src/active/client_init.h +++ /dev/null @@ -1,2 +0,0 @@ - - diff --git a/src/active/common.h b/src/active/common.h deleted file mode 100644 index 9b2742f8895e..000000000000 --- a/src/active/common.h +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef CEPH_COMMON_H -#define CEPH_COMMON_H - - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - - - - -#define CMDLENGTH 10 -#define CMDCOUNT 11 - -#define MAX_STRING_SIZE 255 - -/* - * These are the various messages that can be sent between the master - * and slave. The slave sends one reply to each message from the master. - - * PING/PINGREPLY: just what it sounds like. - - * STARTTASK: starts a task from the given library. The slave attempts - * to perform the task, and replies with FINISHEDTASK or TASKFAILED. - * - * RETRIEVELOCALFILE: requests a file that the slave has stored - * locally. Slave replies with SENDLOCALFILE and the file, or with - * LOCALFILENOTFOUND. - * - * (deprecated) SHIPCODE: sends a shared library to the slave, containing a - * function that is to be executed later by the STARTTASK - * command. Slave replies with CODESAVED or SHIPFAILED. - * - * SHIPCODE is replaced by transport using Ceph. - */ - - -const off_t CHUNK = 1024 * 1024 * 4; - -// a bunch of string constants for commands - -#define PING 0 -#define STARTTASK 1 -#define RETRIEVELOCALFILE 2 -#define PINGREPLY 3 -#define FINISHEDTASK 4 -#define TASKFAILED 5 -#define SENDLOCALFILE 6 -#define LOCALFILENOTFOUND 7 -#define SHIPCODE 8 -#define CODESAVED 9 -#define SHIPFAILED 10 - -#define FOOTER_LENGTH 7 - -const char* CMD_LIST[CMDCOUNT] = {"______PING", - "START_TASK", - "_GET_LOCAL", - "PING_REPLY", - "_TASK_DONE", - "TASKFAILED", - "SEND_LOCAL", - "LOCAL_GONE", - "_SHIP_CODE", - "CODE_SAVED", - "SHIPFAILED"}; - -const char FOOTER[FOOTER_LENGTH + 1] = "MSG_END"; - -// const char* strArray[] = {"string1", "string2", "string3"}; -//const char commands[2][4] = {"foo", "bar"}; - - -// error codes -#define ARGUMENTSINVALID 1001 -#define CEPHCLIENTSTARTUPFAILED 1002 -#define INPUTFILEREADFAILED 1003 - -#endif //COMMON_H diff --git a/src/active/echotestclient.cc b/src/active/echotestclient.cc deleted file mode 100644 index 2b2d15e7ca5c..000000000000 --- a/src/active/echotestclient.cc +++ /dev/null @@ -1,74 +0,0 @@ -/* - * This is merely a test of an echo server; it's an early step in - * building up the Ceph distributed compute service. This is - * discardable once the next stage is up and running. - * - * Code is based off examples in Stevens' "Unix Network Programming". - */ - -#include "echotestclient.h" - -int main(int argc, char* argv[]) { - - int sockfd; - struct sockaddr_in serv_addr; - - char* pname = argv[0]; - - bzero((char *) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = inet_addr(SERV_HOST_ADDR); - serv_addr.sin_port = htons(SERV_TCP_PORT); - - - // open a TCP socket - if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - printf("client: can't open stream socket"); - exit (-1); - } - - // connect to the server. - if (connect(sockfd, (struct sockaddr *) &serv_addr, - sizeof(serv_addr)) < 0) { - printf("client: can't connect to server"); - exit (-1); - } - - // start the test echoer - str_cli(stdin, sockfd); - - - close (sockfd); - exit(0); -} - - -void str_cli(FILE *fp, int sockfd) { - - int n; - char sendline[MAXLINE], recvline[MAXLINE + 1]; - - // read from the fp and write to the socket; - // then read from the socket and write to stdout - while (fgets(sendline, MAXLINE, fp) != NULL) { - - n = strlen(sendline); - if (writen(sockfd, sendline, n) != n) { - printf("str_cli: writen error on socket"); - exit(-1); - } - n = readline(sockfd, recvline, MAXLINE); - if (n < 0) { - printf("str_cli: readline error"); - exit(-1); - } - recvline[n] = 0; - fputs(recvline, stdout); - } - - if (ferror(fp)) { - printf("str_cli: error reading file"); - exit(-1); - } - -} diff --git a/src/active/echotestclient.h b/src/active/echotestclient.h deleted file mode 100644 index 9b26416640bc..000000000000 --- a/src/active/echotestclient.h +++ /dev/null @@ -1,10 +0,0 @@ -#include "inet.h" -#include "common.h" -#include "socket_utility.h" - -#define SERV_HOST_ADDR "128.114.57.143" //issdm-8 -#define SERV_TCP_PORT 6733 -#define MAXLINE 512 - -void str_cli(FILE *fp, int sockfd); - diff --git a/src/active/inet.h b/src/active/inet.h deleted file mode 100644 index 385fa915f9dc..000000000000 --- a/src/active/inet.h +++ /dev/null @@ -1,9 +0,0 @@ -/* - * Generic TCP/IP definitions - */ - -#include -#include -#include -#include -#include diff --git a/src/active/msgtestclient.cc b/src/active/msgtestclient.cc deleted file mode 100644 index 0d8a41884281..000000000000 --- a/src/active/msgtestclient.cc +++ /dev/null @@ -1,414 +0,0 @@ -/* - * This client invokes a distributed task across OSDs. - * - * Networking code is based off examples in Stevens' "Unix Network Programming". - */ -#include "msgtestclient.h" -#define REQUIRED_ARGS 5 - -int main(int argc, char* argv[]) { - - // make sure we have all the arguments we need - if (argc < REQUIRED_ARGS) { usage(argv[0]); exit(-1); } - - // This file is rewired for running tests from a - // shell script. The first parameter specifies the - // name of the Ceph file that the test will be - // run on; the second parameter specifies which of - // four different tests will be run. - const char* library_name = argv[1]; - const char* input_filename = argv[2]; - const char* output_filename = argv[3]; - int test_number = atoi(argv[4]); - assert (test_number > 0); - assert (test_number < 4); - const char* job_options = argv[5]; - - string library_filename("lib"); - library_filename += library_name; - library_filename += ".so"; - - // start up a Ceph client - Client* client = startCephClient(); - cerr << "loaded test client" << endl; - - // generate the file extent tuples for each OSD - vector original_splits; - off_t filesize = generate_splits(client, input_filename, original_splits); - - // copy the library to Ceph - copyLocalFileToCeph(client, library_filename.c_str(), library_filename.c_str()); - - // close the client - we're done with it - kill_client(client); - cerr << "Closed Ceph client instance" << endl; - - // sanity check: display the splits - cerr << "Listing original splits:" << endl; - for (vector::iterator i = original_splits.begin(); - i != original_splits.end(); i++) { - cerr << "Split: IP " << i->ip_address << ", start " - << i->start << ", length " << i->length << endl; - } - - vector test_splits; - // Now, modify the splits as needed for the test type. - // There are three types of tests. - // Test 1: regular test. - // Test 2: put all the tasks on the "wrong" OSD. - // Test 3: do the entire job off one node. - - if (1 == test_number) { - cerr << "Test type 1: using original splits." << endl; - test_splits = original_splits; - } - else if (2 == test_number) { - cerr << "Test type 2: rotating split IP addresses. " << endl; - int split_count = original_splits.size(); - for (int i = 0; i < split_count; ++i) { - request_split s; - s.start = original_splits.at(i).start; - s.length = original_splits.at(i).length; - s.ip_address = original_splits.at((i+1)%split_count).ip_address; - test_splits.push_back(s); - } - } - else if (3 == test_number) { - cerr << "Test type 3: one giant split." << endl; - request_split s; - s.start = 0; - s.length = filesize; - s.ip_address = original_splits.at(0).ip_address; - test_splits.push_back(s); - } - else { - cerr << "Error: received invalid test type " << test_number << endl; - exit(-1); - } - - cerr << "Listing test splits:" << endl; - for (vector::iterator i = test_splits.begin(); - i != test_splits.end(); i++) { - cerr << "Split: IP " << i->ip_address << ", start " - << i->start << ", length " << i->length << endl; - } - - // start the timer - utime_t start_time = g_clock.now(); - int pending_tasks = 0; - - int taskID = 0; - // start up the tasks - for (vector::iterator i = test_splits.begin(); - i != test_splits.end(); i++) { - start_map_task(i->ip_address, ++taskID, library_name, input_filename, - i->start, i->length, output_filename, job_options); - ++pending_tasks; - } - - cerr << "Waiting for " << pending_tasks << " tasks to return..." << endl; - - // wait for all the tasks to finish - while (pending_tasks > 0) { - int exit_status; - cerr << "Waiting for " << pending_tasks << " tasks to return..." << endl; - pid_t pid = wait(&exit_status); - if (pid < 0) { - cerr << "ERROR on wait(): result was " << pid << endl; - exit(-1); - } - --pending_tasks; - if (WIFEXITED(exit_status)) { - cerr << "Task with pid " << pid << " returned with exit status " << - WEXITSTATUS(exit_status) << endl; - } - else { cerr << "WARNING: Task with pid " << pid << " exited abnormally" << endl; } - } - - cerr << "All tasks have returned." << endl; - // report the time - double elapsed_time; - elapsed_time = (g_clock.now() - start_time); - cerr << "Elapsed time: " << elapsed_time << endl; - cerr << elapsed_time << " " << endl; - // send the time to stdout for the shell script - cout << elapsed_time << " "; - exit(0); -} - - -// sends a complete ping message -// through the file descriptor -// and waits for a reply. This -// will hang if there's no reply. - -void ping_test(int fd) { - - // send the message header and footer. - // A ping message has no body. - send_msg_header(fd, PING); - send_msg_footer(fd); - - // receive the reply. - int msg_type = readmsgtype(fd); - if (msg_type < 0) { - cerr << "ping_test: Failed reading the ping reply. Exiting." << endl; - exit(-1); - } - if (PINGREPLY != msg_type) { - assert((msg_type <= 0) && (msg_type < CMDCOUNT) && - "readmsgtype return value out of range"); - cerr << "ping_test: slave sent invalid reply: replied to ping with message type" << - msg_type << ": " << CMD_LIST[msg_type] << ". Exiting. " << endl; - exit(-1); - } - else { - cerr << "Received valid ping reply!" << endl; - } - - if(!check_footer(fd)) { - cerr << "ping_test: message footer not found. Exiting." << endl; - exit(-1); - } -} - -// sends a message to the fd telling it to start a task. -// Remember: the message format requires any string to be -// prefixed by its (unterminated) length. -void send_start_task_msg(int fd, - int taskID, - int library_name_size, const char* library_name, - int inputfilenamesize, const char* inputfilename, - off_t offset, - off_t length, - int outputfilenamesize, const char* outputfilename, - int options_size, const char* options) { - - // write the header and the message to the file descriptor. - - send_msg_header(fd, STARTTASK); - - write_positive_int(fd, taskID); - write_positive_int(fd, library_name_size); - write_string(fd, library_name); - write_positive_int(fd, inputfilenamesize); - write_string(fd, inputfilename); - //write_long(fd, offset); - write_off_t (fd, offset); - write_off_t (fd, length); - write_positive_int(fd, outputfilenamesize); - write_string(fd, outputfilename); - write_positive_int(fd, options_size); - write_string(fd, options); - - // terminate the message - send_msg_footer(fd); -} - - - - -// creates a new connection to the slave -// at the given IP address and port. -// Overloaded to take an IP address as a -// string or as an in_addr_t. - -int create_new_connection(const char* ip_address, uint16_t port) -{ - in_addr_t ip = inet_addr(ip_address); - if ((in_addr_t)-1 == ip) { - cerr << "Error creating new connection: \"" << ip_address << - "\" is not a valid IP address." << endl; - return -1; - } - else - //cerr << "Opening connection to " << ip_address << ":" << endl; - return create_new_connection(ip, port); -} - - -int create_new_connection(in_addr_t ip_address, uint16_t port) { - - struct sockaddr_in serv_addr; - int sockfd; - - bzero((char *) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - //serv_addr.sin_addr.s_addr = inet_addr(SERV_HOST_ADDR); - serv_addr.sin_addr.s_addr = ip_address; - serv_addr.sin_port = htons(SERV_TCP_PORT); - - // open a TCP socket - if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - cerr << "msgtestclient: can't open stream socket. Exiting." << endl; - exit (-1); - } - - // connect to the server. - if (connect(sockfd, (struct sockaddr *) &serv_addr, - sizeof(serv_addr)) < 0) { - cerr << "msgtestclient: can't connect to server." << endl; - exit (-1); - } - //cerr << "opened connection!" << endl; - return sockfd; -} - -void msg_type_sender(int sockfd) { - - for (int i = 0; i < CMDCOUNT; ++i) { - send_msg_header(sockfd, i); - } - -} - -// Fires up a task on a single OSD. -int start_map_task(sockaddr_in ip_address, int taskID, - const char* library_name, const char* input_filename, - off_t start, off_t length, - const char* output_filename, - const char* job_options) -{ - int childpid; - // fork off a child process to do the work, and return - if ((childpid = fork()) < 0) { - cerr << "start_map_task: fork error. Exiting." << endl; - exit(-1); - } - - else if (childpid != 0) { // parent - cerr << "start_map_task: forked child process " - << childpid << " to start task. " << endl; - return 0; - } - - string ip_addr_string(inet_ntoa(ip_address.sin_addr)); - // cerr << "command: " << ip_addr_string << " taskID " - // << taskID << ": " << command - // << " " << input_filename << " " << start << " " << length - // << " " << output_filename << endl; - - // open a socket to the slave, and send the message - //cerr << "Sending message: " << endl; - int sockfd = create_new_connection(ip_addr_string.c_str(), SERV_TCP_PORT); - send_start_task_msg(sockfd, taskID, strlen(library_name), library_name, - strlen(input_filename), input_filename, - start, length, - strlen(output_filename), output_filename, - strlen(job_options), job_options); - - // wait for a reply - cerr << "Sent message for taskID " << taskID << ". Waiting for reply..." << endl; - - // receive the reply. - int msg_type = readmsgtype(sockfd); - if (msg_type < 0) { - cerr << "start_map_task: Failed reading the reply. Exiting." << endl; - exit(-1); - } - if (FINISHEDTASK != msg_type) { - assert((msg_type <= 0) && (msg_type < CMDCOUNT)); - cerr << "start_map_task: slave sent invalid reply: replied with message type" << - msg_type << ": " << CMD_LIST[msg_type] << ". Exiting. " << endl; - exit(-1); - } - // read the taskID of the reply - - int reply_taskID = read_positive_int(sockfd); - - if(!check_footer(sockfd)) { - cerr << "ping_test: message footer not found. Exiting." << endl; - exit(-1); - } - - // done! - close(sockfd); - cerr << "Task " << taskID << "/" << reply_taskID << - " complete! Ending child process." << endl; - //exit(0); - _exit(0); - cerr << "exit(0) returned. Strange things are afoot." << endl; -} - - -// Creates a set of (ip address, start position, length) tuples giving -// the location of each piece of the given Ceph file. Returns the size -// of the file. - - -off_t generate_splits(Client* client, const char* input_filename, - vector& original_splits) { - - // Open the file and get its size - int fh = client->open(input_filename, O_RDONLY); - if (fh < 0) { - cerr << "The input file " << input_filename << " could not be opened." << endl; - exit(-1); - } - cerr << "Opened file " << input_filename << endl; - off_t filesize; - struct stat stbuf; - if (0 > client->lstat(input_filename, &stbuf)) { - cerr << "Error: could not retrieve size of input file " << input_filename << endl; - exit(-1); - } - filesize = stbuf.st_size; - if (filesize < 1) { - cerr << "Error: input file size is " << filesize << endl; - exit(-1); - } - - // grab all the object extents - list extents; - off_t offset = 0; - client->enumerate_layout(fh, extents, filesize, offset); - client->close(fh); - cerr << "Retrieved all object extents" << endl; - - - // generate the tuples - list::iterator i; - map::iterator j; - int osd; - - for (i = extents.begin(); i != extents.end(); i++) { - - request_split split; - // find the primary and get its IP address - osd = client->osdmap->get_pg_primary(i->layout.pgid); - entity_inst_t inst = client->osdmap->get_inst(osd); - entity_addr_t entity_addr = inst.addr; - entity_addr.make_addr(split.ip_address); - - // iterate through each buffer_extent in the ObjectExtent - for (j = i->buffer_extents.begin(); - j != i->buffer_extents.end(); j++) { - - // get the range of the buffer_extent - split.start = (*j).first; - split.length = (*j).second; - // throw the split onto the vector - original_splits.push_back(split); - } - } - return filesize; -} - - -void usage(const char* name) { - - cout << "usage: " << name << " libraryname inputfile outputfile test_number" << endl; - cout << "libraryname is the name of a proper shared task library in the _local_ filesystem. " - << "e.g. entering \"foo\" requires the presence of libfoo.so in the " - << "working directory." << endl; - cout << "inputfile must be a valid path in the running Ceph filesystem." << endl; - cout << "outputfile is a Ceph filename prefix for writing results. e.g. \"bar\" will give " - << "output files bar.1, bar.2, &c." << endl; - cout << "test_number must be 1, 2, or 3." << endl; - cout << " 1: run the test task normally (one slave per OSD file extent)" << endl; - cout << " 2: run the test task on the \"wrong\" OSDs" << endl; - cout << " 3: run the entire task in a single process" << endl; -} - - - diff --git a/src/active/msgtestclient.h b/src/active/msgtestclient.h deleted file mode 100644 index 85ac7fbaf8ec..000000000000 --- a/src/active/msgtestclient.h +++ /dev/null @@ -1,53 +0,0 @@ -#include "inet.h" -#include "common.h" -#include "utility.h" -#include "client/Client.h" - -// wait.h MUST NOT be #included before client/Client.h -#include -#include -#include - -struct request_split { - tcpaddr_t ip_address; - off_t start; - off_t length; -}; - - -//#define SERV_HOST_ADDR "128.114.57.143" //issdm-8 -#define SERV_HOST_ADDR "128.114.57.166" //issdm-31 - -#define SERV_TCP_PORT 6733 -#define MAXLINE 512 - -void msg_type_sender(int sockfd); - - - -int create_new_connection(const char* ip_address, uint16_t port); -int create_new_connection(in_addr_t ip_address, uint16_t port); -void usage(const char* name); -void ping_test(int fd); -void start_task_test(int fd); - - -off_t generate_splits(Client* client, const char* input_filename, - vector& original_splits); - - -int start_map_task(sockaddr_in ip_address, int taskID, - const char* map_command, - const char* input_filename, - off_t start, off_t length, - const char* output_filename, - const char* job_options); - -void send_start_task_msg(int fd, - int taskID, - int command_size, const char* command, - int inputfilenamesize, const char* inputfilename, - off_t offset, - off_t length, - int outputfilenamesize, const char* outputfilename, - int options_size, const char* options); diff --git a/src/active/trivial_task.cc b/src/active/trivial_task.cc deleted file mode 100644 index 7a72ecb277c4..000000000000 --- a/src/active/trivial_task.cc +++ /dev/null @@ -1,50 +0,0 @@ -#include "trivial_task.h" - -void start_trivial_task (const char* ceph_filename, const char* local_filename, - off_t offset, off_t length) { - // Don't bother to copy the file to disk. Read the file directly from Ceph, - // and add up all the bytes. - // Write the total to the local file as a string. - Client * client = startCephClient(); - - bufferptr bp(CHUNK); - - // get the source file's size. Sanity-check the request range. - struct stat st; - int r = client->lstat(ceph_filename, &st); - assert (r == 0); - - off_t src_total_size = st.st_size; - if (src_total_size < offset + length) { - cerr << "Error in copy ExtentToLocalFile: offset + length = " << offset << " + " << length - << " = " + (offset + length) << ", source file size is only " << src_total_size << endl; - exit(-1); - } - off_t remaining = length; - - // open the file and seek to the start position - cerr << "start_trivial_task: opening the source file and seeking " << endl; - - int fh_ceph = client->open(ceph_filename, O_RDONLY); - assert (fh_ceph > -1); - r = client->lseek(fh_ceph, offset, SEEK_SET); - assert (r == offset); - - int counter = 0; - // read through the extent and add up the bytes - cerr << "start_trivial_task: counting up bytes" << endl; - char* bp_c = bp.c_str(); - while (remaining > 0) { - off_t got = client->read(fh_ceph, bp_c, MIN(remaining,CHUNK), -1); - assert(got > 0); - remaining -= got; - for (off_t i = 0; i < got; ++i) { - counter += (unsigned int)(bp_c[i]); - } - } - cerr << "start_trivial_task: Done! Answer is " << counter << endl; - client->close(fh_ceph); - - //assert(0); -} - diff --git a/src/active/trivial_task.h b/src/active/trivial_task.h deleted file mode 100644 index ce9b47c82ceb..000000000000 --- a/src/active/trivial_task.h +++ /dev/null @@ -1,12 +0,0 @@ -// Shared library for the trivial task of adding up all the bytes in a file - -//#include "inet.h" -#include "common.h" -#include "utility.h" -#include "client/Client.h" - - -extern "C" void start_trivial_task (const char* ceph_filename, - const char* local_filename, - off_t offset, off_t length); - diff --git a/src/active/utility.h b/src/active/utility.h deleted file mode 100644 index d303655a5a0e..000000000000 --- a/src/active/utility.h +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Miscellaneous Active OSD helper functions. - * - */ - -//#include -#include "client/Client.h" -#include "common.h" -#include "config.h" -#include "common/Timer.h" -#include "msg/SimpleMessenger.h" -#include "socket_utility.h" - -Client* startCephClient(); -void kill_client(Client* client); - -int send_msg_header(int fd, int header_ID); -int readmsgtype(int fd); -bool check_footer(int fd); -int send_msg_header(int fd, int header_ID); -int send_msg_footer(int fd); - -/* - * Fires up a Ceph client and returns a pointer to it. - */ - -Client* startCephClient() -{ - cout << "ActiveMaster: Initializing Ceph client:" << endl; - - // parse args from CEPH_ARGS, not command line - vector args; - env_to_vec(args); - parse_config_options(args); - - if (g_conf.clock_tare) g_clock.tare(); - - // be safe - g_conf.use_abspaths = true; - - // load monmap - MonMap* monmap = new MonMap(); - int r = monmap->read(".ceph_monmap"); - if (r < 0) { - cout << "ActiveMaster: could not find .ceph_monmap" << endl; - return 0; - } - assert(r >= 0); - - // start up network - rank.start_rank(); - - // start client - Client *client; - client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), monmap); - client->init(); - - // mount - client->mount(); - - return client; -} - -void kill_client (Client * client) -{ - client->unmount(); - client->shutdown(); - delete client; - - // wait for messenger to finish - rank.wait(); -} - - - -// reads a message type from the socket, and prints it. - -int readmsgtype(int fd) { - int rc; - char typebuf[CMDLENGTH + 1]; - - rc = read(fd, &typebuf, CMDLENGTH); - - // read a fixed-length text command - if (rc != CMDLENGTH) { - cerr << "in readmsgtype: read error: result is " << rc << endl; - return -1; - } - - // null-terminate the string - typebuf[CMDLENGTH] = 0; - - // print the command - //cerr << "readmsgtype: text type is " << typebuf << ", " ; - - // figure out which one it is, by number - for (int i = 0; i < CMDCOUNT; ++i) { - if (!strcmp(typebuf, CMD_LIST[i])) { - //cerr << "which is identified as type " << i << endl; - return i; - } - } - - // if we get here the type was invalid - cerr << "readmsgtype: unrecognized message type " << typebuf << endl; - return -1; -} - -// Attempt to read the message footer off -// the given stream. -bool check_footer(int fd) { - - // leave space for null termination - char footer_buf[FOOTER_LENGTH+1]; - - // read the footer - int rc = read(fd, &footer_buf, FOOTER_LENGTH); - if (rc != FOOTER_LENGTH) { - cerr << "in check_footer: read error: result is " << rc << endl; - return false; - } - - // null-terminate the string - footer_buf[FOOTER_LENGTH] = 0; - - // Is the footer correct? - if (0 == strcmp(footer_buf, FOOTER)) - return true; - else - return false; -} - - -// send a fixed-length message header -// given the header's ID. -int send_msg_header(int fd, int header_ID) { - if ((header_ID < 0) || (header_ID >= CMDCOUNT)) { - cerr << "In send_msg_header: received out-of-range header ID " << header_ID << - ". Exiting process." << endl; - exit(-1); - } - - //cerr << "attempting to send message " << CMD_LIST[header_ID] << - // " with ID " << header_ID << endl; - - if (CMDLENGTH != writen(fd, CMD_LIST[header_ID], CMDLENGTH)) { - cerr << "In send_msg_header: error writing header ID " << header_ID << - "to file descriptor " << fd << ". Exiting process." << endl; - exit(-1); - } - - return 0; -} - -// send the fixed-length message footer. -int send_msg_footer(int fd) { - //cerr << "attempting to send message footer: " << endl; - if (FOOTER_LENGTH != writen(fd, FOOTER, FOOTER_LENGTH)) { - cerr << "in send_msg_footer: error writing footer to file descriptor " << - fd << ". Exiting process." << endl; - exit(-1); - } else { - //cerr << "Sent message footer!" << endl; - } - return 0; -} - - - -// Copy a given extent of a Ceph file to the local disk. -// Requires a running Ceph client. -void copyExtentToLocalFile (Client* client, const char* ceph_source, - long offset, long length, - const char* local_destination) { - - // get the source file's size. Sanity-check the request range. - struct stat st; - int r = client->lstat(ceph_source, &st); - assert (r == 0); - - off_t src_total_size = st.st_size; - if (src_total_size < offset + length) { - cerr << "Error in copy ExtentToLocalFile: offset + size = " << offset << " + " << length - << " = " + (offset + length) << ", source file size is only " << src_total_size << endl; - exit(-1); - } - off_t remaining = length; - - // open the source and destination files. Advance the source - // file to the desired offset. - int fh_ceph = client->open(ceph_source, O_RDONLY); - assert (fh_ceph > -1); - r = client->lseek(fh_ceph, offset, SEEK_SET); - assert (r == offset); - - int fh_local = ::open(local_destination, O_WRONLY|O_CREAT|O_TRUNC, 0644); - assert (fh_local > -1); - - // copy the file 4 MB at a time - const int chunk = 4*1024*1024; - bufferptr bp(chunk); - - while (remaining > 0) { - off_t got = client->read(fh_ceph, bp.c_str(), MIN(remaining,chunk), -1); - assert(got > 0); - remaining -= got; - off_t wrote = ::write(fh_local, bp.c_str(), got); - assert (got == wrote); - } - // close the files - client->close(fh_ceph); - ::close(fh_local); -} - - -// Copy a Ceph file to the local disk. Requires a running Ceph client. -// Overwrites the destination if it exists. -void copyCephFileToLocalFile (Client* client, const char* ceph_source, - const char* local_destination) { - - // get the source file's size. - struct stat st; - int r = client->lstat(ceph_source, &st); - assert (r == 0); - - copyExtentToLocalFile(client, ceph_source, 0, st.st_size, - local_destination); - - -} -// Copies a file from the local disk to Ceph. Annihilates the -// destination if it exists. - -void copyLocalFileToCeph(Client* client, const char* local_source, - const char* ceph_destination) { - - // Get the source file's size. - struct stat st; - int r = ::lstat(local_source, &st); - if (0 != r) { - cerr << "in copyLocalFileToCeph: error retrieving size for file " << local_source - << ": is the file missing?" << endl; - assert(0); - } - - off_t remaining = st.st_size; - - // Open the source and destination files. - int fh_source = ::open(local_source, O_RDONLY); - assert (fh_source > -1); - int fh_dest = client->open(ceph_destination, O_WRONLY|O_CREAT|O_TRUNC, 0644); - assert (fh_dest > -1); - - // Copy the file. - const int chunk = 4 * 1024* 1024; // 4 MB - bufferptr bp(chunk); - - while(remaining > 0) { - off_t got = ::read(fh_source, bp.c_str(), MIN(remaining, chunk)); - assert(got > 0); - remaining -= got; - off_t wrote = client->write(fh_dest, bp.c_str(), got); - assert (got == wrote); - } - - // close the files - ::close(fh_source); - client->close(fh_dest); - -}