tripal.jobs.api.inc

Tripal offers a job management subsystem for managing tasks that may require an extended period of time for completion.

File

tripal/api/tripal.jobs.api.inc
View source
  1. <?php
  2. /**
  3. * @file
  4. * Tripal offers a job management subsystem for managing tasks that may require
  5. * an extended period of time for completion.
  6. */
  7. /**
  8. * @defgroup tripal_jobs_api Jobs
  9. * @ingroup tripal_api
  10. * @{
  11. * Tripal offers a job management subsystem for managing tasks that may require
  12. * an extended period of time for completion. Tripal provides several
  13. * administrative tasks that may time out and not complete due to limitations
  14. * of the web server. To circumvent this, as well as provide more fine-grained
  15. * control and monitoring, Tripal uses a jobs management system.
  16. *
  17. * The Tripal jobs management system allows administrators to submit tasks
  18. * to be performed which can then be launched through a UNIX command-line PHP
  19. * script or cron job. This command-line script can be added to a cron
  20. * entry along-side the Drupal cron entry for automatic, regular launching of
  21. * Tripal jobs. The order of execution of waiting jobs is determined first by
  22. * priority and second by the order the jobs were entered.
  23. *
  24. * @}
  25. */
  26. /**
  27. * Adds a job to the Tripal Job queue
  28. *
  29. * @param $job_name
  30. * The human readable name for the job
  31. * @param $modulename
  32. * The name of the module adding the job
  33. * @param $callback
  34. * The name of a function to be called when the job is executed
  35. * @param $arguments
  36. * An array of arguments to be passed on to the callback
  37. * @param $uid
  38. * The uid of the user adding the job
  39. * @param $priority
  40. * The priority at which to run the job where the highest priority is 10 and
  41. * the lowest priority is 1. The default priority is 10.
  42. * @param $includes
  43. * An array of paths to files that should be included in order to execute
  44. * the job. Use the module_load_include function to get a path for a given
  45. * file.
  46. * @param $ignore_duplicate.
  47. * Set to TRUE to not add the job if it has
  48. * the same name as another job which has not yet run. The default is TRUE.
  49. *
  50. * @return
  51. * The job_id of the registered job, or FALSE on failure.
  52. *
  53. * Example usage:
  54. *
  55. * @code
  56. * $args = array($dfile, $organism_id, $type, $library_id, $re_name, $re_uname,
  57. * $re_accession, $db_id, $rel_type, $re_subject, $parent_type, $method,
  58. * $user->uid, $analysis_id, $match_type);
  59. *
  60. * $includes = array()
  61. * $includes[] = module_load_include('inc', 'tripal_chado',
  62. * 'includes/loaders/tripal_chado.fasta_loader');
  63. *
  64. * tripal_add_job("Import FASTA file: $dfile", 'tripal_feature',
  65. * 'tripal_feature_load_fasta', $args, $user->uid, 10, $includes);
  66. * @endcode
  67. *
  68. * The code above is copied from the tripal_feature/fasta_loader.php file. The
  69. * snipped first builds an array of arguments that will then be passed to the
  70. * tripal_add_job function. The number of arguments provided in the $arguments
  71. * variable should match the argument set for the callback function provided
  72. * as the third argument.
  73. *
  74. * @ingroup tripal_jobs_api
  75. */
  76. function tripal_add_job($job_name, $modulename, $callback, $arguments, $uid,
  77. $priority = 10, $includes = array(), $ignore_duplicate = FALSE) {
  78. $user = user_load($uid);
  79. try {
  80. $job = new TripalJob();
  81. $is_created = $job->create(array(
  82. 'job_name' => $job_name,
  83. 'modulename' => $modulename,
  84. 'callback' => $callback,
  85. 'arguments' => $arguments,
  86. 'uid' => $uid,
  87. 'priority' => $priority,
  88. 'includes' => $includes,
  89. 'ignore_duplicate' => $ignore_duplicate,
  90. ));
  91. if ($is_created) {
  92. // If no exceptions were thrown then we know the creation worked. So
  93. // let the user know!
  94. drupal_set_message(t("Job '%job_name' submitted.", array('%job_name' => $job_name)));
  95. // If this is the Tripal admin user then give a bit more information
  96. // about how to run the job.
  97. if (user_access('administer tripal')) {
  98. $jobs_url = url("admin/tripal/tripal_jobs");
  99. drupal_set_message(t("Check the <a href='!jobs_url'>jobs page</a> for status.",
  100. array('!jobs_url' => $jobs_url)));
  101. drupal_set_message(t("You can execute the job queue manually on the command line " .
  102. "using the following Drush command: <br>drush trp-run-jobs --username=%uname --root=%base_path",
  103. array('%base_path' => DRUPAL_ROOT, '%uname' => $user->name)));
  104. }
  105. }
  106. else {
  107. drupal_set_message(t("Job '%job_name' already exists in the queue and was not re-submitted.", array('%job_name' => $job_name)), 'warning');
  108. }
  109. return $job->getJobID();
  110. }
  111. catch (Exception $e) {
  112. tripal_report_error('tripal', TRIPAL_ERROR, $e->getMessage());
  113. drupal_set_message($e->getMessage(), 'error');
  114. return FALSE;
  115. }
  116. }
  117. /**
  118. * Retrieve information regarding a tripal job
  119. *
  120. * @param $job_id
  121. * The unique identifier of the job
  122. *
  123. * @return
  124. * An object representing a record from the tripal_job table or FALSE on
  125. * failure.
  126. *
  127. * @ingroup tripal_jobs_api
  128. */
  129. function tripal_get_job($job_id) {
  130. try {
  131. $job = new TripalJob();
  132. $job->load($job_id);
  133. return $job->getJob();
  134. }
  135. catch (Exception $e) {
  136. tripal_report_error('tripal', TRIPAL_ERROR, $e->getMessage());
  137. drupal_set_message($e->getMessage(), 'error');
  138. return FALSE;
  139. }
  140. }
  141. /**
  142. * Indicates if any jobs are running.
  143. *
  144. * This function will check the system to see if a job has a process ID
  145. * and if that process ID is still running. It will update the job status
  146. * accordingly before returning.
  147. *
  148. * @return
  149. * Returns TRUE if any job is running or FALSE otherwise.
  150. *
  151. * @ingroup tripal_jobs_api
  152. */
  153. function tripal_is_job_running() {
  154. // iterate through each job that has not ended
  155. // and see if it is still running. If it is not
  156. // running but does not have an end_time then
  157. // set the end time and set the status to 'Error'
  158. $sql = "SELECT * FROM {tripal_jobs} TJ " .
  159. "WHERE TJ.end_time IS NULL and NOT TJ.start_time IS NULL ";
  160. $jobs = db_query($sql);
  161. foreach ($jobs as $job) {
  162. $status = shell_exec('ps -p ' . escapeshellarg($job->pid) . ' -o pid=');
  163. if ($job->pid && $status) {
  164. // the job is still running so let it go
  165. // we return 1 to indicate that a job is running
  166. return TRUE;
  167. }
  168. else {
  169. // the job is not running so terminate it
  170. $record = new stdClass();
  171. $record->job_id = $job->job_id;
  172. $record->end_time = time();
  173. $record->status = 'Error';
  174. $record->error_msg = 'Job has terminated unexpectedly.';
  175. drupal_write_record('tripal_jobs', $record, 'job_id');
  176. }
  177. }
  178. // return 1 to indicate that no jobs are currently running.
  179. return FALSE;
  180. }
  181. /**
  182. * Check for too many concurrent jobs.
  183. *
  184. * @param $max_jobs
  185. * The maximum number of concurrent jobs to allow; -1 = no limit
  186. *
  187. * @ingroup tripal_jobs_api
  188. */
  189. function tripal_max_jobs_exceeded($max_jobs) {
  190. if ($max_jobs < 0) {
  191. // No limit on concurrent jobs
  192. return FALSE;
  193. }
  194. $num_jobs_running = 0;
  195. // Iterate through each job that has not ended and see if it is still running.
  196. // If it is not running but does not have an end_time then set the end time
  197. // and set the status to 'Error'
  198. $sql = "SELECT * FROM {tripal_jobs} TJ " .
  199. "WHERE TJ.end_time IS NULL and NOT TJ.start_time IS NULL ";
  200. $jobs = db_query($sql);
  201. foreach ($jobs as $job) {
  202. $status = shell_exec('ps -p ' . escapeshellarg($job->pid) . ' -o pid=');
  203. if ($job->pid && $status) {
  204. // the job is still running
  205. $num_jobs_running++;
  206. }
  207. else {
  208. // the job is not running so terminate it
  209. $record = new stdClass();
  210. $record->job_id = $job->job_id;
  211. $record->end_time = time();
  212. $record->status = 'Error';
  213. $record->error_msg = 'Job has terminated unexpectedly.';
  214. drupal_write_record('tripal_jobs', $record, 'job_id');
  215. }
  216. }
  217. return ($num_jobs_running >= $max_jobs);
  218. }
  219. /**
  220. * Set a job to be re-run (ie: add it back into the job queue).
  221. *
  222. * @param $job_id
  223. * The job_id of the job to be re-ran
  224. * @param $goto_jobs_page
  225. * If set to TRUE then after the re run job is added Drupal will redirect to
  226. * the jobs page
  227. *
  228. * @ingroup tripal_jobs_api
  229. */
  230. function tripal_rerun_job($job_id, $goto_jobs_page = TRUE) {
  231. global $user;
  232. $user_id = $user->uid;
  233. $job = new TripalJob();
  234. $job->load($job_id);
  235. $arguments = $job->getArguments();
  236. $includes = $job->getIncludes();
  237. $newJob = new Tripaljob();
  238. try {
  239. $job->create(array(
  240. 'job_name' => $job->getJobName(),
  241. 'modulename' => $job->getModuleName(),
  242. 'callback' => $job->getCallback(),
  243. 'arguments' => $arguments,
  244. 'uid' => $user_id,
  245. 'priority' => $job->getPriority(),
  246. 'includes' => $includes
  247. ));
  248. // If no exceptions were thrown then we know the creation worked. So
  249. // let the user know!
  250. drupal_set_message(t("Job '%job_name' submitted.", array('%job_name' => $job->getJobName())));
  251. // If this is the Tripal admin user then give a bit more information
  252. // about how to run the job.
  253. if (user_access('administer tripal')) {
  254. $jobs_url = url("admin/tripal/tripal_jobs");
  255. drupal_set_message(t("Check the <a href='!jobs_url'>jobs page</a> for status.",
  256. array('!jobs_url' => $jobs_url)));
  257. drupal_set_message(t("You can execute the job queue manually on the command line " .
  258. "using the following Drush command: <br>drush trp-run-jobs --username=%uname --root=%base_path",
  259. array('%base_path' => DRUPAL_ROOT, '%uname' => $user->name)));
  260. }
  261. if ($goto_jobs_page) {
  262. drupal_goto("admin/tripal/tripal_jobs");
  263. }
  264. }
  265. catch (Exception $e){
  266. drupal_set_message($e->getMessage(), 'error');
  267. tripal_report_error('tripal', TRIPAL_ERROR, $e->getMessage());
  268. }
  269. }
  270. /**
  271. * Cancel a Tripal Job currently waiting in the job queue.
  272. *
  273. * @param $job_id
  274. * The job_id of the job to be cancelled
  275. *
  276. * @return
  277. * FALSE if the an error occured or the job could not be canceled, TRUE
  278. * otherwise.
  279. *
  280. * @ingroup tripal_jobs_api
  281. */
  282. function tripal_cancel_job($job_id, $redirect = TRUE) {
  283. if (!$job_id or !is_numeric($job_id)) {
  284. watchdog('tripal', "Must provide a numeric \$job_id to the tripal_cancel_job() function.");
  285. return FALSE;
  286. }
  287. try {
  288. $job = new Tripaljob();
  289. $job->load($job_id);
  290. $job->cancel();
  291. drupal_set_message('Job is now cancelled.');
  292. drupal_goto("admin/tripal/tripal_jobs");
  293. return TRUE;
  294. }
  295. catch (Exception $e) {
  296. tripal_report_error('tripal', TRIPAL_ERROR, $e->getMessage());
  297. drupal_set_message($e->getMessage(), 'error');
  298. drupal_goto("admin/tripal/tripal_jobs");
  299. return FALSE;
  300. }
  301. }
  302. /**
  303. * A function used to manually launch all queued tripal jobs.
  304. *
  305. * @param $do_parallel
  306. * A boolean indicating whether jobs should be attempted to run in parallel
  307. *
  308. * @param $job_id
  309. * To launch a specific job provide the job id. This option should be
  310. * used sparingly as the jobs queue managment system should launch jobs
  311. * based on order and priority. However there are times when a specific
  312. * job needs to be launched and this argument will allow it. Only jobs
  313. * which have not been run previously will run.
  314. * @param $max_jobs
  315. * The maximum number of jobs that should be run concurrently. If -1 then
  316. * unlimited.
  317. * @param $single
  318. * Ensures only a single job is run rather then the entire queue.
  319. *
  320. * @ingroup tripal_jobs_api
  321. */
  322. function tripal_launch_job($do_parallel = 0, $job_id = NULL, $max_jobs = -1, $single = 0) {
  323. // First check if any jobs are currently running if they are, don't continue,
  324. // we don't want to have more than one job script running at a time.
  325. if (!$do_parallel and tripal_is_job_running()) {
  326. print date('Y-m-d H:i:s') . ": Jobs are still running. Use the --parallel=1 option with the Drush command to run jobs in parallel.";
  327. return;
  328. }
  329. if ($do_parallel && tripal_max_jobs_exceeded($max_jobs)) {
  330. print date('Y-m-d H:i:s') . ": More than $max_jobs jobs are still running. At least one of these jobs much complete before a new job can start.";
  331. return;
  332. }
  333. // Get all jobs that have not started and order them such that they are
  334. // processed in a FIFO manner.
  335. if ($job_id) {
  336. $sql = "
  337. SELECT TJ.job_id
  338. FROM {tripal_jobs} TJ
  339. WHERE
  340. TJ.start_time IS NULL AND
  341. TJ.end_time IS NULL AND
  342. TJ.job_id = :job_id
  343. ORDER BY priority ASC, job_id ASC
  344. ";
  345. $jobs = db_query($sql, array(':job_id' => $job_id));
  346. }
  347. else {
  348. $sql = "
  349. SELECT TJ.job_id
  350. FROM {tripal_jobs} TJ
  351. WHERE
  352. TJ.start_time IS NULL AND
  353. TJ.end_time IS NULL AND
  354. NOT TJ.status = 'Cancelled'
  355. ORDER BY priority ASC,job_id ASC
  356. ";
  357. $jobs = db_query($sql);
  358. }
  359. if ($jobs) {
  360. print date('Y-m-d H:i:s') . ": There are " . $jobs->rowCount() . " jobs queued.\n";
  361. }
  362. foreach ($jobs as $jid) {
  363. $job_id = $jid->job_id;
  364. // Create the Tripoaljob object.
  365. $job = new TripalJob();
  366. $job->load($job_id);
  367. // We need to do some additional processing for printing since the switch
  368. // to serialized arrays now allows nested arrays which cause errors when
  369. // printed using implode alone.
  370. $args = $job->getArguments();
  371. $string_args = array();
  372. foreach ($args as $k => $a) {
  373. if (is_array($a)) {
  374. $string_args[$k] = 'Array';
  375. }
  376. elseif (is_object($a)) {
  377. $string_args[$k] = 'Object';
  378. }
  379. else {
  380. $string_args[$k] = $a;
  381. }
  382. }
  383. // Run the job
  384. $callback = $job->getCallback();
  385. print date('Y-m-d H:i:s') .": Calling: $callback(" . implode(", ", $string_args) . ")\n";
  386. try {
  387. $job->run();
  388. }
  389. catch (Exception $e) {
  390. $job->logMessage($e->getMessage(), array(), TRIPAL_ERROR);
  391. drupal_set_message($e->getMessage(), 'error');
  392. }
  393. if ($single) {
  394. // Don't start any more jobs
  395. break;
  396. }
  397. if (tripal_max_jobs_exceeded($max_jobs)) {
  398. break;
  399. }
  400. // TODO: Send an email to the user advising that the job has finished
  401. }
  402. }
  403. /**
  404. * An internal function for setting the progress for a current job.
  405. *
  406. * @param $job_id
  407. * The job_id to set the progress for
  408. * @param $percentage
  409. * The progress to set the job to
  410. *
  411. * @return
  412. * True on success and False otherwise
  413. *
  414. * @ingroup tripal_jobs_api
  415. */
  416. function tripal_set_job_progress($job_id, $percentage) {
  417. try {
  418. $job = new TripalJob();
  419. $job->load($job_id);
  420. $job->setProgress($percentage);
  421. }
  422. catch (Exception $e) {
  423. tripal_report_error('tripal', TRIPAL_ERROR, $e->getMessage());
  424. drupal_set_message($e->getMessage(), 'error');
  425. return FALSE;
  426. }
  427. return TRUE;
  428. }
  429. /**
  430. * Retrieves the current proress of a job.
  431. *
  432. * @param $job_id
  433. * The job_id to get the progress for
  434. *
  435. * @return
  436. * A value between 0 and 100 indicating the percentage complete of the job.
  437. * FALSE on failure.
  438. *
  439. * @ingroup tripal_jobs_api
  440. */
  441. function tripal_get_job_progress($job_id) {
  442. try {
  443. $job = new TripalJob();
  444. $job->load($job_id);
  445. $progress = $job->getProgress();
  446. return $progress;
  447. }
  448. catch (Exception $e) {
  449. tripal_report_error('tripal', TRIPAL_ERROR, $e->getMessage());
  450. drupal_set_message($e->getMessage(), 'error');
  451. return FALSE;
  452. }
  453. }
  454. /**
  455. * Returns a list of jobs that are active.
  456. *
  457. * @param $modulename
  458. * Limit the list returned to those that were added by a specific module. If
  459. * no module name is provided then all active jobs are returned.
  460. *
  461. * @return
  462. * An array of objects where each object describes a tripal job. If no
  463. * jobs were found then an empty array is returned. Each object will have
  464. * the following members:
  465. * - job_id: The unique ID number for the job.
  466. * - uid: The ID of the user that submitted the job.
  467. * - job_name: The human-readable name of the job.
  468. * - modulename: The name of the module that submitted the job.
  469. * - callback: The callback function to be called when the job is run.
  470. * - arguments: An array of arguments to be passed to the callback function.
  471. * - progress: The percent progress of completion if the job is running.
  472. * - status: The status of the job: Waiting, Completed, Running or Cancelled.
  473. * - submit_date: The UNIX timestamp when the job was submitted.
  474. * - start_time: The UNIX timestamp for when the job started running.
  475. * - end_time: The UNIX timestampe when the job completed running.
  476. * - error_msg: Any error message that occured during execution of the job.
  477. * - prirotiy: The execution priority of the job (value between 1 and 10)
  478. *
  479. * @ingroup tripal_jobs_api
  480. */
  481. function tripal_get_active_jobs($modulename = NULL) {
  482. $query = db_select('tripal_jobs', 'TJ')
  483. ->fields('TJ', array('job_id', 'uid', 'job_name', 'modulename', 'callback',
  484. 'arguments', 'progress', 'status', 'submit_date', 'start_time',
  485. 'end_time', 'error_msg', 'priority'));
  486. if ($modulename) {
  487. $query->where(
  488. "TJ.modulename = :modulename and NOT (TJ.status = 'Completed' or TJ.status = 'Cancelled')",
  489. array(':modulename' => $modulename)
  490. );
  491. }
  492. $results = $query->execute();
  493. $jobs = array();
  494. while($job = $results->fetchobject()) {
  495. $jobs->arguments = unserialize($job->arguments);
  496. $jobs[] = $job;
  497. }
  498. return $jobs;
  499. }
  500. /**
  501. * Execute a specific Tripal Job.
  502. *
  503. * @param $job_id
  504. * The job id to be exeuted.
  505. * @param bool $redirect [optional]
  506. * Whether to redirect to the job page or not.
  507. *
  508. * @ingroup tripal_jobs_api
  509. */
  510. function tripal_execute_job($job_id, $redirect = TRUE) {
  511. $job = new TripalJob();
  512. $job->load($job_id);
  513. // Run the job.
  514. if ($job->getStartTime() == 0 and $job->getEndTime() == 0) {
  515. tripal_launch_job(1, $job_id);
  516. drupal_set_message(t("Job %job_id has finished executing. See below for more information.", array('%job_id' => $job_id)));
  517. }
  518. else {
  519. drupal_set_message(t("Job %job_id cannot be executed. It has already finished.", array('%job_id' => $job_id)));
  520. }
  521. if ($redirect) {
  522. drupal_goto("admin/tripal/tripal_jobs/view/$job_id");
  523. }
  524. }