function tripal_bulk_loader_load_data
2.x tripal_bulk_loader.loader.inc | tripal_bulk_loader_load_data($nid, $job_id) |
3.x tripal_bulk_loader.loader.inc | tripal_bulk_loader_load_data($nid, $job_id) |
1.x tripal_bulk_loader.loader.inc | tripal_bulk_loader_load_data($nid, $job_id) |
Tripal Bulk Loader
This is the function that's run by tripal_launch_jobs to bulk load chado data.
Parameters
$nid: The Node ID of the bulk loading job node to be loaded. All other needed data is expected to be in the node (ie: template ID and file)
Note: Instead of returning a value this function updates the tripal_bulk_loader.status. Errors are thrown through watchdog and can be viewed at admin/reports/dblog.
Related topics
3 string references to 'tripal_bulk_loader_load_data'
- drush_tripal_bulk_loader_tripal_loader_submit in tripal_bulk_loader/
tripal_bulk_loader.drush.inc - Submit or Re-submit the given bulk loading job.
- tripal_bulk_loader_add_loader_job_form_submit in tripal_bulk_loader/
includes/ tripal_bulk_loader.loader.inc - Add Loader Job Form (Submit)
- tripal_bulk_loader_job_describe_args in tripal_bulk_loader/
tripal_bulk_loader.module - Implements hook_job_describe_args() Specifically to make viewing past tripal jobs more readable for jobs registered by this module
File
- tripal_bulk_loader/
includes/ tripal_bulk_loader.loader.inc, line 139 - Handles the actual loading of data.
Code
function tripal_bulk_loader_load_data($nid, $job_id) {
// ensure no timeout
drupal_set_time_limit(0);
// set the status of the job (in the node not the tripal jobs)
db_update('tripal_bulk_loader')
->fields(array('job_status' => 'Loading...'))
->condition('nid', $nid)
->execute();
$node = node_load($nid);
print "Template: " . $node->template->name . " (" . $node->template_id . ")\n";
// Determine the total number of lines in the file.
$total_lines = 0;
$handle = fopen($node->file, "r");
while (!feof($handle)) {
$line = fgets($handle);
$total_lines++;
}
fclose($handle);
// Correct for files with a single line and no enter character.
$total_lines = ($total_lines == 0) ? 1 : $total_lines;
print "File: " . $node->file . " (" . $total_lines . " lines)\n";
//print "\nClearing all prepared statements from previous runs of this loader...\n";
//tripal_core_chado_clear_prepared('_'.$node->nid.'_');
// Prep Work ==================================================================================
print "\nPreparing to load...\n";
$loaded_without_errors = TRUE;
// Generate default values array
$default_data = array();
$field2column = array();
$record2priority = array();
$tables = array();
$template_array = $node->template->template_array;
// first build the record2priority array
foreach ($template_array as $priority => $record_array) {
$record2priority[$record_array['record_id']] = $priority;
}
//
foreach ($template_array as $priority => $record_array) {
if (!is_array($record_array)) {
continue;
}
// Add tables being inserted into to a list to be treated differently
// this is used to acquire locks on these tables
if (preg_match('/insert/', $record_array['mode'])) {
$tables[$record_array['table']] = $record_array['table'];
}
// iterate through each of the fiels for the current record and
// set the default_data array
foreach ($record_array['fields'] as $field_index => $field_array) {
$default_data[$priority]['table'] = $record_array['table'];
$default_data[$priority]['mode'] = ($record_array['mode']) ? $record_array['mode'] : 'insert';
$default_data[$priority]['select_if_duplicate'] = ($record_array['select_if_duplicate']) ? $record_array['select_if_duplicate'] : 0;
$default_data[$priority]['update_if_duplicate'] = ($record_array['update_if_duplicate']) ? $record_array['update_if_duplicate'] : 0;
$default_data[$priority]['disabled'] = ($record_array['disable']) ? $record_array['disable'] : 0;
$default_data[$priority]['optional'] = ($record_array['optional']) ? $record_array['optional'] : 0;
$default_data[$priority]['select_optional'] = ($record_array['select_optional']) ? $record_array['select_optional'] : 0;
$default_data[$priority]['record_id'] = $record_array['record_id'];
$default_data[$priority]['required'][$field_array['field']] = $field_array['required'];
$one = $default_data[$priority];
if (isset($field_array['regex'])) {
$default_data[$priority]['regex_transform'][$field_array['field']] = $field_array['regex'];
}
$two = $default_data[$priority];
if (preg_match('/table field/', $field_array['type'])) {
$default_data[$priority]['values_array'][$field_array['field']] = '';
$default_data[$priority]['need_further_processing'] = TRUE;
$field2column[$priority][$field_array['field']] = $field_array['spreadsheet column'];
}
elseif (preg_match('/constant/', $field_array['type'])) {
$default_data[$priority]['values_array'][$field_array['field']] = $field_array['constant value'];
}
elseif (preg_match('/foreign key/', $field_array['type'])) {
$default_data[$priority]['values_array'][$field_array['field']] = array();
$default_data[$priority]['need_further_processing'] = TRUE;
$default_data[$priority]['values_array'][$field_array['field']]['foreign record']['record'] = $field_array['foreign key'];
// Add in the FK / Referral table
$fk_priority = $record2priority[$field_array['foreign key']];
$fk_table = $template_array[$fk_priority]['table'];
$default_data[$priority]['values_array'][$field_array['field']]['foreign record']['table'] = $fk_table;
// Add in the FK / Referral field
// for backwards compatibility we need to get the FK relationship to find
// out what field we're joining on. For templates created using a
// previous version it was assumed that the FK field was always the field to join
if (!array_key_exists('foreign field', $field_array)) {
$tbl_description = chado_get_schema($record_array['table']);
foreach ($tbl_description['foreign keys'] as $key_table => $key_array) {
if ($key_table == $fk_table) {
foreach ($key_array['columns'] as $left_field => $right_field) {
if ($left_field == $field_array['field']) {
$field_array['foreign field'] = $right_field;
}
}
}
}
}
$default_data[$priority]['values_array'][$field_array['field']]['foreign record']['field'] = $field_array['foreign field'];
}
else {
print 'WARNING: Unsupported type: ' . $field_array['type'] . ' for ' . $table . '.' . $field_array['field'] . "!\n";
}
$three = $default_data[$priority];
} // end of foreach field
} //end of foreach record
///////////////////////////////////////////////
// For each set of constants
///////////////////////////////////////////////
print "Loading...\n";
$original_default_data = $default_data;
$group_index = 0;
$total_num_groups = sizeof($node->constants);
// If there are no constant sets and no exposed fields
// then create an empty constant set so loader runs
if ($total_num_groups == 0 && empty($node->exposed_fields)) {
$node->constants = array(
0 => array()
);
$total_num_groups = 1;
}
foreach ($node->constants as $group_id => $set) {
// revert default data array for next set of constants
$default_data = $original_default_data;
$group_index++;
// Add constants
if (!empty($set)) {
print "Constants:\n";
foreach ($set as $priority => $record) {
foreach ($record as $field_id => $field) {
print "\t- " . $field['chado_table'] . '.' . $field['chado_field'] . ' = ' . $field['value'] . "\n";
if ($default_data[$priority]['table'] == $field['chado_table']) {
if (isset($default_data[$priority]['values_array'][$field['chado_field']])) {
if (isset($field2column[$priority][$field['chado_field']])) {
$field2column[$priority][$field['chado_field']] = $field['value'];
}
else {
$default_data[$priority]['values_array'][$field['chado_field']] = $field['value'];
}
}
else {
print "ERROR: Template has changed after constants were assigned!\n";
tripal_bulk_loader_throw_error('Template has changed after constants were assigned', array(), TRIPAL_NOTICE);
exit(1);
}
}
else {
print "ERROR: Template has changed after constants were assigned!\n";
tripal_bulk_loader_throw_error('Template has changed after constants were assigned', array(), TRIPAL_NOTICE);
exit(1);
}
}
}
}
// Open File
print "\tPreparing to load the current constant set...\n";
print "\t\tOpen File...\n";
try {
$file = new SplFileObject($node->file, 'r');
}
catch (Exception $e) {
tripal_bulk_loader_throw_error('Could not open file %file',
array('%file' => $node->file), TRIPAL_ERROR);
return;
}
// Set defaults
$header = '';
if (preg_match('/(t|true|1)/', $node->file_has_header)) {
$file->next();
$header = $file->current();
}
$num_records = 0;
$num_lines = 0;
$num_errors = 0;
$interval = intval($total_lines * 0.0001);
if ($interval == 0) {
$interval = 1;
}
// Start Transaction
$savepoint = '';
switch (variable_get('tripal_bulk_loader_transactions', 'row')) {
case "none":
break;
case "all":
print "\t\tStart Transaction...\n";
$TRANSACTION = db_transaction();
$transactions = TRUE;
break;
case "row":
print "\t\tStart Transaction...\n";
$TRANSACTION = db_transaction();
$transactions = TRUE;
$new_transaction_per_row = TRUE;
break;
}
// Disable triggers
$triggers_disabled = FALSE;
if ($transactions AND variable_get('tripal_bulk_loader_disable_triggers', TRUE)) {
print "\t\tDefer Constraints...\n";
$triggers_disabled = TRUE;
chado_query("SET CONSTRAINTS ALL DEFERRED");
}
// Acquire Locks
if ($transactions) {
print "\t\tAcquiring Table Locks...\n";
$lockmode = variable_get('tripal_bulk_loader_lock', 'ROW EXCLUSIVE');
foreach ($tables as $table) {
print "\t\t\t$lockmode for $table\n";
chado_query("LOCK TABLE {" . $table . "} IN " . $lockmode . " MODE");
}
}
print "\tLoading the current constant set...\n";
tripal_bulk_loader_progress_bar(0, $total_lines);
while (!$file->eof()) {
$file->next();
$raw_line = $file->current();
$raw_line = trim($raw_line);
if (empty($raw_line)) {
continue;
} // skips blank lines
$line = explode("\t", $raw_line);
$num_lines++;
// update the job status every 1% of lines processed for the current group
if ($node->job_id and $num_lines % $interval == 0) {
// percentage of lines processed for the current group
$group_progress = round(($num_lines / $total_lines) * 100);
tripal_bulk_loader_progress_bar($num_lines, $total_lines);
// percentage of lines processed for all groups
// <previous group index> * 100 + <current group progress>
// --------------------------------------------------------
// <total number of groups>
// For example, if you were in the third group of 3 constant sets
// and had a group percentage of 50% then the job progress would be
// (2*100 + 50%) / 3 = 250%/3 = 83%
$job_progress = round(((($group_index - 1) * 100) + $group_progress) / $total_num_groups);
tripal_set_job_progress($node->job_id, $job_progress);
}
$data = $default_data;
// iterate through each record and process the line
$data_keys = array_keys($data);
foreach ($data_keys as $priority) {
$options = array(
'field2column' => $field2column,
'record2priority' => $record2priority,
'line' => $line,
'line_num' => $num_lines,
'group_index' => $group_index,
'node' => $node,
'nid' => $node->nid,
);
// execute all records that are not disabled
$no_errors = FALSE;
if (array_key_exists($priority, $data) and
array_key_exists('disabled', $data[$priority]) and
$data[$priority]['disabled'] == 0) {
$no_errors = process_data_array_for_line($priority, $data, $default_data, $options);
}
else {
// set status to true for skipped records
$no_errors = TRUE;
}
tripal_bulk_loader_progress_file_track_job($job_id, $no_errors);
$failed = FALSE;
if ($no_errors == FALSE) {
// Encountered an error
if ($transactions) {
$TRANSACTION->rollback();
}
tripal_bulk_loader_finish_loading($node->nid, FALSE);
break;
}
} // end of foreach table in default data array
tripal_bulk_loader_progress_file_track_job($job_id, FALSE, TRUE);
if ($failed) {
$TRANSACTION->rollback();
tripal_bulk_loader_finish_loading($node->nid, FALSE);
break;
}
else {
// Row inserted successfully
if ($transactions && $new_transaction_per_row) {
// commit current transaction and start a new one
unset($TRANSACTION);
$TRANSACTION = db_transaction();
}
}
} //end of foreach line of file
// END Transaction
if ($transactions) {
unset($TRANSACTION);
}
if ($failed) {
$loaded_without_errors = FALSE;
break;
}
tripal_bulk_loader_progress_bar($total_lines, $total_lines);
tripal_bulk_loader_progress_file_track_job($job_id, FALSE, FALSE, TRUE);
} //end of foreach constant set
tripal_bulk_loader_finish_loading($node->nid, $loaded_without_errors);
}