${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
+# code shipping experiments
+activemaster: active/activemaster.cc client.o osdc.o msg/SimpleMessenger.o common.o
+ ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
+activeslave: active/activeslave.cc client.o osdc.o msg/SimpleMessenger.o common.o
+ ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
+echotestclient: active/echotestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
+ ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
+msgtestclient: active/msgtestclient.cc client.o osdc.o msg/SimpleMessenger.o common.o
+ ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
+libtrivialtask.so: active/trivial_task.cc client.o osdc.o msg/SimpleMessenger.o common.o
+ ${CC} -fPIC -shared -Wl,-soname,$@.1 ${CFLAGS} ${LIBS} $^ -o $@
+
+#libhadoopcephfs.so: client/hadoop/CephFSInterface.cc client.o osdc.o msg/SimpleMessenger.o common.o
+# ${CC} -fPIC -shared -Wl,-soname,$@.1 ${CFLAGS} ${LIBS} $^ -o $@
+
+
# fake*
fakefuse: fakefuse.cc mon.o mds.o client.o osd.o osdc.o ebofs.o client/fuse.o client/fuse_ll.o msg/FakeMessenger.o common.o
${CC} -shared -fPIC ${CFLAGS} $< -o $@
%.o: %.cc
- ${CC} ${CFLAGS} -c $< -o $@
+ ${CC} -fPIC ${CFLAGS} -c $< -o $@
%.po: %.cc
${CC} -fPIC ${CFLAGS} -c $< -o $@
--- /dev/null
+/*
+ * Startup executable for
+ * Ceph Active Storage. See README for details.
+ *
+ */
+#include "activemaster.h"
+
+
+/*
+ * What main() must do:
+ *
+ * - start up a Ceph client
+ * - find the set of OSDs that the file is striped across
+ * - start up the Map task on each OSD, using ssh
+ * - eat lunch?
+ * - start up the Reduce task locally
+ */
+
+int main(int argc, const char* argv[]) {
+
+ if (argc < 4) {
+ usage(argv[0]);
+ exit(-1);
+ }
+
+ const char* input_filename = argv[1];
+ const char* map_command = argv[2];
+ //const char* reduce_command = argv[3];
+
+ // fire up the client
+ Client* client = startCephClient();
+
+ // open the file as read_only
+ int fh = client->open(input_filename, O_RDONLY);
+ if (fh < 0) {
+ cout << "The input file " << input_filename << " could not be opened." << endl;
+ exit(-1);
+ }
+
+ // How big is the file?
+ int filesize;
+ struct stat stbuf;
+ if (0 > client->lstat(input_filename, &stbuf)) {
+ cout << "Error: could not retrieve size of input file " << input_filename << endl;
+ exit(-1);
+ }
+ filesize = stbuf.st_size;
+ if (filesize < 1) {
+ cout << "Error: input file size is " << filesize << endl;
+ exit(-1);
+ }
+
+ // retrieve all the object extents
+ list<ObjectExtent> extents;
+ int offset = 0;
+ client->enumerate_layout(fh, extents, filesize, offset);
+
+ // for each object extent, retrieve the OSD IP address and start up a Map task
+ list<ObjectExtent>::iterator i;
+ map<size_t, size_t>::iterator j;
+ int osd;
+ int start, length;
+ tcpaddr_t tcpaddr;
+
+ for (i = extents.begin(); i != extents.end(); i++)
+ {
+ // find the primary and get its IP address
+ osd = client->osdmap->get_pg_primary(i->pgid);
+ entity_inst_t inst = client->osdmap->get_inst(osd);
+ entity_addr_t entity_addr = inst.addr;
+ entity_addr.make_addr(tcpaddr);
+
+ // iterate through each buffer_extent in the ObjectExtent
+ for (j = i->buffer_extents.begin();
+ j != i->buffer_extents.end(); j++)
+ {
+ // get the range of the buffer_extent
+ start = (*j).first;
+ length = (*j).second;
+ // fire up the Map task
+ start_map_task(map_command, input_filename, start, length, tcpaddr);
+ }
+ }
+ return 0;
+}
+
+// Fires up the map task.
+// For the moment, all it does is echo the command line, not run it.
+int start_map_task(const char* command, const char* input_filename,
+ long start, long length, sockaddr_in ip_address)
+{
+ string ip_addr_string(inet_ntoa(ip_address.sin_addr));
+
+
+
+
+
+ cout << "ssh " << ip_addr_string << " " << command
+ << " " << input_filename << " " << start << " " << length << endl;
+ return 0;
+}
+
+
+
+void usage(const char* name) {
+ cout << "usage: " << name << " inputfile map_task reduce_task" << endl;
+ cout << "inputfile must be a valid path in the running Ceph filesystem." << endl;
+ cout << "map_task should be given with an absolute path, and be present on ";
+ cout << "the REGULAR filesystem every node." << endl;
+ cout << "reduce_task need be present on this node only." << endl;
+}
+
+
+
+
--- /dev/null
+/*
+ * This is the master executable to start up
+ * a compute task across several nodes.
+ *
+ *
+ */
+
+
+//#include <sys/stat.h>
+#include "utility.h"
+
+int start_map_task(const char* command, const char* input_filename,
+ long start, long length, tcpaddr_t ip_address);
+
+void usage(const char* name);
+
+//Client* startCephClient();
+//void kill_client(Client* client);
--- /dev/null
+/*
+ * This is a slave for receiving and executing commands for
+ * compute tasks on an OSD. This supersedes the daemon
+ * version in activetaskd.h/cc, because it's easier to debug
+ * if it's not a daemon.
+ *
+ * Networking code is based off examples from Stevens' UNIX Network Programming.
+ */
+
+#include "activeslave.h"
+
+int main(int argc, const char* argv[]) {
+
+ /* Set up TCP server */
+ int sockfd, newsockfd, childpid;
+ socklen_t clilen;
+ struct sockaddr_in cli_addr, serv_addr;
+
+ //const char *pname = argv[0]; // process name
+
+ // Open a TCP socket
+ if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ cerr << "slave: can't open TCP socket. Exiting." << endl;
+ exit(-1);
+ }
+ cerr << "slave: opened TCP socket." << endl;
+
+ // set up the port
+ bzero((char*) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+ serv_addr.sin_port = htons(SERV_TCP_PORT);
+
+ if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
+ cerr << "slave: can't bind local address. Exiting." << endl;
+ exit(-1);
+ }
+
+ if(listen(sockfd, SOMAXCONN) < 0) {
+ cerr << "slave: listening error. Exiting." << endl;
+ exit(-1);
+ }
+
+
+ /* The Big Loop */
+ while (1) {
+
+ // wait for a message and fork off a child process to handle it
+ clilen = sizeof(cli_addr);
+ newsockfd = accept(sockfd,
+ (struct sockaddr *) &cli_addr,
+ &clilen);
+
+ if (newsockfd < 0) {
+ cerr << "slave: accept error. Exiting." << endl;
+ exit(-1);
+ }
+
+ if ((childpid = fork()) < 0) {
+ cerr << "slave: fork error. Exiting." << endl;
+ exit(-1);
+ }
+
+ else if (childpid == 0) { // child process
+ cerr << "Forked child process for incoming socket" << endl;
+ close(sockfd);
+ process_request(newsockfd);
+ cerr << "Finished processing request. Exiting child." << endl;
+ exit(0);
+ }
+
+ close (newsockfd); // parent
+
+ //sleep(30); /* wait 30 seconds */
+ }
+ exit(EXIT_SUCCESS);
+}
+
+
+/* This will process requests from the master.
+ * The protocol in a nutshell:
+ * Master opens a socket to slave, and sends
+ * one message.
+ * Slave replies with one message.
+ * Socket is closed.
+ */
+
+void process_request(int newsockfd) {
+
+ // first, read the message type.
+ int msg_type = readmsgtype(newsockfd);
+
+
+ // Second, call some function based on the message type to process
+ // the rest of the message. The function is responsible for the rest
+ // of the message; this includes checking the message footer.
+
+ switch(msg_type) {
+
+ case PING: // ping
+ process_ping(newsockfd);
+ break;
+ case STARTTASK: // start_task
+ process_start_task(newsockfd);
+ break;
+ case RETRIEVELOCALFILE: // get_local
+ process_get_local(newsockfd);
+ break;
+ case SHIPCODE:
+ process_shipcode(newsockfd);
+ break;
+
+ case PINGREPLY:
+ case FINISHEDTASK:
+ case TASKFAILED:
+ case SENDLOCALFILE:
+ case LOCALFILENOTFOUND:
+ case CODESAVED:
+ case SHIPFAILED:
+ cerr << "activeslave: BUG received message " << CMD_LIST[msg_type] <<
+ " from master; master should never send this message." << endl;
+ exit(-1);
+ break;
+
+
+ case -1:
+ cerr << "activeslave: message had an unidentifiable type. " <<
+ "Closing socket and discarding rest of message." << endl;
+ default:
+ cerr << "activeslave: BUG! received unexpected return value of" << msg_type <<
+ "from readmsgtype(). Closing socket and discarding rest of message." << endl;
+
+ exit(-1);
+ }
+}
+
+
+// Just write a ping_reply to the socket.
+void process_ping(int fd) {
+
+ // make sure the footer is valid
+ if (!check_footer(fd)) {
+ cerr << "process_ping warning: ping message has invalid or missing footer."
+ << endl;
+ }
+ // Even if the footer's invalid, send the reply.
+ cerr << "Replying to ping..." << endl;
+ send_msg_header(fd, PINGREPLY);
+ send_msg_footer(fd);
+ cerr << "Ping processing completed." << endl;
+}
+
+
+
+// Process a start_task message. This reads the incoming message and
+// starts the corresponding task.
+
+// Parameter format: taskID(int) command(string)
+// cephinputfile(string) offset(long) length(long) localoutputfile
+
+// WARNING: currently has the trivial task hardwired. It
+// ignores the command and the output file.
+void process_start_task(int fd) {
+
+ char command[MAX_STRING_SIZE + 1];
+ char cephinputfile[MAX_STRING_SIZE + 1];
+ char localoutputfile[MAX_STRING_SIZE + 1];
+
+ cout << "in process_start_task: ";
+ int taskID = read_positive_int(fd);
+ cout << "read taskID " << taskID;
+
+ read_string(fd, command);
+ cout << ", command " << command;
+
+ read_string(fd, cephinputfile);
+ cout << ", cephinputfile " << cephinputfile;
+ off_t offset = read_off_t(fd);
+ cout << ", offset " << offset;
+ off_t length = read_off_t(fd);
+ cout << ", length " << length;
+
+ read_string(fd, localoutputfile);
+ cout << ", localoutputfile " << localoutputfile << endl;
+
+ // make sure the footer is valid
+ if (!check_footer(fd)) {
+ cerr << "process_start_task warning: message has invalid or missing footer. "
+ << "Discarding message." << endl;
+ exit(-1);
+ }
+
+
+ // To do: modify to load the task from a library instead of just
+ // using the hardwired one.
+
+ void (*task)(const char*, const char*, off_t, off_t) = 0;
+ task = start_trivial_task;
+
+
+
+ // start a task; create an output filename that uses the task ID, 'cause we might
+ // end up with multiple pieces of a file on each OSD.
+ // WARNING: always does the trivial task; prints answer to stdout but
+ // does not write it to disk.
+ cerr << "starting task: " << endl;
+ //start_trivial_task(cephinputfile, localoutputfile, offset, length);
+ task(cephinputfile, localoutputfile, offset, length);
+ cerr << "returned from task! Sending reply:" << endl;
+
+
+
+ // send the reply
+ send_msg_header(fd, FINISHEDTASK);
+ write_positive_int(fd, taskID);
+ send_msg_footer(fd);
+
+ // done
+ cout << "Done sending reply for taskID " << taskID << endl;
+ return;
+}
+
+
+
+void start_trivial_task (const char* ceph_filename, const char* local_filename,
+ off_t offset, off_t length) {
+ // Don't bother to copy the file to disk. Read the file directly from Ceph,
+ // and add up all the bytes.
+ // Write the total to the local file as a string.
+ Client * client = startCephClient();
+
+ bufferptr bp(CHUNK);
+
+ // get the source file's size. Sanity-check the request range.
+ struct stat st;
+ int r = client->lstat(ceph_filename, &st);
+ assert (r == 0);
+
+ off_t src_total_size = st.st_size;
+ if (src_total_size < offset + length) {
+ cerr << "Error in copy ExtentToLocalFile: offset + length = " << offset << " + " << length
+ << " = " + (offset + length) << ", source file size is only " << src_total_size << endl;
+ exit(-1);
+ }
+ off_t remaining = length;
+
+ // open the file and seek to the start position
+ cerr << "start_trivial_task: opening the source file and seeking " << endl;
+
+ int fh_ceph = client->open(ceph_filename, O_RDONLY);
+ assert (fh_ceph > -1);
+ r = client->lseek(fh_ceph, offset, SEEK_SET);
+ assert (r == offset);
+
+ int counter = 0;
+ // read through the extent and add up the bytes
+ cerr << "start_trivial_task: counting up bytes" << endl;
+ char* bp_c = bp.c_str();
+ while (remaining > 0) {
+ off_t got = client->read(fh_ceph, bp_c, MIN(remaining,CHUNK), -1);
+ assert(got > 0);
+ remaining -= got;
+ for (off_t i = 0; i < got; ++i) {
+ counter += (unsigned int)(bp_c[i]);
+ }
+ }
+ cerr << "start_trivial_task: Done! Answer is " << counter << endl;
+ client->close(fh_ceph);
+
+ //assert(0);
+}
+
+
+// Starts a sloppy grep count of the hardwired search string over the
+// given Ceph file extent. It's sloppy because it copies the given
+// extent to a local file and runs "grep" on it, with no effort to take
+// care of boundary issues.
+void start_sloppy_grepcount (const char* ceph_filename, const char* local_filename,
+ long offset, long size) {
+
+ Client* client = startCephClient();
+ char* search_string = "the";
+ // copy the file to a local file.
+
+ copyExtentToLocalFile (client, ceph_filename, offset, size, local_filename);
+ // we want: grep -c search_string local_filename
+ // to get the number of occurrences of the string.
+ string command = "";
+ command.append("grep -c ");
+ command.append(search_string);
+ command.append(local_filename);
+
+ assert(0);
+
+}
+
+
+// Processes a SHIPCODE message. The message will have a shared
+// library attached to it, which must be stored locally.
+
+void process_shipcode(int fd) {
+
+
+ // get the size of the shared library
+ size_t library_size = read_size_t(fd);
+
+
+ // save the library to a file
+ cerr << "saving library..." << endl;
+ const char* libfile = "/tmp/libslavetask.so";
+ int local_fd = ::open(libfile, O_WRONLY | O_CREAT | O_TRUNC);
+ if (local_fd < 0) {
+ cerr << "Error opening " << libfile << " for writing." << endl;
+ exit(-1);
+ }
+
+ off_t remaining = library_size;
+
+ bufferptr bp(CHUNK);
+ char* bp_c = bp.c_str();
+ while (remaining > 0) {
+ off_t got = readn(fd, bp_c, MIN(remaining, CHUNK));
+ assert(got > 0);
+ remaining -= got;
+ ssize_t written = ::write(local_fd, bp_c, got);
+ assert (written == got);
+ }
+ cerr << "Received shared library and stored as " << libfile << endl;
+
+}
+
+
+// Processes a get_local message. The message
+// specifies the filename of a local file to
+// return to the sender.
+
+// Parameter format: requestID(int) localfilename(string)
+
+// INCOMPLETE: currently just reads the message.
+
+
+void process_get_local(int fd) {
+ cout << "in process_get_local: ";
+ int taskID = read_positive_int(fd);
+ cout << "read taskID " << taskID;
+
+ char localfilename[MAX_STRING_SIZE+1];
+ read_string(fd, localfilename);
+ cout << ", localfilename " << localfilename << endl;
+
+
+ // make sure the footer is valid
+ if (!check_footer(fd)) {
+ cerr << "process_get_local warning: message has invalid or missing footer."
+ << endl;
+ }
+
+ // not implemented
+ cerr << "Error: get_local command unimplemented." << endl;
+ assert(0);
+}
+
+
+// Retrieves a formatted message from the socket.
+// At the moment, this just reads and prints a fixed-
+// length message type.
+// DEPRECATED.
+void str_getmsg(int sockfd) {
+
+ int n;
+
+ // read message types until the connection dies
+ while(true) {
+ n = readmsgtype(sockfd);
+ if (n != 0) {
+ cerr << "from getmsg: some sort of error" << endl;
+ exit(-1);
+ }
+ }
+}
+
+// Echo a stream socket message back to the sender.
+// DEPRECATED.
+void str_echo(int sockfd) {
+
+ int n;
+ char line[MAXLINE];
+
+ while(true) {
+
+ // read from the stream
+ cerr << "str_echo: waiting for a line" << endl;
+ n = readline(sockfd, line, MAXLINE);
+ cerr << "str_echo: read a line" << endl;
+ if (0 == n) {
+ cerr << "str_echo: connection terminated" << endl;
+ return; // connection is terminated
+ }
+ else if (n < 0) {
+ cerr << "str_echo: readline error" << endl;
+ exit(-1);
+ }
+
+ // write back to the stream
+ if (n != writen(sockfd, line, n)) {
+ cerr << "str_echo: writen error" << endl;
+ exit(-1);
+ }
+ else
+ cerr << "Echoed line " << endl;
+ }
+}
+
+
+void str_ack(int sockfd) {
+
+ int n;
+ char line[MAXLINE];
+ //char *ack = "ack";
+
+ while(true) {
+
+ // read from the stream
+ n = readline(sockfd, line, MAXLINE);
+
+ if (0 == n)
+ return; // connection is terminated
+ else if (n < 0)
+ //err_dump("str_echo: readline error");
+ exit(-1);
+
+ // write back to the stream
+ if (4 != writen(sockfd, "ack\n", 4))
+ //err_dump("str_echo: writen error");
+ exit(-1);
+ }
+}
+
+
+
+// Read command lines from the socket and execute them
+
+void str_run(int sockfd) {
+
+ int n;
+ char line[MAXLINE];
+ char* error_msg = "str_run: No command interpreter found\n";
+ char* ack_msg = "Running command... ";
+ char* commit_msg = "Command executed!\n";
+
+ while(true) {
+
+ // read from the stream
+ n = readline(sockfd, line, MAXLINE);
+
+ if (0 == n)
+ return; // connection is terminated
+ else if (n < 0)
+ //err_dump("str_echo: readline error");
+ exit(-1);
+
+ if (system(NULL)) {
+ writen(sockfd, ack_msg, strlen(ack_msg));
+ system(line);
+ writen(sockfd, commit_msg, strlen(commit_msg));
+ }
+ else if ((int)strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg)))
+ //err_dump("str_echo: writen error");
+ exit(-1);
+ }
+}
+
+
+// take a filename and copy it from Ceph to a local directory.
+// Not completed.
+
+void str_copytolocal(int sockfd) {
+
+ int n;
+ char line[MAXLINE];
+ char* error_msg = "str_copy: No command interpreter found\n";
+ char* ack_msg = "Running command... ";
+ char* commit_msg = "Command executed!\n";
+ //char* temp_dir = "/tmp";
+
+
+ while(true) {
+
+ // read from the stream
+ n = readline(sockfd, line, MAXLINE);
+
+ if (0 == n)
+ return; // connection is terminated
+ else if (n < 0)
+ //err_dump("str_echo: readline error");
+ exit(-1);
+
+ if (system(NULL)) {
+ writen(sockfd, ack_msg, strlen(ack_msg));
+ system(line);
+ writen(sockfd, commit_msg, strlen(commit_msg));
+ }
+ else if ((int)strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg)))
+ //err_dump("str_echo: writen error");
+ exit(-1);
+ }
+}
+
+
+
--- /dev/null
+#include "inet.h"
+#include "common.h"
+#include "utility.h"
+#include "client/Client.h"
+
+
+// The port number is "osdd" on a telephone keypad.
+#define SERV_TCP_PORT 6733
+
+#define MAXLINE 512
+
+void str_echo(int sockfd);
+void str_ack(int sockfd);
+void str_run(int sockfd);
+void str_getmsg(int sockfd);
+void process_request(int newsockfd);
+void process_ping(int fd);
+void process_start_task(int fd);
+void process_get_local(int fd);
+void process_shipcode(int fd);
+
+void start_trivial_task(const char* ceph_filename, const char* local_filename,
+ long offset, long length);
--- /dev/null
+/*
+ * This is a daemon for receiving and executing commands for compute tasks on an OSD.
+ *
+ * The daemon uses skeleton code from
+ * http://www.linuxprofilm.com/articles/linux-daemon-howto.html. The
+ * site is no longer up, but can be seen through the archive.org.
+ * Networking code is based off examples from Stevens' UNIX Network Programming.
+ */
+
+#include "activetaskd.h"
+
+
+#define SERVER
+
+#undef SERVER
+
+int main(int argc, const char* argv[]) {
+
+ /* Our process ID and Session ID */
+ pid_t pid, sid;
+
+ /* Fork off the parent process */
+ pid = fork();
+ if (pid < 0) {
+ exit(EXIT_FAILURE);
+ }
+ /* If we got a good PID, then
+ we can exit the parent process. */
+ if (pid > 0) {
+ exit(EXIT_SUCCESS);
+ }
+
+ /* Change the file mode mask */
+ umask(0);
+
+ /* Open any logs here */
+
+ /* Create a new SID for the child process */
+ sid = setsid();
+ if (sid < 0) {
+ /* Log the failure */
+ exit(EXIT_FAILURE);
+ }
+
+
+ /* Change the current working directory */
+ if ((chdir("/")) < 0) {
+ /* Log the failure */
+ exit(EXIT_FAILURE);
+ }
+
+ /* Close out the standard file descriptors */
+ close(STDIN_FILENO);
+ close(STDOUT_FILENO);
+ close(STDERR_FILENO);
+
+ /* Daemon-specific initialization goes here */
+
+
+
+ /* Set up TCP server */
+ int sockfd, newsockfd, childpid;
+ socklen_t clilen;
+ struct sockaddr_in cli_addr, serv_addr;
+
+ const char *pname = argv[0]; // process name
+
+ // Open a TCP socket
+ if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
+ exit(-1);
+ //err_dump("server: can't open stream socket");
+
+ // set up the port
+ bzero((char*) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+ serv_addr.sin_port = htons(SERV_TCP_PORT);
+
+ if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
+ exit(-1);
+ //err_dump("server: can't bind local address");
+
+ if(listen(sockfd, SOMAXCONN) < 0)
+ exit(-1);
+ //err_dump("server: listening error");
+
+ /* The Big Loop */
+ while (1) {
+
+ // wait for a message and fork off a child process to handle it
+ clilen = sizeof(cli_addr);
+ newsockfd = accept(sockfd,
+ (struct sockaddr *) &cli_addr,
+ &clilen);
+
+ if (newsockfd < 0)
+ exit(-1);
+ //err_dump("server: accept error");
+
+ if ( (childpid = fork()) < 0)
+ exit(-1);
+ // err_dump("server: fork error");
+
+ else if (childpid == 0) { // child process
+ close(sockfd);
+ //str_echo(newsockfd);
+ str_run(newsockfd);
+ // insert code to process the request
+ exit(0);
+ }
+
+ close (newsockfd); // parent
+
+ //sleep(30); /* wait 30 seconds */
+ }
+ exit(EXIT_SUCCESS);
+}
+
+
+// Echo a stream socket message back to the sender.
+
+void str_echo(int sockfd) {
+
+ int n;
+ char line[MAXLINE];
+
+ while(true) {
+
+ // read from the stream
+ n = readline(sockfd, line, MAXLINE);
+
+ if (0 == n)
+ return; // connection is terminated
+ else if (n < 0)
+ //err_dump("str_echo: readline error");
+ exit(-1);
+
+ // write back to the stream
+ if (n != writen(sockfd, line, n))
+ //err_dump("str_echo: writen error");
+ exit(-1);
+ }
+}
+
+
+void str_ack(int sockfd) {
+
+ int n;
+ char line[MAXLINE];
+ char *ack = "ack";
+
+ while(true) {
+
+ // read from the stream
+ n = readline(sockfd, line, MAXLINE);
+
+ if (0 == n)
+ return; // connection is terminated
+ else if (n < 0)
+ //err_dump("str_echo: readline error");
+ exit(-1);
+
+ // write back to the stream
+ if (4 != writen(sockfd, "ack\n", 4))
+ //err_dump("str_echo: writen error");
+ exit(-1);
+ }
+}
+
+
+
+
+// Read command lines from the socket and execute them
+
+void str_run(int sockfd) {
+
+ int n;
+ char line[MAXLINE];
+ char* error_msg = "str_run: No command interpreter found\n";
+ char* ack_msg = "Running command... ";
+ char* commit_msg = "Command executed!\n";
+
+ while(true) {
+
+ // read from the stream
+ n = readline(sockfd, line, MAXLINE);
+
+ if (0 == n)
+ return; // connection is terminated
+ else if (n < 0)
+ //err_dump("str_echo: readline error");
+ exit(-1);
+
+ if (system(NULL)) {
+ writen(sockfd, ack_msg, strlen(ack_msg));
+ system(line);
+ writen(sockfd, commit_msg, strlen(commit_msg));
+ }
+ else if (strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg)))
+ //err_dump("str_echo: writen error");
+ exit(-1);
+ }
+}
+
+
+// take a filename and copy it from Ceph to a local directory
+
+void str_copytolocal(int sockfd) {
+
+ int n;
+ char line[MAXLINE];
+ char* error_msg = "str_copy: No command interpreter found\n";
+ char* ack_msg = "Running command... ";
+ char* commit_msg = "Command executed!\n";
+ char* temp_dir = "/tmp";
+
+
+ while(true) {
+
+ // read from the stream
+ n = readline(sockfd, line, MAXLINE);
+
+ if (0 == n)
+ return; // connection is terminated
+ else if (n < 0)
+ //err_dump("str_echo: readline error");
+ exit(-1);
+
+ if (system(NULL)) {
+ writen(sockfd, ack_msg, strlen(ack_msg));
+ system(line);
+ writen(sockfd, commit_msg, strlen(commit_msg));
+ }
+ else if (strlen(error_msg) != writen(sockfd, error_msg, strlen(error_msg)))
+ //err_dump("str_echo: writen error");
+ exit(-1);
+ }
+}
+
+
+
--- /dev/null
+#include "inet.h"
+#include "common.h"
+#include "utility.h"
+#include "client/Client.h"
+
+
+// The port number is "osdd" on a telephone keypad.
+#define SERV_TCP_PORT 6733
+
+#define MAXLINE 512
+
+void str_echo(int sockfd);
+void str_ack(int sockfd);
+void str_run(int sockfd);
--- /dev/null
+#ifndef COMMON_H
+#define COMMON_H
+
+
+#include <sys/stat.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <unistd.h>
+#include <syslog.h>
+#include <string.h>
+#include <iostream>
+#include <string>
+
+// a bunch of string constants
+// for commands
+
+
+
+
+#define CMDLENGTH 10
+#define CMDCOUNT 11
+
+#define MAX_STRING_SIZE 255
+
+/*
+ * These are the various messages that can be sent between the master
+ * and slave. The slave sends one reply to each message from the master.
+
+ * PING/PINGREPLY: just what it sounds like.
+
+ * STARTTASK: starts a task. Needs to be reworked to allow code
+ * shipping. The slave attempts to perform the task, and replies with
+ * FINISHEDTASK or TASKFAILED.
+ *
+ * RETRIEVELOCALFILE: requests a file that the slave has stored
+ * locally. Slave replies with SENDLOCALFILE and the file, or with
+ * LOCALFILENOTFOUND.
+ *
+ * SHIPCODE: sends a shared library to the slave, containing a
+ * function that is to be executed later by the STARTTASK
+ * command. Slave replies with CODESAVED or SHIPFAILED.
+ *
+ */
+
+
+const off_t CHUNK = 1024 * 1024 * 4;
+
+#define PING 0
+#define STARTTASK 1
+#define RETRIEVELOCALFILE 2
+#define PINGREPLY 3
+#define FINISHEDTASK 4
+#define TASKFAILED 5
+#define SENDLOCALFILE 6
+#define LOCALFILENOTFOUND 7
+#define SHIPCODE 8
+#define CODESAVED 9
+#define SHIPFAILED 10
+
+
+#define FOOTER_LENGTH 7
+
+const char* CMD_LIST[CMDCOUNT] = {"______PING",
+ "START_TASK",
+ "_GET_LOCAL",
+ "PING_REPLY",
+ "_TASK_DONE",
+ "TASKFAILED",
+ "SEND_LOCAL",
+ "LOCAL_GONE",
+ "_SHIP_CODE",
+ "CODE_SAVED",
+ "SHIPFAILED"};
+
+const char FOOTER[FOOTER_LENGTH + 1] = "MSG_END";
+
+
+// const char* strArray[] = {"string1", "string2", "string3"};
+//const char commands[2][4] = {"foo", "bar"};
+
+
+
+// error codes
+#define ARGUMENTSINVALID 1001
+#define CEPHCLIENTSTARTUPFAILED 1002
+#define INPUTFILEREADFAILED 1003
+
+
+// const char* name = "Njal";
+
+
+
+#endif //COMMON_H
--- /dev/null
+/*
+ * This is merely a test of an echo server; it's an early step in
+ * building up the Ceph distributed compute service. This is
+ * discardable once the next stage is up and running.
+ *
+ * Code is based off examples in Stevens' "Unix Network Programming".
+ */
+
+#include "echotestclient.h"
+
+int main(int argc, char* argv[]) {
+
+ int sockfd;
+ struct sockaddr_in serv_addr;
+
+ char* pname = argv[0];
+
+ bzero((char *) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = inet_addr(SERV_HOST_ADDR);
+ serv_addr.sin_port = htons(SERV_TCP_PORT);
+
+
+ // open a TCP socket
+ if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ printf("client: can't open stream socket");
+ exit (-1);
+ }
+
+ // connect to the server.
+ if (connect(sockfd, (struct sockaddr *) &serv_addr,
+ sizeof(serv_addr)) < 0) {
+ printf("client: can't connect to server");
+ exit (-1);
+ }
+
+ // start the test echoer
+ str_cli(stdin, sockfd);
+
+
+ close (sockfd);
+ exit(0);
+}
+
+
+void str_cli(FILE *fp, int sockfd) {
+
+ int n;
+ char sendline[MAXLINE], recvline[MAXLINE + 1];
+
+ // read from the fp and write to the socket;
+ // then read from the socket and write to stdout
+ while (fgets(sendline, MAXLINE, fp) != NULL) {
+
+ n = strlen(sendline);
+ if (writen(sockfd, sendline, n) != n) {
+ printf("str_cli: writen error on socket");
+ exit(-1);
+ }
+ n = readline(sockfd, recvline, MAXLINE);
+ if (n < 0) {
+ printf("str_cli: readline error");
+ exit(-1);
+ }
+ recvline[n] = 0;
+ fputs(recvline, stdout);
+ }
+
+ if (ferror(fp)) {
+ printf("str_cli: error reading file");
+ exit(-1);
+ }
+
+}
--- /dev/null
+#include "inet.h"
+#include "common.h"
+#include "utility.h"
+
+#define SERV_HOST_ADDR "128.114.57.143" //issdm-8
+#define SERV_TCP_PORT 6733
+#define MAXLINE 512
+
+void str_cli(FILE *fp, int sockfd);
+
--- /dev/null
+/*
+ * Generic TCP/IP definitions
+ */
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
--- /dev/null
+/*
+ * This test client tests the sending of message headers to the slave.
+ *
+ * Code is based off examples in Stevens' "Unix Network Programming".
+ */
+
+#include "msgtestclient.h"
+#define REQUIRED_ARGS 2
+
+int main(int argc, char* argv[]) {
+
+
+ // make sure we have all the arguments we need
+ if (argc < REQUIRED_ARGS) { usage(argv[0]); exit(-1); }
+
+ // This file is rewired for running tests from a
+ // shell script. The first parameter specifies the
+ // name of the Ceph file that the test will be
+ // run on; the second parameter specifies which of
+ // four different tests will be run.
+ const char* input_filename = argv[1];
+ int test_number = atoi(argv[2]);
+ assert (test_number > 0);
+ assert (test_number < 4);
+
+ //const char* map_command = argv[2];
+ // These two variables aren't really used yet.
+ const char* map_command = "map_foo";
+ const char* output_filename = "out_foo";
+ //const char* output_filename = argv[3];
+ //const char* reduce_command = argv[4]; // not implemented yet
+
+ // start up a Ceph client
+ Client* client = startCephClient();
+
+ // open the input file as read_only
+ int fh = client->open(input_filename, O_RDONLY);
+ if (fh < 0) {
+ cerr << "The input file " << input_filename << " could not be opened." << endl;
+ exit(-1);
+ }
+
+ // How big is the file?
+ off_t filesize;
+ struct stat stbuf;
+ if (0 > client->lstat(input_filename, &stbuf)) {
+ cerr << "Error: could not retrieve size of input file " << input_filename << endl;
+ exit(-1);
+ }
+ filesize = stbuf.st_size;
+ if (filesize < 1) {
+ cerr << "Error: input file size is " << filesize << endl;
+ exit(-1);
+ }
+
+ // retrieve all the object extents
+ list<ObjectExtent> extents;
+ off_t offset = 0;
+ client->enumerate_layout(fh, extents, filesize, offset);
+
+ list<ObjectExtent>::iterator i;
+ map<size_t, size_t>::iterator j;
+ int osd;
+ int taskID = 0;
+
+ // Pull out all the extents, and make a vector of
+ // (ip_address, start, length).
+
+ vector<request_split> original_splits;
+
+ for (i = extents.begin(); i != extents.end(); i++) {
+
+ request_split split;
+ // find the primary and get its IP address
+ osd = client->osdmap->get_pg_primary(i->layout.pgid);
+ entity_inst_t inst = client->osdmap->get_inst(osd);
+ entity_addr_t entity_addr = inst.addr;
+ entity_addr.make_addr(split.ip_address);
+
+ // iterate through each buffer_extent in the ObjectExtent
+ for (j = i->buffer_extents.begin();
+ j != i->buffer_extents.end(); j++) {
+
+ // get the range of the buffer_extent
+ split.start = (*j).first;
+ split.length = (*j).second;
+ // throw the split onto the vector
+ original_splits.push_back(split);
+ }
+ }
+
+ // close the client - we're done with it
+ client->shutdown();
+
+ // sanity check: display the splits
+ cerr << "Listing original splits:" << endl;
+ for (vector<request_split>::iterator i = original_splits.begin();
+ i != original_splits.end(); i++) {
+ cerr << "Split: IP " << i->ip_address << ", start "
+ << i->start << ", length " << i->length << endl;
+ }
+
+ vector<request_split> test_splits;
+ // Now, modify the splits as needed for the test type.
+ // There are three types of tests.
+ // Test 1: regular test.
+ // Test 2: put all the tasks on the "wrong" OSD.
+ // Test 3: do the entire job off one node.
+
+ if (1 == test_number) {
+ cerr << "Test type 1: using original splits." << endl;
+ test_splits = original_splits;
+ }
+ else if (2 == test_number) {
+ cerr << "Test type 2: rotating split IP addresses. " << endl;
+ int split_count = original_splits.size();
+ for (int i = 0; i < split_count; ++i) {
+ request_split s;
+ s.start = original_splits.at(i).start;
+ s.length = original_splits.at(i).length;
+ s.ip_address = original_splits.at((i+1)%split_count).ip_address;
+ test_splits.push_back(s);
+ }
+ }
+ else if (3 == test_number) {
+ cerr << "Test type 3: one giant split." << endl;
+ request_split s;
+ s.start = 0;
+ s.length = filesize;
+ s.ip_address = original_splits.at(0).ip_address;
+ test_splits.push_back(s);
+ }
+ else {
+ cerr << "Error: received invalid test type " << test_number << endl;
+ exit(-1);
+ }
+
+ cerr << "Listing test splits:" << endl;
+ for (vector<request_split>::iterator i = test_splits.begin();
+ i != test_splits.end(); i++) {
+ cerr << "Split: IP " << i->ip_address << ", start "
+ << i->start << ", length " << i->length << endl;
+ }
+
+ // start the timer
+ utime_t start_time = g_clock.now();
+ int pending_tasks = 0;
+
+ // start up the tasks
+ for (vector<request_split>::iterator i = test_splits.begin();
+ i != test_splits.end(); i++) {
+ start_map_task(i->ip_address, taskID++, map_command, input_filename,
+ i->start, i->length, output_filename);
+ ++pending_tasks;
+ }
+
+
+ // wait for all the tasks to finish
+ while (pending_tasks > 0) {
+ int exit_status;
+ cerr << "Waiting for " << pending_tasks << " tasks to return..." << endl;
+ pid_t pid = wait(&exit_status);
+ if (pid < 0) {
+ cerr << "ERROR on wait(): result was " << pid << endl;
+ exit(-1);
+ }
+ --pending_tasks;
+ if (WIFEXITED(exit_status)) {
+ cerr << "Task with pid " << pid << " returned with exit status " <<
+ WEXITSTATUS(exit_status) << endl;
+ }
+ else { cerr << "WARNING: Task with pid " << pid << " exited abnormally" << endl; }
+ }
+
+ cerr << "All tasks have returned." << endl;
+ // report the time
+ double elapsed_time;
+ elapsed_time = (g_clock.now() - start_time);
+ cerr << "Elapsed time: " << elapsed_time << endl;
+ cerr << elapsed_time << " " << endl;
+ // send the time to stdout for the shell script
+ cout << elapsed_time << " ";
+ exit(0);
+}
+
+
+// sends a complete ping message
+// through the file descriptor
+// and waits for a reply. This
+// will hang if there's no reply.
+
+void ping_test(int fd) {
+
+ // send the message header and footer.
+ // A ping message has no body.
+ send_msg_header(fd, PING);
+ send_msg_footer(fd);
+
+ // receive the reply.
+ int msg_type = readmsgtype(fd);
+ if (msg_type < 0) {
+ cerr << "ping_test: Failed reading the ping reply. Exiting." << endl;
+ exit(-1);
+ }
+ if (PINGREPLY != msg_type) {
+ assert((msg_type <= 0) && (msg_type < CMDCOUNT) &&
+ "readmsgtype return value out of range");
+ cerr << "ping_test: slave sent invalid reply: replied to ping with message type" <<
+ msg_type << ": " << CMD_LIST[msg_type] << ". Exiting. " << endl;
+ exit(-1);
+ }
+ else {
+ cerr << "Received valid ping reply!" << endl;
+ }
+
+ if(!check_footer(fd)) {
+ cerr << "ping_test: message footer not found. Exiting." << endl;
+ exit(-1);
+ }
+}
+
+
+
+
+// send a test message for starting a task
+void start_task_test(int fd) {
+
+ // The test:
+ // TaskID 42
+ // command: "Burninate"
+ // input file: "countryside"
+ // offset: 8764 (TROG)
+ // length: 367 (DOR)
+
+ send_start_task_msg(fd, 42, strlen("Burninate"), "Burninate",
+ strlen("countryside"), "countryside",
+ 8764, 367,
+ strlen("toast"), "toast");
+}
+
+
+// sends a message to the fd telling it to start a task.
+// Remember: the message format requires any string to be
+// prefixed by its (unterminated) length.
+void send_start_task_msg(int fd,
+ int taskID,
+ int command_size, const char* command,
+ int inputfilenamesize, const char* inputfilename,
+ off_t offset,
+ off_t length,
+ int outputfilenamesize, const char* outputfilename) {
+
+ // write the header and the message to the file descriptor.
+
+ send_msg_header(fd, STARTTASK);
+
+ write_positive_int(fd, taskID);
+ write_positive_int(fd, command_size);
+ write_string(fd, command);
+ write_positive_int(fd, inputfilenamesize);
+ write_string(fd, inputfilename);
+ //write_long(fd, offset);
+ write_off_t (fd, offset);
+ //write_long(fd, length);
+ write_off_t (fd, length);
+ write_positive_int(fd, outputfilenamesize);
+ write_string(fd, outputfilename);
+
+ // terminate the message
+ send_msg_footer(fd);
+}
+
+
+
+
+// creates a new connection to the slave
+// at the given IP address and port.
+// Overloaded to take an IP address as a
+// string or as an in_addr_t.
+
+int create_new_connection(const char* ip_address, uint16_t port)
+{
+ in_addr_t ip = inet_addr(ip_address);
+ if ((in_addr_t)-1 == ip) {
+ cerr << "Error creating new connection: \"" << ip_address <<
+ "\" is not a valid IP address." << endl;
+ return -1;
+ }
+ else
+ //cerr << "Opening connection to " << ip_address << ":" << endl;
+ return create_new_connection(ip, port);
+}
+
+
+int create_new_connection(in_addr_t ip_address, uint16_t port) {
+
+ struct sockaddr_in serv_addr;
+ int sockfd;
+
+ bzero((char *) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ //serv_addr.sin_addr.s_addr = inet_addr(SERV_HOST_ADDR);
+ serv_addr.sin_addr.s_addr = ip_address;
+ serv_addr.sin_port = htons(SERV_TCP_PORT);
+
+ // open a TCP socket
+ if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ cerr << "msgtestclient: can't open stream socket. Exiting." << endl;
+ exit (-1);
+ }
+
+ // connect to the server.
+ if (connect(sockfd, (struct sockaddr *) &serv_addr,
+ sizeof(serv_addr)) < 0) {
+ cerr << "msgtestclient: can't connect to server." << endl;
+ exit (-1);
+ }
+ //cerr << "opened connection!" << endl;
+ return sockfd;
+}
+
+void msg_type_sender(int sockfd) {
+
+ for (int i = 0; i < CMDCOUNT; ++i) {
+ send_msg_header(sockfd, i);
+ }
+
+}
+
+// Fires up the map task.
+// For the moment, all it does is echo the command line, not run it.
+int start_map_task(sockaddr_in ip_address, int taskID,
+ const char* command, const char* input_filename,
+ off_t start, off_t length,
+ const char* output_filename)
+{
+ int childpid;
+ // fork off a child process to do the work, and return
+ if ((childpid = fork()) < 0) {
+ cerr << "start_map_task: fork error. Exiting." << endl;
+ exit(-1);
+ }
+
+ else if (childpid != 0) { // parent
+ cerr << "start_map_task: forked child process "
+ << childpid << " to start task. " << endl;
+ return 0;
+ }
+
+
+ string ip_addr_string(inet_ntoa(ip_address.sin_addr));
+ // cerr << "command: " << ip_addr_string << " taskID "
+ // << taskID << ": " << command
+ // << " " << input_filename << " " << start << " " << length
+ // << " " << output_filename << endl;
+
+ // open a socket to the slave, and send the message
+ //cerr << "Sending message: " << endl;
+ int sockfd = create_new_connection(ip_addr_string.c_str(), SERV_TCP_PORT);
+ send_start_task_msg(sockfd, taskID, strlen(command), command,
+ strlen(input_filename), input_filename,
+ start, length,
+ strlen(output_filename), output_filename);
+
+ // wait for a reply
+ cerr << "Sent message for taskID " << taskID << ". Waiting for reply..." << endl;
+
+ // receive the reply.
+ int msg_type = readmsgtype(sockfd);
+ if (msg_type < 0) {
+ cerr << "start_map_task: Failed reading the reply. Exiting." << endl;
+ exit(-1);
+ }
+ if (FINISHEDTASK != msg_type) {
+ assert((msg_type <= 0) && (msg_type < CMDCOUNT));
+ cerr << "start_map_task: slave sent invalid reply: replied with message type" <<
+ msg_type << ": " << CMD_LIST[msg_type] << ". Exiting. " << endl;
+ exit(-1);
+ }
+ // read the taskID of the reply
+
+ int reply_taskID = read_positive_int(sockfd);
+
+ if(!check_footer(sockfd)) {
+ cerr << "ping_test: message footer not found. Exiting." << endl;
+ exit(-1);
+ }
+
+ // done!
+ close(sockfd);
+ cerr << "Task " << taskID << "/" << reply_taskID <<
+ " complete! Ending child process." << endl;
+ exit(0);
+ //_exit(0);
+ cerr << "exit(0) returned. Strange things are afoot." << endl;
+}
+
+
+
+
+void usage(const char* name) {
+ //cout << "usage: " << name << " inputfile map_task outputfile" << endl;
+ //cout << "inputfile must be a valid path in the running Ceph filesystem." << endl;
+ //cout << "map_task should be given with an absolute path, and be present on ";
+ //cout << "the REGULAR filesystem every node." << endl;
+ //cout << "output_file will be written locally to the node." << endl;
+
+ cout << "usage: " << name << " inputfile test_number" << endl;
+ cout << "inputfile must be a valid path in the running Ceph filesystem." << endl;
+ cout << "test_number must be 1, 2, or 3." << endl;
+ cout << " 1: run the test task normally (one slave per OSD)" << endl;
+ cout << " 2: run the test task on the \"wrong\" OSDs" << endl;
+ cout << " 3: run the entire task in a single process" << endl;
+}
+
+
+
--- /dev/null
+#include "inet.h"
+#include "common.h"
+#include "utility.h"
+#include "client/Client.h"
+
+// wait.h MUST NOT be #included before client/Client.h
+#include <sys/wait.h>
+#include <vector>
+
+ struct request_split {
+ tcpaddr_t ip_address;
+ off_t start;
+ off_t length;
+ };
+
+
+//#define SERV_HOST_ADDR "128.114.57.143" //issdm-8
+#define SERV_HOST_ADDR "128.114.57.166" //issdm-31
+
+#define SERV_TCP_PORT 6733
+#define MAXLINE 512
+
+void msg_type_sender(int sockfd);
+
+
+int create_new_connection(const char* ip_address, uint16_t port);
+int create_new_connection(in_addr_t ip_address, uint16_t port);
+void usage(const char* name);
+void ping_test(int fd);
+void start_task_test(int fd);
+
+int start_map_task(sockaddr_in ip_address, int taskID,
+ const char* map_command,
+ const char* input_filename,
+ off_t start, off_t length,
+ const char* output_filename);
+
+void send_start_task_msg(int fd,
+ int taskID,
+ int command_size, const char* command,
+ int inputfilenamesize, const char* inputfilename,
+ off_t offset,
+ off_t length,
+ int outputfilenamesize, const char* outputfilename);
--- /dev/null
+#include "trivial_task.h"
+
+void start_trivial_task (const char* ceph_filename, const char* local_filename,
+ off_t offset, off_t length) {
+ // Don't bother to copy the file to disk. Read the file directly from Ceph,
+ // and add up all the bytes.
+ // Write the total to the local file as a string.
+ Client * client = startCephClient();
+
+ bufferptr bp(CHUNK);
+
+ // get the source file's size. Sanity-check the request range.
+ struct stat st;
+ int r = client->lstat(ceph_filename, &st);
+ assert (r == 0);
+
+ off_t src_total_size = st.st_size;
+ if (src_total_size < offset + length) {
+ cerr << "Error in copy ExtentToLocalFile: offset + length = " << offset << " + " << length
+ << " = " + (offset + length) << ", source file size is only " << src_total_size << endl;
+ exit(-1);
+ }
+ off_t remaining = length;
+
+ // open the file and seek to the start position
+ cerr << "start_trivial_task: opening the source file and seeking " << endl;
+
+ int fh_ceph = client->open(ceph_filename, O_RDONLY);
+ assert (fh_ceph > -1);
+ r = client->lseek(fh_ceph, offset, SEEK_SET);
+ assert (r == offset);
+
+ int counter = 0;
+ // read through the extent and add up the bytes
+ cerr << "start_trivial_task: counting up bytes" << endl;
+ char* bp_c = bp.c_str();
+ while (remaining > 0) {
+ off_t got = client->read(fh_ceph, bp_c, MIN(remaining,CHUNK), -1);
+ assert(got > 0);
+ remaining -= got;
+ for (off_t i = 0; i < got; ++i) {
+ counter += (unsigned int)(bp_c[i]);
+ }
+ }
+ cerr << "start_trivial_task: Done! Answer is " << counter << endl;
+ client->close(fh_ceph);
+
+ //assert(0);
+}
+
--- /dev/null
+// Shared library for the trivial task of adding up all the bytes in a file
+
+//#include "inet.h"
+#include "common.h"
+#include "utility.h"
+#include "client/Client.h"
+
+
+extern "C" void start_trivial_task (const char* ceph_filename,
+ const char* local_filename,
+ off_t offset, off_t length);
+
--- /dev/null
+/*
+ * Miscellaneous Active OSD helper functions.
+ *
+ */
+
+//#include <sys/stat.h>
+#include "client/Client.h"
+#include "common.h"
+#include "config.h"
+#include "common/Timer.h"
+#include "msg/SimpleMessenger.h"
+
+
+Client* startCephClient();
+void kill_client(Client* client);
+
+
+int readline(int fd, char *ptr, int maxlen);
+int writen(int fd, const char *ptr, int nbytes);
+int send_msg_header(int fd, int header_ID);
+int readmsgtype(int fd);
+bool check_footer(int fd);
+int send_msg_header(int fd, int header_ID);
+int send_msg_footer(int fd);
+int read_positive_int(int fd);
+long read_long(int fd);
+string read_string(int fd, string *buf);
+bool write_positive_int(int fd, int value);
+off_t read_off_t(int fd);
+
+/*
+ * Fires up a Ceph client and returns a pointer to it.
+ */
+
+Client* startCephClient()
+{
+ dout(3) << "ActiveMaster: Initializing Ceph client:" << endl;
+
+ // parse args from CEPH_ARGS, not command line
+ vector<char*> args;
+ env_to_vec(args);
+ parse_config_options(args);
+
+ if (g_conf.clock_tare) g_clock.tare();
+
+ // be safe
+ g_conf.use_abspaths = true;
+
+ // load monmap
+ MonMap monmap;
+ int r = monmap.read(".ceph_monmap");
+ if (r < 0) {
+ dout(0) << "ActiveMaster: could not find .ceph_monmap" << endl;
+ return 0;
+ }
+ assert(r >= 0);
+
+ // start up network
+ rank.start_rank();
+
+ // start client
+ Client *client;
+ client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
+ client->init();
+
+ // mount
+ client->mount();
+
+ return client;
+}
+
+void kill_client (Client * client)
+{
+ client->unmount();
+ client->shutdown();
+ delete client;
+
+ // wait for messenger to finish
+ rank.wait();
+}
+
+
+// Read n bytes from a descriptor.
+int readn(int fd, char * ptr, int nbytes)
+{
+ int nleft, nread;
+
+ nleft = nbytes;
+ while (nleft > 0) {
+ nread = read(fd, ptr, nleft);
+ if (nread < 0)
+ return nread; // error
+ else if (nread == 0)
+ break;
+
+ nleft -= nread;
+ ptr += nread;
+ }
+ return (nbytes - nleft);
+}
+
+// Read a line from the socket. This is horrifically slow, as
+// it goes one character at a time to catch the newline.
+
+int readline(int fd, char *ptr, int maxlen) {
+
+ int n, rc;
+ char c;
+
+ for (n = 1; n < maxlen; ++n) {
+ if ( (rc = read(fd, &c, 1)) == 1) {
+ *ptr++ = c;
+ if (c == '\n')
+ break;
+ }
+ else if (rc == 0) {
+ if (n == 1)
+ return 0; // EOF, no data
+ else
+ break; // EOF, data read
+ }
+ else
+ return -1; // error
+
+ }
+
+ // null-terminate the string and return the number of bytes read
+ *ptr = 0;
+ return n;
+}
+
+// write n bytes to a stream file descriptor.
+
+int writen(int fd, const char *ptr, int nbytes) {
+ int nleft, nwritten;
+
+ nleft = nbytes;
+
+ // write until everything's sent
+ while (nleft > 0) {
+ nwritten = write(fd, ptr, nleft);
+ if (nwritten <= 0) {
+ cerr << "writen: error writing " << nbytes <<
+ "bytes to file descriptor " << fd << endl;
+ return nwritten; //error
+ }
+ nleft -= nwritten;
+ ptr += nwritten;
+ }
+ assert (0 == nleft);
+ return (nbytes - nleft);
+}
+
+
+
+// read a message type from the socket, and print it.
+
+int readmsgtype(int fd) {
+ int rc;
+ char typebuf[CMDLENGTH + 1];
+
+ rc = read(fd, &typebuf, CMDLENGTH);
+
+ // read a fixed-length text command
+ if (rc != CMDLENGTH) {
+ cerr << "in readmsgtype: read error: result is " << rc << endl;
+ return -1;
+ }
+
+ // null-terminate the string
+ typebuf[CMDLENGTH] = 0;
+
+ // print the command
+ //cerr << "readmsgtype: text type is " << typebuf << ", " ;
+
+ // figure out which one it is, by number
+ for (int i = 0; i < CMDCOUNT; ++i) {
+ if (!strcmp(typebuf, CMD_LIST[i])) {
+ //cerr << "which is identified as type " << i << endl;
+ return i;
+ }
+ }
+
+ // if we get here the type was invalid
+ cerr << "readmsgtype: unrecognized message type " << typebuf << endl;
+ return -1;
+}
+
+// Attempt to read the message footer off
+// the given stream.
+bool check_footer(int fd) {
+
+ // leave space for null termination
+ char footer_buf[FOOTER_LENGTH+1];
+
+ // read the footer
+ int rc = read(fd, &footer_buf, FOOTER_LENGTH);
+ if (rc != FOOTER_LENGTH) {
+ cerr << "in check_footer: read error: result is " << rc << endl;
+ return false;
+ }
+
+ // null-terminate the string
+ footer_buf[FOOTER_LENGTH] = 0;
+
+ // Is the footer correct?
+ if (0 == strcmp(footer_buf, FOOTER))
+ return true;
+ else
+ return false;
+}
+
+
+// Attempt to read a positive signed integer off the given stream.
+// This function assumes that the sender and receiver use the same
+// integer size. If this is false, weird stuff will happen.
+int read_positive_int(int fd) {
+
+ char buf[sizeof(int)];
+ int rc = read(fd, &buf, sizeof(int));
+ if (rc != sizeof(int)) {
+ cerr << "in read_positive_int: read error: result is " << rc <<
+ ". Exiting process. " << endl;
+ exit(-1);
+ }
+ return *((int*)buf);
+}
+
+long read_long(int fd) {
+ char buf[sizeof(long)];
+ int rc = read(fd, &buf, sizeof(long));
+ if (rc != sizeof(long)) {
+ cerr << "in read_long: read error: result is " << rc <<
+ ". Exiting process. " << endl;
+ exit(-1);
+ }
+ return *((long*)buf);
+}
+
+
+off_t read_off_t(int fd) {
+ char buf[sizeof(off_t)];
+ int rc = read(fd, &buf, sizeof(off_t));
+ if (rc != sizeof(off_t)) {
+ cerr << "in read_off_t: read error: result is " << rc <<
+ ". Exiting process. " << endl;
+ exit(-1);
+ }
+ return *((off_t*)buf);
+}
+
+size_t read_size_t(int fd) {
+ char buf[sizeof(size_t)];
+ int rc = read(fd, &buf, sizeof(size_t));
+ if (rc != sizeof(size_t)) {
+ cerr << "in read_size_t: read error: result is " << rc <<
+ ". Exiting process. " << endl;
+ exit(-1);
+ }
+ return *((size_t*)buf);
+}
+
+
+
+
+// attempt to write an integer to the given
+// file descriptor.
+bool write_positive_int(int fd, int value) {
+
+ char* buf = (char*)(&(value));
+ int rc = writen(fd, buf, sizeof(int));
+
+ if (rc != sizeof(int)) {
+ cerr << "in write_positive_int: write failed, result is " << rc <<
+ ", sizeof(int) is " << sizeof(int) << ". Exiting process." << endl;
+ exit(-1);
+ }
+
+ return true;
+}
+
+// attempt to write a long integer to the given
+// file descriptor.
+bool write_long(int fd, long value) {
+
+ char* buf = (char*)(&(value));
+ int rc = writen(fd, buf, sizeof(long));
+
+ if (rc != sizeof(long)) {
+ cerr << "in write_long: write failed, result is " << rc <<
+ ", sizeof(long) is " << sizeof(long) << ". Exiting process." << endl;
+ exit(-1);
+ }
+
+ return true;
+}
+
+
+// attempt to write a long integer to the given
+// file descriptor.
+bool write_off_t(int fd, off_t value) {
+
+ char* buf = (char*)(&(value));
+ int rc = writen(fd, buf, sizeof(off_t));
+
+ if (rc != sizeof(off_t)) {
+ cerr << "in writeoff_t: write failed, result is " << rc <<
+ ", sizeof(off_t) is " << sizeof(off_t) << ". Exiting process." << endl;
+ exit(-1);
+ }
+
+ return true;
+}
+
+
+
+
+// read a string from the given file descriptor.
+// The expected format is an int n denoting the
+// length of the string, followed by a series of n
+// bytes, not null-terminated.
+void read_string(int fd, char* buf) {
+
+ // get the size of the string
+ int size = read_positive_int(fd);
+ if (size < 1) {
+ cerr << "Error in read_string: invalid string size of " << size << endl;
+ exit(-1);
+ }
+ if (size > MAX_STRING_SIZE) {
+ cerr << "Error in read_string: string size of " << size << "is more than maximum of"
+ << MAX_STRING_SIZE << endl;
+ exit(-1);
+ }
+
+ // read the string
+ int result = readn(fd, buf, size);
+ if (result != size) {
+ cerr << "Error in read_string: attempted read size was " << size <<
+ ", result was " << result << endl;
+ exit(-1);
+ }
+ // null-terminate
+ buf[size] = 0;
+
+ cerr << "in read_string: read string \"" << buf << "\" of size " << size << endl;
+
+}
+
+
+// send a fixed-length message header
+// given the header's ID.
+int send_msg_header(int fd, int header_ID) {
+ if ((header_ID < 0) || (header_ID >= CMDCOUNT)) {
+ cerr << "In send_msg_header: received out-of-range header ID " << header_ID <<
+ ". Exiting process." << endl;
+ exit(-1);
+ }
+
+ //cerr << "attempting to send message " << CMD_LIST[header_ID] <<
+ // " with ID " << header_ID << endl;
+
+ if (CMDLENGTH != writen(fd, CMD_LIST[header_ID], CMDLENGTH)) {
+ cerr << "In send_msg_header: error writing header ID " << header_ID <<
+ "to file descriptor " << fd << ". Exiting process." << endl;
+ exit(-1);
+ }
+
+ return 0;
+}
+
+// send the fixed-length message footer.
+int send_msg_footer(int fd) {
+ //cerr << "attempting to send message footer: " << endl;
+ if (FOOTER_LENGTH != writen(fd, FOOTER, FOOTER_LENGTH)) {
+ cerr << "in send_msg_footer: error writing footer to file descriptor " <<
+ fd << ". Exiting process." << endl;
+ exit(-1);
+ } else {
+ //cerr << "Sent message footer!" << endl;
+ }
+ return 0;
+}
+
+
+// Writes a string to a stream file descriptor.
+// Dies loudly and horribly on any error.
+bool write_string(int fd, const char* buf) {
+
+ int length = strlen(buf);
+ assert (length >= 0);
+ int result = writen(fd, buf, length);
+ if (result != length) {
+ cerr << "Error in write_string: string length is " << length <<
+ ", result is " << result << endl;
+ exit(-1);
+ }
+
+ return true;
+}
+
+
+// Copy a given extent of a Ceph file to the local disk.
+// Requires a running Ceph client.
+void copyExtentToLocalFile (Client* client, const char* ceph_source,
+ long offset, long length,
+ const char* local_destination) {
+
+ // get the source file's size. Sanity-check the request range.
+ struct stat st;
+ int r = client->lstat(ceph_source, &st);
+ assert (r == 0);
+
+ off_t src_total_size = st.st_size;
+ if (src_total_size < offset + length) {
+ cerr << "Error in copy ExtentToLocalFile: offset + size = " << offset << " + " << length
+ << " = " + (offset + length) << ", source file size is only " << src_total_size << endl;
+ exit(-1);
+ }
+ off_t remaining = length;
+
+ // open the source and destination files. Advance the source
+ // file to the desired offset.
+ int fh_ceph = client->open(ceph_source, O_RDONLY);
+ assert (fh_ceph > -1);
+ r = client->lseek(fh_ceph, offset, SEEK_SET);
+ assert (r == offset);
+
+ int fh_local = ::open(local_destination, O_WRONLY|O_CREAT|O_TRUNC, 0644);
+ assert (fh_local > -1);
+
+ // copy the file 4 MB at a time
+ const int chunk = 4*1024*1024;
+ bufferptr bp(chunk);
+
+ while (remaining > 0) {
+ off_t got = client->read(fh_ceph, bp.c_str(), MIN(remaining,chunk), -1);
+ assert(got > 0);
+ remaining -= got;
+ off_t wrote = ::write(fh_local, bp.c_str(), got);
+ assert (got == wrote);
+ }
+ // close the files
+ client->close(fh_ceph);
+ ::close(fh_local);
+}
#define __THREAD_H
#include <pthread.h>
+#include <signal.h>
#include <errno.h>
class Thread {
bool is_started() { return thread_id != 0; }
bool am_self() { return (pthread_self() == thread_id); }
+ int kill(int signal) {
+ return pthread_kill(thread_id, signal);
+ }
int create() {
return pthread_create( &thread_id, NULL, _entry_func, (void*)this );
}
dirty_onodes.erase(on);
}
- if (on->get_ref_count() > 1) cout << "remove_onode **** will survive " << *on << endl;
+ if (on->get_ref_count() > 1) dout(10) << "remove_onode **** will survive " << *on << endl;
put_onode(on);
dirty = true;
+void noop_signal_handler(int s)
+{
+ //cout << "blah_handler got " << s << endl;
+}
int Rank::Accepter::start()
{
// set up signal handler
old_sigint_handler = signal(SIGINT, simplemessenger_sigint);
+ // set a harmless handle for SIGUSR1 (we'll use it to stop the accepter)
+ struct sigaction sa;
+ sa.sa_handler = noop_signal_handler;
+ sa.sa_flags = 0;
+ sigaction(SIGUSR1, &sa, NULL);
+
// start thread
create();
void *Rank::Accepter::entry()
{
dout(10) << "accepter starting" << endl;
-
+
+ fd_set fds;
while (!done) {
+ FD_ZERO(&fds);
+ FD_SET(listen_sd, &fds);
+ dout(20) << "accepter calling select" << endl;
+ int r = ::select(listen_sd+1, &fds, 0, &fds, 0);
+ dout(20) << "accepter select got " << r << endl;
+
+ if (done) break;
+
// accept
struct sockaddr_in addr;
socklen_t slen = sizeof(addr);
- int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen);
+ int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen); // FIXME: make this non-blocking.
if (sd > 0) {
dout(10) << "accepted incoming on sd " << sd << endl;
rank.lock.Unlock();
} else {
dout(10) << "no incoming connection?" << endl;
- break;
}
}
+ ::close(listen_sd);
+
return 0;
}
+void Rank::Accepter::stop()
+{
+ done = true;
+ this->kill(SIGUSR1);
+ join();
+}
/**************************************
lock.Unlock();
// done! clean up.
-
- //dout(10) << "wait: stopping accepter thread" << endl;
- //accepter.stop();
+ dout(-10) << "wait: stopping accepter thread" << endl;
+ accepter.stop();
+ dout(-10) << "wait: stopped accepter thread" << endl;
// stop dispatch thread
if (g_conf.ms_single_dispatch) {
-
/* Rank - per-process
*/
class Rank {
Accepter() : done(false) {}
void *entry();
- void stop() {
- done = true;
- ::close(listen_sd);
- join();
- }
+ void stop();
int start();
} accepter;
}
// do it now
- dout(-10) << "preprocess_op data is in cache, reading from cache" << *op << dendl;
+ dout(10) << "preprocess_op data is in cache, reading from cache" << *op << dendl;
do_op(op);
return true;
}