TripalJob.inc

File

tripal/includes/TripalJob.inc
View source
  1. <?php
  2. class TripalJob {
  3. /**
  4. * The ID of the job.
  5. */
  6. protected $job_id = NULL;
  7. /**
  8. * Contains the job record for this job.
  9. */
  10. protected $job = NULL;
  11. /**
  12. * The number of items that this importer needs to process. A progress
  13. * can be calculated by dividing the number of items process by this
  14. * number.
  15. */
  16. private $total_items;
  17. /**
  18. * The number of items that have been handled so far. This must never
  19. * be below 0 and never exceed $total_items;
  20. */
  21. private $num_handled;
  22. /**
  23. * The interval when the job progress should be updated. Updating the job
  24. * progress incurrs a database write which takes time and if it occurs to
  25. * frequently can slow down the loader. This should be a value between
  26. * 0 and 100 to indicate a percent interval (e.g. 1 means update the
  27. * progress every time the num_handled increases by 1%).
  28. */
  29. private $interval;
  30. /**
  31. * Each time the job progress is updated this variable gets set. It is
  32. * used to calculate if the $interval has passed for the next update.
  33. */
  34. private $prev_update;
  35. /**
  36. * Instantiates a new TripalJob object.
  37. *
  38. * By default the job object is "empty". It must be associated with
  39. * job details either by calling the load() function or the
  40. * create() function.
  41. */
  42. public function __construct() {
  43. }
  44. /**
  45. * Loads a job for this object.
  46. *
  47. * @param $job_id
  48. * The ID of the job.
  49. */
  50. public function load($job_id) {
  51. // Make sure we have a numeric job_id.
  52. if (!$job_id or !is_numeric($job_id)) {
  53. // If we don't then do a quick double check in case this is a
  54. // TripalJob object in which case, I still have the job_id.
  55. if (is_object($job_id) AND is_a($job_id, 'TripalJob')) {
  56. $job_id = $job_id->job->job_id;
  57. }
  58. // Finally just throw an exception.
  59. // I can't load a job if I don't know which one.
  60. else {
  61. throw new Exception("You must provide the job_id to load the job.");
  62. }
  63. }
  64. $sql = 'SELECT j.* FROM {tripal_jobs} j WHERE j.job_id = :job_id';
  65. $args = array(':job_id' => $job_id);
  66. $this->job = db_query($sql, $args)->fetchObject();
  67. if (!$this->job) {
  68. throw new Exception("Cannot find a job with this ID provided.");
  69. }
  70. // Fix the date/time fields.
  71. $this->job->submit_date_string = $this->job->submit_date ? format_date($this->job->submit_date) : '';
  72. $this->job->start_time_string = $this->job->start_time ? format_date($this->job->start_time): '';
  73. $this->job->end_time_string = $this->job->end_time ? format_date($this->job->end_time): '';
  74. // Unserialize the includes.
  75. $this->job->includes = unserialize($this->job->includes);
  76. // Arguments for jobs used to be stored as plain string with a double colon
  77. // separating them. But as of Tripal v2.0 the arguments are stored as
  78. // a serialized array. To be backwards compatible, we should check for
  79. // serialization and if not then we will use the old style
  80. $this->job->arguments = unserialize($this->job->arguments);
  81. if (!is_array($this->job->arguments)) {
  82. $this->job->arguments = explode("::", $this->job->arguments);
  83. }
  84. }
  85. /**
  86. * Creates a new job.
  87. *
  88. * @param $details
  89. * An associative array of the job details or a single job_id. If the
  90. * details are provided then the job is created and added to the database
  91. * otherwise if a job_id is provided then the object is loaded from the
  92. * database. The following keys are allowed:
  93. * - job_name: The human readable name for the job.
  94. * - modulename: The name of the module adding the job.
  95. * - callback: The name of a function to be called when the job is executed.
  96. * - arguments: An array of arguments to be passed on to the callback.
  97. * - uid: The uid of the user adding the job
  98. * - priority: The priority at which to run the job where the highest
  99. * priority is 10 and the lowest priority is 1. The default
  100. * priority is 10.
  101. * - includes: An array of paths to files that should be included in order
  102. * to execute the job. Use the module_load_include function to get a path
  103. * for a given file.
  104. * - ignore_duplicate: (Optional). Set to TRUE to ignore a job if it has
  105. * the same name as another job which has not yet run. If TRUE and a job
  106. * already exists then this object will reference the job already in the
  107. * queue rather than a new submission. The default is TRUE.
  108. *
  109. * @throws Exception
  110. * On failure an exception is thrown.
  111. *
  112. * @return
  113. * Returns TRUE if the job was succesfully created. Returns FALSE otherwise.
  114. * A return of FALSE does not mean the job creation failed. If the
  115. * ignore_duplicate is set to false and the job already is present in the
  116. * queue then the return value will be FALSE.
  117. */
  118. public function create($details) {
  119. // Set some defaults
  120. if (!array_key_exists('prority', $details)) {
  121. $details['priority'] = 10;
  122. }
  123. if (!array_key_exists('includes', $details)) {
  124. $details['includes'] = array();
  125. }
  126. if (!array_key_exists('ignore_duplicate', $details)) {
  127. $details['ignore_duplicate'] = FALSE;
  128. }
  129. // Make sure the arguments are correct.
  130. if (!$details['job_name']) {
  131. throw new Exception("Must provide a 'job_name' to create a job.");
  132. }
  133. if (!$details['modulename']) {
  134. throw new Exception("Must provide a 'modulename' to create a job.");
  135. }
  136. if (!$details['callback']) {
  137. throw new Exception("Must provide a 'callback' to create a job.");
  138. }
  139. if ($details['ignore_duplicate'] !== FALSE and $details['ignore_duplicate'] !== TRUE) {
  140. throw new Exception("Must provide either TRUE or FALSE for the ignore_duplicate option when creating a job.");
  141. }
  142. $includes = $details['includes'];
  143. foreach ($includes as $path) {
  144. $full_path = $_SERVER['DOCUMENT_ROOT'] . base_path() . $path;
  145. if (!empty($path)) {
  146. if (file_exists($path)) {
  147. require_once($path);
  148. }
  149. elseif (file_exists($full_path)) {
  150. require_once($path);
  151. }
  152. elseif (!empty($path)) {
  153. throw new Exception("Included files for Tripal Job must exist. This path ($full_path) doesn't exist.");
  154. }
  155. }
  156. }
  157. if (!function_exists($details['callback'])) {
  158. throw new Exception("Must provide a valid callback function to the tripal_add_job() function.");
  159. }
  160. if (!is_numeric($details['uid'])) {
  161. throw new Exception("Must provide a numeric \$uid argument to the tripal_add_job() function.");
  162. }
  163. $priority = $details['priority'];
  164. if (!$priority or !is_numeric($priority) or $priority < 1 or $priority > 10) {
  165. throw new Exception("Must provide a numeric \$priority argument between 1 and 10 to the tripal_add_job() function.");
  166. }
  167. $arguments = $details['arguments'];
  168. if (!is_array($arguments)) {
  169. throw new Exception("Must provide an array as the \$arguments argument to the tripal_add_job() function.");
  170. }
  171. // convert the arguments into a string for storage in the database
  172. $args = array();
  173. if (is_array($arguments)) {
  174. $args = serialize($arguments);
  175. }
  176. try {
  177. // Before inserting a new record, and if ignore_duplicate is TRUE then
  178. // check to see if the job already exists.
  179. if ($details['ignore_duplicate'] === TRUE) {
  180. $query = db_select('tripal_jobs', 'tj');
  181. $query->fields('tj', array('job_id'));
  182. $query->condition('job_name', $details['job_name']);
  183. $query->isNull('start_time');
  184. $job_id = $query->execute()->fetchField();
  185. if ($job_id) {
  186. $this->load($job_id);
  187. return FALSE;
  188. }
  189. }
  190. $job_id = db_insert('tripal_jobs')
  191. ->fields(array(
  192. 'job_name' => $details['job_name'],
  193. 'modulename' => $details['modulename'],
  194. 'callback' => $details['callback'],
  195. 'status' => 'Waiting',
  196. 'submit_date' => time(),
  197. 'uid' => $details['uid'],
  198. 'priority' => $priority,
  199. 'arguments' => $args,
  200. 'includes' => serialize($includes),
  201. ))
  202. ->execute();
  203. // Now load the job into this object.
  204. $this->load($job_id);
  205. return TRUE;
  206. }
  207. catch (Exception $e) {
  208. throw new Exception('Cannot create job: ' . $e->getMessage());
  209. }
  210. }
  211. /**
  212. * Cancels the job and prevents it from running.
  213. */
  214. public function cancel() {
  215. if (!$this->job) {
  216. throw new Exception("There is no job associated with this object. Cannot cancel");
  217. }
  218. if ($this->job->status == 'Running') {
  219. throw new Exception("Job Cannot be cancelled it is currently running.");
  220. }
  221. if ($this->job->status == 'Completed') {
  222. throw new Exception("Job Cannot be cancelled it has already finished.");
  223. }
  224. if ($this->job->status == 'Error') {
  225. throw new Exception("Job Cannot be cancelled it is in an error state.");
  226. }
  227. if ($this->job->status == 'Cancelled') {
  228. throw new Exception("Job Cannot be cancelled it is already cancelled.");
  229. }
  230. // Set the end time for this job.
  231. try {
  232. if ($this->job->start_time == 0) {
  233. $record = new stdClass();
  234. $record->job_id = $this->job->job_id;
  235. $record->status = 'Cancelled';
  236. $record->progress = '0';
  237. drupal_write_record('tripal_jobs', $record, 'job_id');
  238. }
  239. }
  240. catch (Exception $e) {
  241. throw new Exception('Cannot cancel job: ' . $e->getMessage());
  242. }
  243. }
  244. /**
  245. * Executes the job.
  246. */
  247. public function run() {
  248. if (!$this->job) {
  249. throw new Exception('Cannot launch job as no job is associated with this object.');
  250. }
  251. try {
  252. // Include the necessary files needed to run the job.
  253. if (is_array($this->job->includes)) {
  254. foreach ($this->job->includes as $path) {
  255. if ($path) {
  256. require_once $path;
  257. }
  258. }
  259. }
  260. // Set the start time for this job.
  261. $record = new stdClass();
  262. $record->job_id = $this->job->job_id;
  263. $record->start_time = time();
  264. $record->status = 'Running';
  265. $record->pid = getmypid();
  266. drupal_write_record('tripal_jobs', $record, 'job_id');
  267. // Callback functions need the job in order to update
  268. // progress. But prior to Tripal v3 the job callback functions
  269. // only accepted a $job_id as the final argument. So, we need
  270. // to see if the callback is Tv3 compatible or older. If older
  271. // we want to still support it and pass the job_id.
  272. $arguments = $this->job->arguments;
  273. $callback = $this->job->callback;
  274. $ref = new ReflectionFunction($callback);
  275. $refparams = $ref->getParameters();
  276. if (count($refparams) > 0) {
  277. $lastparam = $refparams[count($refparams)-1];
  278. if ($lastparam->getName() == 'job_id') {
  279. $arguments[] = $this->job->job_id;
  280. }
  281. else {
  282. $arguments[] = $this;
  283. }
  284. }
  285. // Launch the job.
  286. call_user_func_array($callback, $arguments);
  287. // Set the end time for this job.
  288. $record = new stdClass();
  289. $record->job_id = $this->job->job_id;
  290. $record->end_time = time();
  291. $record->error_msg = $this->job->error_msg;
  292. $record->progress = 100;
  293. $record->status = 'Completed';
  294. $record->pid = '';
  295. drupal_write_record('tripal_jobs', $record, 'job_id');
  296. $this->load($this->job->job_id);
  297. }
  298. catch (Exception $e) {
  299. $record->end_time = time();
  300. $record->error_msg = $this->job->error_msg;
  301. $record->progress = $this->job->progress;
  302. $record->status = 'Error';
  303. $record->pid = '';
  304. drupal_write_record('tripal_jobs', $record, 'job_id');
  305. drupal_set_message('Job execution failed: ' . $e->getMessage(), 'error');
  306. }
  307. }
  308. /**
  309. * Inidcates if the job is running.
  310. *
  311. * @return
  312. * TRUE if the job is running, FALSE otherwise.
  313. */
  314. public function isRunning() {
  315. if (!$this->job) {
  316. throw new Exception('Cannot check running status as no job is associated with this object.');
  317. }
  318. $status = shell_exec('ps -p ' . escapeshellarg($this->job->pid) . ' -o pid=');
  319. if ($this->job->pid && $status) {
  320. // The job is still running.
  321. return TRUE;
  322. }
  323. // return FALSE to indicate that no jobs are currently running.
  324. return FALSE;
  325. }
  326. /**
  327. * Retrieve the job object as if from a database query.
  328. */
  329. public function getJob(){
  330. return $this->job;
  331. }
  332. /**
  333. * Retrieves the job ID.
  334. */
  335. public function getJobID(){
  336. return $this->job->job_id;
  337. }
  338. /**
  339. * Retrieves the user ID of the user that submitted the job.
  340. */
  341. public function getUID() {
  342. return $this->job->uid;
  343. }
  344. /**
  345. * Retrieves the job name.
  346. */
  347. public function getJobName() {
  348. return $this->job->job_name;
  349. }
  350. /**
  351. * Retrieves the name of the module that submitted the job.
  352. */
  353. public function getModuleName() {
  354. return $this->job->modulename;
  355. }
  356. /**
  357. * Retrieves the callback function for the job.
  358. */
  359. public function getCallback() {
  360. return $this->job->callback;
  361. }
  362. /**
  363. * Retrieves the array of arguments for the job.
  364. */
  365. public function getArguments() {
  366. return $this->job->arguments;
  367. }
  368. /**
  369. * Retrieves the current percent complete (i.e. progress) of the job.
  370. */
  371. public function getProgress() {
  372. return $this->job->progress;
  373. }
  374. /**
  375. * Sets the current percent complete of a job.
  376. *
  377. * @param $percent_done
  378. * A value between 0 and 100 indicating the percentage complete of the job.
  379. */
  380. public function setProgress($percent_done) {
  381. if (!$this->job) {
  382. throw new Exception('Cannot set progress as no job is associated with this object.');
  383. }
  384. $this->job->progress = $percent_done;
  385. $progress = sprintf("%d", $percent_done);
  386. db_update('tripal_jobs')
  387. ->fields(array(
  388. 'progress' => $progress,
  389. ))
  390. ->condition('job_id', $this->job->job_id)
  391. ->execute();
  392. }
  393. /**
  394. * Sets the total number if items to be processed.
  395. *
  396. * This should typically be called near the beginning of the loading process
  397. * to indicate the number of items that must be processed.
  398. *
  399. * @param $total_items
  400. * The total number of items to process.
  401. */
  402. public function setTotalItems($total_items) {
  403. $this->total_items = $total_items;
  404. }
  405. /**
  406. * Adds to the count of the total number of items that have been handled.
  407. *
  408. * @param $num_handled
  409. */
  410. public function addItemsHandled($num_handled) {
  411. $items_handled = $this->num_handled = $this->num_handled + $num_handled;
  412. $this->setItemsHandled($items_handled);
  413. }
  414. /**
  415. * Sets the number of items that have been processed.
  416. *
  417. * This should be called anytime the loader wants to indicate how many
  418. * items have been processed. The amount of progress will be
  419. * calculated using this number. If the amount of items handled exceeds
  420. * the interval specified then the progress is reported to the user. If
  421. * this loader is associated with a job then the job progress is also updated.
  422. *
  423. * @param $total_handled
  424. * The total number of items that have been processed.
  425. */
  426. public function setItemsHandled($total_handled) {
  427. // First set the number of items handled.
  428. $this->num_handled = $total_handled;
  429. if ($total_handled == 0) {
  430. $memory = number_format(memory_get_usage());
  431. print "Percent complete: 0%. Memory: " . $memory . " bytes.\r";
  432. return;
  433. }
  434. // Now see if we need to report to the user the percent done. A message
  435. // will be printed on the command-line if the job is run there.
  436. $percent = sprintf("%.2f", ($this->num_handled / $this->total_items) * 100);
  437. $diff = $percent - $this->prev_update;
  438. if ($diff >= $this->interval) {
  439. $memory = number_format(memory_get_usage());
  440. print "Percent complete: " . $percent . "%. Memory: " . $memory . " bytes.\r";
  441. $this->prev_update = $diff;
  442. $this->setProgress($percent);
  443. }
  444. }
  445. /**
  446. * Updates the percent interval when the job progress is updated.
  447. *
  448. * Updating the job
  449. * progress incurrs a database write which takes time and if it occurs to
  450. * frequently can slow down the loader. This should be a value between
  451. * 0 and 100 to indicate a percent interval (e.g. 1 means update the
  452. * progress every time the num_handled increases by 1%).
  453. *
  454. * @param $interval
  455. * A number between 0 and 100.
  456. */
  457. public function setInterval($interval) {
  458. $this->interval = $interval;
  459. }
  460. /**
  461. * Retrieves the status of the job.
  462. */
  463. public function getStatus() {
  464. return $this->job->status;
  465. }
  466. /**
  467. * Retrieves the time the job was submitted.
  468. */
  469. public function getSubmitTime() {
  470. return $this->job->submit_date;
  471. }
  472. /**
  473. * Retieves the time the job began execution (i.e. the start time).
  474. */
  475. public function getStartTime() {
  476. return $this->job->start_time;
  477. }
  478. /**
  479. * Retieves the time the job completed execution (i.e. the end time).
  480. */
  481. public function getEndTime() {
  482. return $this->job->end_time;
  483. }
  484. /**
  485. * Retieves the log for the job.
  486. *
  487. * @return
  488. * A large string containing the text of the job log. It contains both
  489. * status upates and errors.
  490. */
  491. public function getLog() {
  492. return $this->job->error_msg;
  493. }
  494. /**
  495. * Retrieves the process ID of the job.
  496. */
  497. public function getPID() {
  498. return $this->job->pid;
  499. }
  500. /**
  501. * Retreieves the priority that is currently set for the job.
  502. */
  503. public function getPriority() {
  504. return $this->job->priority;
  505. }
  506. /**
  507. * Get the MLock value of the job.
  508. *
  509. * The MLock value indicates if no other jobs from a give module
  510. * should be executed while this job is running.
  511. */
  512. public function getMLock() {
  513. return $this->job->mlock;
  514. }
  515. /**
  516. * Get the lock value of the job.
  517. *
  518. * The lock value indicates if no other jobs from any module
  519. * should be executed while this job is running.
  520. */
  521. public function getLock() {
  522. return $this->job->lock;
  523. }
  524. /**
  525. * Get the list of files that must be included prior to job execution.
  526. */
  527. public function getIncludes() {
  528. return $this->job->includes;
  529. }
  530. /**
  531. * Logs a message for the job.
  532. *
  533. * There is no distinction between status messages and error logs. Any
  534. * message that is intended for the user to review the status of the job
  535. * can be provided here.
  536. *
  537. * Messages that are are of severity TRIPAL_CRITICAL or TRIPAL_ERROR
  538. * are also logged to the watchdog.
  539. *
  540. * Logging works regardless if the job uses a transaction. If the
  541. * transaction must be rolled back to to an error the error messages will
  542. * persist.
  543. *
  544. * @param $message
  545. * The message to store in the log. Keep $message translatable by not
  546. * concatenating dynamic values into it! Variables in the message should
  547. * be added by using placeholder strings alongside the variables argument
  548. * to declare the value of the placeholders. See t() for documentation on
  549. * how $message and $variables interact.
  550. * @param $variables
  551. * Array of variables to replace in the message on display or NULL if
  552. * message is already translated or not possible to translate.
  553. * @param $severity
  554. * The severity of the message; one of the following values:
  555. * - TRIPAL_CRITICAL: Critical conditions.
  556. * - TRIPAL_ERROR: Error conditions.
  557. * - TRIPAL_WARNING: Warning conditions.
  558. * - TRIPAL_NOTICE: Normal but significant conditions.
  559. * - TRIPAL_INFO: (default) Informational messages.
  560. * - TRIPAL_DEBUG: Debug-level messages.
  561. */
  562. public function logMessage($message, $variables = array(), $severity = TRIPAL_INFO) {
  563. // Generate a translated message.
  564. $tmessage = t($message, $variables);
  565. // For the sake of the command-line user, print the message to the
  566. // terminal.
  567. print $tmessage . "\n";
  568. // Add this message to the job's log.
  569. $this->job->error_msg .= "\n" . $tmessage;
  570. // Report this message to watchdog or set a message.
  571. if ($severity == TRIPAL_CRITICAL or $severity == TRIPAL_ERROR) {
  572. tripal_report_error('tripal_job', $severity, $message, $variables);
  573. $this->job->status = 'Error';
  574. }
  575. }
  576. }