00001 /* 00002 * Copyright 2015 CERN 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 * 00016 */ 00017 #ifndef DMLITE_TASKEXEC_H 00018 #define DMLITE_TASKEXEC_H 00019 00020 /** @file dmTaskExec.h 00021 * @brief A class that spawns commands that perform actions 00022 * @author Fabrizio Furano 00023 * @date Dec 2015 00024 */ 00025 00026 00027 #include <boost/thread.hpp> 00028 #include <signal.h> 00029 #include <vector> 00030 #include <string> 00031 #include <algorithm> 00032 #include <sstream> 00033 #include <iterator> 00034 #include <iostream> 00035 #include <dmlite/cpp/utils/logger.h> 00036 00037 namespace dmlite { 00038 class dmTaskExec; 00039 00040 class dmTask: public boost::mutex { 00041 00042 protected: 00043 /// Threads waiting for result about this task will wait and synchronize here 00044 /// using something like 00045 /// boost::lock_guard< boost::mutex > l(workmutex); 00046 /// 00047 boost::condition_variable condvar; 00048 public: 00049 dmTask(dmTaskExec *wheretolog); 00050 dmTask(const dmTask &o) { 00051 key = o.key; 00052 cmd = o.cmd; 00053 for(unsigned int i = 0; i < 64; i++) parms[i] = NULL; 00054 resultcode = o.resultcode; 00055 starttime = o.starttime; 00056 endtime = o.endtime; 00057 finished = o.finished; 00058 fd[0] = 0; fd[1] = 0; fd[2] = 0; 00059 this->stdout = o.stdout; 00060 this->loggerinst = o.loggerinst; 00061 } 00062 00063 ~dmTask(); 00064 int key; 00065 00066 std::string cmd; 00067 const char *parms[64]; 00068 00069 int resultcode; 00070 00071 time_t starttime, endtime; 00072 bool finished; 00073 00074 int fd[3]; 00075 pid_t pid; 00076 std::string stdout; 00077 00078 /// Split che command string into the single parms 00079 void splitCmd(); 00080 00081 /// Wait until the task has finished or the timeout is expired 00082 int waitFinished(int tmout=5); 00083 00084 void notifyAll() { 00085 condvar.notify_all(); 00086 } 00087 00088 dmTaskExec *loggerinst; 00089 }; 00090 00091 00092 /// Allows to spawn commands, useful for checksum calculations or file pulling 00093 /// The spawned commands are pollable, i.e. in a given moment it's possible to 00094 /// know the list of commands that are still running. 00095 /// Objects belonging to this class in general are created in the disk nodes, 00096 /// e.g. for running checksums or file copies and pulls 00097 class dmTaskExec: public boost::recursive_mutex { 00098 00099 public: 00100 dmTaskExec(); 00101 ~dmTaskExec(); 00102 std::string instance; 00103 /// Executes a command. Returns a positive integer as a key to reference 00104 /// the execution status and the result 00105 /// The mechanics is that a detached thread is started. This guy invokes popen3 00106 /// and blocks waiting for the process to end. Upon end it updates the corresponding 00107 /// instance of dmTask with the result and the stdout 00108 int submitCmd(std::string cmd); 00109 00110 00111 /// Executes a command. Returns a positive integer as a key to reference 00112 // the execution status and the result 00113 // The mechanics is that a detached thread is started. This guy invokes popen3 00114 // and blocks waiting for the process to end. Upon end it updates the corresponding 00115 // instance of dmTask with the result and the stdout 00116 // -1 is returned in case of error in the submission 00117 int submitCmd(std::vector<std::string> &args); 00118 00119 /// Actually starts the thread corresponding to a command that was just submitted 00120 /// Avoids race conditions 00121 void goCmd(int id); 00122 00123 /// Split che command string into the single parms 00124 void assignCmd(dmTask *task, std::vector<std::string> &args); 00125 00126 /// Get the results of a task. 00127 /// Wait at max tmout seconds until the task finishes 00128 /// Return 0 if the task has finished and there is a result 00129 /// Return nonzero if the task is still running 00130 int waitResult(int taskID, int tmout=5); 00131 00132 //kill a specific task given the id 00133 int killTask(int taskID); 00134 00135 //get a dmTask given the id ( mainly for testing) 00136 dmTask* getTask(int taskID); 00137 00138 //get the current stdout of a task which may be running 00139 int getTaskStdout(int taskID, std::string &stdout); 00140 00141 /// Loops over all the tasks and: 00142 /// - send a notification to the head node about all the processes that are running or that have finished 00143 /// - garbage collect the task list. 00144 /// - Task that are finished since long (e.g. 1 hour) 00145 /// - Tasks that are stuck (e.g. 1 day) 00146 void tick(); 00147 00148 int getTaskCounters(int &tot, int &running); 00149 00150 00151 /// Event invoked internally to log stuff 00152 virtual void onLoggingRequest(Logger::Level lvl, std::string const & msg) = 0; 00153 /// Event invoked internally to log stuff 00154 virtual void onErrLoggingRequest(std::string const & msg) = 0; 00155 00156 protected: 00157 00158 /// event for immediate notifications when a task finishes 00159 /// Subclasses can specialize this and apply app-dependent behavior to 00160 /// perform actions when something has finished running 00161 /// NOTE the signature. This passes copies of Task objects, not the originals 00162 virtual void onTaskCompleted(dmTask &task); 00163 00164 // event that notifies that a task is running 00165 // This event can be invoked multiple times during the life of a task 00166 /// NOTE the signature. This passes copies of Task objects, not the originals 00167 virtual void onTaskRunning(dmTask &task); 00168 00169 00170 private: 00171 00172 int popen3(int fd[3], pid_t *pid, const char ** argv ); 00173 00174 /// Used to create keys to be inserted into the map. This has to be treated modulo MAXINT or similar big number 00175 int taskcnt; 00176 /// This map works like a sparse array :-) 00177 std::map<int, dmTask*> tasks; 00178 00179 00180 /// Here we invoke popen3 00181 /// and block waiting for the process to end. Upon end it updates the corresponding 00182 /// instance of dmTask with the result and the stdout 00183 virtual void run(int key); 00184 00185 //kill a specific task 00186 int killTask(dmTask *task); 00187 }; 00188 00189 00190 00191 } 00192 00193 00194 00195 00196 00197 00198 00199 00200 00201 00202 00203 00204 00205 #endif 00206