From: Casey Bodley Date: Wed, 21 Mar 2018 17:34:16 +0000 (-0400) Subject: Merge commit 'b7c8aeb691c07f356d7cb042b51afd4759ff2f96' into wip-dmclock-remove-by-ptr X-Git-Tag: v13.1.0~400^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5334539ca69cacd4f0e4e7c4c8ff82b7058a6ae9;p=ceph.git Merge commit 'b7c8aeb691c07f356d7cb042b51afd4759ff2f96' into wip-dmclock-remove-by-ptr --- 5334539ca69cacd4f0e4e7c4c8ff82b7058a6ae9 diff --cc src/dmclock/COPYING index 000000000000,000000000000..6f1dfff1f687 new file mode 100644 --- /dev/null +++ b/src/dmclock/COPYING @@@ -1,0 -1,0 +1,3 @@@ ++Files: * ++Copyright: (C) 2016-2018 by Red Hat Inc. ++License: LGPL2.1 (see COPYING-LGPL2.1) diff --cc src/dmclock/COPYING-LGPL2.1 index 000000000000,000000000000..5ab7695ab8ca new file mode 100644 --- /dev/null +++ b/src/dmclock/COPYING-LGPL2.1 @@@ -1,0 -1,0 +1,504 @@@ ++ GNU LESSER GENERAL PUBLIC LICENSE ++ Version 2.1, February 1999 ++ ++ Copyright (C) 1991, 1999 Free Software Foundation, Inc. ++ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA ++ Everyone is permitted to copy and distribute verbatim copies ++ of this license document, but changing it is not allowed. ++ ++[This is the first released version of the Lesser GPL. It also counts ++ as the successor of the GNU Library Public License, version 2, hence ++ the version number 2.1.] ++ ++ Preamble ++ ++ The licenses for most software are designed to take away your ++freedom to share and change it. By contrast, the GNU General Public ++Licenses are intended to guarantee your freedom to share and change ++free software--to make sure the software is free for all its users. ++ ++ This license, the Lesser General Public License, applies to some ++specially designated software packages--typically libraries--of the ++Free Software Foundation and other authors who decide to use it. You ++can use it too, but we suggest you first think carefully about whether ++this license or the ordinary General Public License is the better ++strategy to use in any particular case, based on the explanations below. ++ ++ When we speak of free software, we are referring to freedom of use, ++not price. Our General Public Licenses are designed to make sure that ++you have the freedom to distribute copies of free software (and charge ++for this service if you wish); that you receive source code or can get ++it if you want it; that you can change the software and use pieces of ++it in new free programs; and that you are informed that you can do ++these things. ++ ++ To protect your rights, we need to make restrictions that forbid ++distributors to deny you these rights or to ask you to surrender these ++rights. These restrictions translate to certain responsibilities for ++you if you distribute copies of the library or if you modify it. ++ ++ For example, if you distribute copies of the library, whether gratis ++or for a fee, you must give the recipients all the rights that we gave ++you. You must make sure that they, too, receive or can get the source ++code. If you link other code with the library, you must provide ++complete object files to the recipients, so that they can relink them ++with the library after making changes to the library and recompiling ++it. And you must show them these terms so they know their rights. ++ ++ We protect your rights with a two-step method: (1) we copyright the ++library, and (2) we offer you this license, which gives you legal ++permission to copy, distribute and/or modify the library. ++ ++ To protect each distributor, we want to make it very clear that ++there is no warranty for the free library. Also, if the library is ++modified by someone else and passed on, the recipients should know ++that what they have is not the original version, so that the original ++author's reputation will not be affected by problems that might be ++introduced by others. ++ ++ Finally, software patents pose a constant threat to the existence of ++any free program. We wish to make sure that a company cannot ++effectively restrict the users of a free program by obtaining a ++restrictive license from a patent holder. Therefore, we insist that ++any patent license obtained for a version of the library must be ++consistent with the full freedom of use specified in this license. ++ ++ Most GNU software, including some libraries, is covered by the ++ordinary GNU General Public License. This license, the GNU Lesser ++General Public License, applies to certain designated libraries, and ++is quite different from the ordinary General Public License. We use ++this license for certain libraries in order to permit linking those ++libraries into non-free programs. ++ ++ When a program is linked with a library, whether statically or using ++a shared library, the combination of the two is legally speaking a ++combined work, a derivative of the original library. The ordinary ++General Public License therefore permits such linking only if the ++entire combination fits its criteria of freedom. The Lesser General ++Public License permits more lax criteria for linking other code with ++the library. ++ ++ We call this license the "Lesser" General Public License because it ++does Less to protect the user's freedom than the ordinary General ++Public License. It also provides other free software developers Less ++of an advantage over competing non-free programs. These disadvantages ++are the reason we use the ordinary General Public License for many ++libraries. However, the Lesser license provides advantages in certain ++special circumstances. ++ ++ For example, on rare occasions, there may be a special need to ++encourage the widest possible use of a certain library, so that it becomes ++a de-facto standard. To achieve this, non-free programs must be ++allowed to use the library. A more frequent case is that a free ++library does the same job as widely used non-free libraries. In this ++case, there is little to gain by limiting the free library to free ++software only, so we use the Lesser General Public License. ++ ++ In other cases, permission to use a particular library in non-free ++programs enables a greater number of people to use a large body of ++free software. For example, permission to use the GNU C Library in ++non-free programs enables many more people to use the whole GNU ++operating system, as well as its variant, the GNU/Linux operating ++system. ++ ++ Although the Lesser General Public License is Less protective of the ++users' freedom, it does ensure that the user of a program that is ++linked with the Library has the freedom and the wherewithal to run ++that program using a modified version of the Library. ++ ++ The precise terms and conditions for copying, distribution and ++modification follow. Pay close attention to the difference between a ++"work based on the library" and a "work that uses the library". The ++former contains code derived from the library, whereas the latter must ++be combined with the library in order to run. ++ ++ GNU LESSER GENERAL PUBLIC LICENSE ++ TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION ++ ++ 0. This License Agreement applies to any software library or other ++program which contains a notice placed by the copyright holder or ++other authorized party saying it may be distributed under the terms of ++this Lesser General Public License (also called "this License"). ++Each licensee is addressed as "you". ++ ++ A "library" means a collection of software functions and/or data ++prepared so as to be conveniently linked with application programs ++(which use some of those functions and data) to form executables. ++ ++ The "Library", below, refers to any such software library or work ++which has been distributed under these terms. A "work based on the ++Library" means either the Library or any derivative work under ++copyright law: that is to say, a work containing the Library or a ++portion of it, either verbatim or with modifications and/or translated ++straightforwardly into another language. (Hereinafter, translation is ++included without limitation in the term "modification".) ++ ++ "Source code" for a work means the preferred form of the work for ++making modifications to it. For a library, complete source code means ++all the source code for all modules it contains, plus any associated ++interface definition files, plus the scripts used to control compilation ++and installation of the library. ++ ++ Activities other than copying, distribution and modification are not ++covered by this License; they are outside its scope. The act of ++running a program using the Library is not restricted, and output from ++such a program is covered only if its contents constitute a work based ++on the Library (independent of the use of the Library in a tool for ++writing it). Whether that is true depends on what the Library does ++and what the program that uses the Library does. ++ ++ 1. You may copy and distribute verbatim copies of the Library's ++complete source code as you receive it, in any medium, provided that ++you conspicuously and appropriately publish on each copy an ++appropriate copyright notice and disclaimer of warranty; keep intact ++all the notices that refer to this License and to the absence of any ++warranty; and distribute a copy of this License along with the ++Library. ++ ++ You may charge a fee for the physical act of transferring a copy, ++and you may at your option offer warranty protection in exchange for a ++fee. ++ ++ 2. You may modify your copy or copies of the Library or any portion ++of it, thus forming a work based on the Library, and copy and ++distribute such modifications or work under the terms of Section 1 ++above, provided that you also meet all of these conditions: ++ ++ a) The modified work must itself be a software library. ++ ++ b) You must cause the files modified to carry prominent notices ++ stating that you changed the files and the date of any change. ++ ++ c) You must cause the whole of the work to be licensed at no ++ charge to all third parties under the terms of this License. ++ ++ d) If a facility in the modified Library refers to a function or a ++ table of data to be supplied by an application program that uses ++ the facility, other than as an argument passed when the facility ++ is invoked, then you must make a good faith effort to ensure that, ++ in the event an application does not supply such function or ++ table, the facility still operates, and performs whatever part of ++ its purpose remains meaningful. ++ ++ (For example, a function in a library to compute square roots has ++ a purpose that is entirely well-defined independent of the ++ application. Therefore, Subsection 2d requires that any ++ application-supplied function or table used by this function must ++ be optional: if the application does not supply it, the square ++ root function must still compute square roots.) ++ ++These requirements apply to the modified work as a whole. If ++identifiable sections of that work are not derived from the Library, ++and can be reasonably considered independent and separate works in ++themselves, then this License, and its terms, do not apply to those ++sections when you distribute them as separate works. But when you ++distribute the same sections as part of a whole which is a work based ++on the Library, the distribution of the whole must be on the terms of ++this License, whose permissions for other licensees extend to the ++entire whole, and thus to each and every part regardless of who wrote ++it. ++ ++Thus, it is not the intent of this section to claim rights or contest ++your rights to work written entirely by you; rather, the intent is to ++exercise the right to control the distribution of derivative or ++collective works based on the Library. ++ ++In addition, mere aggregation of another work not based on the Library ++with the Library (or with a work based on the Library) on a volume of ++a storage or distribution medium does not bring the other work under ++the scope of this License. ++ ++ 3. You may opt to apply the terms of the ordinary GNU General Public ++License instead of this License to a given copy of the Library. To do ++this, you must alter all the notices that refer to this License, so ++that they refer to the ordinary GNU General Public License, version 2, ++instead of to this License. (If a newer version than version 2 of the ++ordinary GNU General Public License has appeared, then you can specify ++that version instead if you wish.) Do not make any other change in ++these notices. ++ ++ Once this change is made in a given copy, it is irreversible for ++that copy, so the ordinary GNU General Public License applies to all ++subsequent copies and derivative works made from that copy. ++ ++ This option is useful when you wish to copy part of the code of ++the Library into a program that is not a library. ++ ++ 4. You may copy and distribute the Library (or a portion or ++derivative of it, under Section 2) in object code or executable form ++under the terms of Sections 1 and 2 above provided that you accompany ++it with the complete corresponding machine-readable source code, which ++must be distributed under the terms of Sections 1 and 2 above on a ++medium customarily used for software interchange. ++ ++ If distribution of object code is made by offering access to copy ++from a designated place, then offering equivalent access to copy the ++source code from the same place satisfies the requirement to ++distribute the source code, even though third parties are not ++compelled to copy the source along with the object code. ++ ++ 5. A program that contains no derivative of any portion of the ++Library, but is designed to work with the Library by being compiled or ++linked with it, is called a "work that uses the Library". Such a ++work, in isolation, is not a derivative work of the Library, and ++therefore falls outside the scope of this License. ++ ++ However, linking a "work that uses the Library" with the Library ++creates an executable that is a derivative of the Library (because it ++contains portions of the Library), rather than a "work that uses the ++library". The executable is therefore covered by this License. ++Section 6 states terms for distribution of such executables. ++ ++ When a "work that uses the Library" uses material from a header file ++that is part of the Library, the object code for the work may be a ++derivative work of the Library even though the source code is not. ++Whether this is true is especially significant if the work can be ++linked without the Library, or if the work is itself a library. The ++threshold for this to be true is not precisely defined by law. ++ ++ If such an object file uses only numerical parameters, data ++structure layouts and accessors, and small macros and small inline ++functions (ten lines or less in length), then the use of the object ++file is unrestricted, regardless of whether it is legally a derivative ++work. (Executables containing this object code plus portions of the ++Library will still fall under Section 6.) ++ ++ Otherwise, if the work is a derivative of the Library, you may ++distribute the object code for the work under the terms of Section 6. ++Any executables containing that work also fall under Section 6, ++whether or not they are linked directly with the Library itself. ++ ++ 6. As an exception to the Sections above, you may also combine or ++link a "work that uses the Library" with the Library to produce a ++work containing portions of the Library, and distribute that work ++under terms of your choice, provided that the terms permit ++modification of the work for the customer's own use and reverse ++engineering for debugging such modifications. ++ ++ You must give prominent notice with each copy of the work that the ++Library is used in it and that the Library and its use are covered by ++this License. You must supply a copy of this License. If the work ++during execution displays copyright notices, you must include the ++copyright notice for the Library among them, as well as a reference ++directing the user to the copy of this License. Also, you must do one ++of these things: ++ ++ a) Accompany the work with the complete corresponding ++ machine-readable source code for the Library including whatever ++ changes were used in the work (which must be distributed under ++ Sections 1 and 2 above); and, if the work is an executable linked ++ with the Library, with the complete machine-readable "work that ++ uses the Library", as object code and/or source code, so that the ++ user can modify the Library and then relink to produce a modified ++ executable containing the modified Library. (It is understood ++ that the user who changes the contents of definitions files in the ++ Library will not necessarily be able to recompile the application ++ to use the modified definitions.) ++ ++ b) Use a suitable shared library mechanism for linking with the ++ Library. A suitable mechanism is one that (1) uses at run time a ++ copy of the library already present on the user's computer system, ++ rather than copying library functions into the executable, and (2) ++ will operate properly with a modified version of the library, if ++ the user installs one, as long as the modified version is ++ interface-compatible with the version that the work was made with. ++ ++ c) Accompany the work with a written offer, valid for at ++ least three years, to give the same user the materials ++ specified in Subsection 6a, above, for a charge no more ++ than the cost of performing this distribution. ++ ++ d) If distribution of the work is made by offering access to copy ++ from a designated place, offer equivalent access to copy the above ++ specified materials from the same place. ++ ++ e) Verify that the user has already received a copy of these ++ materials or that you have already sent this user a copy. ++ ++ For an executable, the required form of the "work that uses the ++Library" must include any data and utility programs needed for ++reproducing the executable from it. However, as a special exception, ++the materials to be distributed need not include anything that is ++normally distributed (in either source or binary form) with the major ++components (compiler, kernel, and so on) of the operating system on ++which the executable runs, unless that component itself accompanies ++the executable. ++ ++ It may happen that this requirement contradicts the license ++restrictions of other proprietary libraries that do not normally ++accompany the operating system. Such a contradiction means you cannot ++use both them and the Library together in an executable that you ++distribute. ++ ++ 7. You may place library facilities that are a work based on the ++Library side-by-side in a single library together with other library ++facilities not covered by this License, and distribute such a combined ++library, provided that the separate distribution of the work based on ++the Library and of the other library facilities is otherwise ++permitted, and provided that you do these two things: ++ ++ a) Accompany the combined library with a copy of the same work ++ based on the Library, uncombined with any other library ++ facilities. This must be distributed under the terms of the ++ Sections above. ++ ++ b) Give prominent notice with the combined library of the fact ++ that part of it is a work based on the Library, and explaining ++ where to find the accompanying uncombined form of the same work. ++ ++ 8. You may not copy, modify, sublicense, link with, or distribute ++the Library except as expressly provided under this License. Any ++attempt otherwise to copy, modify, sublicense, link with, or ++distribute the Library is void, and will automatically terminate your ++rights under this License. However, parties who have received copies, ++or rights, from you under this License will not have their licenses ++terminated so long as such parties remain in full compliance. ++ ++ 9. You are not required to accept this License, since you have not ++signed it. However, nothing else grants you permission to modify or ++distribute the Library or its derivative works. These actions are ++prohibited by law if you do not accept this License. Therefore, by ++modifying or distributing the Library (or any work based on the ++Library), you indicate your acceptance of this License to do so, and ++all its terms and conditions for copying, distributing or modifying ++the Library or works based on it. ++ ++ 10. Each time you redistribute the Library (or any work based on the ++Library), the recipient automatically receives a license from the ++original licensor to copy, distribute, link with or modify the Library ++subject to these terms and conditions. You may not impose any further ++restrictions on the recipients' exercise of the rights granted herein. ++You are not responsible for enforcing compliance by third parties with ++this License. ++ ++ 11. If, as a consequence of a court judgment or allegation of patent ++infringement or for any other reason (not limited to patent issues), ++conditions are imposed on you (whether by court order, agreement or ++otherwise) that contradict the conditions of this License, they do not ++excuse you from the conditions of this License. If you cannot ++distribute so as to satisfy simultaneously your obligations under this ++License and any other pertinent obligations, then as a consequence you ++may not distribute the Library at all. For example, if a patent ++license would not permit royalty-free redistribution of the Library by ++all those who receive copies directly or indirectly through you, then ++the only way you could satisfy both it and this License would be to ++refrain entirely from distribution of the Library. ++ ++If any portion of this section is held invalid or unenforceable under any ++particular circumstance, the balance of the section is intended to apply, ++and the section as a whole is intended to apply in other circumstances. ++ ++It is not the purpose of this section to induce you to infringe any ++patents or other property right claims or to contest validity of any ++such claims; this section has the sole purpose of protecting the ++integrity of the free software distribution system which is ++implemented by public license practices. Many people have made ++generous contributions to the wide range of software distributed ++through that system in reliance on consistent application of that ++system; it is up to the author/donor to decide if he or she is willing ++to distribute software through any other system and a licensee cannot ++impose that choice. ++ ++This section is intended to make thoroughly clear what is believed to ++be a consequence of the rest of this License. ++ ++ 12. If the distribution and/or use of the Library is restricted in ++certain countries either by patents or by copyrighted interfaces, the ++original copyright holder who places the Library under this License may add ++an explicit geographical distribution limitation excluding those countries, ++so that distribution is permitted only in or among countries not thus ++excluded. In such case, this License incorporates the limitation as if ++written in the body of this License. ++ ++ 13. The Free Software Foundation may publish revised and/or new ++versions of the Lesser General Public License from time to time. ++Such new versions will be similar in spirit to the present version, ++but may differ in detail to address new problems or concerns. ++ ++Each version is given a distinguishing version number. If the Library ++specifies a version number of this License which applies to it and ++"any later version", you have the option of following the terms and ++conditions either of that version or of any later version published by ++the Free Software Foundation. If the Library does not specify a ++license version number, you may choose any version ever published by ++the Free Software Foundation. ++ ++ 14. If you wish to incorporate parts of the Library into other free ++programs whose distribution conditions are incompatible with these, ++write to the author to ask for permission. For software which is ++copyrighted by the Free Software Foundation, write to the Free ++Software Foundation; we sometimes make exceptions for this. Our ++decision will be guided by the two goals of preserving the free status ++of all derivatives of our free software and of promoting the sharing ++and reuse of software generally. ++ ++ NO WARRANTY ++ ++ 15. BECAUSE THE LIBRARY IS LICENSED FREE OF CHARGE, THERE IS NO ++WARRANTY FOR THE LIBRARY, TO THE EXTENT PERMITTED BY APPLICABLE LAW. ++EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR ++OTHER PARTIES PROVIDE THE LIBRARY "AS IS" WITHOUT WARRANTY OF ANY ++KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE ++IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR ++PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE ++LIBRARY IS WITH YOU. SHOULD THE LIBRARY PROVE DEFECTIVE, YOU ASSUME ++THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. ++ ++ 16. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN ++WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY ++AND/OR REDISTRIBUTE THE LIBRARY AS PERMITTED ABOVE, BE LIABLE TO YOU ++FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR ++CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE ++LIBRARY (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING ++RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A ++FAILURE OF THE LIBRARY TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF ++SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH ++DAMAGES. ++ ++ END OF TERMS AND CONDITIONS ++ ++ How to Apply These Terms to Your New Libraries ++ ++ If you develop a new library, and you want it to be of the greatest ++possible use to the public, we recommend making it free software that ++everyone can redistribute and change. You can do so by permitting ++redistribution under these terms (or, alternatively, under the terms of the ++ordinary General Public License). ++ ++ To apply these terms, attach the following notices to the library. It is ++safest to attach them to the start of each source file to most effectively ++convey the exclusion of warranty; and each file should have at least the ++"copyright" line and a pointer to where the full notice is found. ++ ++ ++ Copyright (C) ++ ++ This library is free software; you can redistribute it and/or ++ modify it under the terms of the GNU Lesser General Public ++ License as published by the Free Software Foundation; either ++ version 2.1 of the License, or (at your option) any later version. ++ ++ This library is distributed in the hope that it will be useful, ++ but WITHOUT ANY WARRANTY; without even the implied warranty of ++ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ++ Lesser General Public License for more details. ++ ++ You should have received a copy of the GNU Lesser General Public ++ License along with this library; if not, write to the Free Software ++ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA ++ ++Also add information on how to contact you by electronic and paper mail. ++ ++You should also get your employer (if you work as a programmer) or your ++school, if any, to sign a "copyright disclaimer" for the library, if ++necessary. Here is a sample; alter the names: ++ ++ Yoyodyne, Inc., hereby disclaims all copyright interest in the ++ library `Frob' (a library for tweaking knobs) written by James Random Hacker. ++ ++ , 1 April 1990 ++ Ty Coon, President of Vice ++ ++That's all there is to it! ++ ++ diff --cc src/dmclock/sim/src/config.cc index a55ba9a47bcc,000000000000..79a7b284651e mode 100644,000000..100644 --- a/src/dmclock/sim/src/config.cc +++ b/src/dmclock/sim/src/config.cc @@@ -1,173 -1,0 +1,182 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + ++/* ++ * Copyright (C) 2016 Red Hat Inc. ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. ++ */ ++ + +#include +#include +#include + +#include +#include +#include + +#include "config.h" +#include "str_list.h" + + +static void dashes_to_underscores(const char *input, char *output) { + char c = 0; + char *o = output; + const char *i = input; + // first two characters are copied as-is + *o = *i++; + if (*o++ == '\0') + return; + *o = *i++; + if (*o++ == '\0') + return; + for (; ((c = *i)); ++i) { + if (c == '=') { + strcpy(o, i); + return; + } + if (c == '-') + *o++ = '_'; + else + *o++ = c; + } + *o++ = '\0'; +} + +static int va_ceph_argparse_witharg(std::vector &args, + std::vector::iterator &i, std::string *ret, + std::ostream &oss, va_list ap) { + const char *first = *i; + char tmp[strlen(first)+1]; + dashes_to_underscores(first, tmp); + first = tmp; + + // does this argument match any of the possibilities? + while (1) { + const char *a = va_arg(ap, char*); + if (a == NULL) + return 0; + int strlen_a = strlen(a); + char a2[strlen_a+1]; + dashes_to_underscores(a, a2); + if (strncmp(a2, first, strlen(a2)) == 0) { + if (first[strlen_a] == '=') { + *ret = first + strlen_a + 1; + i = args.erase(i); + return 1; + } + else if (first[strlen_a] == '\0') { + // find second part (or not) + if (i+1 == args.end()) { + oss << "Option " << *i << " requires an argument." << std::endl; + i = args.erase(i); + return -EINVAL; + } + i = args.erase(i); + *ret = *i; + i = args.erase(i); + return 1; + } + } + } +} + +bool crimson::qos_simulation::ceph_argparse_witharg(std::vector &args, + std::vector::iterator &i, std::string *ret, ...) { + int r; + va_list ap; + va_start(ap, ret); + r = va_ceph_argparse_witharg(args, i, ret, std::cerr, ap); + va_end(ap); + if (r < 0) + _exit(1); + return r != 0; +} + +void crimson::qos_simulation::ceph_argparse_early_args(std::vector& args, std::string *conf_file_list) { + std::string val; + + std::vector orig_args = args; + + for (std::vector::iterator i = args.begin(); i != args.end(); ) { + if (ceph_argparse_witharg(args, i, &val, "--conf", "-c", (char*)NULL)) { + *conf_file_list = val; + } + else { + // ignore + ++i; + } + } + return; +} + +static bool stobool(const std::string & v) { + return !v.empty () && + (strcasecmp (v.c_str (), "true") == 0 || + atoi (v.c_str ()) != 0); +} + +int crimson::qos_simulation::parse_config_file(const std::string &fname, sim_config_t &g_conf) { + ConfFile cf; + std::deque err; + std::ostringstream warn; + int ret = cf.parse_file(fname.c_str(), &err, &warn); + if (ret) { + // error + return ret; + } + + std::string val; + if (!cf.read("global", "server_groups", val)) + g_conf.server_groups = std::stoul(val); + if (!cf.read("global", "client_groups", val)) + g_conf.client_groups = std::stoul(val); + if (!cf.read("global", "server_random_selection", val)) + g_conf.server_random_selection = stobool(val); + if (!cf.read("global", "server_soft_limit", val)) + g_conf.server_soft_limit = stobool(val); + if (!cf.read("global", "anticipation_timeout", val)) + g_conf.anticipation_timeout = stod(val); + + for (uint i = 0; i < g_conf.server_groups; i++) { + srv_group_t st; + std::string section = "server." + std::to_string(i); + if (!cf.read(section, "server_count", val)) + st.server_count = std::stoul(val); + if (!cf.read(section, "server_iops", val)) + st.server_iops = std::stoul(val); + if (!cf.read(section, "server_threads", val)) + st.server_threads = std::stoul(val); + g_conf.srv_group.push_back(st); + } + + for (uint i = 0; i < g_conf.client_groups; i++) { + cli_group_t ct; + std::string section = "client." + std::to_string(i); + if (!cf.read(section, "client_count", val)) + ct.client_count = std::stoul(val); + if (!cf.read(section, "client_wait", val)) + ct.client_wait = std::chrono::seconds(std::stoul(val)); + if (!cf.read(section, "client_total_ops", val)) + ct.client_total_ops = std::stoul(val); + if (!cf.read(section, "client_server_select_range", val)) + ct.client_server_select_range = std::stoul(val); + if (!cf.read(section, "client_iops_goal", val)) + ct.client_iops_goal = std::stoul(val); + if (!cf.read(section, "client_outstanding_ops", val)) + ct.client_outstanding_ops = std::stoul(val); + if (!cf.read(section, "client_reservation", val)) + ct.client_reservation = std::stod(val); + if (!cf.read(section, "client_limit", val)) + ct.client_limit = std::stod(val); + if (!cf.read(section, "client_weight", val)) + ct.client_weight = std::stod(val); + g_conf.cli_group.push_back(ct); + } + + return 0; +} diff --cc src/dmclock/sim/src/config.h index e85c69d07451,000000000000..41a675e714c3 mode 100644,000000..100644 --- a/src/dmclock/sim/src/config.h +++ b/src/dmclock/sim/src/config.h @@@ -1,143 -1,0 +1,152 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + ++/* ++ * Copyright (C) 2016 Red Hat Inc. ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. ++ */ ++ + +#pragma once + + +#include + +#include +#include +#include +#include + +#include "ConfUtils.h" + + +namespace crimson { + namespace qos_simulation { + + struct cli_group_t { + uint client_count; + std::chrono::seconds client_wait; + uint client_total_ops; + uint client_server_select_range; + uint client_iops_goal; + uint client_outstanding_ops; + double client_reservation; + double client_limit; + double client_weight; + + cli_group_t(uint _client_count = 100, + uint _client_wait = 0, + uint _client_total_ops = 1000, + uint _client_server_select_range = 10, + uint _client_iops_goal = 50, + uint _client_outstanding_ops = 100, + double _client_reservation = 20.0, + double _client_limit = 60.0, + double _client_weight = 1.0) : + client_count(_client_count), + client_wait(std::chrono::seconds(_client_wait)), + client_total_ops(_client_total_ops), + client_server_select_range(_client_server_select_range), + client_iops_goal(_client_iops_goal), + client_outstanding_ops(_client_outstanding_ops), + client_reservation(_client_reservation), + client_limit(_client_limit), + client_weight(_client_weight) + { + // empty + } + + friend std::ostream& operator<<(std::ostream& out, + const cli_group_t& cli_group) { + out << + "client_count = " << cli_group.client_count << "\n" << + "client_wait = " << cli_group.client_wait.count() << "\n" << + "client_total_ops = " << cli_group.client_total_ops << "\n" << + "client_server_select_range = " << cli_group.client_server_select_range << "\n" << + "client_iops_goal = " << cli_group.client_iops_goal << "\n" << + "client_outstanding_ops = " << cli_group.client_outstanding_ops << "\n" << + std::fixed << std::setprecision(1) << + "client_reservation = " << cli_group.client_reservation << "\n" << + "client_limit = " << cli_group.client_limit << "\n" << + "client_weight = " << cli_group.client_weight; + return out; + } + }; // class cli_group_t + + + struct srv_group_t { + uint server_count; + uint server_iops; + uint server_threads; + + srv_group_t(uint _server_count = 100, + uint _server_iops = 40, + uint _server_threads = 1) : + server_count(_server_count), + server_iops(_server_iops), + server_threads(_server_threads) + { + // empty + } + + friend std::ostream& operator<<(std::ostream& out, + const srv_group_t& srv_group) { + out << + "server_count = " << srv_group.server_count << "\n" << + "server_iops = " << srv_group.server_iops << "\n" << + "server_threads = " << srv_group.server_threads; + return out; + } + }; // class srv_group_t + + + struct sim_config_t { + uint server_groups; + uint client_groups; + bool server_random_selection; + bool server_soft_limit; + double anticipation_timeout; + + std::vector cli_group; + std::vector srv_group; + + sim_config_t(uint _server_groups = 1, + uint _client_groups = 1, + bool _server_random_selection = false, + bool _server_soft_limit = true, + double _anticipation_timeout = 0.0) : + server_groups(_server_groups), + client_groups(_client_groups), + server_random_selection(_server_random_selection), + server_soft_limit(_server_soft_limit), + anticipation_timeout(_anticipation_timeout) + { + srv_group.reserve(server_groups); + cli_group.reserve(client_groups); + } + + friend std::ostream& operator<<(std::ostream& out, + const sim_config_t& sim_config) { + out << + "server_groups = " << sim_config.server_groups << "\n" << + "client_groups = " << sim_config.client_groups << "\n" << + "server_random_selection = " << sim_config.server_random_selection << "\n" << + "server_soft_limit = " << sim_config.server_soft_limit << "\n" << + std::fixed << std::setprecision(3) << + "anticipation_timeout = " << sim_config.anticipation_timeout; + return out; + } + }; // class sim_config_t + + + bool ceph_argparse_witharg(std::vector &args, + std::vector::iterator &i, std::string *ret, ...); + void ceph_argparse_early_args(std::vector& args, std::string *conf_file_list); + int parse_config_file(const std::string &fname, sim_config_t &g_conf); + + }; // namespace qos_simulation +}; // namespace crimson diff --cc src/dmclock/sim/src/sim_client.h index fd4a81c76dac,000000000000..328fe184745b mode 100644,000000..100644 --- a/src/dmclock/sim/src/sim_client.h +++ b/src/dmclock/sim/src/sim_client.h @@@ -1,330 -1,0 +1,337 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#pragma once + + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "sim_recs.h" + + +namespace crimson { + namespace qos_simulation { + + struct req_op_t {}; + struct wait_op_t {}; + constexpr struct req_op_t req_op {}; + constexpr struct wait_op_t wait_op {}; + + + enum class CliOp { req, wait }; + struct CliInst { + CliOp op; + union { + std::chrono::milliseconds wait_time; + struct { + uint32_t count; + std::chrono::microseconds time_bw_reqs; + uint16_t max_outstanding; + } req_params; + } args; + + // D is a duration type + template + CliInst(wait_op_t, D duration) : + op(CliOp::wait) + { + args.wait_time = + std::chrono::duration_cast(duration); + } + + CliInst(req_op_t, + uint32_t count, double ops_per_sec, uint16_t max_outstanding) : + op(CliOp::req) + { + args.req_params.count = count; + args.req_params.max_outstanding = max_outstanding; + uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000); + args.req_params.time_bw_reqs = std::chrono::microseconds(us); + } + }; + + + using ServerSelectFunc = std::function; + + + template + class SimulatedClient { + public: + + struct InternalStats { + std::mutex mtx; + std::chrono::nanoseconds track_resp_time; + std::chrono::nanoseconds get_req_params_time; + uint32_t track_resp_count; + uint32_t get_req_params_count; + + InternalStats() : + track_resp_time(0), + get_req_params_time(0), + track_resp_count(0), + get_req_params_count(0) + { + // empty + } + }; + + using SubmitFunc = + std::function; + + using ClientAccumFunc = std::function; + + typedef std::chrono::time_point TimePoint; + + static TimePoint now() { return std::chrono::steady_clock::now(); } + + protected: + + struct RespQueueItem { + TestResponse response; + ServerId server_id; + RespPm resp_params; + }; + + const ClientId id; + const SubmitFunc submit_f; + const ServerSelectFunc server_select_f; + const ClientAccumFunc accum_f; + + std::vector instructions; + + SvcTrk service_tracker; + + // TODO: use lock rather than atomic??? + std::atomic_ulong outstanding_ops; + std::atomic_bool requests_complete; + + std::deque resp_queue; + + std::mutex mtx_req; + std::condition_variable cv_req; + + std::mutex mtx_resp; + std::condition_variable cv_resp; + + using RespGuard = std::lock_guard; + using Lock = std::unique_lock; + + // data collection + + std::vector op_times; + Accum accumulator; + InternalStats internal_stats; + + std::thread thd_req; + std::thread thd_resp; + + public: + + SimulatedClient(ClientId _id, + const SubmitFunc& _submit_f, + const ServerSelectFunc& _server_select_f, + const ClientAccumFunc& _accum_f, + const std::vector& _instrs) : + id(_id), + submit_f(_submit_f), + server_select_f(_server_select_f), + accum_f(_accum_f), + instructions(_instrs), + service_tracker(), + outstanding_ops(0), + requests_complete(false) + { + size_t op_count = 0; + for (auto i : instructions) { + if (CliOp::req == i.op) { + op_count += i.args.req_params.count; + } + } + op_times.reserve(op_count); + + thd_resp = std::thread(&SimulatedClient::run_resp, this); + thd_req = std::thread(&SimulatedClient::run_req, this); + } + + + SimulatedClient(ClientId _id, + const SubmitFunc& _submit_f, + const ServerSelectFunc& _server_select_f, + const ClientAccumFunc& _accum_f, + uint16_t _ops_to_run, + double _iops_goal, + uint16_t _outstanding_ops_allowed) : + SimulatedClient(_id, + _submit_f, _server_select_f, _accum_f, + {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}}) + { + // empty + } + + + SimulatedClient(const SimulatedClient&) = delete; + SimulatedClient(SimulatedClient&&) = delete; + SimulatedClient& operator=(const SimulatedClient&) = delete; + SimulatedClient& operator=(SimulatedClient&&) = delete; + + virtual ~SimulatedClient() { + wait_until_done(); + } + + void receive_response(const TestResponse& resp, + const ServerId& server_id, + const RespPm& resp_params) { + RespGuard g(mtx_resp); + resp_queue.push_back(RespQueueItem{resp, server_id, resp_params}); + cv_resp.notify_one(); + } + + const std::vector& get_op_times() const { return op_times; } + + void wait_until_done() { + if (thd_req.joinable()) thd_req.join(); + if (thd_resp.joinable()) thd_resp.join(); + } + + const Accum& get_accumulator() const { return accumulator; } + + const InternalStats& get_internal_stats() const { return internal_stats; } + + protected: + + void run_req() { + size_t ops_count = 0; + for (auto i : instructions) { + if (CliOp::wait == i.op) { + std::this_thread::sleep_for(i.args.wait_time); + } else if (CliOp::req == i.op) { + Lock l(mtx_req); + for (uint64_t o = 0; o < i.args.req_params.count; ++o) { + while (outstanding_ops >= i.args.req_params.max_outstanding) { + cv_req.wait(l); + } + + l.unlock(); + auto now = std::chrono::steady_clock::now(); + const ServerId& server = server_select_f(o); + + ReqPm rp = + time_stats_w_return(internal_stats.mtx, + internal_stats.get_req_params_time, + [&]() -> ReqPm { + return service_tracker.get_req_params(server); + }); + count_stats(internal_stats.mtx, + internal_stats.get_req_params_count); + + submit_f(server, + TestRequest{server, static_cast(o), 12}, + id, rp); + ++outstanding_ops; + l.lock(); // lock for return to top of loop + + auto delay_time = now + i.args.req_params.time_bw_reqs; + while (std::chrono::steady_clock::now() < delay_time) { + cv_req.wait_until(l, delay_time); + } // while + } // for + ops_count += i.args.req_params.count; + } else { + assert(false); + } + } // for loop + + requests_complete = true; + + // all requests made, thread ends + } + + + void run_resp() { + std::chrono::milliseconds delay(1000); + int op = 0; + + Lock l(mtx_resp); + + // since the following code would otherwise be repeated (except for + // the call to notify_one) in the two loops below; let's avoid + // repetition and define it once. + const auto proc_resp = [this, &op, &l](const bool notify_req_cv) { + if (!resp_queue.empty()) { + RespQueueItem item = resp_queue.front(); + resp_queue.pop_front(); + + l.unlock(); + + // data collection + + op_times.push_back(now()); + accum_f(accumulator, item.resp_params); + + // processing + +#if 0 // not needed + TestResponse& resp = item.response; +#endif + + time_stats(internal_stats.mtx, + internal_stats.track_resp_time, + [&](){ + service_tracker.track_resp(item.server_id, item.resp_params); + }); + count_stats(internal_stats.mtx, + internal_stats.track_resp_count); + + --outstanding_ops; + if (notify_req_cv) { + cv_req.notify_one(); + } + + l.lock(); + } + }; + + while(!requests_complete.load()) { + while(resp_queue.empty() && !requests_complete.load()) { + cv_resp.wait_for(l, delay); + } + proc_resp(true); + } + + while(outstanding_ops.load() > 0) { + while(resp_queue.empty() && outstanding_ops.load() > 0) { + cv_resp.wait_for(l, delay); + } + proc_resp(false); // don't call notify_one as all requests are complete + } + + // all responses received, thread ends + } + }; // class SimulatedClient + + + }; // namespace qos_simulation +}; // namespace crimson diff --cc src/dmclock/sim/src/sim_recs.h index 759ab4e14134,000000000000..29369a7226e6 mode 100644,000000..100644 --- a/src/dmclock/sim/src/sim_recs.h +++ b/src/dmclock/sim/src/sim_recs.h @@@ -1,122 -1,0 +1,129 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#pragma once + + +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + + +using ClientId = uint; +using ServerId = uint; + + +namespace crimson { + namespace qos_simulation { + + inline void debugger() { + raise(SIGCONT); + } + + template + void time_stats(std::mutex& mtx, + T& time_accumulate, + std::function code) { + auto t1 = std::chrono::steady_clock::now(); + code(); + auto t2 = std::chrono::steady_clock::now(); + auto duration = t2 - t1; + auto cast_duration = std::chrono::duration_cast(duration); + std::lock_guard lock(mtx); + time_accumulate += cast_duration; + } + + // unfortunately it's hard for the compiler to infer the types, + // and therefore when called the template params might have to be + // explicit + template + R time_stats_w_return(std::mutex& mtx, + T& time_accumulate, + std::function code) { + auto t1 = std::chrono::steady_clock::now(); + R result = code(); + auto t2 = std::chrono::steady_clock::now(); + auto duration = t2 - t1; + auto cast_duration = std::chrono::duration_cast(duration); + std::lock_guard lock(mtx); + time_accumulate += cast_duration; + return result; + } + + template + void count_stats(std::mutex& mtx, + T& counter) { + std::lock_guard lock(mtx); + ++counter; + } + + struct TestRequest { + ServerId server; // allows debugging + uint32_t epoch; + uint32_t op; + + TestRequest(ServerId _server, + uint32_t _epoch, + uint32_t _op) : + server(_server), + epoch(_epoch), + op(_op) + { + // empty + } + + TestRequest(const TestRequest& r) : + TestRequest(r.server, r.epoch, r.op) + { + // empty + } + }; // struct TestRequest + + + struct TestResponse { + uint32_t epoch; + + TestResponse(uint32_t _epoch) : + epoch(_epoch) + { + // empty + } + + TestResponse(const TestResponse& r) : + epoch(r.epoch) + { + // empty + } + + friend std::ostream& operator<<(std::ostream& out, const TestResponse& resp) { + out << "{ "; + out << "epoch:" << resp.epoch; + out << " }"; + return out; + } + }; // class TestResponse + + }; // namespace qos_simulation +}; // namespace crimson diff --cc src/dmclock/sim/src/sim_server.h index 42b5269d7803,000000000000..aecaa8ed3364 mode 100644,000000..100644 --- a/src/dmclock/sim/src/sim_server.h +++ b/src/dmclock/sim/src/sim_server.h @@@ -1,227 -1,0 +1,234 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#pragma once + + +#include +#include +#include +#include +#include + +#include "sim_recs.h" + + +namespace crimson { + namespace qos_simulation { + + template + class SimulatedServer { + + struct QueueItem { + ClientId client; + std::unique_ptr request; + RespPm additional; + + QueueItem(const ClientId& _client, + std::unique_ptr&& _request, + const RespPm& _additional) : + client(_client), + request(std::move(_request)), + additional(_additional) + { + // empty + } + }; // QueueItem + + public: + + struct InternalStats { + std::mutex mtx; + std::chrono::nanoseconds add_request_time; + std::chrono::nanoseconds request_complete_time; + uint32_t add_request_count; + uint32_t request_complete_count; + + InternalStats() : + add_request_time(0), + request_complete_time(0), + add_request_count(0), + request_complete_count(0) + { + // empty + } + }; + + using ClientRespFunc = std::function; + + using ServerAccumFunc = std::function; + + protected: + + const ServerId id; + Q* priority_queue; + ClientRespFunc client_resp_f; + int iops; + size_t thread_pool_size; + + bool finishing; + std::chrono::microseconds op_time; + + std::mutex inner_queue_mtx; + std::condition_variable inner_queue_cv; + std::deque inner_queue; + + std::thread* threads; + + using InnerQGuard = std::lock_guard; + using Lock = std::unique_lock; + + // data collection + + ServerAccumFunc accum_f; + Accum accumulator; + + InternalStats internal_stats; + + public: + + using CanHandleRequestFunc = std::function; + using HandleRequestFunc = + std::function,const RespPm&)>; + using CreateQueueF = std::function; + + + SimulatedServer(ServerId _id, + int _iops, + size_t _thread_pool_size, + const ClientRespFunc& _client_resp_f, + const ServerAccumFunc& _accum_f, + CreateQueueF _create_queue_f) : + id(_id), + priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread, + this), + std::bind(&SimulatedServer::inner_post, + this, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3))), + client_resp_f(_client_resp_f), + iops(_iops), + thread_pool_size(_thread_pool_size), + finishing(false), + accum_f(_accum_f) + { + op_time = + std::chrono::microseconds((int) (0.5 + + thread_pool_size * 1000000.0 / iops)); + std::chrono::milliseconds delay(1000); + threads = new std::thread[thread_pool_size]; + for (size_t i = 0; i < thread_pool_size; ++i) { + threads[i] = std::thread(&SimulatedServer::run, this, delay); + } + } + + virtual ~SimulatedServer() { + Lock l(inner_queue_mtx); + finishing = true; + inner_queue_cv.notify_all(); + l.unlock(); + + for (size_t i = 0; i < thread_pool_size; ++i) { + threads[i].join(); + } + + delete[] threads; + + delete priority_queue; + } + + void post(TestRequest&& request, + const ClientId& client_id, + const ReqPm& req_params) + { + time_stats(internal_stats.mtx, + internal_stats.add_request_time, + [&](){ + priority_queue->add_request(std::move(request), + client_id, req_params); + }); + count_stats(internal_stats.mtx, + internal_stats.add_request_count); + } + + bool has_avail_thread() { + InnerQGuard g(inner_queue_mtx); + return inner_queue.size() <= thread_pool_size; + } + + const Accum& get_accumulator() const { return accumulator; } + const Q& get_priority_queue() const { return *priority_queue; } + const InternalStats& get_internal_stats() const { return internal_stats; } + + protected: + + void inner_post(const ClientId& client, + std::unique_ptr request, + const RespPm& additional) { + Lock l(inner_queue_mtx); + assert(!finishing); + accum_f(accumulator, additional); + inner_queue.emplace_back(QueueItem(client, + std::move(request), + additional)); + inner_queue_cv.notify_one(); + } + + void run(std::chrono::milliseconds check_period) { + Lock l(inner_queue_mtx); + while(true) { + while(inner_queue.empty() && !finishing) { + inner_queue_cv.wait_for(l, check_period); + } + if (!inner_queue.empty()) { + auto& front = inner_queue.front(); + auto client = front.client; + auto req = std::move(front.request); + auto additional = front.additional; + inner_queue.pop_front(); + + l.unlock(); + + // simulation operation by sleeping; then call function to + // notify server of completion + std::this_thread::sleep_for(op_time); + + // TODO: rather than assuming this constructor exists, perhaps + // pass in a function that does this mapping? + client_resp_f(client, TestResponse{req->epoch}, id, additional); + + time_stats(internal_stats.mtx, + internal_stats.request_complete_time, + [&](){ + priority_queue->request_completed(); + }); + count_stats(internal_stats.mtx, + internal_stats.request_complete_count); + + l.lock(); // in prep for next iteration of loop + } else { + break; + } + } + } + }; // class SimulatedServer + + }; // namespace qos_simulation +}; // namespace crimson diff --cc src/dmclock/sim/src/simulate.h index a967d004b6dc,000000000000..4d61ddf92312 mode 100644,000000..100644 --- a/src/dmclock/sim/src/simulate.h +++ b/src/dmclock/sim/src/simulate.h @@@ -1,441 -1,0 +1,448 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#pragma once + + +#include + +#include +#include +#include +#include +#include +#include +#include + + +namespace crimson { + namespace qos_simulation { + + template + class Simulation { + + public: + + using TimePoint = std::chrono::time_point; + + protected: + + using ClientMap = std::map; + using ServerMap = std::map; + + uint server_count = 0; + uint client_count = 0; + + ServerMap servers; + ClientMap clients; + std::vector server_ids; + + TimePoint early_time; + TimePoint servers_created_time; + TimePoint clients_created_time; + TimePoint clients_finished_time; + TimePoint late_time; + + std::default_random_engine prng; + + bool has_run = false; + + + public: + + double fmt_tp(const TimePoint& t) { + auto c = t.time_since_epoch().count(); + return uint64_t(c / 1000000.0 + 0.5) % 100000 / 1000.0; + } + + TimePoint now() { + return std::chrono::steady_clock::now(); + } + + using ClientBasedServerSelectFunc = + std::function; + + using ClientFilter = std::function; + + using ServerFilter = std::function; + + using ServerDataOutF = + std::function; + + using ClientDataOutF = + std::function; + + Simulation() : + early_time(now()), + prng(std::chrono::system_clock::now().time_since_epoch().count()) + { + // empty + } + + ~Simulation() { + for (auto c : clients) { + TC* cp = c.second; + delete cp; + } + + for (auto s : servers) { + delete s.second; + } + } + + uint get_client_count() const { return client_count; } + uint get_server_count() const { return server_count; } + TC& get_client(ClientId id) { return *clients[id]; } + TS& get_server(ServerId id) { return *servers[id]; } + const ServerId& get_server_id(uint index) const { + return server_ids[index]; + } + + + void add_servers(uint count, + std::function create_server_f) { + uint i = server_count; + + // increment server_count before creating servers since they + // will start running immediately and may use the server_count + // value; NB: this could still be an issue if servers are + // added with multiple add_servers calls; consider using a + // separate start function after all servers (and clients?) + // have been added + server_count += count; + + for (; i < server_count; ++i) { + server_ids.push_back(i); + servers[i] = create_server_f(i); + } + + servers_created_time = now(); + } + + + void add_clients(uint count, + std::function create_client_f) { + uint i = client_count; + + // increment client_count before creating clients since they + // will start running immediately and may use the client_count + // value (e.g., in the server selection function); NB: this could + // still be an issue if clients are added with multiple + // add_clients calls; consider using a separate start function + // after all clients have been added + client_count += count; + + for (; i < client_count; ++i) { + clients[i] = create_client_f(i); + } + + clients_created_time = now(); + } + + + void run() { + assert(server_count > 0); + assert(client_count > 0); + + std::cout << "simulation started" << std::endl; + + // clients are now running; wait for all to finish + + for (auto const &i : clients) { + i.second->wait_until_done(); + } + + late_time = clients_finished_time = now(); + + std::cout << "simulation completed in " << + std::chrono::duration_cast(clients_finished_time - servers_created_time).count() << + " millisecs" << std::endl; + + has_run = true; + } // run + + + void display_stats(std::ostream& out, + ServerDataOutF server_out_f, ClientDataOutF client_out_f, + ServerFilter server_filter = + [] (const ServerId&) { return true; }, + ClientFilter client_filter = + [] (const ClientId&) { return true; }, + int head_w = 12, int data_w = 7, int data_prec = 2) { + assert(has_run); + + // skip first 2 secondsd of data + const std::chrono::seconds skip_amount(0); + // calculate in groups of 5 seconds + const std::chrono::seconds measure_unit(2); + // unit to output reports in + const std::chrono::seconds report_unit(1); + + // compute and display stats + + TimePoint earliest_start = late_time; + TimePoint latest_start = early_time; + TimePoint earliest_finish = late_time; + TimePoint latest_finish = early_time; + + for (auto const &c : clients) { + auto start = c.second->get_op_times().front(); + auto end = c.second->get_op_times().back(); + + if (start < earliest_start) { earliest_start = start; } + if (start > latest_start) { latest_start = start; } + if (end < earliest_finish) { earliest_finish = end; } + if (end > latest_finish) { latest_finish = end; } + } + + double ops_factor = + std::chrono::duration_cast>(measure_unit) / + std::chrono::duration_cast>(report_unit); + + const auto start_edge = clients_created_time + skip_amount; + + std::map> ops_data; + + for (auto const &c : clients) { + auto it = c.second->get_op_times().begin(); + const auto end = c.second->get_op_times().end(); + while (it != end && *it < start_edge) { ++it; } + + for (auto time_edge = start_edge + measure_unit; + time_edge <= latest_finish + measure_unit; + time_edge += measure_unit) { + int count = 0; + for (; it != end && *it < time_edge; ++count, ++it) { /* empty */ } + double ops_per_second = double(count) / ops_factor; + ops_data[c.first].push_back(ops_per_second); + } + } + + out << "==== Client Data ====" << std::endl; + + out << std::setw(head_w) << "client:"; + for (auto const &c : clients) { + if (!client_filter(c.first)) continue; + out << " " << std::setw(data_w) << c.first; + } + out << std::setw(data_w) << "total" << std::endl; + + { + bool has_data; + size_t i = 0; + do { + std::string line_header = "t_" + std::to_string(i) + ":"; + out << std::setw(head_w) << line_header; + has_data = false; + double total = 0.0; + for (auto const &c : clients) { + double data = 0.0; + if (i < ops_data[c.first].size()) { + data = ops_data[c.first][i]; + has_data = true; + } + total += data; + + if (!client_filter(c.first)) continue; + + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << data; + } + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total << std::endl; + ++i; + } while(has_data); + } + + client_out_f(out, this, client_filter, head_w, data_w, data_prec); + + display_client_internal_stats(out, + "nanoseconds"); + + out << std::endl << "==== Server Data ====" << std::endl; + + out << std::setw(head_w) << "server:"; + for (auto const &s : servers) { + if (!server_filter(s.first)) continue; + out << " " << std::setw(data_w) << s.first; + } + out << " " << std::setw(data_w) << "total" << std::endl; + + server_out_f(out, this, server_filter, head_w, data_w, data_prec); + + display_server_internal_stats(out, + "nanoseconds"); + + // clean up clients then servers + + for (auto i = clients.begin(); i != clients.end(); ++i) { + delete i->second; + i->second = nullptr; + } + + for (auto i = servers.begin(); i != servers.end(); ++i) { + delete i->second; + i->second = nullptr; + } + } // display_stats + + + template + void display_server_internal_stats(std::ostream& out, + std::string time_unit) { + T add_request_time(0); + T request_complete_time(0); + uint32_t add_request_count = 0; + uint32_t request_complete_count = 0; + + for (uint i = 0; i < get_server_count(); ++i) { + const auto& server = get_server(i); + const auto& is = server.get_internal_stats(); + add_request_time += + std::chrono::duration_cast(is.add_request_time); + request_complete_time += + std::chrono::duration_cast(is.request_complete_time); + add_request_count += is.add_request_count; + request_complete_count += is.request_complete_count; + } + + double add_request_time_per_unit = + double(add_request_time.count()) / add_request_count ; + out << "total time to add requests: " << + std::fixed << add_request_time.count() << " " << time_unit << + ";" << std::endl << + " count: " << add_request_count << ";" << std::endl << + " average: " << add_request_time_per_unit << + " " << time_unit << " per request/response" << std::endl; + + double request_complete_time_unit = + double(request_complete_time.count()) / request_complete_count ; + out << "total time to note requests complete: " << std::fixed << + request_complete_time.count() << " " << time_unit << ";" << + std::endl << + " count: " << request_complete_count << ";" << std::endl << + " average: " << request_complete_time_unit << + " " << time_unit << " per request/response" << std::endl; + + out << std::endl; + + assert(add_request_count == request_complete_count); + out << "server timing for QOS algorithm: " << + add_request_time_per_unit + request_complete_time_unit << + " " << time_unit << " per request/response" << std::endl; + } + + + template + void display_client_internal_stats(std::ostream& out, + std::string time_unit) { + T track_resp_time(0); + T get_req_params_time(0); + uint32_t track_resp_count = 0; + uint32_t get_req_params_count = 0; + + for (uint i = 0; i < get_client_count(); ++i) { + const auto& client = get_client(i); + const auto& is = client.get_internal_stats(); + track_resp_time += + std::chrono::duration_cast(is.track_resp_time); + get_req_params_time += + std::chrono::duration_cast(is.get_req_params_time); + track_resp_count += is.track_resp_count; + get_req_params_count += is.get_req_params_count; + } + + double track_resp_time_unit = + double(track_resp_time.count()) / track_resp_count; + out << "total time to track responses: " << + std::fixed << track_resp_time.count() << " " << time_unit << ";" << + std::endl << + " count: " << track_resp_count << ";" << std::endl << + " average: " << track_resp_time_unit << " " << time_unit << + " per request/response" << std::endl; + + double get_req_params_time_unit = + double(get_req_params_time.count()) / get_req_params_count; + out << "total time to get request parameters: " << + std::fixed << get_req_params_time.count() << " " << time_unit << + ";" << std::endl << + " count: " << get_req_params_count << ";" << std::endl << + " average: " << get_req_params_time_unit << " " << time_unit << + " per request/response" << std::endl; + + out << std::endl; + + assert(track_resp_count == get_req_params_count); + out << "client timing for QOS algorithm: " << + track_resp_time_unit + get_req_params_time_unit << " " << + time_unit << " per request/response" << std::endl; + } + + + // **** server selection functions **** + + + const ServerId& server_select_alternate(uint64_t seed, + uint16_t client_idx) { + uint index = (client_idx + seed) % server_count; + return server_ids[index]; + } + + + // returns a lambda using the range specified as servers_per (client) + ClientBasedServerSelectFunc + make_server_select_alt_range(uint16_t servers_per) { + return [servers_per,this](uint64_t seed, uint16_t client_idx) + -> const ServerId& { + double factor = double(server_count) / client_count; + uint offset = seed % servers_per; + uint index = (uint(0.5 + client_idx * factor) + offset) % server_count; + return server_ids[index]; + }; + } + + + // function to choose a server randomly + const ServerId& server_select_random(uint64_t seed, uint16_t client_idx) { + uint index = prng() % server_count; + return server_ids[index]; + } + + + // function to choose a server randomly + ClientBasedServerSelectFunc + make_server_select_ran_range(uint16_t servers_per) { + return [servers_per,this](uint64_t seed, uint16_t client_idx) + -> const ServerId& { + double factor = double(server_count) / client_count; + uint offset = prng() % servers_per; + uint index = (uint(0.5 + client_idx * factor) + offset) % server_count; + return server_ids[index]; + }; + } + + + // function to always choose the first server + const ServerId& server_select_0(uint64_t seed, uint16_t client_idx) { + return server_ids[0]; + } + }; // class Simulation + + }; // namespace qos_simulation +}; // namespace crimson diff --cc src/dmclock/sim/src/ssched/ssched_client.h index dcbe0771de5e,000000000000..89ff148fccfc mode 100644,000000..100644 --- a/src/dmclock/sim/src/ssched/ssched_client.h +++ b/src/dmclock/sim/src/ssched/ssched_client.h @@@ -1,44 -1,0 +1,51 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#pragma once + +#include "ssched_recs.h" + + +namespace crimson { + namespace simple_scheduler { + + // S is server identifier type + template + class ServiceTracker { + + public: + + // we have to start the counters at 1, as 0 is used in the + // cleaning process + ServiceTracker() + { + // emptry + } + + + void track_resp(const S& server_id, const NullData& ignore) { + // empty + } + + + /* + * Returns the ReqParams for the given server. + */ + ReqParams get_req_params(const S& server) { + return ReqParams(); + } // get_req_params + }; // class ServiceTracker + } // namespace simple_scheduler +} // namespace crimson diff --cc src/dmclock/sim/src/ssched/ssched_recs.h index 3332d5a49333,000000000000..935e678c1ef3 mode 100644,000000..100644 --- a/src/dmclock/sim/src/ssched/ssched_recs.h +++ b/src/dmclock/sim/src/ssched/ssched_recs.h @@@ -1,37 -1,0 +1,44 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#pragma once + + +#include +#include + + +namespace crimson { + namespace simple_scheduler { + + // since we send no additional data out + // NOTE: Change name to RespParams? Is it used elsewhere? + struct NullData { + friend std::ostream& operator<<(std::ostream& out, const NullData& n) { + out << "NullData{ EMPTY }"; + return out; + } + }; // struct NullData + + + struct ReqParams { + friend std::ostream& operator<<(std::ostream& out, const ReqParams& rp) { + out << "ReqParams{ EMPTY }"; + return out; + } + }; + + } +} diff --cc src/dmclock/sim/src/ssched/ssched_server.h index fcc7055450a4,000000000000..dc496286a135 mode 100644,000000..100644 --- a/src/dmclock/sim/src/ssched/ssched_server.h +++ b/src/dmclock/sim/src/ssched/ssched_server.h @@@ -1,184 -1,0 +1,192 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + ++ +#pragma once + +#include +#include +#include +#include + +#include "boost/variant.hpp" + +#include "ssched_recs.h" + +#ifdef PROFILE +#include "profile.h" +#endif + +namespace crimson { + + namespace simple_scheduler { + + template + class SimpleQueue { + + public: + + using RequestRef = std::unique_ptr; + + // a function to see whether the server can handle another request + using CanHandleRequestFunc = std::function; + + // a function to submit a request to the server; the second + // parameter is a callback when it's completed + using HandleRequestFunc = + std::function; + + struct PullReq { + enum class Type { returning, none }; + + struct Retn { + C client; + RequestRef request; + }; + + Type type; + boost::variant data; + }; + + protected: + + enum class Mechanism { push, pull }; + + struct QRequest { + C client; + RequestRef request; + }; + + bool finishing = false; + Mechanism mechanism; + + CanHandleRequestFunc can_handle_f; + HandleRequestFunc handle_f; + + mutable std::mutex queue_mtx; + using DataGuard = std::lock_guard; + + std::deque queue; + +#ifdef PROFILE + public: + ProfileTimer pull_request_timer; + ProfileTimer add_request_timer; + ProfileTimer request_complete_timer; + protected: +#endif + + public: + + // push full constructor + SimpleQueue(CanHandleRequestFunc _can_handle_f, + HandleRequestFunc _handle_f) : + mechanism(Mechanism::push), + can_handle_f(_can_handle_f), + handle_f(_handle_f) + { + // empty + } + + SimpleQueue() : + mechanism(Mechanism::pull) + { + // empty + } + + ~SimpleQueue() { + finishing = true; + } + + void add_request(R&& request, + const C& client_id, + const ReqParams& req_params) { + add_request(RequestRef(new R(std::move(request))), + client_id, req_params); + } + + void add_request(RequestRef&& request, + const C& client_id, + const ReqParams& req_params) { + DataGuard g(queue_mtx); + +#ifdef PROFILE + add_request_timer.start(); +#endif + queue.emplace_back(QRequest{client_id, std::move(request)}); + + if (Mechanism::push == mechanism) { + schedule_request(); + } + +#ifdef PROFILE + add_request_timer.stop(); +#endif + } // add_request + + void request_completed() { + assert(Mechanism::push == mechanism); + DataGuard g(queue_mtx); + +#ifdef PROFILE + request_complete_timer.start(); +#endif + schedule_request(); + +#ifdef PROFILE + request_complete_timer.stop(); +#endif + } // request_completed + + PullReq pull_request() { + assert(Mechanism::pull == mechanism); + PullReq result; + DataGuard g(queue_mtx); + +#ifdef PROFILE + pull_request_timer.start(); +#endif + + if (queue.empty()) { + result.type = PullReq::Type::none; + } else { + auto front = queue.front(); + result.type = PullReq::Type::returning; + result.data = + typename PullReq::Retn{front.client, std::move(front.request)}; + queue.pop(); + } + +#ifdef PROFILE + pull_request_timer.stop(); +#endif + + return result; + } + + protected: + + // queue_mtx should be held when called; should only be called + // when mechanism is push + void schedule_request() { + if (!queue.empty() && can_handle_f()) { + auto& front = queue.front(); + static NullData null_data; + handle_f(front.client, std::move(front.request), null_data); + queue.pop_front(); + } + } + }; + }; +}; diff --cc src/dmclock/sim/src/str_list.h index 4ba0cadd960c,000000000000..0d2e3bad71c2 mode 100644,000000..100644 --- a/src/dmclock/sim/src/str_list.h +++ b/src/dmclock/sim/src/str_list.h @@@ -1,94 -1,0 +1,109 @@@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++/* ++ * Copyright (C) 2009 Red Hat Inc. ++ * ++ * Forked from Red Hat's Ceph project. ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. ++ */ ++ ++ +#ifndef CEPH_STRLIST_H +#define CEPH_STRLIST_H + +#include +#include +#include +#include +#include + +/** + * Split **str** into a list of strings, using the ";,= \t" delimiters and output the result in **str_list**. + * + * @param [in] str String to split and save as list + * @param [out] str_list List modified containing str after it has been split +**/ +extern void get_str_list(const std::string& str, + std::list& str_list); + +/** + * Split **str** into a list of strings, using the **delims** delimiters and output the result in **str_list**. + * + * @param [in] str String to split and save as list + * @param [in] delims characters used to split **str** + * @param [out] str_list List modified containing str after it has been split +**/ +extern void get_str_list(const std::string& str, + const char *delims, + std::list& str_list); + +/** + * Split **str** into a list of strings, using the ";,= \t" delimiters and output the result in **str_vec**. + * + * @param [in] str String to split and save as Vector + * @param [out] str_vec Vector modified containing str after it has been split +**/ +extern void get_str_vec(const std::string& str, + std::vector& str_vec); + +/** + * Split **str** into a list of strings, using the **delims** delimiters and output the result in **str_vec**. + * + * @param [in] str String to split and save as Vector + * @param [in] delims characters used to split **str** + * @param [out] str_vec Vector modified containing str after it has been split +**/ +extern void get_str_vec(const std::string& str, + const char *delims, + std::vector& str_vec); + +/** + * Split **str** into a list of strings, using the ";,= \t" delimiters and output the result in **str_list**. + * + * @param [in] str String to split and save as Set + * @param [out] str_list Set modified containing str after it has been split +**/ +extern void get_str_set(const std::string& str, + std::set& str_list); + +/** + * Split **str** into a list of strings, using the **delims** delimiters and output the result in **str_list**. + * + * @param [in] str String to split and save as Set + * @param [in] delims characters used to split **str** + * @param [out] str_list Set modified containing str after it has been split +**/ +extern void get_str_set(const std::string& str, + const char *delims, + std::set& str_list); + +/** + * Return a String containing the vector **v** joined with **sep** + * + * If **v** is empty, the function returns an empty string + * For each element in **v**, + * it will concatenate this element and **sep** with result + * + * @param [in] v Vector to join as a String + * @param [in] sep String used to join each element from **v** + * @return empty string if **v** is empty or concatenated string +**/ +inline std::string str_join(const std::vector& v, std::string sep) +{ + if (v.empty()) + return std::string(); + std::vector::const_iterator i = v.begin(); + std::string r = *i; + for (++i; i != v.end(); ++i) { + r += sep; + r += *i; + } + return r; +} + +#endif diff --cc src/dmclock/sim/src/test_dmclock.cc index 8e7aa4ab2199,000000000000..db981da8ae69 mode 100644,000000..100644 --- a/src/dmclock/sim/src/test_dmclock.cc +++ b/src/dmclock/sim/src/test_dmclock.cc @@@ -1,40 -1,0 +1,47 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#include "dmclock_recs.h" +#include "dmclock_server.h" +#include "dmclock_client.h" + +#include "sim_recs.h" +#include "sim_server.h" +#include "sim_client.h" + +#include "test_dmclock.h" + + +namespace test = crimson::test_dmc; + + +void test::dmc_server_accumulate_f(test::DmcAccum& a, + const test::dmc::PhaseType& phase) { + if (test::dmc::PhaseType::reservation == phase) { + ++a.reservation_count; + } else { + ++a.proportion_count; + } +} + + +void test::dmc_client_accumulate_f(test::DmcAccum& a, + const test::dmc::PhaseType& phase) { + if (test::dmc::PhaseType::reservation == phase) { + ++a.reservation_count; + } else { + ++a.proportion_count; + } +} diff --cc src/dmclock/sim/src/test_dmclock.h index 9728b45f935a,000000000000..1bbe5f7c98a1 mode 100644,000000..100644 --- a/src/dmclock/sim/src/test_dmclock.h +++ b/src/dmclock/sim/src/test_dmclock.h @@@ -1,57 -1,0 +1,64 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#include "dmclock_recs.h" +#include "dmclock_server.h" +#include "dmclock_client.h" + +#include "sim_recs.h" +#include "sim_server.h" +#include "sim_client.h" + +#include "simulate.h" + + +namespace crimson { + namespace test_dmc { + + namespace dmc = crimson::dmclock; + namespace sim = crimson::qos_simulation; + + struct DmcAccum { + uint64_t reservation_count = 0; + uint64_t proportion_count = 0; + }; + + using DmcQueue = dmc::PushPriorityQueue; + using DmcServiceTracker = dmc::ServiceTracker; + + using DmcServer = sim::SimulatedServer; + + using DmcClient = sim::SimulatedClient; + + using CreateQueueF = std::function; + + using MySim = sim::Simulation; + + using SubmitFunc = DmcClient::SubmitFunc; + + extern void dmc_server_accumulate_f(DmcAccum& a, + const dmc::PhaseType& phase); + + extern void dmc_client_accumulate_f(DmcAccum& a, + const dmc::PhaseType& phase); + } // namespace test_dmc +} // namespace crimson diff --cc src/dmclock/sim/src/test_dmclock_main.cc index ce9a31e404e8,000000000000..8983d0349c94 mode 100644,000000..100644 --- a/src/dmclock/sim/src/test_dmclock_main.cc +++ b/src/dmclock/sim/src/test_dmclock_main.cc @@@ -1,329 -1,0 +1,336 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#include "test_dmclock.h" +#include "config.h" + +#ifdef PROFILE +#include "profile.h" +#endif + + +namespace dmc = crimson::dmclock; +namespace test = crimson::test_dmc; +namespace sim = crimson::qos_simulation; + +using namespace std::placeholders; + + +namespace crimson { + namespace test_dmc { + void server_data(std::ostream& out, + test::MySim* sim, + test::MySim::ServerFilter server_disp_filter, + int head_w, int data_w, int data_prec); + + void client_data(std::ostream& out, + test::MySim* sim, + test::MySim::ClientFilter client_disp_filter, + int head_w, int data_w, int data_prec); + } +} + + +int main(int argc, char* argv[]) { + std::vector args; + for (int i = 1; i < argc; ++i) { + args.push_back(argv[i]); + } + + std::string conf_file_list; + sim::ceph_argparse_early_args(args, &conf_file_list); + + sim::sim_config_t g_conf; + std::vector &cli_group = g_conf.cli_group; + std::vector &srv_group = g_conf.srv_group; + + if (!conf_file_list.empty()) { + int ret; + ret = sim::parse_config_file(conf_file_list, g_conf); + if (ret) { + // error + _exit(1); + } + } else { + // default simulation parameter + g_conf.client_groups = 2; + + sim::srv_group_t st; + srv_group.push_back(st); + + sim::cli_group_t ct1(99, 0); + cli_group.push_back(ct1); + + sim::cli_group_t ct2(1, 10); + cli_group.push_back(ct2); + } + + const uint server_groups = g_conf.server_groups; + const uint client_groups = g_conf.client_groups; + const bool server_random_selection = g_conf.server_random_selection; + const bool server_soft_limit = g_conf.server_soft_limit; + const double anticipation_timeout = g_conf.anticipation_timeout; + uint server_total_count = 0; + uint client_total_count = 0; + + for (uint i = 0; i < client_groups; ++i) { + client_total_count += cli_group[i].client_count; + } + + for (uint i = 0; i < server_groups; ++i) { + server_total_count += srv_group[i].server_count; + } + + std::vector client_info; + for (uint i = 0; i < client_groups; ++i) { + client_info.push_back(test::dmc::ClientInfo + { cli_group[i].client_reservation, + cli_group[i].client_weight, + cli_group[i].client_limit } ); + } + + auto ret_client_group_f = [&](const ClientId& c) -> uint { + uint group_max = 0; + uint i = 0; + for (; i < client_groups; ++i) { + group_max += cli_group[i].client_count; + if (c < group_max) { + break; + } + } + return i; + }; + + auto ret_server_group_f = [&](const ServerId& s) -> uint { + uint group_max = 0; + uint i = 0; + for (; i < server_groups; ++i) { + group_max += srv_group[i].server_count; + if (s < group_max) { + break; + } + } + return i; + }; + + auto client_info_f = [=](const ClientId& c) -> const test::dmc::ClientInfo* { + return &client_info[ret_client_group_f(c)]; + }; + + auto client_disp_filter = [=] (const ClientId& i) -> bool { + return i < 3 || i >= (client_total_count - 3); + }; + + auto server_disp_filter = [=] (const ServerId& i) -> bool { + return i < 3 || i >= (server_total_count - 3); + }; + + + test::MySim *simulation; + + + // lambda to post a request to the identified server; called by client + test::SubmitFunc server_post_f = + [&simulation](const ServerId& server, + sim::TestRequest&& request, + const ClientId& client_id, + const test::dmc::ReqParams& req_params) { + test::DmcServer& s = simulation->get_server(server); + s.post(std::move(request), client_id, req_params); + }; + + std::vector> cli_inst; + for (uint i = 0; i < client_groups; ++i) { + if (cli_group[i].client_wait == std::chrono::seconds(0)) { + cli_inst.push_back( + { { sim::req_op, + (uint32_t)cli_group[i].client_total_ops, + (double)cli_group[i].client_iops_goal, + (uint16_t)cli_group[i].client_outstanding_ops } } ); + } else { + cli_inst.push_back( + { { sim::wait_op, cli_group[i].client_wait }, + { sim::req_op, + (uint32_t)cli_group[i].client_total_ops, + (double)cli_group[i].client_iops_goal, + (uint16_t)cli_group[i].client_outstanding_ops } } ); + } + } + + simulation = new test::MySim(); + + test::DmcServer::ClientRespFunc client_response_f = + [&simulation](ClientId client_id, + const sim::TestResponse& resp, + const ServerId& server_id, + const dmc::PhaseType& phase) { + simulation->get_client(client_id).receive_response(resp, + server_id, + phase); + }; + + test::CreateQueueF create_queue_f = + [&](test::DmcQueue::CanHandleRequestFunc can_f, + test::DmcQueue::HandleRequestFunc handle_f) -> test::DmcQueue* { + return new test::DmcQueue(client_info_f, + can_f, + handle_f, + server_soft_limit, + anticipation_timeout); + }; + + + auto create_server_f = [&](ServerId id) -> test::DmcServer* { + uint i = ret_server_group_f(id); + return new test::DmcServer(id, + srv_group[i].server_iops, + srv_group[i].server_threads, + client_response_f, + test::dmc_server_accumulate_f, + create_queue_f); + }; + + auto create_client_f = [&](ClientId id) -> test::DmcClient* { + uint i = ret_client_group_f(id); + test::MySim::ClientBasedServerSelectFunc server_select_f; + uint client_server_select_range = cli_group[i].client_server_select_range; + if (!server_random_selection) { + server_select_f = simulation->make_server_select_alt_range(client_server_select_range); + } else { + server_select_f = simulation->make_server_select_ran_range(client_server_select_range); + } + return new test::DmcClient(id, + server_post_f, + std::bind(server_select_f, _1, id), + test::dmc_client_accumulate_f, + cli_inst[i]); + }; + +#if 1 + std::cout << "[global]" << std::endl << g_conf << std::endl; + for (uint i = 0; i < client_groups; ++i) { + std::cout << std::endl << "[client." << i << "]" << std::endl; + std::cout << cli_group[i] << std::endl; + } + for (uint i = 0; i < server_groups; ++i) { + std::cout << std::endl << "[server." << i << "]" << std::endl; + std::cout << srv_group[i] << std::endl; + } + std::cout << std::endl; +#endif + + simulation->add_servers(server_total_count, create_server_f); + simulation->add_clients(client_total_count, create_client_f); + + simulation->run(); + simulation->display_stats(std::cout, + &test::server_data, &test::client_data, + server_disp_filter, client_disp_filter); + + delete simulation; +} // main + + +void test::client_data(std::ostream& out, + test::MySim* sim, + test::MySim::ClientFilter client_disp_filter, + int head_w, int data_w, int data_prec) { + // report how many ops were done by reservation and proportion for + // each client + + int total_r = 0; + out << std::setw(head_w) << "res_ops:"; + for (uint i = 0; i < sim->get_client_count(); ++i) { + const auto& client = sim->get_client(i); + auto r = client.get_accumulator().reservation_count; + total_r += r; + if (!client_disp_filter(i)) continue; + out << " " << std::setw(data_w) << r; + } + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total_r << std::endl; + + int total_p = 0; + out << std::setw(head_w) << "prop_ops:"; + for (uint i = 0; i < sim->get_client_count(); ++i) { + const auto& client = sim->get_client(i); + auto p = client.get_accumulator().proportion_count; + total_p += p; + if (!client_disp_filter(i)) continue; + out << " " << std::setw(data_w) << p; + } + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total_p << std::endl; +} + + +void test::server_data(std::ostream& out, + test::MySim* sim, + test::MySim::ServerFilter server_disp_filter, + int head_w, int data_w, int data_prec) { + out << std::setw(head_w) << "res_ops:"; + int total_r = 0; + for (uint i = 0; i < sim->get_server_count(); ++i) { + const auto& server = sim->get_server(i); + auto rc = server.get_accumulator().reservation_count; + total_r += rc; + if (!server_disp_filter(i)) continue; + out << " " << std::setw(data_w) << rc; + } + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total_r << std::endl; + + out << std::setw(head_w) << "prop_ops:"; + int total_p = 0; + for (uint i = 0; i < sim->get_server_count(); ++i) { + const auto& server = sim->get_server(i); + auto pc = server.get_accumulator().proportion_count; + total_p += pc; + if (!server_disp_filter(i)) continue; + out << " " << std::setw(data_w) << pc; + } + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total_p << std::endl; + + const auto& q = sim->get_server(0).get_priority_queue(); + out << std::endl << + " k-way heap: " << q.get_heap_branching_factor() << std::endl + << std::endl; + +#ifdef PROFILE + crimson::ProfileCombiner art_combiner; + crimson::ProfileCombiner rct_combiner; + for (uint i = 0; i < sim->get_server_count(); ++i) { + const auto& q = sim->get_server(i).get_priority_queue(); + const auto& art = q.add_request_timer; + art_combiner.combine(art); + const auto& rct = q.request_complete_timer; + rct_combiner.combine(rct); + } + out << "Server add_request_timer: count:" << art_combiner.get_count() << + ", mean:" << art_combiner.get_mean() << + ", std_dev:" << art_combiner.get_std_dev() << + ", low:" << art_combiner.get_low() << + ", high:" << art_combiner.get_high() << std::endl; + out << "Server request_complete_timer: count:" << rct_combiner.get_count() << + ", mean:" << rct_combiner.get_mean() << + ", std_dev:" << rct_combiner.get_std_dev() << + ", low:" << rct_combiner.get_low() << + ", high:" << rct_combiner.get_high() << std::endl; + out << "Server combined mean: " << + (art_combiner.get_mean() + rct_combiner.get_mean()) << + std::endl; +#endif +} diff --cc src/dmclock/sim/src/test_ssched.cc index e28b015cbdbf,000000000000..b06273dc0a82 mode 100644,000000..100644 --- a/src/dmclock/sim/src/test_ssched.cc +++ b/src/dmclock/sim/src/test_ssched.cc @@@ -1,33 -1,0 +1,40 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#include "ssched_recs.h" +#include "ssched_server.h" +#include "ssched_client.h" + +#include "sim_recs.h" +#include "sim_server.h" +#include "sim_client.h" + +#include "test_ssched.h" + + +namespace test = crimson::test_simple_scheduler; +namespace ssched = crimson::simple_scheduler; + + +void test::simple_server_accumulate_f(test::SimpleAccum& a, + const ssched::NullData& add_info) { + ++a.request_count; +} + + +void test::simple_client_accumulate_f(test::SimpleAccum& a, + const ssched::NullData& ignore) { + // empty +} diff --cc src/dmclock/sim/src/test_ssched.h index 96ac33ff376f,000000000000..0d778709afe1 mode 100644,000000..100644 --- a/src/dmclock/sim/src/test_ssched.h +++ b/src/dmclock/sim/src/test_ssched.h @@@ -1,57 -1,0 +1,64 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#include "ssched_server.h" +#include "ssched_client.h" + +#include "sim_recs.h" +#include "sim_server.h" +#include "sim_client.h" + +#include "simulate.h" + + +namespace crimson { + namespace test_simple_scheduler { + + namespace ssched = crimson::simple_scheduler; + namespace sim = crimson::qos_simulation; + + using Time = double; + + struct SimpleAccum { + uint32_t request_count = 0; + }; + + using SimpleQueue = ssched::SimpleQueue; + + using SimpleServer = sim::SimulatedServer; + using SimpleClient = sim::SimulatedClient, + ssched::ReqParams, + ssched::NullData, + SimpleAccum>; + + using CreateQueueF = + std::function; + + + using MySim = sim::Simulation; + + using SubmitFunc = SimpleClient::SubmitFunc; + + extern void simple_server_accumulate_f(SimpleAccum& a, + const ssched::NullData& add_info); + + extern void simple_client_accumulate_f(SimpleAccum& a, + const ssched::NullData& ignore); + } // namespace test_simple +} // namespace crimson diff --cc src/dmclock/sim/src/test_ssched_main.cc index 14ff7e9b4704,000000000000..82bc381eba61 mode 100644,000000..100644 --- a/src/dmclock/sim/src/test_ssched_main.cc +++ b/src/dmclock/sim/src/test_ssched_main.cc @@@ -1,187 -1,0 +1,194 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#include "test_ssched.h" + + +#ifdef PROFILE +#include "profile.h" +#endif + + +namespace test = crimson::test_simple_scheduler; +namespace ssched = crimson::simple_scheduler; +namespace sim = crimson::qos_simulation; + +using namespace std::placeholders; + + +namespace crimson { + namespace test_simple_scheduler { + void client_data(std::ostream& out, + test::MySim* sim, + test::MySim::ClientFilter client_disp_filter, + int head_w, int data_w, int data_prec); + + void server_data(std::ostream& out, + test::MySim* sim, + test::MySim::ServerFilter server_disp_filter, + int head_w, int data_w, int data_prec); + } // namespace test_simple +} // namespace crimson + + +int main(int argc, char* argv[]) { + // server params + + const uint server_count = 100; + const uint server_iops = 40; + const uint server_threads = 1; + + // client params + + const uint client_total_ops = 1000; + const uint client_count = 100; + const uint client_server_select_range = 10; + const uint client_wait_count = 1; + const uint client_iops_goal = 50; + const uint client_outstanding_ops = 100; + const std::chrono::seconds client_wait(10); + + auto client_disp_filter = [=] (const ClientId& i) -> bool { + return i < 3 || i >= (client_count - 3); + }; + + auto server_disp_filter = [=] (const ServerId& i) -> bool { + return i < 3 || i >= (server_count - 3); + }; + + + test::MySim *simulation; + + // lambda to post a request to the identified server; called by client + test::SubmitFunc server_post_f = + [&simulation](const ServerId& server_id, + sim::TestRequest&& request, + const ClientId& client_id, + const ssched::ReqParams& req_params) { + auto& server = simulation->get_server(server_id); + server.post(std::move(request), client_id, req_params); + }; + + static std::vector no_wait = + { { sim::req_op, client_total_ops, client_iops_goal, client_outstanding_ops } }; + static std::vector wait = + { { sim::wait_op, client_wait }, + { sim::req_op, client_total_ops, client_iops_goal, client_outstanding_ops } }; + + simulation = new test::MySim(); + +#if 1 + test::MySim::ClientBasedServerSelectFunc server_select_f = + simulation->make_server_select_alt_range(client_server_select_range); +#elif 0 + test::MySim::ClientBasedServerSelectFunc server_select_f = + std::bind(&test::MySim::server_select_random, simulation, _1, _2); +#else + test::MySim::ClientBasedServerSelectFunc server_select_f = + std::bind(&test::MySim::server_select_0, simulation, _1, _2); +#endif + + test::SimpleServer::ClientRespFunc client_response_f = + [&simulation](ClientId client_id, + const sim::TestResponse& resp, + const ServerId& server_id, + const ssched::NullData& resp_params) { + simulation->get_client(client_id).receive_response(resp, + server_id, + resp_params); + }; + + test::CreateQueueF create_queue_f = + [&](test::SimpleQueue::CanHandleRequestFunc can_f, + test::SimpleQueue::HandleRequestFunc handle_f) -> test::SimpleQueue* { + return new test::SimpleQueue(can_f, handle_f); + }; + + auto create_server_f = [&](ServerId id) -> test::SimpleServer* { + return new test::SimpleServer(id, + server_iops, server_threads, + client_response_f, + test::simple_server_accumulate_f, + create_queue_f); + }; + + auto create_client_f = [&](ClientId id) -> test::SimpleClient* { + return new test::SimpleClient(id, + server_post_f, + std::bind(server_select_f, _1, id), + test::simple_client_accumulate_f, + id < (client_count - client_wait_count) + ? no_wait : wait); + }; + + simulation->add_servers(server_count, create_server_f); + simulation->add_clients(client_count, create_client_f); + + simulation->run(); + simulation->display_stats(std::cout, + &test::server_data, &test::client_data, + server_disp_filter, client_disp_filter); +} // main + + +void test::client_data(std::ostream& out, + test::MySim* sim, + test::MySim::ClientFilter client_disp_filter, + int head_w, int data_w, int data_prec) { + // empty +} + + +void test::server_data(std::ostream& out, + test::MySim* sim, + test::MySim::ServerFilter server_disp_filter, + int head_w, int data_w, int data_prec) { + out << std::setw(head_w) << "requests:"; + int total_req = 0; + for (uint i = 0; i < sim->get_server_count(); ++i) { + const auto& server = sim->get_server(i); + auto req_count = server.get_accumulator().request_count; + total_req += req_count; + if (!server_disp_filter(i)) continue; + out << std::setw(data_w) << req_count; + } + out << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total_req << std::endl; + +#ifdef PROFILE + crimson::ProfileCombiner art_combiner; + crimson::ProfileCombiner rct_combiner; + for (uint i = 0; i < sim->get_server_count(); ++i) { + const auto& q = sim->get_server(i).get_priority_queue(); + const auto& art = q.add_request_timer; + art_combiner.combine(art); + const auto& rct = q.request_complete_timer; + rct_combiner.combine(rct); + } + out << "Server add_request_timer: count:" << art_combiner.get_count() << + ", mean:" << art_combiner.get_mean() << + ", std_dev:" << art_combiner.get_std_dev() << + ", low:" << art_combiner.get_low() << + ", high:" << art_combiner.get_high() << std::endl; + out << "Server request_complete_timer: count:" << rct_combiner.get_count() << + ", mean:" << rct_combiner.get_mean() << + ", std_dev:" << rct_combiner.get_std_dev() << + ", low:" << rct_combiner.get_low() << + ", high:" << rct_combiner.get_high() << std::endl; + out << "Server combined mean: " << + (art_combiner.get_mean() + rct_combiner.get_mean()) << + std::endl; +#endif +} diff --cc src/dmclock/src/dmclock_client.h index e0280ab311c1,000000000000..7cebd9e01087 mode 100644,000000..100644 --- a/src/dmclock/src/dmclock_client.h +++ b/src/dmclock/src/dmclock_client.h @@@ -1,274 -1,0 +1,281 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2017 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "run_every.h" +#include "dmclock_util.h" +#include "dmclock_recs.h" + + +namespace crimson { + namespace dmclock { + + // OrigTracker is a best-effort implementation of the the original + // dmClock calculations of delta and rho. It adheres to an + // interface, implemented via a template type, that allows it to + // be replaced with an alternative. The interface consists of the + // static create, prepare_req, resp_update, and get_last_delta + // functions. + class OrigTracker { + Counter delta_prev_req; + Counter rho_prev_req; + uint32_t my_delta; + uint32_t my_rho; + + public: + + OrigTracker(Counter global_delta, + Counter global_rho) : + delta_prev_req(global_delta), + rho_prev_req(global_rho), + my_delta(0), + my_rho(0) + { /* empty */ } + + static inline OrigTracker create(Counter the_delta, Counter the_rho) { + return OrigTracker(the_delta, the_rho); + } + + inline ReqParams prepare_req(Counter& the_delta, Counter& the_rho) { + Counter delta_out = 1 + the_delta - delta_prev_req - my_delta; + Counter rho_out = 1 + the_rho - rho_prev_req - my_rho; + delta_prev_req = the_delta; + rho_prev_req = the_rho; + my_delta = 0; + my_rho = 0; + return ReqParams(uint32_t(delta_out), uint32_t(rho_out)); + } + + inline void resp_update(PhaseType phase, + Counter& the_delta, + Counter& the_rho) { + ++the_delta; + ++my_delta; + if (phase == PhaseType::reservation) { + ++the_rho; + ++my_rho; + } + } + + inline Counter get_last_delta() const { + return delta_prev_req; + } + }; // struct OrigTracker + + + // BorrowingTracker always returns a positive delta and rho. If + // not enough responses have come in to allow that, we will borrow + // a future response and repay it later. + class BorrowingTracker { + Counter delta_prev_req; + Counter rho_prev_req; + Counter delta_borrow; + Counter rho_borrow; + + public: + + BorrowingTracker(Counter global_delta, Counter global_rho) : + delta_prev_req(global_delta), + rho_prev_req(global_rho), + delta_borrow(0), + rho_borrow(0) + { /* empty */ } + + static inline BorrowingTracker create(Counter the_delta, + Counter the_rho) { + return BorrowingTracker(the_delta, the_rho); + } + + inline Counter calc_with_borrow(const Counter& global, + const Counter& previous, + Counter& borrow) { + Counter result = global - previous; + if (0 == result) { + // if no replies have come in, borrow one from the future + ++borrow; + return 1; + } else if (result > borrow) { + // if we can give back all of what we borrowed, do so + result -= borrow; + borrow = 0; + return result; + } else { + // can only return part of what was borrowed in order to + // return positive + borrow = borrow - result + 1; + return 1; + } + } + + inline ReqParams prepare_req(Counter& the_delta, Counter& the_rho) { + Counter delta_out = + calc_with_borrow(the_delta, delta_prev_req, delta_borrow); + Counter rho_out = + calc_with_borrow(the_rho, rho_prev_req, rho_borrow); + delta_prev_req = the_delta; + rho_prev_req = the_rho; + return ReqParams(uint32_t(delta_out), uint32_t(rho_out)); + } + + inline void resp_update(PhaseType phase, + Counter& the_delta, + Counter& the_rho) { + ++the_delta; + if (phase == PhaseType::reservation) { + ++the_rho; + } + } + + inline Counter get_last_delta() const { + return delta_prev_req; + } + }; // struct BorrowingTracker + + + // S is server identifier type + // T is the server info class that adheres to ServerTrackerIfc interface + template + class ServiceTracker { + // we don't want to include gtest.h just for FRIEND_TEST + friend class dmclock_client_server_erase_Test; + + using TimePoint = decltype(std::chrono::steady_clock::now()); + using Duration = std::chrono::milliseconds; + using MarkPoint = std::pair; + + Counter delta_counter; // # reqs completed + Counter rho_counter; // # reqs completed via reservation + std::map server_map; + mutable std::mutex data_mtx; // protects Counters and map + + using DataGuard = std::lock_guard; + + // clean config + + std::deque clean_mark_points; + Duration clean_age; // age at which server tracker cleaned + + // NB: All threads declared at end, so they're destructed firs! + + std::unique_ptr cleaning_job; + + + public: + + // we have to start the counters at 1, as 0 is used in the + // cleaning process + template + ServiceTracker(std::chrono::duration _clean_every, + std::chrono::duration _clean_age) : + delta_counter(1), + rho_counter(1), + clean_age(std::chrono::duration_cast(_clean_age)) + { + cleaning_job = + std::unique_ptr( + new RunEvery(_clean_every, + std::bind(&ServiceTracker::do_clean, this))); + } + + + // the reason we're overloading the constructor rather than + // using default values for the arguments is so that callers + // have to either use all defaults or specify all timings; with + // default arguments they could specify some without others + ServiceTracker() : + ServiceTracker(std::chrono::minutes(5), std::chrono::minutes(10)) + { + // empty + } + + + /* + * Incorporates the RespParams received into the various counter. + */ + void track_resp(const S& server_id, const PhaseType& phase) { + DataGuard g(data_mtx); + + auto it = server_map.find(server_id); + if (server_map.end() == it) { + // this code can only run if a request did not precede the + // response or if the record was cleaned up b/w when + // the request was made and now + auto i = server_map.emplace(server_id, + T::create(delta_counter, rho_counter)); + it = i.first; + } + it->second.resp_update(phase, delta_counter, rho_counter); + } + + /* + * Returns the ReqParams for the given server. + */ + ReqParams get_req_params(const S& server) { + DataGuard g(data_mtx); + auto it = server_map.find(server); + if (server_map.end() == it) { + server_map.emplace(server, + T::create(delta_counter, rho_counter)); + return ReqParams(1, 1); + } else { + return it->second.prepare_req(delta_counter, rho_counter); + } + } + + private: + + /* + * This is being called regularly by RunEvery. Every time it's + * called it notes the time and delta counter (mark point) in a + * deque. It also looks at the deque to find the most recent + * mark point that is older than clean_age. It then walks the + * map and delete all server entries that were last used before + * that mark point. + */ + void do_clean() { + TimePoint now = std::chrono::steady_clock::now(); + DataGuard g(data_mtx); + clean_mark_points.emplace_back(MarkPoint(now, delta_counter)); + + Counter earliest = 0; + auto point = clean_mark_points.front(); + while (point.first <= now - clean_age) { + earliest = point.second; + clean_mark_points.pop_front(); + point = clean_mark_points.front(); + } + + if (earliest > 0) { + for (auto i = server_map.begin(); + i != server_map.end(); + /* empty */) { + auto i2 = i++; + if (i2->second.get_last_delta() <= earliest) { + server_map.erase(i2); + } + } + } + } // do_clean + }; // class ServiceTracker + } +} diff --cc src/dmclock/src/dmclock_recs.h index 50b129f67fc8,000000000000..4b574f8dbe09 mode 100644,000000..100644 --- a/src/dmclock/src/dmclock_recs.h +++ b/src/dmclock/src/dmclock_recs.h @@@ -1,61 -1,0 +1,68 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2017 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#pragma once + + +#include +#include + + +namespace crimson { + namespace dmclock { + using Counter = uint64_t; + + enum class PhaseType : uint8_t { reservation, priority }; + + inline std::ostream& operator<<(std::ostream& out, const PhaseType& phase) { + out << (PhaseType::reservation == phase ? "reservation" : "priority"); + return out; + } + + struct ReqParams { + // count of all replies since last request; MUSTN'T BE 0 + uint32_t delta; + + // count of reservation replies since last request; MUSTN'T BE 0 + uint32_t rho; + + ReqParams(uint32_t _delta, uint32_t _rho) : + delta(_delta), + rho(_rho) + { + assert(0 != delta && 0 != rho && rho <= delta); + } + + ReqParams() : + ReqParams(1, 1) + { + // empty + } + + ReqParams(const ReqParams& other) : + delta(other.delta), + rho(other.rho) + { + // empty + } + + friend std::ostream& operator<<(std::ostream& out, const ReqParams& rp) { + out << "ReqParams{ delta:" << rp.delta << + ", rho:" << rp.rho << " }"; + return out; + } + }; // class ReqParams + } +} diff --cc src/dmclock/src/dmclock_server.h index dd9dd2b734f3,000000000000..39a6b781d422 mode 100644,000000..100644 --- a/src/dmclock/src/dmclock_server.h +++ b/src/dmclock/src/dmclock_server.h @@@ -1,1660 -1,0 +1,1667 @@@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2017 Red Hat Inc. ++ * ++ * Author: J. Eric Ivancich ++ * ++ * This is free software; you can redistribute it and/or modify it ++ * under the terms of the GNU Lesser General Public License version ++ * 2.1, as published by the Free Software Foundation. See file ++ * COPYING. + */ + + +#pragma once + +/* COMPILATION OPTIONS + * + * By default we include an optimization over the originally published + * dmclock algorithm using not the values of rho and delta that were + * sent in with a request but instead the most recent rho and delta + * values from the requests's client. To restore the algorithm's + * original behavior, define DO_NOT_DELAY_TAG_CALC (i.e., compiler + * argument -DDO_NOT_DELAY_TAG_CALC). + * + * The prop_heap does not seem to be necessary. The only thing it + * would help with is quickly finding the mininum proportion/prioity + * when an idle client became active. To have the code maintain the + * proportional heap, define USE_PROP_HEAP (i.e., compiler argument + * -DUSE_PROP_HEAP). + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "indirect_intrusive_heap.h" +#include "run_every.h" +#include "dmclock_util.h" +#include "dmclock_recs.h" + +#ifdef PROFILE +#include "profile.h" +#endif + + +namespace crimson { + + namespace dmclock { + + namespace c = crimson; + + constexpr double max_tag = std::numeric_limits::is_iec559 ? + std::numeric_limits::infinity() : + std::numeric_limits::max(); + constexpr double min_tag = std::numeric_limits::is_iec559 ? + -std::numeric_limits::infinity() : + std::numeric_limits::lowest(); + constexpr uint tag_modulo = 1000000; + + struct ClientInfo { + double reservation; // minimum + double weight; // proportional + double limit; // maximum + + // multiplicative inverses of above, which we use in calculations + // and don't want to recalculate repeatedly + double reservation_inv; + double weight_inv; + double limit_inv; + + // order parameters -- min, "normal", max + ClientInfo(double _reservation, double _weight, double _limit) : + reservation(_reservation), + weight(_weight), + limit(_limit), + reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation), + weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight), + limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit) + { + // empty + } + + + friend std::ostream& operator<<(std::ostream& out, + const ClientInfo& client) { + out << + "{ ClientInfo:: r:" << client.reservation << + " w:" << std::fixed << client.weight << + " l:" << std::fixed << client.limit << + " 1/r:" << std::fixed << client.reservation_inv << + " 1/w:" << std::fixed << client.weight_inv << + " 1/l:" << std::fixed << client.limit_inv << + " }"; + return out; + } + }; // class ClientInfo + + + struct RequestTag { + double reservation; + double proportion; + double limit; + bool ready; // true when within limit + Time arrival; + + RequestTag(const RequestTag& prev_tag, + const ClientInfo& client, + const uint32_t delta, + const uint32_t rho, + const Time time, + const double cost = 0.0, + const double anticipation_timeout = 0.0) : + ready(false), + arrival(time) + { + Time max_time = time; + if (time - anticipation_timeout < prev_tag.arrival) + max_time -= anticipation_timeout; + + reservation = cost + tag_calc(max_time, + prev_tag.reservation, + client.reservation_inv, + rho, + true); + proportion = tag_calc(max_time, + prev_tag.proportion, + client.weight_inv, + delta, + true); + limit = tag_calc(max_time, + prev_tag.limit, + client.limit_inv, + delta, + false); + + assert(reservation < max_tag || proportion < max_tag); + } + + RequestTag(const RequestTag& prev_tag, + const ClientInfo& client, + const ReqParams req_params, + const Time time, + const double cost = 0.0, + const double anticipation_timeout = 0.0) : + RequestTag(prev_tag, client, req_params.delta, req_params.rho, time, + cost, anticipation_timeout) + { /* empty */ } + + RequestTag(double _res, double _prop, double _lim, const Time _arrival) : + reservation(_res), + proportion(_prop), + limit(_lim), + ready(false), + arrival(_arrival) + { + assert(reservation < max_tag || proportion < max_tag); + } + + RequestTag(const RequestTag& other) : + reservation(other.reservation), + proportion(other.proportion), + limit(other.limit), + ready(other.ready), + arrival(other.arrival) + { + // empty + } + + static std::string format_tag_change(double before, double after) { + if (before == after) { + return std::string("same"); + } else { + std::stringstream ss; + ss << format_tag(before) << "=>" << format_tag(after); + return ss.str(); + } + } + + static std::string format_tag(double value) { + if (max_tag == value) { + return std::string("max"); + } else if (min_tag == value) { + return std::string("min"); + } else { + return format_time(value, tag_modulo); + } + } + + private: + + static double tag_calc(const Time time, + double prev, + double increment, + uint32_t dist_req_val, + bool extreme_is_high) { + if (0.0 == increment) { + return extreme_is_high ? max_tag : min_tag; + } else { + if (0 != dist_req_val) { + increment *= dist_req_val; + } + return std::max(time, prev + increment); + } + } + + friend std::ostream& operator<<(std::ostream& out, + const RequestTag& tag) { + out << + "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") << + " r:" << format_tag(tag.reservation) << + " p:" << format_tag(tag.proportion) << + " l:" << format_tag(tag.limit) << +#if 0 // try to resolve this to make sure Time is operator<<'able. +#ifndef DO_NOT_DELAY_TAG_CALC + " arrival:" << tag.arrival << +#endif +#endif + " }"; + return out; + } + }; // class RequestTag + + + // C is client identifier type, R is request type, + // U1 determines whether to use client information function dynamically, + // B is heap branching factor + template + class PriorityQueueBase { + // we don't want to include gtest.h just for FRIEND_TEST + friend class dmclock_server_client_idle_erase_Test; + + public: + + using RequestRef = std::unique_ptr; + + protected: + + using TimePoint = decltype(std::chrono::steady_clock::now()); + using Duration = std::chrono::milliseconds; + using MarkPoint = std::pair; + + enum class ReadyOption {ignore, lowers, raises}; + + // forward decl for friend decls + template + struct ClientCompare; + + class ClientReq { + friend PriorityQueueBase; + + RequestTag tag; + C client_id; + RequestRef request; + + public: + + ClientReq(const RequestTag& _tag, + const C& _client_id, + RequestRef&& _request) : + tag(_tag), + client_id(_client_id), + request(std::move(_request)) + { + // empty + } + + friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) { + out << "{ ClientReq:: tag:" << c.tag << " client:" << + c.client_id << " }"; + return out; + } + }; // class ClientReq + + public: + + // NOTE: ClientRec is in the "public" section for compatibility + // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1 + // ClientRec could be "protected" with no issue. [See comments + // associated with function submit_top_request.] + class ClientRec { + friend PriorityQueueBase; + + C client; + RequestTag prev_tag; + std::deque requests; + + // amount added from the proportion tag as a result of + // an idle client becoming unidle + double prop_delta = 0.0; + + c::IndIntruHeapData reserv_heap_data {}; + c::IndIntruHeapData lim_heap_data {}; + c::IndIntruHeapData ready_heap_data {}; +#if USE_PROP_HEAP + c::IndIntruHeapData prop_heap_data {}; +#endif + + public: + + const ClientInfo* info; + bool idle; + Counter last_tick; + uint32_t cur_rho; + uint32_t cur_delta; + + ClientRec(C _client, + const ClientInfo* _info, + Counter current_tick) : + client(_client), + prev_tag(0.0, 0.0, 0.0, TimeZero), + info(_info), + idle(true), + last_tick(current_tick), + cur_rho(1), + cur_delta(1) + { + // empty + } + + inline const RequestTag& get_req_tag() const { + return prev_tag; + } + + static inline void assign_unpinned_tag(double& lhs, const double rhs) { + if (rhs != max_tag && rhs != min_tag) { + lhs = rhs; + } + } + + inline void update_req_tag(const RequestTag& _prev, + const Counter& _tick) { + assign_unpinned_tag(prev_tag.reservation, _prev.reservation); + assign_unpinned_tag(prev_tag.limit, _prev.limit); + assign_unpinned_tag(prev_tag.proportion, _prev.proportion); + prev_tag.arrival = _prev.arrival; + last_tick = _tick; + } + + inline void add_request(const RequestTag& tag, + const C& client_id, + RequestRef&& request) { + requests.emplace_back(ClientReq(tag, client_id, std::move(request))); + } + + inline const ClientReq& next_request() const { + return requests.front(); + } + + inline ClientReq& next_request() { + return requests.front(); + } + + inline void pop_request() { + requests.pop_front(); + } + + inline bool has_request() const { + return !requests.empty(); + } + + inline size_t request_count() const { + return requests.size(); + } + + // NB: because a deque is the underlying structure, this + // operation might be expensive - bool remove_by_req_filter_fw(std::function filter_accum) { ++ bool remove_by_req_filter_fw(std::function filter_accum) { + bool any_removed = false; + for (auto i = requests.begin(); + i != requests.end(); + /* no inc */) { - if (filter_accum(std::move(*i->request))) { ++ if (filter_accum(std::move(i->request))) { + any_removed = true; + i = requests.erase(i); + } else { + ++i; + } + } + return any_removed; + } + + // NB: because a deque is the underlying structure, this + // operation might be expensive - bool remove_by_req_filter_bw(std::function filter_accum) { ++ bool remove_by_req_filter_bw(std::function filter_accum) { + bool any_removed = false; + for (auto i = requests.rbegin(); + i != requests.rend(); + /* no inc */) { - if (filter_accum(std::move(*i->request))) { ++ if (filter_accum(std::move(i->request))) { + any_removed = true; + i = decltype(i){ requests.erase(std::next(i).base()) }; + } else { + ++i; + } + } + return any_removed; + } + + inline bool - remove_by_req_filter(std::function filter_accum, ++ remove_by_req_filter(std::function filter_accum, + bool visit_backwards) { + if (visit_backwards) { + return remove_by_req_filter_bw(filter_accum); + } else { + return remove_by_req_filter_fw(filter_accum); + } + } + + friend std::ostream& + operator<<(std::ostream& out, + const typename PriorityQueueBase::ClientRec& e) { + out << "{ ClientRec::" << + " client:" << e.client << + " prev_tag:" << e.prev_tag << + " req_count:" << e.requests.size() << + " top_req:"; + if (e.has_request()) { + out << e.next_request(); + } else { + out << "none"; + } + out << " }"; + + return out; + } + }; // class ClientRec + + using ClientRecRef = std::shared_ptr; + + // when we try to get the next request, we'll be in one of three + // situations -- we'll have one to return, have one that can + // fire in the future, or not have any + enum class NextReqType { returning, future, none }; + + // specifies which queue next request will get popped from + enum class HeapId { reservation, ready }; + + // this is returned from next_req to tell the caller the situation + struct NextReq { + NextReqType type; + union { + HeapId heap_id; + Time when_ready; + }; + + inline explicit NextReq() : + type(NextReqType::none) + { } + + inline NextReq(HeapId _heap_id) : + type(NextReqType::returning), + heap_id(_heap_id) + { } + + inline NextReq(Time _when_ready) : + type(NextReqType::future), + when_ready(_when_ready) + { } + + // calls to this are clearer than calls to the default + // constructor + static inline NextReq none() { + return NextReq(); + } + }; + + + // a function that can be called to look up client information + using ClientInfoFunc = std::function; + + + bool empty() const { + DataGuard g(data_mtx); + return (resv_heap.empty() || ! resv_heap.top().has_request()); + } + + + size_t client_count() const { + DataGuard g(data_mtx); + return resv_heap.size(); + } + + + size_t request_count() const { + DataGuard g(data_mtx); + size_t total = 0; + for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) { + total += i->request_count(); + } + return total; + } + + - bool remove_by_req_filter(std::function filter_accum, ++ bool remove_by_req_filter(std::function filter_accum, + bool visit_backwards = false) { + bool any_removed = false; + DataGuard g(data_mtx); + for (auto i : client_map) { + bool modified = + i.second->remove_by_req_filter(filter_accum, visit_backwards); + if (modified) { + resv_heap.adjust(*i.second); + limit_heap.adjust(*i.second); + ready_heap.adjust(*i.second); +#if USE_PROP_HEAP + prop_heap.adjust(*i.second); +#endif + any_removed = true; + } + } + return any_removed; + } + + + // use as a default value when no accumulator is provide - static void request_sink(R&& req) { ++ static void request_sink(RequestRef&& req) { + // do nothing + } + + + void remove_by_client(const C& client, + bool reverse = false, - std::function accum = request_sink) { ++ std::function accum = request_sink) { + DataGuard g(data_mtx); + + auto i = client_map.find(client); + + if (i == client_map.end()) return; + + if (reverse) { + for (auto j = i->second->requests.rbegin(); + j != i->second->requests.rend(); + ++j) { - accum(std::move(*j->request)); ++ accum(std::move(j->request)); + } + } else { + for (auto j = i->second->requests.begin(); + j != i->second->requests.end(); + ++j) { - accum(std::move(*j->request)); ++ accum(std::move(j->request)); + } + } + + i->second->requests.clear(); + + resv_heap.adjust(*i->second); + limit_heap.adjust(*i->second); + ready_heap.adjust(*i->second); +#if USE_PROP_HEAP + prop_heap.adjust(*i->second); +#endif + } + + + uint get_heap_branching_factor() const { + return B; + } + + + void update_client_info(const C& client_id) { + DataGuard g(data_mtx); + auto client_it = client_map.find(client_id); + if (client_map.end() != client_it) { + ClientRec& client = (*client_it->second); + client.info = client_info_f(client_id); + } + } + + + void update_client_infos() { + DataGuard g(data_mtx); + for (auto i : client_map) { + i.second->info = client_info_f(i.second->client); + } + } + + + friend std::ostream& operator<<(std::ostream& out, + const PriorityQueueBase& q) { + std::lock_guard guard(q.data_mtx); + + out << "{ PriorityQueue::"; + for (const auto& c : q.client_map) { + out << " { client:" << c.first << ", record:" << *c.second << + " }"; + } + if (!q.resv_heap.empty()) { + const auto& resv = q.resv_heap.top(); + out << " { reservation_top:" << resv << " }"; + const auto& ready = q.ready_heap.top(); + out << " { ready_top:" << ready << " }"; + const auto& limit = q.limit_heap.top(); + out << " { limit_top:" << limit << " }"; + } else { + out << " HEAPS-EMPTY"; + } + out << " }"; + + return out; + } + + // for debugging + void display_queues(std::ostream& out, + bool show_res = true, + bool show_lim = true, + bool show_ready = true, + bool show_prop = true) const { + auto filter = [](const ClientRec& e)->bool { return true; }; + DataGuard g(data_mtx); + if (show_res) { + resv_heap.display_sorted(out << "RESER:", filter); + } + if (show_lim) { + limit_heap.display_sorted(out << "LIMIT:", filter); + } + if (show_ready) { + ready_heap.display_sorted(out << "READY:", filter); + } +#if USE_PROP_HEAP + if (show_prop) { + prop_heap.display_sorted(out << "PROPO:", filter); + } +#endif + } // display_queues + + + protected: + + // The ClientCompare functor is essentially doing a precedes? + // operator, returning true if and only if the first parameter + // must precede the second parameter. If the second must precede + // the first, or if they are equivalent, false should be + // returned. The reason for this behavior is that it will be + // called to test if two items are out of order and if true is + // returned it will reverse the items. Therefore false is the + // default return when it doesn't matter to prevent unnecessary + // re-ordering. + // + // The template is supporting variations in sorting based on the + // heap in question and allowing these variations to be handled + // at compile-time. + // + // tag_field determines which tag is being used for comparison + // + // ready_opt determines how the ready flag influences the sort + // + // use_prop_delta determines whether the proportional delta is + // added in for comparison + template + struct ClientCompare { + bool operator()(const ClientRec& n1, const ClientRec& n2) const { + if (n1.has_request()) { + if (n2.has_request()) { + const auto& t1 = n1.next_request().tag; + const auto& t2 = n2.next_request().tag; + if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) { + // if we don't care about ready or the ready values are the same + if (use_prop_delta) { + return (t1.*tag_field + n1.prop_delta) < + (t2.*tag_field + n2.prop_delta); + } else { + return t1.*tag_field < t2.*tag_field; + } + } else if (ReadyOption::raises == ready_opt) { + // use_ready == true && the ready fields are different + return t1.ready; + } else { + return t2.ready; + } + } else { + // n1 has request but n2 does not + return true; + } + } else if (n2.has_request()) { + // n2 has request but n1 does not + return false; + } else { + // both have none; keep stable w false + return false; + } + } + }; + + ClientInfoFunc client_info_f; + static constexpr bool is_dynamic_cli_info_f = U1; + + mutable std::mutex data_mtx; + using DataGuard = std::lock_guard; + + // stable mapping between client ids and client queues + std::map client_map; + + c::IndIntruHeap, + B> resv_heap; +#if USE_PROP_HEAP + c::IndIntruHeap, + B> prop_heap; +#endif + c::IndIntruHeap, + B> limit_heap; + c::IndIntruHeap, + B> ready_heap; + + // if all reservations are met and all other requestes are under + // limit, this will allow the request next in terms of + // proportion to still get issued + bool allow_limit_break; + double anticipation_timeout; + + std::atomic_bool finishing; + + // every request creates a tick + Counter tick = 0; + + // performance data collection + size_t reserv_sched_count = 0; + size_t prop_sched_count = 0; + size_t limit_break_sched_count = 0; + + Duration idle_age; + Duration erase_age; + Duration check_time; + std::deque clean_mark_points; + + // NB: All threads declared at end, so they're destructed first! + + std::unique_ptr cleaning_job; + + + // COMMON constructor that others feed into; we can accept three + // different variations of durations + template + PriorityQueueBase(ClientInfoFunc _client_info_f, + std::chrono::duration _idle_age, + std::chrono::duration _erase_age, + std::chrono::duration _check_time, + bool _allow_limit_break, + double _anticipation_timeout) : + client_info_f(_client_info_f), + allow_limit_break(_allow_limit_break), + anticipation_timeout(_anticipation_timeout), + finishing(false), + idle_age(std::chrono::duration_cast(_idle_age)), + erase_age(std::chrono::duration_cast(_erase_age)), + check_time(std::chrono::duration_cast(_check_time)) + { + assert(_erase_age >= _idle_age); + assert(_check_time < _idle_age); + cleaning_job = + std::unique_ptr( + new RunEvery(check_time, + std::bind(&PriorityQueueBase::do_clean, this))); + } + + + ~PriorityQueueBase() { + finishing = true; + } + + + inline const ClientInfo* get_cli_info(ClientRec& client) const { + if (is_dynamic_cli_info_f) { + client.info = client_info_f(client.client); + } + return client.info; + } + + + // data_mtx must be held by caller + void do_add_request(RequestRef&& request, + const C& client_id, + const ReqParams& req_params, + const Time time, + const double cost = 0.0) { + ++tick; + + // this pointer will help us create a reference to a shared + // pointer, no matter which of two codepaths we take + ClientRec* temp_client; + + auto client_it = client_map.find(client_id); + if (client_map.end() != client_it) { + temp_client = &(*client_it->second); // address of obj of shared_ptr + } else { + const ClientInfo* info = client_info_f(client_id); + ClientRecRef client_rec = + std::make_shared(client_id, info, tick); + resv_heap.push(client_rec); +#if USE_PROP_HEAP + prop_heap.push(client_rec); +#endif + limit_heap.push(client_rec); + ready_heap.push(client_rec); + client_map[client_id] = client_rec; + temp_client = &(*client_rec); // address of obj of shared_ptr + } + + // for convenience, we'll create a reference to the shared pointer + ClientRec& client = *temp_client; + + if (client.idle) { + // We need to do an adjustment so that idle clients compete + // fairly on proportional tags since those tags may have + // drifted from real-time. Either use the lowest existing + // proportion tag -- O(1) -- or the client with the lowest + // previous proportion tag -- O(n) where n = # clients. + // + // So we don't have to maintain a propotional queue that + // keeps the minimum on proportional tag alone (we're + // instead using a ready queue), we'll have to check each + // client. + // + // The alternative would be to maintain a proportional queue + // (define USE_PROP_TAG) and do an O(1) operation here. + + // Was unable to confirm whether equality testing on + // std::numeric_limits::max() is guaranteed, so + // we'll use a compile-time calculated trigger that is one + // third the max, which should be much larger than any + // expected organic value. + constexpr double lowest_prop_tag_trigger = + std::numeric_limits::max() / 3.0; + + double lowest_prop_tag = std::numeric_limits::max(); + for (auto const &c : client_map) { + // don't use ourselves (or anything else that might be + // listed as idle) since we're now in the map + if (!c.second->idle) { + double p; + // use either lowest proportion tag or previous proportion tag + if (c.second->has_request()) { + p = c.second->next_request().tag.proportion + + c.second->prop_delta; + } else { + p = c.second->get_req_tag().proportion + c.second->prop_delta; + } + + if (p < lowest_prop_tag) { + lowest_prop_tag = p; + } + } + } + + // if this conditional does not fire, it + if (lowest_prop_tag < lowest_prop_tag_trigger) { + client.prop_delta = lowest_prop_tag - time; + } + client.idle = false; + } // if this client was idle + +#ifndef DO_NOT_DELAY_TAG_CALC + RequestTag tag(0, 0, 0, time); + + if (!client.has_request()) { + const ClientInfo* client_info = get_cli_info(client); + assert(client_info); + tag = RequestTag(client.get_req_tag(), + *client_info, + req_params, + time, + cost, + anticipation_timeout); + + // copy tag to previous tag for client + client.update_req_tag(tag, tick); + } +#else + const ClientInfo* client_info = get_cli_info(client); + assert(client_info); + RequestTag tag(client.get_req_tag(), + *client_info, + req_params, + time, + cost, + anticipation_timeout); + + // copy tag to previous tag for client + client.update_req_tag(tag, tick); +#endif + + client.add_request(tag, client.client, std::move(request)); + if (1 == client.requests.size()) { + // NB: can the following 4 calls to adjust be changed + // promote? Can adding a request ever demote a client in the + // heaps? + resv_heap.adjust(client); + limit_heap.adjust(client); + ready_heap.adjust(client); +#if USE_PROP_HEAP + prop_heap.adjust(client); +#endif + } + + client.cur_rho = req_params.rho; + client.cur_delta = req_params.delta; + + resv_heap.adjust(client); + limit_heap.adjust(client); + ready_heap.adjust(client); +#if USE_PROP_HEAP + prop_heap.adjust(client); +#endif + } // add_request + + + // data_mtx should be held when called; top of heap should have + // a ready request + template + void pop_process_request(IndIntruHeap& heap, + std::function process) { + // gain access to data + ClientRec& top = heap.top(); + + RequestRef request = std::move(top.next_request().request); +#ifndef DO_NOT_DELAY_TAG_CALC + RequestTag tag = top.next_request().tag; +#endif + + // pop request and adjust heaps + top.pop_request(); + +#ifndef DO_NOT_DELAY_TAG_CALC + if (top.has_request()) { + ClientReq& next_first = top.next_request(); + const ClientInfo* client_info = get_cli_info(top); + assert(client_info); + next_first.tag = RequestTag(tag, *client_info, + top.cur_delta, top.cur_rho, + next_first.tag.arrival, + 0.0, anticipation_timeout); + + // copy tag to previous tag for client + top.update_req_tag(next_first.tag, tick); + } +#endif + + resv_heap.demote(top); + limit_heap.adjust(top); +#if USE_PROP_HEAP + prop_heap.demote(top); +#endif + ready_heap.demote(top); + + // process + process(top.client, request); + } // pop_process_request + + + // data_mtx should be held when called + void reduce_reservation_tags(ClientRec& client) { + for (auto& r : client.requests) { + r.tag.reservation -= client.info->reservation_inv; + +#ifndef DO_NOT_DELAY_TAG_CALC + // reduce only for front tag. because next tags' value are invalid + break; +#endif + } + // don't forget to update previous tag + client.prev_tag.reservation -= client.info->reservation_inv; + resv_heap.promote(client); + } + + + // data_mtx should be held when called + void reduce_reservation_tags(const C& client_id) { + auto client_it = client_map.find(client_id); + + // means the client was cleaned from map; should never happen + // as long as cleaning times are long enough + assert(client_map.end() != client_it); + reduce_reservation_tags(*client_it->second); + } + + + // data_mtx should be held when called + NextReq do_next_request(Time now) { + // if reservation queue is empty, all are empty (i.e., no + // active clients) + if(resv_heap.empty()) { + return NextReq::none(); + } + + // try constraint (reservation) based scheduling + + auto& reserv = resv_heap.top(); + if (reserv.has_request() && + reserv.next_request().tag.reservation <= now) { + return NextReq(HeapId::reservation); + } + + // no existing reservations before now, so try weight-based + // scheduling + + // all items that are within limit are eligible based on + // priority + auto limits = &limit_heap.top(); + while (limits->has_request() && + !limits->next_request().tag.ready && + limits->next_request().tag.limit <= now) { + limits->next_request().tag.ready = true; + ready_heap.promote(*limits); + limit_heap.demote(*limits); + + limits = &limit_heap.top(); + } + + auto& readys = ready_heap.top(); + if (readys.has_request() && + readys.next_request().tag.ready && + readys.next_request().tag.proportion < max_tag) { + return NextReq(HeapId::ready); + } + + // if nothing is schedulable by reservation or + // proportion/weight, and if we allow limit break, try to + // schedule something with the lowest proportion tag or + // alternatively lowest reservation tag. + if (allow_limit_break) { + if (readys.has_request() && + readys.next_request().tag.proportion < max_tag) { + return NextReq(HeapId::ready); + } else if (reserv.has_request() && + reserv.next_request().tag.reservation < max_tag) { + return NextReq(HeapId::reservation); + } + } + + // nothing scheduled; make sure we re-run when next + // reservation item or next limited item comes up + + Time next_call = TimeMax; + if (resv_heap.top().has_request()) { + next_call = + min_not_0_time(next_call, + resv_heap.top().next_request().tag.reservation); + } + if (limit_heap.top().has_request()) { + const auto& next = limit_heap.top().next_request(); + assert(!next.tag.ready || max_tag == next.tag.proportion); + next_call = min_not_0_time(next_call, next.tag.limit); + } + if (next_call < TimeMax) { + return NextReq(next_call); + } else { + return NextReq::none(); + } + } // do_next_request + + + // if possible is not zero and less than current then return it; + // otherwise return current; the idea is we're trying to find + // the minimal time but ignoring zero + static inline const Time& min_not_0_time(const Time& current, + const Time& possible) { + return TimeZero == possible ? current : std::min(current, possible); + } + + + /* + * This is being called regularly by RunEvery. Every time it's + * called it notes the time and delta counter (mark point) in a + * deque. It also looks at the deque to find the most recent + * mark point that is older than clean_age. It then walks the + * map and delete all server entries that were last used before + * that mark point. + */ + void do_clean() { + TimePoint now = std::chrono::steady_clock::now(); + DataGuard g(data_mtx); + clean_mark_points.emplace_back(MarkPoint(now, tick)); + + // first erase the super-old client records + + Counter erase_point = 0; + auto point = clean_mark_points.front(); + while (point.first <= now - erase_age) { + erase_point = point.second; + clean_mark_points.pop_front(); + point = clean_mark_points.front(); + } + + Counter idle_point = 0; + for (auto i : clean_mark_points) { + if (i.first <= now - idle_age) { + idle_point = i.second; + } else { + break; + } + } + + if (erase_point > 0 || idle_point > 0) { + for (auto i = client_map.begin(); i != client_map.end(); /* empty */) { + auto i2 = i++; + if (erase_point && i2->second->last_tick <= erase_point) { + delete_from_heaps(i2->second); + client_map.erase(i2); + } else if (idle_point && i2->second->last_tick <= idle_point) { + i2->second->idle = true; + } + } // for + } // if + } // do_clean + + + // data_mtx must be held by caller + template + void delete_from_heap(ClientRecRef& client, + c::IndIntruHeap& heap) { + auto i = heap.rfind(client); + heap.remove(i); + } + + + // data_mtx must be held by caller + void delete_from_heaps(ClientRecRef& client) { + delete_from_heap(client, resv_heap); +#if USE_PROP_HEAP + delete_from_heap(client, prop_heap); +#endif + delete_from_heap(client, limit_heap); + delete_from_heap(client, ready_heap); + } + }; // class PriorityQueueBase + + + template + class PullPriorityQueue : public PriorityQueueBase { + using super = PriorityQueueBase; + + public: + + // When a request is pulled, this is the return type. + struct PullReq { + struct Retn { + C client; + typename super::RequestRef request; + PhaseType phase; + }; + + typename super::NextReqType type; + boost::variant data; + + bool is_none() const { return type == super::NextReqType::none; } + + bool is_retn() const { return type == super::NextReqType::returning; } + Retn& get_retn() { + return boost::get(data); + } + + bool is_future() const { return type == super::NextReqType::future; } + Time getTime() const { return boost::get