tripal_core_jobs.api.inc

Contains functions related to the Tripal Jobs API

tripal_jobs_api Jobs API

Tripal offers a job management subsystem for managing tasks that may require an extended period of time for completion. Drupal uses a UNIX-based cron job to handle tasks such as checking the availability of updates, indexing new nodes for searching, etc. Drupal's cron uses the web interface for launching these tasks, however, Tripal provides several administrative tasks that may time out and not complete due to limitations of the web server. Examples including syncing of a large number of features between chado and Drupal. To circumvent this, as well as provide more fine-grained control and monitoring, Tripal uses a jobs management sub-system built into the Tripal Core module. It is anticipated that this functionality will be used for managing analysis jobs provided by future tools, with eventual support for distributed computing.

The Tripal jobs management system allows administrators to submit tasks to be performed which can then be launched through a UNIX command-line PHP script or cron job. This command-line script can be added to a cron entry along-side the Drupal cron entry for automatic, regular launching of Tripal jobs. The order of execution of waiting jobs is determined first by priority and second by the order the jobs were entered.

The API functions described below provide a programmatic interface for adding, checking and viewing jobs.

File

tripal_core/api/tripal_core_jobs.api.inc
View source
  1. <?php
  2. /**
  3. * @file
  4. * Contains functions related to the Tripal Jobs API
  5. *
  6. * @defgroup tripal_jobs_api Jobs API
  7. * @ingroup tripal_core_api
  8. * @{
  9. * Tripal offers a job management subsystem for managing tasks that may require an extended period of time for
  10. * completion. Drupal uses a UNIX-based cron job to handle tasks such as checking the availability of updates,
  11. * indexing new nodes for searching, etc. Drupal's cron uses the web interface for launching these tasks, however,
  12. * Tripal provides several administrative tasks that may time out and not complete due to limitations of the web
  13. * server. Examples including syncing of a large number of features between chado and Drupal. To circumvent this,
  14. * as well as provide more fine-grained control and monitoring, Tripal uses a jobs management sub-system built into
  15. * the Tripal Core module. It is anticipated that this functionality will be used for managing analysis jobs provided by
  16. * future tools, with eventual support for distributed computing.
  17. *
  18. * The Tripal jobs management system allows administrators to submit tasks to be performed which can then be
  19. * launched through a UNIX command-line PHP script or cron job. This command-line script can be added to a cron
  20. * entry along-side the Drupal cron entry for automatic, regular launching of Tripal jobs. The order of execution of
  21. * waiting jobs is determined first by priority and second by the order the jobs were entered.
  22. *
  23. * The API functions described below provide a programmatic interface for adding, checking and viewing jobs.
  24. * @}
  25. */
  26. /**
  27. * Adds a job to the Tripal Jbo queue
  28. *
  29. * @param $job_name
  30. * The human readable name for the job
  31. * @param $modulename
  32. * The name of the module adding the job
  33. * @param $callback
  34. * The name of a function to be called when the job is executed
  35. * @param $arguments
  36. * An array of arguements to be passed on to the callback
  37. * @param $uid
  38. * The uid of the user adding the job
  39. * @param $priority
  40. * The priority at which to run the job where the highest priority is 10 and the lowest priority
  41. * is 1. The default priority is 10.
  42. *
  43. * @return
  44. * The job_id of the registered job
  45. *
  46. * Example usage:
  47. * @code
  48. * $args = array($dfile, $organism_id, $type, $library_id, $re_name, $re_uname,
  49. * $re_accession, $db_id, $rel_type, $re_subject, $parent_type, $method,
  50. * $user->uid, $analysis_id, $match_type);
  51. *
  52. * tripal_add_job("Import FASTA file: $dfile", 'tripal_feature',
  53. * 'tripal_feature_load_fasta', $args, $user->uid);
  54. * @endcode
  55. * The code above is copied from the tripal_feature/fasta_loader.php file. The
  56. * snipped first builds an array of arguments that will then be passed to the
  57. * tripal_add_job function. The number of arguments provided in the $arguments
  58. * variable should match the argument set for the callback function provided
  59. * as the third argument.
  60. *
  61. * @ingroup tripal_jobs_api
  62. */
  63. function tripal_add_job($job_name, $modulename, $callback, $arguments, $uid, $priority = 10) {
  64. // convert the arguments into a string for storage in the database
  65. $args = implode("::", $arguments);
  66. $record = new stdClass();
  67. $record->job_name = $job_name;
  68. $record->modulename = $modulename;
  69. $record->callback = $callback;
  70. $record->status = 'Waiting';
  71. $record->submit_date = time();
  72. $record->uid = $uid;
  73. $record->priority = $priority; # the lower the number the higher the priority
  74. if ($args) {
  75. $record->arguments = $args;
  76. }
  77. if (drupal_write_record('tripal_jobs', $record)) {
  78. $jobs_url = url("admin/tripal/tripal_jobs");
  79. drupal_set_message(t("Job '%job_name' submitted. Check the <a href='!jobs_url'>jobs page</a> for status", array('%job_name' => $job_name, '!jobs_url' => $jobs_url)));
  80. }
  81. else {
  82. drupal_set_message(t("Failed to add job %job_name.", array('%job_name' => $job_name)), 'error');
  83. }
  84. return $record->job_id;
  85. }
  86. /**
  87. * Returns a list of running tripal jobs
  88. *
  89. * @return
  90. * and array of objects where each object describes a running job or FALSE if no jobs are running
  91. *
  92. * @ingroup tripal_jobs_api
  93. */
  94. function tripal_jobs_check_running() {
  95. // iterate through each job that has not ended
  96. // and see if it is still running. If it is not
  97. // running but does not have an end_time then
  98. // set the end time and set the status to 'Error'
  99. $sql = "SELECT * FROM {tripal_jobs} TJ ".
  100. "WHERE TJ.end_time IS NULL and NOT TJ.start_time IS NULL ";
  101. $jobs = db_query($sql);
  102. while ($job = db_fetch_object($jobs)) {
  103. $status = `ps -p $job->pid -o pid=`;
  104. if ($job->pid && $status) {
  105. // the job is still running so let it go
  106. // we return 1 to indicate that a job is running
  107. return TRUE;
  108. }
  109. else {
  110. // the job is not running so terminate it
  111. $record = new stdClass();
  112. $record->job_id = $job->job_id;
  113. $record->end_time = time();
  114. $record->status = 'Error';
  115. $record->error_msg = 'Job has terminated unexpectedly.';
  116. drupal_write_record('tripal_jobs', $record, 'job_id');
  117. }
  118. }
  119. // return 1 to indicate that no jobs are currently running.
  120. return FALSE;
  121. }
  122. /**
  123. * Returns the start time for a given job
  124. *
  125. * @param $job
  126. * An object describing the job
  127. *
  128. * @return
  129. * The start time of the job if it was already run and either "Cancelled" or "Not Yet Started" otherwise
  130. *
  131. * @ingroup tripal_jobs_api
  132. */
  133. function tripal_jobs_get_start_time($job) {
  134. if ($job->start_time > 0) {
  135. $start = format_date($job->start_time);
  136. }
  137. else {
  138. if (strcmp($job->job_status, 'Cancelled')==0) {
  139. $start = 'Cancelled';
  140. }
  141. else {
  142. $start = 'Not Yet Started';
  143. }
  144. }
  145. return $start;
  146. }
  147. /**
  148. * Returns the end time for a given job
  149. *
  150. * @param $job
  151. * An object describing the job
  152. *
  153. * @return
  154. * The end time of the job if it was already run and empty otherwise
  155. *
  156. * @ingroup tripal_jobs_api
  157. */
  158. function tripal_jobs_get_end_time($job) {
  159. if ($job->end_time > 0) {
  160. $end = format_date($job->end_time);
  161. }
  162. else {
  163. $end = '';
  164. }
  165. return $end;
  166. }
  167. /**
  168. * Set a job to be re-ran (ie: add it back into the job queue)
  169. *
  170. * @param $job_id
  171. * The job_id of the job to be re-ran
  172. *
  173. * @ingroup tripal_jobs_api
  174. */
  175. function tripal_jobs_rerun($job_id, $goto_jobs_page = TRUE) {
  176. global $user;
  177. $sql = "SELECT * FROM {tripal_jobs} WHERE job_id = %d";
  178. $job = db_fetch_object(db_query($sql, $job_id));
  179. $args = explode("::", $job->arguments);
  180. $job_id = tripal_add_job(
  181. $job->job_name,
  182. $job->modulename,
  183. $job->callback,
  184. $args,
  185. $user->uid,
  186. $job->priority);
  187. if ($goto_jobs_page) {
  188. drupal_goto("admin/tripal/tripal_jobs");
  189. }
  190. return $job_id;
  191. }
  192. /**
  193. * Cancel a Tripal Job currently waiting in the job queue
  194. *
  195. * @param $job_id
  196. * The job_id of the job to be cancelled
  197. *
  198. * @ingroup tripal_jobs_api
  199. */
  200. function tripal_jobs_cancel($job_id, $redirect = TRUE) {
  201. $sql = "SELECT * FROM {tripal_jobs} WHERE job_id = %d";
  202. $job = db_fetch_object(db_query($sql, $job_id));
  203. // set the end time for this job
  204. if ($job->start_time == 0) {
  205. $record = new stdClass();
  206. $record->job_id = $job->job_id;
  207. $record->end_time = time();
  208. $record->status = 'Cancelled';
  209. $record->progress = '0';
  210. drupal_write_record('tripal_jobs', $record, 'job_id');
  211. drupal_set_message(t("Job #%job_id cancelled", array('%job_id' => $job_id)));
  212. }
  213. else {
  214. drupal_set_message(t("Job %job_id cannot be cancelled. It is in progress or has finished.", array('%job_id' => $job_id)));
  215. }
  216. if ($redirect) {
  217. drupal_goto("admin/tripal/tripal_jobs");
  218. }
  219. }
  220. /**
  221. * A function used to manually launch all queued tripal jobs
  222. *
  223. * @param $do_parallel
  224. * A boolean indicating whether jobs should be attempted to run in parallel
  225. *
  226. * @param $job_id
  227. * To launch a specific job provide the job id. This option should be
  228. * used sparingly as the jobs queue managment system should launch jobs
  229. * based on order and priority. However there are times when a specific
  230. * job needs to be launched and this argument will allow it. Only jobs
  231. * which have not been run previously will run.
  232. *
  233. * @ingroup tripal_jobs_api
  234. */
  235. function tripal_jobs_launch($do_parallel = 0, $job_id = NULL) {
  236. // first check if any jobs are currently running
  237. // if they are, don't continue, we don't want to have
  238. // more than one job script running at a time
  239. if (!$do_parallel and tripal_jobs_check_running()) {
  240. print "Jobs are still running. Use the --parallel=1 option with the Drush command to run jobs in parallel.";
  241. return;
  242. }
  243. // get all jobs that have not started and order them such that
  244. // they are processed in a FIFO manner.
  245. if ($job_id) {
  246. $sql = "SELECT * FROM {tripal_jobs} TJ ".
  247. "WHERE TJ.start_time IS NULL and TJ.end_time IS NULL and TJ.job_id = %d ".
  248. "ORDER BY priority ASC,job_id ASC";
  249. $job_res = db_query($sql,$job_id);
  250. }
  251. else {
  252. $sql = "SELECT * FROM {tripal_jobs} TJ ".
  253. "WHERE TJ.start_time IS NULL and TJ.end_time IS NULL ".
  254. "ORDER BY priority ASC,job_id ASC";
  255. $job_res = db_query($sql);
  256. }
  257. while ($job = db_fetch_object($job_res)) {
  258. // set the start time for this job
  259. $record = new stdClass();
  260. $record->job_id = $job->job_id;
  261. $record->start_time = time();
  262. $record->status = 'Running';
  263. $record->pid = getmypid();
  264. drupal_write_record('tripal_jobs', $record, 'job_id');
  265. // call the function provided in the callback column.
  266. // Add the job_id as the last item in the list of arguments. All
  267. // callback functions should support this argument.
  268. $callback = $job->callback;
  269. $args = split("::", $job->arguments);
  270. $args[] = $job->job_id;
  271. print "Calling: $callback(" . implode(", ", $args) . ")\n";
  272. call_user_func_array($callback, $args);
  273. // set the end time for this job
  274. $record->end_time = time();
  275. $record->status = 'Completed';
  276. $record->progress = '100';
  277. drupal_write_record('tripal_jobs', $record, 'job_id');
  278. // send an email to the user advising that the job has finished
  279. }
  280. }
  281. /**
  282. * An internal function for setting the progress for a current job
  283. *
  284. * @param $job_id
  285. * The job_id to set the progress for
  286. * @param $percentage
  287. * The progress to set the job to
  288. *
  289. * @return
  290. * True on success and False otherwise
  291. *
  292. * @ingroup tripal_core
  293. */
  294. function tripal_job_set_progress($job_id, $percentage) {
  295. if (preg_match("/^(\d+|100)$/", $percentage)) {
  296. $record = new stdClass();
  297. $record->job_id = $job_id;
  298. $record->progress = $percentage;
  299. if (drupal_write_record('tripal_jobs', $record, 'job_id')) {
  300. return TRUE;
  301. }
  302. }
  303. return FALSE;
  304. }
  305. /**
  306. * Returns a list of jobs associated with the given module
  307. *
  308. * @param $modulename
  309. * The module to return a list of jobs for
  310. *
  311. * @return
  312. * An array of objects where each object describes a tripal job
  313. *
  314. * @ingroup tripal_jobs_api
  315. */
  316. function tripal_get_module_active_jobs($modulename) {
  317. $sql = "SELECT * FROM {tripal_jobs} TJ ".
  318. "WHERE TJ.end_time IS NULL and TJ.modulename = '%s' ";
  319. return db_fetch_object(db_query($sql, $modulename));
  320. }
  321. /**
  322. * Returns the date the job was added to the queue
  323. *
  324. * @param $job
  325. * An object describing the job
  326. *
  327. * @return
  328. * The date teh job was submitted
  329. *
  330. * @ingroup tripal_jobs_api
  331. */
  332. function tripal_jobs_get_submit_date($job) {
  333. return format_date($job->submit_date);
  334. }

Related topics