+++ /dev/null
-
-
-activeslave.h/cc: OSD slave process. Must be running on each OSD.
-
-msgtestclient.h/cc: OSD master process. Run ./msgtestclient with no parameters for help.
-
-trivial_task_ipc.h/cc: A trivial task to be built as a library. Uses
-client interface from ../ceph_ipc; server from ../ceph_ipc must be
-running on each OSD.
-
-common.h, inet.h, utility.h, socket_utility.h: Helper functions.
-
-activemaster, activetaskd, trivial_task_test, trivial_task: obsolete.
+++ /dev/null
-/*
- * Startup executable for
- * Ceph Active Storage. See README for details.
- *
- */
-#include "activemaster.h"
-
-
-/*
- * What main() must do:
- *
- * - start up a Ceph client
- * - find the set of OSDs that the file is striped across
- * - start up the Map task on each OSD, using ssh
- * - eat lunch?
- * - start up the Reduce task locally
- */
-
-int main(int argc, const char* argv[]) {
-
- if (argc < 4) {
- usage(argv[0]);
- exit(-1);
- }
-
- const char* input_filename = argv[1];
- const char* map_command = argv[2];
- //const char* reduce_command = argv[3];
-
- // fire up the client
- Client* client = startCephClient();
-
- // open the file as read_only
- int fh = client->open(input_filename, O_RDONLY);
- if (fh < 0) {
- cout << "The input file " << input_filename << " could not be opened." << endl;
- exit(-1);
- }
-
- // How big is the file?
- int filesize;
- struct stat stbuf;
- if (0 > client->lstat(input_filename, &stbuf)) {
- cout << "Error: could not retrieve size of input file " << input_filename << endl;
- exit(-1);
- }
- filesize = stbuf.st_size;
- if (filesize < 1) {
- cout << "Error: input file size is " << filesize << endl;
- exit(-1);
- }
-
- // retrieve all the object extents
- list<ObjectExtent> extents;
- int offset = 0;
- client->enumerate_layout(fh, extents, filesize, offset);
-
- // for each object extent, retrieve the OSD IP address and start up a Map task
- list<ObjectExtent>::iterator i;
- map<size_t, size_t>::iterator j;
- int osd;
- int start, length;
- tcpaddr_t tcpaddr;
-
- for (i = extents.begin(); i != extents.end(); i++)
- {
- // find the primary and get its IP address
- osd = client->osdmap->get_pg_primary(i->pgid);
- entity_inst_t inst = client->osdmap->get_inst(osd);
- entity_addr_t entity_addr = inst.addr;
- entity_addr.make_addr(tcpaddr);
-
- // iterate through each buffer_extent in the ObjectExtent
- for (j = i->buffer_extents.begin();
- j != i->buffer_extents.end(); j++)
- {
- // get the range of the buffer_extent
- start = (*j).first;
- length = (*j).second;
- // fire up the Map task
- start_map_task(map_command, input_filename, start, length, tcpaddr);
- }
- }
- return 0;
-}
-
-// Fires up the map task.
-// For the moment, all it does is echo the command line, not run it.
-int start_map_task(const char* command, const char* input_filename,
- long start, long length, sockaddr_in ip_address)
-{
- string ip_addr_string(inet_ntoa(ip_address.sin_addr));
-
-
-
-
-
- cout << "ssh " << ip_addr_string << " " << command
- << " " << input_filename << " " << start << " " << length << endl;
- return 0;
-}
-
-
-
-void usage(const char* name) {
- cout << "usage: " << name << " inputfile map_task reduce_task" << endl;
- cout << "inputfile must be a valid path in the running Ceph filesystem." << endl;
- cout << "map_task should be given with an absolute path, and be present on ";
- cout << "the REGULAR filesystem every node." << endl;
- cout << "reduce_task need be present on this node only." << endl;
-}
-
-
-
-
+++ /dev/null
-/*
- * This is the master executable to start up
- * a compute task across several nodes.
- *
- *
- */
-
-
-//#include <sys/stat.h>
-#include "utility.h"
-
-int start_map_task(const char* command, const char* input_filename,
- long start, long length, tcpaddr_t ip_address);
-
-void usage(const char* name);
-
-//Client* startCephClient();
-//void kill_client(Client* client);
+++ /dev/null
-/*
- * This is a slave for receiving and executing commands for
- * compute tasks on an OSD. This supersedes the daemon
- * version in activetaskd.h/cc, because it's easier to debug
- * if it's not a daemon.
- *
- * Networking code is based off examples from Stevens' UNIX Network Programming.
- */
-
-#include "activeslave.h"
-
-int main(int argc, const char* argv[]) {
-
- /* Set up TCP server */
- int sockfd, newsockfd, childpid;
- socklen_t clilen;
- struct sockaddr_in cli_addr, serv_addr;
-
- // Open a TCP socket
- if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- cerr << "slave: can't open TCP socket. Exiting." << endl;
- exit(-1);
- }
- cerr << "slave: opened TCP socket." << endl;
-
- // set up the port
- bzero((char*) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
- serv_addr.sin_port = htons(SERV_TCP_PORT);
-
- if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
- cerr << "slave: can't bind local address. Exiting." << endl;
- exit(-1);
- }
-
- if(listen(sockfd, SOMAXCONN) < 0) {
- cerr << "slave: listening error. Exiting." << endl;
- exit(-1);
- }
-
-
- /* The Big Loop */
- while (1) {
-
- // wait for a message and fork off a child process to handle it
- clilen = sizeof(cli_addr);
- newsockfd = accept(sockfd,
- (struct sockaddr *) &cli_addr,
- &clilen);
-
- if (newsockfd < 0) {
- cerr << "slave: accept error. Exiting." << endl;
- exit(-1);
- }
-
- if ((childpid = fork()) < 0) {
- cerr << "slave: fork error. Exiting." << endl;
- exit(-1);
- }
-
- else if (childpid == 0) { // child process
- cerr << "Forked child process for incoming socket" << endl;
- close(sockfd);
- process_request(newsockfd);
- cerr << "Finished processing request. Exiting child." << endl;
- exit(0);
- }
-
- close (newsockfd); // parent
-
- //sleep(30); /* wait 30 seconds */
- }
- exit(EXIT_SUCCESS);
-}
-
-
-/* This will process requests from the master.
- * The protocol in a nutshell:
- * Master opens a socket to slave, and sends
- * one message.
- * Slave replies with one message.
- * Socket is closed.
- */
-
-void process_request(int newsockfd) {
-
- // first, read the message type.
- int msg_type = readmsgtype(newsockfd);
-
- // Second, call some function based on the message type to process
- // the rest of the message. The function is responsible for the rest
- // of the message; this includes checking the message footer.
-
- switch(msg_type) {
-
- case PING: // ping
- process_ping(newsockfd);
- break;
- case STARTTASK: // start_task
- process_start_task(newsockfd);
- break;
- case RETRIEVELOCALFILE: // get_local
- process_get_local(newsockfd);
- break;
- case SHIPCODE:
- assert(0); // obsolete
- //process_shipcode(newsockfd);
- break;
-
- case PINGREPLY:
- case FINISHEDTASK:
- case TASKFAILED:
- case SENDLOCALFILE:
- case LOCALFILENOTFOUND:
- case CODESAVED:
- case SHIPFAILED:
- cerr << "activeslave: BUG received message " << CMD_LIST[msg_type] <<
- " from master; master should never send this message." << endl;
- exit(-1);
- break;
-
-
- case -1:
- cerr << "activeslave: message had an unidentifiable type. " <<
- "Closing socket and discarding rest of message." << endl;
- default:
- cerr << "activeslave: BUG! received unexpected return value of" << msg_type <<
- "from readmsgtype(). Closing socket and discarding rest of message." << endl;
-
- exit(-1);
- }
-}
-
-
-// Just write a ping_reply to the socket.
-void process_ping(int fd) {
-
- // make sure the footer is valid
- if (!check_footer(fd)) {
- cerr << "process_ping warning: ping message has invalid or missing footer."
- << endl;
- }
- // Even if the footer's invalid, send the reply.
- cerr << "Replying to ping..." << endl;
- send_msg_header(fd, PINGREPLY);
- send_msg_footer(fd);
- cerr << "Ping processing completed." << endl;
-}
-
-
-// Process a start_task message. This reads the incoming message,
-// retrieves the necessary library, and starts the corresponding task.
-
-// Parameter format: taskID(int) library(string)
-// cephinputfile(string) offset(long) length(long) localoutputfile
-
-void process_start_task(int fd) {
-
- char libraryname[MAX_STRING_SIZE + 1];
- char cephinputfile[MAX_STRING_SIZE + 1];
- char localoutputfile[MAX_STRING_SIZE + 1];
- char options[MAX_STRING_SIZE + 1];
-
- cout << "in process_start_task: ";
- int taskID = read_positive_int(fd);
- cout << "read taskID " << taskID;
-
- // There may be multiple instances running on the same OSD. Cheap
- // and dirty hack: append the taskID to the filename to avoid contention.
-
- read_string(fd, libraryname);
- string libraryfilename("lib");
- libraryfilename += libraryname;
- libraryfilename += ".so";
- cout << ", library name " << libraryname << " -> " << libraryfilename;
-
- read_string(fd, cephinputfile);
- cout << ", cephinputfile " << cephinputfile;
- off_t offset = read_off_t(fd);
- cout << ", offset " << offset;
- off_t length = read_off_t(fd);
- cout << ", length " << length;
-
- read_string(fd, localoutputfile);
- cout << ", localoutputfile " << localoutputfile;
- read_string(fd, options);
- cout << ", options: " << options << endl;
-
-
- // make sure the footer is valid
- if (!check_footer(fd)) {
- cerr << "process_start_task warning: message has invalid or missing footer. "
- << "Discarding message." << endl;
- exit(-1);
- }
-
-
- // copy the library over from Ceph
-
- ostringstream locallibraryfilename;
- locallibraryfilename << "/tmp/lib" << libraryname << "_" << taskID << ".so";
-
- //string locallibraryfilename("lib");
- //locallibraryfilename += libraryname;
- //locallibraryfilename += "_";
- //locallibraryfilename += taskID;
- //locallibraryfilename += ".so";
- cout << "Naming local library copy " << locallibraryfilename.str() << endl;
-
- Client* client = startCephClient();
- copyCephFileToLocalFile(client, libraryfilename.c_str(), locallibraryfilename.str().c_str());
- kill_client(client);
- cout << "Local library copy acquired" << endl;
-
- // load the task from the shared library
- void (*task)(const char*, const char*, int,
- off_t, off_t, const char*) = 0;
- void* dl_h = dlopen(locallibraryfilename.str().c_str(), RTLD_LAZY);
- if (NULL == dl_h) {
- cerr << "Dynamic linking error: " << dlerror() << endl;
- exit(-1);
- }
- task = (void (*)(const char*, const char*, int,
- off_t, off_t, const char*)
- ) dlsym(dl_h, "run_task");
- if (NULL == dl_h) {
- cerr << "Symbol lookup error: " << dlerror() << endl;
- exit(-1);
- }
-
-
- // start a task; create an output filename that uses the task ID,
- // 'cause we might end up with multiple pieces of a file on each
- // OSD.
- cerr << "starting task: " << endl;
- task(cephinputfile, localoutputfile, taskID, offset, length, options);
- cerr << "returned from task! Sending reply:" << endl;
-
- // send the reply
- send_msg_header(fd, FINISHEDTASK);
- write_positive_int(fd, taskID);
- send_msg_footer(fd);
-
- // done
- cout << "Done sending reply for taskID " << taskID << endl;
- return;
-}
-
-
-// Starts a sloppy grep count of the hardwired search string over the
-// given Ceph file extent. It's sloppy because it copies the given
-// extent to a local file and runs "grep" on it, with no effort to take
-// care of boundary issues.
-void start_sloppy_grepcount (const char* ceph_filename, const char* local_filename,
- long offset, long size) {
-
- Client* client = startCephClient();
- char* search_string = "the";
- // copy the file to a local file.
-
- copyExtentToLocalFile (client, ceph_filename, offset, size, local_filename);
- // we want: grep -c search_string local_filename
- // to get the number of occurrences of the string.
- string command = "";
- command.append("grep -c ");
- command.append(search_string);
- command.append(local_filename);
-
- assert(0);
-}
-
-
-// SHIPCODE messages have been removed.
-
-void process_shipcode(int fd) { assert(0); }
-
-
-// Processes a get_local message. The message
-// specifies the filename of a local file to
-// return to the sender.
-
-// Parameter format: requestID(int) localfilename(string)
-
-// INCOMPLETE: currently just reads the message.
-
-
-void process_get_local(int fd) {
- cout << "in process_get_local: ";
- int taskID = read_positive_int(fd);
- cout << "read taskID " << taskID;
-
- char localfilename[MAX_STRING_SIZE+1];
- read_string(fd, localfilename);
- cout << ", localfilename " << localfilename << endl;
-
-
- // make sure the footer is valid
- if (!check_footer(fd)) {
- cerr << "process_get_local warning: message has invalid or missing footer."
- << endl;
- }
-
- // not implemented
- cerr << "Error: get_local command unimplemented." << endl;
- assert(0);
-}
-
-
-// Retrieves a formatted message from the socket.
-// At the moment, this just reads and prints a fixed-
-// length message type.
-// DEPRECATED.
-void str_getmsg(int sockfd) {
-
- int n;
-
- // read message types until the connection dies
- while(true) {
- n = readmsgtype(sockfd);
- if (n != 0) {
- cerr << "from getmsg: some sort of error" << endl;
- exit(-1);
- }
- }
-}
-
-// Echo a stream socket message back to the sender.
-// DEPRECATED.
-void str_echo(int sockfd) {
-
- int n;
- char line[MAXLINE];
-
- while(true) {
-
- // read from the stream
- cerr << "str_echo: waiting for a line" << endl;
- n = readline(sockfd, line, MAXLINE);
- cerr << "str_echo: read a line" << endl;
- if (0 == n) {
- cerr << "str_echo: connection terminated" << endl;
- return; // connection is terminated
- }
- else if (n < 0) {
- cerr << "str_echo: readline error" << endl;
- exit(-1);
- }
-
- // write back to the stream
- if (n != writen(sockfd, line, n)) {
- cerr << "str_echo: writen error" << endl;
- exit(-1);
- }
- else
- cerr << "Echoed line " << endl;
- }
-}
-
-
-void str_ack(int sockfd) {
-
- int n;
- char line[MAXLINE];
- //char *ack = "ack";
-
- while(true) {
-
- // read from the stream
- n = readline(sockfd, line, MAXLINE);
-
- if (0 == n)
- return; // connection is terminated
- else if (n < 0)
- //err_dump("str_echo: readline error");
- exit(-1);
-
- // write back to the stream
- if (4 != writen(sockfd, "ack\n", 4))
- //err_dump("str_echo: writen error");
- exit(-1);
- }
-}
-
-
-
-// Read command lines from the socket and execute them
-
-void str_run(int sockfd) {
-
- int n;
- char line[MAXLINE];
- char* error_msg = "str_run: No command interpreter found\n";
- char* ack_msg = "Running command... ";
- char* commit_msg = "Command executed!\n";
-
- while(true) {
-
- // read from the stream
- n = readline(sockfd, line, MAXLINE);
-
- if (0 == n)
- return; // connection is terminated
- else if (n < 0)
- //err_dump("str_echo: readline error");
- exit(-1);
-
- if (system(NULL)) {
- writen(sockfd, ack_msg, strlen(ack_msg));
- system(line);
- writen(sockfd, commit_msg, strlen(commit_msg));
- }
- else if ((int)strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg)))
- //err_dump("str_echo: writen error");
- exit(-1);
- }
-}
-
-
-// take a filename and copy it from Ceph to a local directory.
-// Not completed.
-
-void str_copytolocal(int sockfd) {
-
- int n;
- char line[MAXLINE];
- char* error_msg = "str_copy: No command interpreter found\n";
- char* ack_msg = "Running command... ";
- char* commit_msg = "Command executed!\n";
- //char* temp_dir = "/tmp";
-
-
- while(true) {
-
- // read from the stream
- n = readline(sockfd, line, MAXLINE);
-
- if (0 == n)
- return; // connection is terminated
- else if (n < 0)
- //err_dump("str_echo: readline error");
- exit(-1);
-
- if (system(NULL)) {
- writen(sockfd, ack_msg, strlen(ack_msg));
- system(line);
- writen(sockfd, commit_msg, strlen(commit_msg));
- }
- else if ((int)strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg)))
- //err_dump("str_echo: writen error");
- exit(-1);
- }
-}
-
-
-
+++ /dev/null
-#include <dlfcn.h>
-#include <string>
-#include <sstream>
-#include "inet.h"
-#include "common.h"
-#include "utility.h"
-#include "client/Client.h"
-
-// The port number is "osdd" on a telephone keypad.
-#define SERV_TCP_PORT 6733
-
-#define MAXLINE 512
-
-void str_echo(int sockfd);
-void str_ack(int sockfd);
-void str_run(int sockfd);
-void str_getmsg(int sockfd);
-void process_request(int newsockfd);
-void process_ping(int fd);
-void process_start_task(int fd);
-void process_get_local(int fd);
-void process_shipcode(int fd);
-
-void start_trivial_task(const char* ceph_filename, const char* local_filename,
- long offset, long length);
+++ /dev/null
-/*
- * This is a daemon for receiving and executing commands for compute tasks on an OSD.
- *
- * The daemon uses skeleton code from
- * http://www.linuxprofilm.com/articles/linux-daemon-howto.html. The
- * site is no longer up, but can be seen through the archive.org.
- * Networking code is based off examples from Stevens' UNIX Network Programming.
- */
-
-#include "activetaskd.h"
-
-
-#define SERVER
-
-#undef SERVER
-
-int main(int argc, const char* argv[]) {
-
- /* Our process ID and Session ID */
- pid_t pid, sid;
-
- /* Fork off the parent process */
- pid = fork();
- if (pid < 0) {
- exit(EXIT_FAILURE);
- }
- /* If we got a good PID, then
- we can exit the parent process. */
- if (pid > 0) {
- exit(EXIT_SUCCESS);
- }
-
- /* Change the file mode mask */
- umask(0);
-
- /* Open any logs here */
-
- /* Create a new SID for the child process */
- sid = setsid();
- if (sid < 0) {
- /* Log the failure */
- exit(EXIT_FAILURE);
- }
-
-
- /* Change the current working directory */
- if ((chdir("/")) < 0) {
- /* Log the failure */
- exit(EXIT_FAILURE);
- }
-
- /* Close out the standard file descriptors */
- close(STDIN_FILENO);
- close(STDOUT_FILENO);
- close(STDERR_FILENO);
-
- /* Daemon-specific initialization goes here */
-
-
-
- /* Set up TCP server */
- int sockfd, newsockfd, childpid;
- socklen_t clilen;
- struct sockaddr_in cli_addr, serv_addr;
-
- const char *pname = argv[0]; // process name
-
- // Open a TCP socket
- if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
- exit(-1);
- //err_dump("server: can't open stream socket");
-
- // set up the port
- bzero((char*) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
- serv_addr.sin_port = htons(SERV_TCP_PORT);
-
- if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
- exit(-1);
- //err_dump("server: can't bind local address");
-
- if(listen(sockfd, SOMAXCONN) < 0)
- exit(-1);
- //err_dump("server: listening error");
-
- /* The Big Loop */
- while (1) {
-
- // wait for a message and fork off a child process to handle it
- clilen = sizeof(cli_addr);
- newsockfd = accept(sockfd,
- (struct sockaddr *) &cli_addr,
- &clilen);
-
- if (newsockfd < 0)
- exit(-1);
- //err_dump("server: accept error");
-
- if ( (childpid = fork()) < 0)
- exit(-1);
- // err_dump("server: fork error");
-
- else if (childpid == 0) { // child process
- close(sockfd);
- //str_echo(newsockfd);
- str_run(newsockfd);
- // insert code to process the request
- exit(0);
- }
-
- close (newsockfd); // parent
-
- //sleep(30); /* wait 30 seconds */
- }
- exit(EXIT_SUCCESS);
-}
-
-
-// Echo a stream socket message back to the sender.
-
-void str_echo(int sockfd) {
-
- int n;
- char line[MAXLINE];
-
- while(true) {
-
- // read from the stream
- n = readline(sockfd, line, MAXLINE);
-
- if (0 == n)
- return; // connection is terminated
- else if (n < 0)
- //err_dump("str_echo: readline error");
- exit(-1);
-
- // write back to the stream
- if (n != writen(sockfd, line, n))
- //err_dump("str_echo: writen error");
- exit(-1);
- }
-}
-
-
-void str_ack(int sockfd) {
-
- int n;
- char line[MAXLINE];
- char *ack = "ack";
-
- while(true) {
-
- // read from the stream
- n = readline(sockfd, line, MAXLINE);
-
- if (0 == n)
- return; // connection is terminated
- else if (n < 0)
- //err_dump("str_echo: readline error");
- exit(-1);
-
- // write back to the stream
- if (4 != writen(sockfd, "ack\n", 4))
- //err_dump("str_echo: writen error");
- exit(-1);
- }
-}
-
-
-
-
-// Read command lines from the socket and execute them
-
-void str_run(int sockfd) {
-
- int n;
- char line[MAXLINE];
- char* error_msg = "str_run: No command interpreter found\n";
- char* ack_msg = "Running command... ";
- char* commit_msg = "Command executed!\n";
-
- while(true) {
-
- // read from the stream
- n = readline(sockfd, line, MAXLINE);
-
- if (0 == n)
- return; // connection is terminated
- else if (n < 0)
- //err_dump("str_echo: readline error");
- exit(-1);
-
- if (system(NULL)) {
- writen(sockfd, ack_msg, strlen(ack_msg));
- system(line);
- writen(sockfd, commit_msg, strlen(commit_msg));
- }
- else if (strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg)))
- //err_dump("str_echo: writen error");
- exit(-1);
- }
-}
-
-
-// take a filename and copy it from Ceph to a local directory
-
-void str_copytolocal(int sockfd) {
-
- int n;
- char line[MAXLINE];
- char* error_msg = "str_copy: No command interpreter found\n";
- char* ack_msg = "Running command... ";
- char* commit_msg = "Command executed!\n";
- char* temp_dir = "/tmp";
-
-
- while(true) {
-
- // read from the stream
- n = readline(sockfd, line, MAXLINE);
-
- if (0 == n)
- return; // connection is terminated
- else if (n < 0)
- //err_dump("str_echo: readline error");
- exit(-1);
-
- if (system(NULL)) {
- writen(sockfd, ack_msg, strlen(ack_msg));
- system(line);
- writen(sockfd, commit_msg, strlen(commit_msg));
- }
- else if (strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg)))
- //err_dump("str_echo: writen error");
- exit(-1);
- }
-}
-
-
-
+++ /dev/null
-#include "inet.h"
-#include "common.h"
-#include "utility.h"
-#include "client/Client.h"
-
-
-// The port number is "osdd" on a telephone keypad.
-#define SERV_TCP_PORT 6733
-
-#define MAXLINE 512
-
-void str_echo(int sockfd);
-void str_ack(int sockfd);
-void str_run(int sockfd);
+++ /dev/null
-#ifndef CEPH_COMMON_H
-#define CEPH_COMMON_H
-
-
-#include <sys/stat.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <unistd.h>
-#include <syslog.h>
-#include <string.h>
-#include <iostream>
-#include <string>
-
-
-
-
-#define CMDLENGTH 10
-#define CMDCOUNT 11
-
-#define MAX_STRING_SIZE 255
-
-/*
- * These are the various messages that can be sent between the master
- * and slave. The slave sends one reply to each message from the master.
-
- * PING/PINGREPLY: just what it sounds like.
-
- * STARTTASK: starts a task from the given library. The slave attempts
- * to perform the task, and replies with FINISHEDTASK or TASKFAILED.
- *
- * RETRIEVELOCALFILE: requests a file that the slave has stored
- * locally. Slave replies with SENDLOCALFILE and the file, or with
- * LOCALFILENOTFOUND.
- *
- * (deprecated) SHIPCODE: sends a shared library to the slave, containing a
- * function that is to be executed later by the STARTTASK
- * command. Slave replies with CODESAVED or SHIPFAILED.
- *
- * SHIPCODE is replaced by transport using Ceph.
- */
-
-
-const off_t CHUNK = 1024 * 1024 * 4;
-
-// a bunch of string constants for commands
-
-#define PING 0
-#define STARTTASK 1
-#define RETRIEVELOCALFILE 2
-#define PINGREPLY 3
-#define FINISHEDTASK 4
-#define TASKFAILED 5
-#define SENDLOCALFILE 6
-#define LOCALFILENOTFOUND 7
-#define SHIPCODE 8
-#define CODESAVED 9
-#define SHIPFAILED 10
-
-#define FOOTER_LENGTH 7
-
-const char* CMD_LIST[CMDCOUNT] = {"______PING",
- "START_TASK",
- "_GET_LOCAL",
- "PING_REPLY",
- "_TASK_DONE",
- "TASKFAILED",
- "SEND_LOCAL",
- "LOCAL_GONE",
- "_SHIP_CODE",
- "CODE_SAVED",
- "SHIPFAILED"};
-
-const char FOOTER[FOOTER_LENGTH + 1] = "MSG_END";
-
-// const char* strArray[] = {"string1", "string2", "string3"};
-//const char commands[2][4] = {"foo", "bar"};
-
-
-// error codes
-#define ARGUMENTSINVALID 1001
-#define CEPHCLIENTSTARTUPFAILED 1002
-#define INPUTFILEREADFAILED 1003
-
-#endif //COMMON_H
+++ /dev/null
-/*
- * This is merely a test of an echo server; it's an early step in
- * building up the Ceph distributed compute service. This is
- * discardable once the next stage is up and running.
- *
- * Code is based off examples in Stevens' "Unix Network Programming".
- */
-
-#include "echotestclient.h"
-
-int main(int argc, char* argv[]) {
-
- int sockfd;
- struct sockaddr_in serv_addr;
-
- char* pname = argv[0];
-
- bzero((char *) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_addr.s_addr = inet_addr(SERV_HOST_ADDR);
- serv_addr.sin_port = htons(SERV_TCP_PORT);
-
-
- // open a TCP socket
- if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- printf("client: can't open stream socket");
- exit (-1);
- }
-
- // connect to the server.
- if (connect(sockfd, (struct sockaddr *) &serv_addr,
- sizeof(serv_addr)) < 0) {
- printf("client: can't connect to server");
- exit (-1);
- }
-
- // start the test echoer
- str_cli(stdin, sockfd);
-
-
- close (sockfd);
- exit(0);
-}
-
-
-void str_cli(FILE *fp, int sockfd) {
-
- int n;
- char sendline[MAXLINE], recvline[MAXLINE + 1];
-
- // read from the fp and write to the socket;
- // then read from the socket and write to stdout
- while (fgets(sendline, MAXLINE, fp) != NULL) {
-
- n = strlen(sendline);
- if (writen(sockfd, sendline, n) != n) {
- printf("str_cli: writen error on socket");
- exit(-1);
- }
- n = readline(sockfd, recvline, MAXLINE);
- if (n < 0) {
- printf("str_cli: readline error");
- exit(-1);
- }
- recvline[n] = 0;
- fputs(recvline, stdout);
- }
-
- if (ferror(fp)) {
- printf("str_cli: error reading file");
- exit(-1);
- }
-
-}
+++ /dev/null
-#include "inet.h"
-#include "common.h"
-#include "socket_utility.h"
-
-#define SERV_HOST_ADDR "128.114.57.143" //issdm-8
-#define SERV_TCP_PORT 6733
-#define MAXLINE 512
-
-void str_cli(FILE *fp, int sockfd);
-
+++ /dev/null
-/*
- * Generic TCP/IP definitions
- */
-
-#include <stdio.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
+++ /dev/null
-/*
- * This client invokes a distributed task across OSDs.
- *
- * Networking code is based off examples in Stevens' "Unix Network Programming".
- */
-#include "msgtestclient.h"
-#define REQUIRED_ARGS 5
-
-int main(int argc, char* argv[]) {
-
- // make sure we have all the arguments we need
- if (argc < REQUIRED_ARGS) { usage(argv[0]); exit(-1); }
-
- // This file is rewired for running tests from a
- // shell script. The first parameter specifies the
- // name of the Ceph file that the test will be
- // run on; the second parameter specifies which of
- // four different tests will be run.
- const char* library_name = argv[1];
- const char* input_filename = argv[2];
- const char* output_filename = argv[3];
- int test_number = atoi(argv[4]);
- assert (test_number > 0);
- assert (test_number < 4);
- const char* job_options = argv[5];
-
- string library_filename("lib");
- library_filename += library_name;
- library_filename += ".so";
-
- // start up a Ceph client
- Client* client = startCephClient();
- cerr << "loaded test client" << endl;
-
- // generate the file extent tuples for each OSD
- vector<request_split> original_splits;
- off_t filesize = generate_splits(client, input_filename, original_splits);
-
- // copy the library to Ceph
- copyLocalFileToCeph(client, library_filename.c_str(), library_filename.c_str());
-
- // close the client - we're done with it
- kill_client(client);
- cerr << "Closed Ceph client instance" << endl;
-
- // sanity check: display the splits
- cerr << "Listing original splits:" << endl;
- for (vector<request_split>::iterator i = original_splits.begin();
- i != original_splits.end(); i++) {
- cerr << "Split: IP " << i->ip_address << ", start "
- << i->start << ", length " << i->length << endl;
- }
-
- vector<request_split> test_splits;
- // Now, modify the splits as needed for the test type.
- // There are three types of tests.
- // Test 1: regular test.
- // Test 2: put all the tasks on the "wrong" OSD.
- // Test 3: do the entire job off one node.
-
- if (1 == test_number) {
- cerr << "Test type 1: using original splits." << endl;
- test_splits = original_splits;
- }
- else if (2 == test_number) {
- cerr << "Test type 2: rotating split IP addresses. " << endl;
- int split_count = original_splits.size();
- for (int i = 0; i < split_count; ++i) {
- request_split s;
- s.start = original_splits.at(i).start;
- s.length = original_splits.at(i).length;
- s.ip_address = original_splits.at((i+1)%split_count).ip_address;
- test_splits.push_back(s);
- }
- }
- else if (3 == test_number) {
- cerr << "Test type 3: one giant split." << endl;
- request_split s;
- s.start = 0;
- s.length = filesize;
- s.ip_address = original_splits.at(0).ip_address;
- test_splits.push_back(s);
- }
- else {
- cerr << "Error: received invalid test type " << test_number << endl;
- exit(-1);
- }
-
- cerr << "Listing test splits:" << endl;
- for (vector<request_split>::iterator i = test_splits.begin();
- i != test_splits.end(); i++) {
- cerr << "Split: IP " << i->ip_address << ", start "
- << i->start << ", length " << i->length << endl;
- }
-
- // start the timer
- utime_t start_time = g_clock.now();
- int pending_tasks = 0;
-
- int taskID = 0;
- // start up the tasks
- for (vector<request_split>::iterator i = test_splits.begin();
- i != test_splits.end(); i++) {
- start_map_task(i->ip_address, ++taskID, library_name, input_filename,
- i->start, i->length, output_filename, job_options);
- ++pending_tasks;
- }
-
- cerr << "Waiting for " << pending_tasks << " tasks to return..." << endl;
-
- // wait for all the tasks to finish
- while (pending_tasks > 0) {
- int exit_status;
- cerr << "Waiting for " << pending_tasks << " tasks to return..." << endl;
- pid_t pid = wait(&exit_status);
- if (pid < 0) {
- cerr << "ERROR on wait(): result was " << pid << endl;
- exit(-1);
- }
- --pending_tasks;
- if (WIFEXITED(exit_status)) {
- cerr << "Task with pid " << pid << " returned with exit status " <<
- WEXITSTATUS(exit_status) << endl;
- }
- else { cerr << "WARNING: Task with pid " << pid << " exited abnormally" << endl; }
- }
-
- cerr << "All tasks have returned." << endl;
- // report the time
- double elapsed_time;
- elapsed_time = (g_clock.now() - start_time);
- cerr << "Elapsed time: " << elapsed_time << endl;
- cerr << elapsed_time << " " << endl;
- // send the time to stdout for the shell script
- cout << elapsed_time << " ";
- exit(0);
-}
-
-
-// sends a complete ping message
-// through the file descriptor
-// and waits for a reply. This
-// will hang if there's no reply.
-
-void ping_test(int fd) {
-
- // send the message header and footer.
- // A ping message has no body.
- send_msg_header(fd, PING);
- send_msg_footer(fd);
-
- // receive the reply.
- int msg_type = readmsgtype(fd);
- if (msg_type < 0) {
- cerr << "ping_test: Failed reading the ping reply. Exiting." << endl;
- exit(-1);
- }
- if (PINGREPLY != msg_type) {
- assert((msg_type <= 0) && (msg_type < CMDCOUNT) &&
- "readmsgtype return value out of range");
- cerr << "ping_test: slave sent invalid reply: replied to ping with message type" <<
- msg_type << ": " << CMD_LIST[msg_type] << ". Exiting. " << endl;
- exit(-1);
- }
- else {
- cerr << "Received valid ping reply!" << endl;
- }
-
- if(!check_footer(fd)) {
- cerr << "ping_test: message footer not found. Exiting." << endl;
- exit(-1);
- }
-}
-
-// sends a message to the fd telling it to start a task.
-// Remember: the message format requires any string to be
-// prefixed by its (unterminated) length.
-void send_start_task_msg(int fd,
- int taskID,
- int library_name_size, const char* library_name,
- int inputfilenamesize, const char* inputfilename,
- off_t offset,
- off_t length,
- int outputfilenamesize, const char* outputfilename,
- int options_size, const char* options) {
-
- // write the header and the message to the file descriptor.
-
- send_msg_header(fd, STARTTASK);
-
- write_positive_int(fd, taskID);
- write_positive_int(fd, library_name_size);
- write_string(fd, library_name);
- write_positive_int(fd, inputfilenamesize);
- write_string(fd, inputfilename);
- //write_long(fd, offset);
- write_off_t (fd, offset);
- write_off_t (fd, length);
- write_positive_int(fd, outputfilenamesize);
- write_string(fd, outputfilename);
- write_positive_int(fd, options_size);
- write_string(fd, options);
-
- // terminate the message
- send_msg_footer(fd);
-}
-
-
-
-
-// creates a new connection to the slave
-// at the given IP address and port.
-// Overloaded to take an IP address as a
-// string or as an in_addr_t.
-
-int create_new_connection(const char* ip_address, uint16_t port)
-{
- in_addr_t ip = inet_addr(ip_address);
- if ((in_addr_t)-1 == ip) {
- cerr << "Error creating new connection: \"" << ip_address <<
- "\" is not a valid IP address." << endl;
- return -1;
- }
- else
- //cerr << "Opening connection to " << ip_address << ":" << endl;
- return create_new_connection(ip, port);
-}
-
-
-int create_new_connection(in_addr_t ip_address, uint16_t port) {
-
- struct sockaddr_in serv_addr;
- int sockfd;
-
- bzero((char *) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- //serv_addr.sin_addr.s_addr = inet_addr(SERV_HOST_ADDR);
- serv_addr.sin_addr.s_addr = ip_address;
- serv_addr.sin_port = htons(SERV_TCP_PORT);
-
- // open a TCP socket
- if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- cerr << "msgtestclient: can't open stream socket. Exiting." << endl;
- exit (-1);
- }
-
- // connect to the server.
- if (connect(sockfd, (struct sockaddr *) &serv_addr,
- sizeof(serv_addr)) < 0) {
- cerr << "msgtestclient: can't connect to server." << endl;
- exit (-1);
- }
- //cerr << "opened connection!" << endl;
- return sockfd;
-}
-
-void msg_type_sender(int sockfd) {
-
- for (int i = 0; i < CMDCOUNT; ++i) {
- send_msg_header(sockfd, i);
- }
-
-}
-
-// Fires up a task on a single OSD.
-int start_map_task(sockaddr_in ip_address, int taskID,
- const char* library_name, const char* input_filename,
- off_t start, off_t length,
- const char* output_filename,
- const char* job_options)
-{
- int childpid;
- // fork off a child process to do the work, and return
- if ((childpid = fork()) < 0) {
- cerr << "start_map_task: fork error. Exiting." << endl;
- exit(-1);
- }
-
- else if (childpid != 0) { // parent
- cerr << "start_map_task: forked child process "
- << childpid << " to start task. " << endl;
- return 0;
- }
-
- string ip_addr_string(inet_ntoa(ip_address.sin_addr));
- // cerr << "command: " << ip_addr_string << " taskID "
- // << taskID << ": " << command
- // << " " << input_filename << " " << start << " " << length
- // << " " << output_filename << endl;
-
- // open a socket to the slave, and send the message
- //cerr << "Sending message: " << endl;
- int sockfd = create_new_connection(ip_addr_string.c_str(), SERV_TCP_PORT);
- send_start_task_msg(sockfd, taskID, strlen(library_name), library_name,
- strlen(input_filename), input_filename,
- start, length,
- strlen(output_filename), output_filename,
- strlen(job_options), job_options);
-
- // wait for a reply
- cerr << "Sent message for taskID " << taskID << ". Waiting for reply..." << endl;
-
- // receive the reply.
- int msg_type = readmsgtype(sockfd);
- if (msg_type < 0) {
- cerr << "start_map_task: Failed reading the reply. Exiting." << endl;
- exit(-1);
- }
- if (FINISHEDTASK != msg_type) {
- assert((msg_type <= 0) && (msg_type < CMDCOUNT));
- cerr << "start_map_task: slave sent invalid reply: replied with message type" <<
- msg_type << ": " << CMD_LIST[msg_type] << ". Exiting. " << endl;
- exit(-1);
- }
- // read the taskID of the reply
-
- int reply_taskID = read_positive_int(sockfd);
-
- if(!check_footer(sockfd)) {
- cerr << "ping_test: message footer not found. Exiting." << endl;
- exit(-1);
- }
-
- // done!
- close(sockfd);
- cerr << "Task " << taskID << "/" << reply_taskID <<
- " complete! Ending child process." << endl;
- //exit(0);
- _exit(0);
- cerr << "exit(0) returned. Strange things are afoot." << endl;
-}
-
-
-// Creates a set of (ip address, start position, length) tuples giving
-// the location of each piece of the given Ceph file. Returns the size
-// of the file.
-
-
-off_t generate_splits(Client* client, const char* input_filename,
- vector<request_split>& original_splits) {
-
- // Open the file and get its size
- int fh = client->open(input_filename, O_RDONLY);
- if (fh < 0) {
- cerr << "The input file " << input_filename << " could not be opened." << endl;
- exit(-1);
- }
- cerr << "Opened file " << input_filename << endl;
- off_t filesize;
- struct stat stbuf;
- if (0 > client->lstat(input_filename, &stbuf)) {
- cerr << "Error: could not retrieve size of input file " << input_filename << endl;
- exit(-1);
- }
- filesize = stbuf.st_size;
- if (filesize < 1) {
- cerr << "Error: input file size is " << filesize << endl;
- exit(-1);
- }
-
- // grab all the object extents
- list<ObjectExtent> extents;
- off_t offset = 0;
- client->enumerate_layout(fh, extents, filesize, offset);
- client->close(fh);
- cerr << "Retrieved all object extents" << endl;
-
-
- // generate the tuples
- list<ObjectExtent>::iterator i;
- map<size_t, size_t>::iterator j;
- int osd;
-
- for (i = extents.begin(); i != extents.end(); i++) {
-
- request_split split;
- // find the primary and get its IP address
- osd = client->osdmap->get_pg_primary(i->layout.pgid);
- entity_inst_t inst = client->osdmap->get_inst(osd);
- entity_addr_t entity_addr = inst.addr;
- entity_addr.make_addr(split.ip_address);
-
- // iterate through each buffer_extent in the ObjectExtent
- for (j = i->buffer_extents.begin();
- j != i->buffer_extents.end(); j++) {
-
- // get the range of the buffer_extent
- split.start = (*j).first;
- split.length = (*j).second;
- // throw the split onto the vector
- original_splits.push_back(split);
- }
- }
- return filesize;
-}
-
-
-void usage(const char* name) {
-
- cout << "usage: " << name << " libraryname inputfile outputfile test_number" << endl;
- cout << "libraryname is the name of a proper shared task library in the _local_ filesystem. "
- << "e.g. entering \"foo\" requires the presence of libfoo.so in the "
- << "working directory." << endl;
- cout << "inputfile must be a valid path in the running Ceph filesystem." << endl;
- cout << "outputfile is a Ceph filename prefix for writing results. e.g. \"bar\" will give "
- << "output files bar.1, bar.2, &c." << endl;
- cout << "test_number must be 1, 2, or 3." << endl;
- cout << " 1: run the test task normally (one slave per OSD file extent)" << endl;
- cout << " 2: run the test task on the \"wrong\" OSDs" << endl;
- cout << " 3: run the entire task in a single process" << endl;
-}
-
-
-
+++ /dev/null
-#include "inet.h"
-#include "common.h"
-#include "utility.h"
-#include "client/Client.h"
-
-// wait.h MUST NOT be #included before client/Client.h
-#include <sys/wait.h>
-#include <vector>
-#include <string>
-
-struct request_split {
- tcpaddr_t ip_address;
- off_t start;
- off_t length;
-};
-
-
-//#define SERV_HOST_ADDR "128.114.57.143" //issdm-8
-#define SERV_HOST_ADDR "128.114.57.166" //issdm-31
-
-#define SERV_TCP_PORT 6733
-#define MAXLINE 512
-
-void msg_type_sender(int sockfd);
-
-
-
-int create_new_connection(const char* ip_address, uint16_t port);
-int create_new_connection(in_addr_t ip_address, uint16_t port);
-void usage(const char* name);
-void ping_test(int fd);
-void start_task_test(int fd);
-
-
-off_t generate_splits(Client* client, const char* input_filename,
- vector<request_split>& original_splits);
-
-
-int start_map_task(sockaddr_in ip_address, int taskID,
- const char* map_command,
- const char* input_filename,
- off_t start, off_t length,
- const char* output_filename,
- const char* job_options);
-
-void send_start_task_msg(int fd,
- int taskID,
- int command_size, const char* command,
- int inputfilenamesize, const char* inputfilename,
- off_t offset,
- off_t length,
- int outputfilenamesize, const char* outputfilename,
- int options_size, const char* options);
+++ /dev/null
-#include "trivial_task.h"
-
-void start_trivial_task (const char* ceph_filename, const char* local_filename,
- off_t offset, off_t length) {
- // Don't bother to copy the file to disk. Read the file directly from Ceph,
- // and add up all the bytes.
- // Write the total to the local file as a string.
- Client * client = startCephClient();
-
- bufferptr bp(CHUNK);
-
- // get the source file's size. Sanity-check the request range.
- struct stat st;
- int r = client->lstat(ceph_filename, &st);
- assert (r == 0);
-
- off_t src_total_size = st.st_size;
- if (src_total_size < offset + length) {
- cerr << "Error in copy ExtentToLocalFile: offset + length = " << offset << " + " << length
- << " = " + (offset + length) << ", source file size is only " << src_total_size << endl;
- exit(-1);
- }
- off_t remaining = length;
-
- // open the file and seek to the start position
- cerr << "start_trivial_task: opening the source file and seeking " << endl;
-
- int fh_ceph = client->open(ceph_filename, O_RDONLY);
- assert (fh_ceph > -1);
- r = client->lseek(fh_ceph, offset, SEEK_SET);
- assert (r == offset);
-
- int counter = 0;
- // read through the extent and add up the bytes
- cerr << "start_trivial_task: counting up bytes" << endl;
- char* bp_c = bp.c_str();
- while (remaining > 0) {
- off_t got = client->read(fh_ceph, bp_c, MIN(remaining,CHUNK), -1);
- assert(got > 0);
- remaining -= got;
- for (off_t i = 0; i < got; ++i) {
- counter += (unsigned int)(bp_c[i]);
- }
- }
- cerr << "start_trivial_task: Done! Answer is " << counter << endl;
- client->close(fh_ceph);
-
- //assert(0);
-}
-
+++ /dev/null
-// Shared library for the trivial task of adding up all the bytes in a file
-
-//#include "inet.h"
-#include "common.h"
-#include "utility.h"
-#include "client/Client.h"
-
-
-extern "C" void start_trivial_task (const char* ceph_filename,
- const char* local_filename,
- off_t offset, off_t length);
-
+++ /dev/null
-/*
- * Miscellaneous Active OSD helper functions.
- *
- */
-
-//#include <sys/stat.h>
-#include "client/Client.h"
-#include "common.h"
-#include "config.h"
-#include "common/Timer.h"
-#include "msg/SimpleMessenger.h"
-#include "socket_utility.h"
-
-Client* startCephClient();
-void kill_client(Client* client);
-
-int send_msg_header(int fd, int header_ID);
-int readmsgtype(int fd);
-bool check_footer(int fd);
-int send_msg_header(int fd, int header_ID);
-int send_msg_footer(int fd);
-
-/*
- * Fires up a Ceph client and returns a pointer to it.
- */
-
-Client* startCephClient()
-{
- cout << "ActiveMaster: Initializing Ceph client:" << endl;
-
- // parse args from CEPH_ARGS, not command line
- vector<char*> args;
- env_to_vec(args);
- parse_config_options(args);
-
- if (g_conf.clock_tare) g_clock.tare();
-
- // be safe
- g_conf.use_abspaths = true;
-
- // load monmap
- MonMap* monmap = new MonMap();
- int r = monmap->read(".ceph_monmap");
- if (r < 0) {
- cout << "ActiveMaster: could not find .ceph_monmap" << endl;
- return 0;
- }
- assert(r >= 0);
-
- // start up network
- rank.start_rank();
-
- // start client
- Client *client;
- client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), monmap);
- client->init();
-
- // mount
- client->mount();
-
- return client;
-}
-
-void kill_client (Client * client)
-{
- client->unmount();
- client->shutdown();
- delete client;
-
- // wait for messenger to finish
- rank.wait();
-}
-
-
-
-// reads a message type from the socket, and prints it.
-
-int readmsgtype(int fd) {
- int rc;
- char typebuf[CMDLENGTH + 1];
-
- rc = read(fd, &typebuf, CMDLENGTH);
-
- // read a fixed-length text command
- if (rc != CMDLENGTH) {
- cerr << "in readmsgtype: read error: result is " << rc << endl;
- return -1;
- }
-
- // null-terminate the string
- typebuf[CMDLENGTH] = 0;
-
- // print the command
- //cerr << "readmsgtype: text type is " << typebuf << ", " ;
-
- // figure out which one it is, by number
- for (int i = 0; i < CMDCOUNT; ++i) {
- if (!strcmp(typebuf, CMD_LIST[i])) {
- //cerr << "which is identified as type " << i << endl;
- return i;
- }
- }
-
- // if we get here the type was invalid
- cerr << "readmsgtype: unrecognized message type " << typebuf << endl;
- return -1;
-}
-
-// Attempt to read the message footer off
-// the given stream.
-bool check_footer(int fd) {
-
- // leave space for null termination
- char footer_buf[FOOTER_LENGTH+1];
-
- // read the footer
- int rc = read(fd, &footer_buf, FOOTER_LENGTH);
- if (rc != FOOTER_LENGTH) {
- cerr << "in check_footer: read error: result is " << rc << endl;
- return false;
- }
-
- // null-terminate the string
- footer_buf[FOOTER_LENGTH] = 0;
-
- // Is the footer correct?
- if (0 == strcmp(footer_buf, FOOTER))
- return true;
- else
- return false;
-}
-
-
-// send a fixed-length message header
-// given the header's ID.
-int send_msg_header(int fd, int header_ID) {
- if ((header_ID < 0) || (header_ID >= CMDCOUNT)) {
- cerr << "In send_msg_header: received out-of-range header ID " << header_ID <<
- ". Exiting process." << endl;
- exit(-1);
- }
-
- //cerr << "attempting to send message " << CMD_LIST[header_ID] <<
- // " with ID " << header_ID << endl;
-
- if (CMDLENGTH != writen(fd, CMD_LIST[header_ID], CMDLENGTH)) {
- cerr << "In send_msg_header: error writing header ID " << header_ID <<
- "to file descriptor " << fd << ". Exiting process." << endl;
- exit(-1);
- }
-
- return 0;
-}
-
-// send the fixed-length message footer.
-int send_msg_footer(int fd) {
- //cerr << "attempting to send message footer: " << endl;
- if (FOOTER_LENGTH != writen(fd, FOOTER, FOOTER_LENGTH)) {
- cerr << "in send_msg_footer: error writing footer to file descriptor " <<
- fd << ". Exiting process." << endl;
- exit(-1);
- } else {
- //cerr << "Sent message footer!" << endl;
- }
- return 0;
-}
-
-
-
-// Copy a given extent of a Ceph file to the local disk.
-// Requires a running Ceph client.
-void copyExtentToLocalFile (Client* client, const char* ceph_source,
- long offset, long length,
- const char* local_destination) {
-
- // get the source file's size. Sanity-check the request range.
- struct stat st;
- int r = client->lstat(ceph_source, &st);
- assert (r == 0);
-
- off_t src_total_size = st.st_size;
- if (src_total_size < offset + length) {
- cerr << "Error in copy ExtentToLocalFile: offset + size = " << offset << " + " << length
- << " = " + (offset + length) << ", source file size is only " << src_total_size << endl;
- exit(-1);
- }
- off_t remaining = length;
-
- // open the source and destination files. Advance the source
- // file to the desired offset.
- int fh_ceph = client->open(ceph_source, O_RDONLY);
- assert (fh_ceph > -1);
- r = client->lseek(fh_ceph, offset, SEEK_SET);
- assert (r == offset);
-
- int fh_local = ::open(local_destination, O_WRONLY|O_CREAT|O_TRUNC, 0644);
- assert (fh_local > -1);
-
- // copy the file 4 MB at a time
- const int chunk = 4*1024*1024;
- bufferptr bp(chunk);
-
- while (remaining > 0) {
- off_t got = client->read(fh_ceph, bp.c_str(), MIN(remaining,chunk), -1);
- assert(got > 0);
- remaining -= got;
- off_t wrote = ::write(fh_local, bp.c_str(), got);
- assert (got == wrote);
- }
- // close the files
- client->close(fh_ceph);
- ::close(fh_local);
-}
-
-
-// Copy a Ceph file to the local disk. Requires a running Ceph client.
-// Overwrites the destination if it exists.
-void copyCephFileToLocalFile (Client* client, const char* ceph_source,
- const char* local_destination) {
-
- // get the source file's size.
- struct stat st;
- int r = client->lstat(ceph_source, &st);
- assert (r == 0);
-
- copyExtentToLocalFile(client, ceph_source, 0, st.st_size,
- local_destination);
-
-
-}
-// Copies a file from the local disk to Ceph. Annihilates the
-// destination if it exists.
-
-void copyLocalFileToCeph(Client* client, const char* local_source,
- const char* ceph_destination) {
-
- // Get the source file's size.
- struct stat st;
- int r = ::lstat(local_source, &st);
- if (0 != r) {
- cerr << "in copyLocalFileToCeph: error retrieving size for file " << local_source
- << ": is the file missing?" << endl;
- assert(0);
- }
-
- off_t remaining = st.st_size;
-
- // Open the source and destination files.
- int fh_source = ::open(local_source, O_RDONLY);
- assert (fh_source > -1);
- int fh_dest = client->open(ceph_destination, O_WRONLY|O_CREAT|O_TRUNC, 0644);
- assert (fh_dest > -1);
-
- // Copy the file.
- const int chunk = 4 * 1024* 1024; // 4 MB
- bufferptr bp(chunk);
-
- while(remaining > 0) {
- off_t got = ::read(fh_source, bp.c_str(), MIN(remaining, chunk));
- assert(got > 0);
- remaining -= got;
- off_t wrote = client->write(fh_dest, bp.c_str(), got);
- assert (got == wrote);
- }
-
- // close the files
- ::close(fh_source);
- client->close(fh_dest);
-
-}