/usr/include/arc/data-staging/DTR.h is in nordugrid-arc-dev 4.0.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 | // Summary page of data staging for doxygen
namespace DataStaging {
/**
* \defgroup datastaging ARC data staging (libarcdatastaging)
*
* ARC data staging components form a complete data transfer management system.
* Whereas \ref data is a library for data access, enabling several types of
* operation on data files on the Grid using a variety of access protocols,
* \ref datastaging is a framework for managed data transfer to and from the
* Grid. The data staging system is designed to run as a persistent process, to
* execute data transfers on demand. Data transfers are defined and fed into
* the system, and then notification is given when they complete. No knowledge
* is required of the internal workings of the Grid, a user only needs to
* specify URLs representing the source and destination of the transfer.
*
* The system is highly configurable and features an intelligent priority,
* fair-share and error handling mechanism, as well as the ability to spread
* data transfer across multiple hosts using ARC's DataDelivery service. It is
* used by ARC's Computing Element (A-REX) for pre- and post- job data transfer
* of input and output files. Note that this system is primarily for data
* transfer to and from local files and that third-party transfer is not
* supported. It is designed for the case of pulling or pushing data between
* the Grid and a local file system, rather than a service for transfer between
* two Grid storage elements. It is possible to transfer data between two
* remote endpoints, but all data flows through the client.
*
* Simple examples of how to use libarcdatastaging are shown for several
* languages in the \ref dtrgenerator "DTR examples page". In all the examples
* a Generator class receives as input a source and destination, and creates
* a DTR which describes the data transfer. It is then passed to the Scheduler
* and the Generator defines a receiveDTR() method for the Scheduler to calls
* to notify that the transfer has finished. The examples all allow using the
* Generator as a basic copy tool from the command line to copy a single file.
*
* For more information see http://wiki.nordugrid.org/index.php/Data_Staging
*/
} // namespace DataStaging
#ifndef DTR_H_
#define DTR_H_
#include <arc/data/DataHandle.h>
#include <arc/CheckSum.h>
#include <arc/data/URLMap.h>
#include <arc/DateTime.h>
#include <arc/Logger.h>
#include <arc/User.h>
#include <arc/UserConfig.h>
#include <arc/Thread.h>
#include "DTRStatus.h"
#ifdef WIN32
#ifndef uid_t
#define uid_t int
#endif
#ifndef gid_t
#define gid_t int
#endif
#endif
/// DataStaging contains all components for data transfer scheduling and execution.
namespace DataStaging {
class DTR;
/// Provides automatic memory management of DTRs and thread-safe destruction.
/** \ingroup datastaging */
typedef Arc::ThreadedPointer<DTR> DTR_ptr;
/// The DTR's Logger object can be used outside the DTR object with DTRLogger.
/** \ingroup datastaging */
typedef Arc::ThreadedPointer<Arc::Logger> DTRLogger;
/// Components of the data staging framework
/** \ingroup datastaging */
enum StagingProcesses {
GENERATOR, ///< Creator of new DTRs and receiver of completed DTRs
SCHEDULER, ///< Controls queues and moves DTRs bewteen other components when necessary
PRE_PROCESSOR, ///< Performs all pre-transfer operations
DELIVERY, ///< Performs physical transfer
POST_PROCESSOR ///< Performs all post-transfer operations
};
/// Internal state of StagingProcesses
/** \ingroup datastaging */
enum ProcessState {
INITIATED, ///< Process is ready to start
RUNNING, ///< Process is running
TO_STOP, ///< Process has been instructed to stop
STOPPED ///< Proecess has stopped
};
/// Represents limits and properties of a DTR transfer. These generally apply to all DTRs.
/**
* \ingroup datastaging
* \headerfile DTR.h arc/data-staging/DTR.h
*/
class TransferParameters {
public:
/// Minimum average bandwidth in bytes/sec.
/**
* If the average bandwidth used over the whole transfer drops below this
* level the transfer will be killed.
*/
unsigned long long int min_average_bandwidth;
/// Maximum inactivity time in sec.
/**
* If transfer stops for longer than this time it will be killed.
*/
unsigned int max_inactivity_time;
/// Minimum current bandwidth in bytes/sec.
/**
* If bandwidth averaged over the previous averaging_time seconds is less
* than min_current_bandwidth the transfer will be killed (allows transfers
* which slow down to be killed quicker).
*/
unsigned long long int min_current_bandwidth;
/// The time in seconds over which to average the calculation of min_current_bandwidth.
unsigned int averaging_time;
/// Constructor. Initialises all values to zero.
TransferParameters() : min_average_bandwidth(0), max_inactivity_time(0),
min_current_bandwidth(0), averaging_time(0) {};
};
/// The configured cache directories
/**
* \ingroup datastaging
* \headerfile DTR.h arc/data-staging/DTR.h
*/
class DTRCacheParameters {
public:
/// List of (cache dir [link dir])
std::vector<std::string> cache_dirs;
/// List of (cache dir [link dir]) for remote caches
std::vector<std::string> remote_cache_dirs;
/// List of draining caches. Not necessary for data staging but here for completeness.
std::vector<std::string> drain_cache_dirs;
/// Constructor with empty lists initialised
DTRCacheParameters(void) {};
/// Constructor with supplied cache lists
DTRCacheParameters(std::vector<std::string> caches,
std::vector<std::string> remote_caches,
std::vector<std::string> drain_caches);
};
/// Represents possible cache states of this DTR
/** \ingroup datastaging */
enum CacheState {
CACHEABLE, ///< Source should be cached
NON_CACHEABLE, ///< Source should not be cached
CACHE_ALREADY_PRESENT, ///< Source is available in cache from before
CACHE_DOWNLOADED, ///< Source has just been downloaded and put in cache
CACHE_LOCKED, ///< Cache file is locked
CACHE_SKIP, ///< Source is cacheable but due to some problem should not be cached
CACHE_NOT_USED ///< Cache was started but was not used
};
/// The base class from which all callback-enabled classes should be derived.
/**
* This class is a container for a callback method which is called when a
* DTR is to be passed to a component. Several components in data staging
* (eg Scheduler, Generator) are subclasses of DTRCallback, which allows
* them to receive DTRs through the callback system.
* \ingroup datastaging
* \headerfile DTR.h arc/data-staging/DTR.h
*/
class DTRCallback {
public:
/// Empty virtual destructor
virtual ~DTRCallback() {};
/// Defines the callback method called when a DTR is pushed to this object.
/**
* The automatic memory management of DTR_ptr ensures that the DTR object
* is only deleted when the last copy is deleted.
*/
virtual void receiveDTR(DTR_ptr dtr) = 0;
// TODO
//virtual void suspendDTR(DTR& dtr) = 0;
//virtual void cancelDTR(DTR& dtr) = 0;
};
/// Data Transfer Request.
/**
* DTR stands for Data Transfer Request and a DTR describes a data transfer
* between two endpoints, a source and a destination. There are several
* parameters and options relating to the transfer contained in a DTR.
* The normal workflow is for a Generator to create a DTR and send it to the
* Scheduler for processing using DTR::push(SCHEDULER). If the Generator is a
* subclass of DTRCallback, when the Scheduler has finished with the DTR
* the DTRCallback::receiveDTR() callback method is called.
*
* DTRs should always be used through the Arc::ThreadedPointer DTR_ptr. This
* ensures proper memory management when passing DTRs among various threads.
* To enforce this policy the copy constructor and assignment operator are
* private.
*
* A lock protects member variables that are likely to be accessed and
* modified by multiple threads.
* \ingroup datastaging
* \headerfile DTR.h arc/data-staging/DTR.h
*/
class DTR {
private:
/// Identifier
std::string DTR_ID;
/// UserConfig and URL objects. Needed as DataHandle keeps a reference to them.
Arc::URL source_url;
Arc::URL destination_url;
Arc::UserConfig cfg;
/// Source file
Arc::DataHandle source_endpoint;
/// Destination file
Arc::DataHandle destination_endpoint;
/// Source file as a string
std::string source_url_str;
/// Destination file as a string
std::string destination_url_str;
/// Endpoint of cached file.
/* Kept as string so we don't need to duplicate DataHandle properties
* of destination. Delivery should check if this is set and if so use
* it as destination. */
std::string cache_file;
/// Cache configuration
DTRCacheParameters cache_parameters;
/// Cache state for this DTR
CacheState cache_state;
/// Local user information
Arc::User user;
/// Whether the credentials for this DTR are of type RFC proxy (and
/// hence remote delivery service can be used for transfer)
bool rfc_proxy;
/// Job that requested the transfer. Could be used as a generic way of grouping DTRs.
std::string parent_job_id;
/// A flattened number set by the scheduler
int priority;
/// Transfer share this DTR belongs to
std::string transfershare;
/// This string can be used to form sub-sets of transfer shares.
/** It is appended to transfershare. It can be used by the Generator
* for example to split uploads and downloads into separate shares or
* make shares for different endpoints. */
std::string sub_share;
/// Number of attempts left to complete this DTR
unsigned int tries_left;
/// Initial number of attempts
unsigned int initial_tries;
/// A flag to say whether the DTR is replicating inside the same LFN of an index service
bool replication;
/// A flag to say whether to forcibly register the destination in an index service.
/** Even if the source is not the same file, the destination will be
* registered to an existing LFN. It should be set to true in
* the case where an output file is uploaded to several locations but
* with the same index service LFN */
bool force_registration;
/// The file that the current source is mapped to.
/** Delivery should check if this is set and if so use this as source. */
std::string mapped_source;
/// Status of the DTR
DTRStatus status;
/// Error status of the DTR
DTRErrorStatus error_status;
/// Number of bytes transferred so far
unsigned long long int bytes_transferred; // TODO and/or offset?
/** Timing variables **/
/// When should we finish the current action
Arc::Time timeout;
/// Creation time
Arc::Time created;
/// Modification time
Arc::Time last_modified;
/// Wait until this time before doing more processing
Arc::Time next_process_time;
/// True if some process requested cancellation
bool cancel_request;
/// Bulk start flag
bool bulk_start;
/// Bulk end flag
bool bulk_end;
/// Whether bulk operations are supported for the source
bool source_supports_bulk;
/// Endpoint of delivery service this DTR is scheduled for.
/** By default it is LOCAL_DELIVERY so local Delivery is used. */
Arc::URL delivery_endpoint;
/// List of problematic endpoints - those which the DTR definitely cannot use
std::vector<Arc::URL> problematic_delivery_endpoints;
/// Whether to use host instead of user credentials for contacting remote delivery services.
bool use_host_cert_for_remote_delivery;
/// The process in charge of this DTR right now
StagingProcesses current_owner;
/// Logger object.
/** Creation and deletion of this object should be managed
* in the Generator and a pointer passed in the DTR constructor. */
DTRLogger logger;
/// Log Destinations.
/** This list is kept here so that the Logger can be connected and
* disconnected in threads which have their own root logger
* to avoid duplicate messages */
std::list<Arc::LogDestination*> log_destinations;
/// List of callback methods called when DTR moves between processes
std::map<StagingProcesses,std::list<DTRCallback*> > proc_callback;
/// Lock to avoid collisions while changing DTR properties
Arc::SimpleCondition lock;
/** Possible fields (types, names and so on are subject to change) **
/// DTRs that are grouped must have the same number here
int affiliation;
/// History of recent statuses
DTRStatus::DTRStatusType *history_of_statuses;
**/
/* Methods */
/// Change modification time
void mark_modification () { last_modified.SetTime(time(NULL)); };
/// Get the list of callbacks for this owner. Protected by lock.
std::list<DTRCallback*> get_callbacks(const std::map<StagingProcesses, std::list<DTRCallback*> >& proc_callback,
StagingProcesses owner);
/// Private and not implemented because DTR_ptr should always be used.
DTR& operator=(const DTR& dtr);
DTR(const DTR& dtr);
DTR();
public:
/// URL that is used to denote local Delivery should be used
static const Arc::URL LOCAL_DELIVERY;
/// Log level for all DTR activity
static Arc::LogLevel LOG_LEVEL;
/// Normal constructor.
/** Construct a new DTR.
* @param source Endpoint from which to read data
* @param destination Endpoint to which to write data
* @param usercfg Provides some user configuration information
* @param jobid ID of the job associated with this data transfer
* @param uid UID to use when accessing local file system if source
* or destination is a local file. If this is different to the current
* uid then the current uid must have sufficient privileges to change uid.
* @param log ThreadedPointer containing log object. If NULL the root
* logger is used.
*/
DTR(const std::string& source,
const std::string& destination,
const Arc::UserConfig& usercfg,
const std::string& jobid,
const uid_t& uid,
DTRLogger log);
/// Empty destructor
~DTR() {};
/// Is DTR valid?
operator bool() const {
return (!DTR_ID.empty());
}
/// Is DTR not valid?
bool operator!() const {
return (DTR_ID.empty());
}
/// Register callback objects to be used during DTR processing.
/**
* Objects deriving from DTRCallback can be registered with this method.
* The callback method of these objects will then be called when the DTR
* is passed to the specified owner. Protected by lock.
*/
void registerCallback(DTRCallback* cb, StagingProcesses owner);
/// Reset information held on this DTR, such as resolved replicas, error state etc.
/**
* Useful when a failed DTR is to be retried.
*/
void reset();
/// Set the ID of this DTR. Useful when passing DTR between processes.
void set_id(const std::string& id);
/// Get the ID of this DTR
std::string get_id() const { return DTR_ID; };
/// Get an abbreviated version of the DTR ID - useful to reduce logging verbosity
std::string get_short_id() const;
/// Get source handle. Return by reference since DataHandle cannot be copied
Arc::DataHandle& get_source() { return source_endpoint; };
/// Get destination handle. Return by reference since DataHandle cannot be copied
Arc::DataHandle& get_destination() { return destination_endpoint; };
/// Get source as a string
std::string get_source_str() const { return source_url_str; };
/// Get destination as a string
std::string get_destination_str() const { return destination_url_str; };
/// Get the UserConfig object associated with this DTR
const Arc::UserConfig& get_usercfg() const { return cfg; };
/// Set the timeout for processing this DTR
void set_timeout(time_t value) { timeout.SetTime(Arc::Time().GetTime() + value); };
/// Get the timeout for processing this DTR
Arc::Time get_timeout() const { return timeout; };
/// Set the next processing time to current time + given time
void set_process_time(const Arc::Period& process_time);
/// Get the next processing time for the DTR
Arc::Time get_process_time() const { return next_process_time; };
/// Get the creation time
Arc::Time get_creation_time() const { return created; };
/// Get the modification time
Arc::Time get_modification_time() const { return last_modified; };
/// Get the parent job ID
std::string get_parent_job_id() const { return parent_job_id; };
/// Set the priority
void set_priority(int pri);
/// Get the priority
int get_priority() const { return priority; };
/// Set whether credentials are type RFC proxy
void set_rfc_proxy(bool rfc) { rfc_proxy = rfc; };
/// Get whether credentials are type RFC proxy
bool is_rfc_proxy() const { return rfc_proxy; };
/// Set the transfer share. sub_share is automatically added to transfershare.
void set_transfer_share(const std::string& share_name);
/// Get the transfer share. sub_share is automatically added to transfershare.
std::string get_transfer_share() const { return transfershare; };
/// Set sub-share
void set_sub_share(const std::string& share) { sub_share = share; };
/// Get sub-share
std::string get_sub_share() const { return sub_share; };
/// Set the number of attempts remaining
void set_tries_left(unsigned int tries);
/// Get the number of attempts remaining
unsigned int get_tries_left() const { return tries_left; };
/// Get the initial number of attempts (set by set_tries_left())
unsigned int get_initial_tries() const { return initial_tries; }
/// Decrease attempt number
void decrease_tries_left();
/// Set the status. Protected by lock.
void set_status(DTRStatus stat);
/// Get the status. Protected by lock.
DTRStatus get_status();
/// Set the error status.
/**
* The DTRErrorStatus last error state field is set to the current status
* of the DTR. Protected by lock.
*/
void set_error_status(DTRErrorStatus::DTRErrorStatusType error_stat,
DTRErrorStatus::DTRErrorLocation error_loc,
const std::string& desc="");
/// Set the error status back to NONE_ERROR and clear other fields
void reset_error_status();
/// Get the error status.
DTRErrorStatus get_error_status();
/// Set bytes transferred (should be set by whatever is controlling the transfer)
void set_bytes_transferred(unsigned long long int bytes);
/// Get current number of bytes transferred
unsigned long long int get_bytes_transferred() const { return bytes_transferred; };
/// Set the DTR to be cancelled
void set_cancel_request();
/// Returns true if cancellation has been requested
bool cancel_requested() const { return cancel_request; };
/// Set delivery endpoint
void set_delivery_endpoint(const Arc::URL& endpoint) { delivery_endpoint = endpoint; };
/// Returns delivery endpoint
const Arc::URL& get_delivery_endpoint() const { return delivery_endpoint; };
/// Add problematic endpoint.
/**
* Should only be those endpoints where there is a problem with the service
* itself and not the transfer.
*/
void add_problematic_delivery_service(const Arc::URL& endpoint) { problematic_delivery_endpoints.push_back(endpoint); };
/// Get all problematic endpoints
const std::vector<Arc::URL>& get_problematic_delivery_services() const { return problematic_delivery_endpoints; };
/// Set the flag for using host certificate for contacting remote delivery services
void host_cert_for_remote_delivery(bool host) { use_host_cert_for_remote_delivery = host; };
/// Get the flag for using host certificate for contacting remote delivery services
bool host_cert_for_remote_delivery() const { return use_host_cert_for_remote_delivery; };
/// Set cache filename
void set_cache_file(const std::string& filename);
/// Get cache filename
std::string get_cache_file() const { return cache_file; };
/// Set cache parameters
void set_cache_parameters(const DTRCacheParameters& param) { cache_parameters = param; };
/// Get cache parameters
const DTRCacheParameters& get_cache_parameters() const { return cache_parameters; };
/// Set the cache state
void set_cache_state(CacheState state);
/// Get the cache state
CacheState get_cache_state() const { return cache_state; };
/// Set the mapped file
void set_mapped_source(const std::string& file = "") { mapped_source = file; };
/// Get the mapped file
std::string get_mapped_source() const { return mapped_source; };
/// Find the DTR owner
StagingProcesses get_owner() const { return current_owner; };
/// Get the local user information
Arc::User get_local_user() const { return user; };
/// Set replication flag
void set_replication(bool rep) { replication = rep; };
/// Get replication flag
bool is_replication() const { return replication; };
/// Set force replication flag
void set_force_registration(bool force) { force_registration = force; };
/// Get force replication flag
bool is_force_registration() const { return force_registration; };
/// Set bulk start flag
void set_bulk_start(bool value) { bulk_start = value; };
/// Get bulk start flag
bool get_bulk_start() const { return bulk_start; };
/// Set bulk end flag
void set_bulk_end(bool value) { bulk_end = value; };
/// Get bulk start flag
bool get_bulk_end() const { return bulk_end; };
/// Whether bulk operation is possible according to current state and src/dest
bool bulk_possible();
/// Get Logger object, so that processes can log to this DTR's log
const DTRLogger& get_logger() const { return logger; };
/// Connect log destinations to logger. Only needs to be done after disconnect()
void connect_logger() { if (logger) logger->setDestinations(log_destinations); };
/// Disconnect log destinations from logger.
void disconnect_logger() { if (logger) logger->removeDestinations(); };
/// Pass the DTR from one process to another. Protected by lock.
static void push(DTR_ptr dtr, StagingProcesses new_owner);
/// Suspend the DTR which is in doing transfer in the delivery process
bool suspend();
/// Did an error happen?
bool error() const { return (error_status != DTRErrorStatus::NONE_ERROR); }
/// Returns true if this DTR is about to go into the pre-processor
bool is_destined_for_pre_processor() const;
/// Returns true if this DTR is about to go into the post-processor
bool is_destined_for_post_processor() const;
/// Returns true if this DTR is about to go into delivery
bool is_destined_for_delivery() const;
/// Returns true if this DTR just came from the pre-processor
bool came_from_pre_processor() const;
/// Returns true if this DTR just came from the post-processor
bool came_from_post_processor() const;
/// Returns true if this DTR just came from delivery
bool came_from_delivery() const;
/// Returns true if this DTR just came from the generator
bool came_from_generator() const;
/// Returns true if this DTR is in a final state (finished, failed or cancelled)
bool is_in_final_state() const;
};
/// Helper method to create smart pointer, only for swig bindings
DTR_ptr createDTRPtr(const std::string& source,
const std::string& destination,
const Arc::UserConfig& usercfg,
const std::string& jobid,
const uid_t& uid,
DTRLogger log);
/// Helper method to create smart pointer, only for swig bindings
DTRLogger createDTRLogger(Arc::Logger& parent,
const std::string& subdomain);
} // namespace DataStaging
#endif /*DTR_H_*/
|