/usr/include/arc/data-staging/Scheduler.h is in nordugrid-arc-dev 4.0.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 | #ifndef SCHEDULER_H_
#define SCHEDULER_H_
#include <arc/Thread.h>
#include <arc/Logger.h>
#include <arc/URL.h>
#include <arc/data/URLMap.h>
#include "DTR.h"
#include "DTRList.h"
#include "Processor.h"
#include "DataDelivery.h"
#include "TransferShares.h"
namespace DataStaging {
/// The Scheduler is the control centre of the data staging framework.
/**
* The Scheduler manages a global list of DTRs and schedules when they should
* go into the next state or be sent to other processes. The DTR priority is
* used to decide each DTR's position in a queue.
* \ingroup datastaging
* \headerfile Scheduler.h arc/data-staging/Scheduler.h
*/
class Scheduler: public DTRCallback {
private:
/// All the DTRs the scheduler is aware of.
/** The DTR comes to this list once received from the generator
* and leaves the list only when pushed back to the generator. */
DTRList DtrList;
/// A list of jobs that have been requested to be cancelled.
/** External threads add items to this list, and the Scheduler
* processes it during the main loop. */
std::list<std::string> cancelled_jobs;
/// A list of DTRs to process
std::list<DTR_ptr> events;
/// Map of transfer shares to staged DTRs. Filled each event processing loop
std::map<std::string, std::list<DTR_ptr> > staged_queue;
/// A lock for the cancelled jobs list
Arc::SimpleCondition cancelled_jobs_lock;
/// Configuration of transfer shares
TransferSharesConf transferSharesConf;
/// URLMap containing information on any local mappings defined in the configuration
Arc::URLMap url_map;
/// Preferred pattern to match replicas defined in configuration
std::string preferred_pattern;
/// Lock to protect multi-threaded access to start() and stop()
Arc::SimpleCondition state_lock;
/// Lock for events list
Arc::SimpleCondition event_lock;
/// Condition to signal end of running
Arc::SimpleCondition run_signal;
/// Condition to signal end of dump thread
Arc::SimpleCondition dump_signal;
/// Limit on number of DTRs in pre-processor
unsigned int PreProcessorSlots;
/// Limit on number of DTRs in delivery
unsigned int DeliverySlots;
/// Limit on number of DTRs in post-processor
unsigned int PostProcessorSlots;
/// Limit on number of emergency DTRs in each state
unsigned int EmergencySlots;
/// Limit on number of staged-prepared files, per share
unsigned int StagedPreparedSlots;
/// Where to dump DTR state. Currently only a path to a file is supported.
std::string dumplocation;
/// Endpoints of delivery services from configuration
std::vector<Arc::URL> configured_delivery_services;
/// Map of delivery services and directories they can access, filled after
/// querying all services when the first DTR is processed
std::map<Arc::URL, std::vector<std::string> > usable_delivery_services;
/// Timestamp of last check of delivery services
Arc::Time delivery_last_checked;
/// File size limit (in bytes) under which local transfer is used
unsigned long long int remote_size_limit;
/// Logger object
static Arc::Logger logger;
/// Root logger destinations, to use when logging non-DTR specific messages
std::list<Arc::LogDestination*> root_destinations;
/// Flag describing scheduler state. Used to decide whether to keep running main loop.
ProcessState scheduler_state;
/// Processor object
Processor processor;
/// Delivery object
DataDelivery delivery;
/// Static instance of Scheduler
static Scheduler* scheduler_instance;
/// Lock for multiple threads getting static Scheduler instance
static Glib::Mutex instance_lock;
/// Copy constructor is private because Scheduler should not be copied
Scheduler(const Scheduler&); // should not happen
/// Assignment operator is private because Scheduler should not be copied
Scheduler& operator=(const Scheduler&); // should not happen
/* Functions to process every state of the DTR during normal workflow */
/// Process a DTR in the NEW state
void ProcessDTRNEW(DTR_ptr request);
/// Process a DTR in the CACHE_WAIT state
void ProcessDTRCACHE_WAIT(DTR_ptr request);
/// Process a DTR in the CACHE_CHECKED state
void ProcessDTRCACHE_CHECKED(DTR_ptr request);
/// Process a DTR in the RESOLVED state
void ProcessDTRRESOLVED(DTR_ptr request);
/// Process a DTR in the REPLICA_QUERIED state
void ProcessDTRREPLICA_QUERIED(DTR_ptr request);
/// Process a DTR in the PRE_CLEANED state
void ProcessDTRPRE_CLEANED(DTR_ptr request);
/// Process a DTR in the STAGING_PREPARING_WAIT state
void ProcessDTRSTAGING_PREPARING_WAIT(DTR_ptr request);
/// Process a DTR in the STAGED_PREPARED state
void ProcessDTRSTAGED_PREPARED(DTR_ptr request);
/// Process a DTR in the TRANSFERRED state
void ProcessDTRTRANSFERRED(DTR_ptr request);
/// Process a DTR in the REQUEST_RELEASED state
void ProcessDTRREQUEST_RELEASED(DTR_ptr request);
/// Process a DTR in the REPLICA_REGISTERED state
void ProcessDTRREPLICA_REGISTERED(DTR_ptr request);
/// Process a DTR in the CACHE_PROCESSED state
void ProcessDTRCACHE_PROCESSED(DTR_ptr request);
/// Process a DTR in a final state
/* This is a special function to deal with states after which
* the DTR is returned to the generator, i.e. DONE, ERROR, CANCELLED */
void ProcessDTRFINAL_STATE(DTR_ptr request);
/// Log a message to the root logger. This sends the message to the log
/// destinations attached to the root logger at the point the Scheduler
/// was started.
void log_to_root_logger(Arc::LogLevel level, const std::string& message);
/// Call the appropriate Process method depending on the DTR state
void map_state_and_process(DTR_ptr request);
/// Maps the DTR to the appropriate state when it is cancelled.
/** This is a separate function, since cancellation request
* can arrive at any time, breaking the normal workflow. */
void map_cancel_state(DTR_ptr request);
/// Map a DTR stuck in a processing state to new state from which it can
/// recover and retry.
void map_stuck_state(DTR_ptr request);
/// Choose a delivery service for the DTR, based on the file system paths
/// each service can access. These paths are determined by calling all the
/// configured services when the first DTR is received.
void choose_delivery_service(DTR_ptr request);
/// Go through all DTRs waiting to go into a processing state and decide
/// whether to push them into that state, depending on shares and limits.
void revise_queues();
/// Add a new event for the Scheduler to process. Used in receiveDTR().
void add_event(DTR_ptr event);
/// Process the pool of DTRs which have arrived from other processes
void process_events(void);
/// Move to the next replica in the DTR.
/** Utility function which should be called in the case of error
* if the next replica should be tried. It takes care of sending
* the DTR to the appropriate state, depending on whether or not
* there are more replicas to try.
*/
void next_replica(DTR_ptr request);
/// Handle a DTR whose source is mapped to another URL.
/** If a file is mapped, this method should be called to deal
* with the mapping. It sets the mapped_file attribute of
* request to mapped_url. Returns true if the processing was
* successful.
*/
bool handle_mapped_source(DTR_ptr request, Arc::URL& mapped_url);
/// Thread method for dumping state
static void dump_thread(void* arg);
/// Static version of main_thread, used when thread is created
static void main_thread(void* arg);
/// Main thread, which runs until stopped
void main_thread(void);
public:
/// Get static instance of Scheduler, to use one DTR instance with multiple generators.
/**
* Configuration of Scheduler by Set* methods can only be done before
* start() is called, so undetermined behaviour can result from multiple
* threads simultaneously calling Set* then start(). It is safer to make
* sure that all threads use the same configuration (calling start() twice
* is harmless). It is also better to make sure that threads call stop() in
* a roughly coordinated way, i.e. all generators stop at the same time.
*/
static Scheduler* getInstance();
/// Constructor, to be used when only one Generator uses this Scheduler.
Scheduler();
/// Destructor calls stop(), which cancels all DTRs and waits for them to complete
~Scheduler() { stop(); };
/* The following Set/Add methods are only effective when called before start() */
/// Set number of slots for processor and delivery stages
void SetSlots(int pre_processor = 0, int post_processor = 0,
int delivery = 0, int emergency = 0, int staged_prepared = 0);
/// Add URL mapping entry. See Arc::URLMap.
void AddURLMapping(const Arc::URL& template_url, const Arc::URL& replacement_url,
const Arc::URL& access_url = Arc::URL());
/// Replace all URL mapping entries
void SetURLMapping(const Arc::URLMap& mapping = Arc::URLMap());
/// Set the preferred pattern for ordering replicas.
/**
* This pattern will be used in the case of an index service URL with
* multiple physical replicas and allows sorting of those replicas in order
* of preference. It consists of one or more patterns separated by a pipe
* character (|) listed in order of preference. If the dollar character ($)
* is used at the end of a pattern, the pattern will be matched to the end
* of the hostname of the replica. Example: "srm://myhost.org|.uk$|.ch$"
*/
void SetPreferredPattern(const std::string& pattern);
/// Set TransferShares configuration
void SetTransferSharesConf(const TransferSharesConf& share_conf);
/// Set transfer limits
void SetTransferParameters(const TransferParameters& params);
/// Set the list of delivery services. DTR::LOCAL_DELIVERY means local Delivery.
void SetDeliveryServices(const std::vector<Arc::URL>& endpoints);
/// Set the remote transfer size limit
void SetRemoteSizeLimit(unsigned long long int limit);
/// Set location for periodic dump of DTR state (only file paths currently supported)
void SetDumpLocation(const std::string& location);
/// Start scheduling activity.
/**
* This method must be called after all configuration parameters are set
* properly. Scheduler can be stopped either by calling stop() method or
* by destroying its instance.
*/
bool start(void);
/// Callback method implemented from DTRCallback.
/**
* This method is called by the generator when it wants to pass a DTR
* to the scheduler and when other processes send a DTR back to the
* scheduler after processing.
*/
virtual void receiveDTR(DTR_ptr dtr);
/// Tell the Scheduler to cancel all the DTRs in the given job description
bool cancelDTRs(const std::string& jobid);
/// Tell the Scheduler to shut down all threads and exit.
/**
* All active DTRs are cancelled and this method waits until they finish
* (all DTRs go to CANCELLED state)
*/
bool stop();
};
} // namespace DataStaging
#endif /*SCHEDULER_H_*/
|