function tripal_bulk_loader_load_data

2.x tripal_bulk_loader.loader.inc tripal_bulk_loader_load_data($nid, $job_id)
3.x tripal_bulk_loader.loader.inc tripal_bulk_loader_load_data($nid, $job_id)
1.x tripal_bulk_loader.loader.inc tripal_bulk_loader_load_data($nid, $job_id)

Tripal Bulk Loader

This is the function that's run by tripal_launch_jobs to bulk load chado data.

Parameters

$nid: The Node ID of the bulk loading job node to be loaded. All other needed data is expected to be in the node (ie: template ID and file)

Note: Instead of returning a value this function updates the tripal_bulk_loader.status. Errors are thrown through watchdog and can be viewed at admin/reports/dblog.

Related topics

3 string references to 'tripal_bulk_loader_load_data'
drush_tripal_bulk_loader_tripal_loader_submit in tripal_bulk_loader/tripal_bulk_loader.drush.inc
Submit or Re-submit the given bulk loading job.
tripal_bulk_loader_add_loader_job_form_submit in tripal_bulk_loader/includes/tripal_bulk_loader.loader.inc
Add Loader Job Form (Submit)
tripal_bulk_loader_job_describe_args in tripal_bulk_loader/tripal_bulk_loader.module
Implements hook_job_describe_args() Specifically to make viewing past tripal jobs more readable for jobs registered by this module

File

tripal_bulk_loader/includes/tripal_bulk_loader.loader.inc, line 139
Handles the actual loading of data.

Code

function tripal_bulk_loader_load_data($nid, $job_id) {

  // ensure no timeout
  drupal_set_time_limit(0);

  // set the status of the job (in the node not the tripal jobs)
  db_update('tripal_bulk_loader')
    ->fields(array('job_status' => 'Loading...'))
    ->condition('nid', $nid)
    ->execute();

  $node = node_load($nid);
  print "Template: " . $node->template->name . " (" . $node->template_id . ")\n";

  // Determine the total number of lines in the file.
  $total_lines = 0;
  $handle = fopen($node->file, "r");
  while (!feof($handle)) {
    $line = fgets($handle);
    $total_lines++;
  }
  fclose($handle);

  // Correct for files with a single line and no enter character.
  $total_lines = ($total_lines == 0) ? 1 : $total_lines;
  print "File: " . $node->file . " (" . $total_lines . " lines)\n";

  //print "\nClearing all prepared statements from previous runs of this loader...\n";
  //tripal_core_chado_clear_prepared('_'.$node->nid.'_');

  // Prep Work ==================================================================================
  print "\nPreparing to load...\n";
  $loaded_without_errors = TRUE;

  // Generate default values array
  $default_data = array();
  $field2column = array();
  $record2priority = array();
  $tables = array();
  $template_array = $node->template->template_array;

  // first build the record2priority array
  foreach ($template_array as $priority => $record_array) {
    $record2priority[$record_array['record_id']] = $priority;
  }

  //
  foreach ($template_array as $priority => $record_array) {
    if (!is_array($record_array)) {
      continue;
    }

    // Add tables being inserted into to a list to be treated differently
    // this is used to acquire locks on these tables
    if (preg_match('/insert/', $record_array['mode'])) {
      $tables[$record_array['table']] = $record_array['table'];
    }

    // iterate through each of the fiels for the current record and
    // set the default_data array
    foreach ($record_array['fields'] as $field_index => $field_array) {

      $default_data[$priority]['table'] = $record_array['table'];
      $default_data[$priority]['mode'] = ($record_array['mode']) ? $record_array['mode'] : 'insert';
      $default_data[$priority]['select_if_duplicate'] = ($record_array['select_if_duplicate']) ? $record_array['select_if_duplicate'] : 0;
      $default_data[$priority]['update_if_duplicate'] = ($record_array['update_if_duplicate']) ? $record_array['update_if_duplicate'] : 0;
      $default_data[$priority]['disabled'] = ($record_array['disable']) ? $record_array['disable'] : 0;
      $default_data[$priority]['optional'] = ($record_array['optional']) ? $record_array['optional'] : 0;
      $default_data[$priority]['select_optional'] = ($record_array['select_optional']) ? $record_array['select_optional'] : 0;
      $default_data[$priority]['record_id'] = $record_array['record_id'];
      $default_data[$priority]['required'][$field_array['field']] = $field_array['required'];

      $one = $default_data[$priority];
      if (isset($field_array['regex'])) {
        $default_data[$priority]['regex_transform'][$field_array['field']] = $field_array['regex'];
      }

      $two = $default_data[$priority];

      if (preg_match('/table field/', $field_array['type'])) {
        $default_data[$priority]['values_array'][$field_array['field']] = '';
        $default_data[$priority]['need_further_processing'] = TRUE;
        $field2column[$priority][$field_array['field']] = $field_array['spreadsheet column'];

      }
      elseif (preg_match('/constant/', $field_array['type'])) {
        $default_data[$priority]['values_array'][$field_array['field']] = $field_array['constant value'];

      }
      elseif (preg_match('/foreign key/', $field_array['type'])) {
        $default_data[$priority]['values_array'][$field_array['field']] = array();
        $default_data[$priority]['need_further_processing'] = TRUE;
        $default_data[$priority]['values_array'][$field_array['field']]['foreign record']['record'] = $field_array['foreign key'];

        // Add in the FK / Referral table
        $fk_priority = $record2priority[$field_array['foreign key']];
        $fk_table = $template_array[$fk_priority]['table'];
        $default_data[$priority]['values_array'][$field_array['field']]['foreign record']['table'] = $fk_table;

        // Add in the FK / Referral field
        // for backwards compatibility we need to get the FK relationship to find
        // out what field we're joining on.  For templates created using a
        // previous version it was assumed that the FK field was always the field to join
        if (!array_key_exists('foreign field', $field_array)) {
          $tbl_description = chado_get_schema($record_array['table']);
          foreach ($tbl_description['foreign keys'] as $key_table => $key_array) {
            if ($key_table == $fk_table) {
              foreach ($key_array['columns'] as $left_field => $right_field) {
                if ($left_field == $field_array['field']) {
                  $field_array['foreign field'] = $right_field;
                }
              }
            }
          }
        }
        $default_data[$priority]['values_array'][$field_array['field']]['foreign record']['field'] = $field_array['foreign field'];
      }
      else {
        print 'WARNING: Unsupported type: ' . $field_array['type'] . ' for ' . $table . '.' . $field_array['field'] . "!\n";
      }
      $three = $default_data[$priority];

    } // end of foreach field
  } //end of foreach record

  ///////////////////////////////////////////////
  // For each set of constants
  ///////////////////////////////////////////////
  print "Loading...\n";
  $original_default_data = $default_data;
  $group_index = 0;
  $total_num_groups = sizeof($node->constants);
  // If there are no constant sets and no exposed fields
  // then create an empty constant set so loader runs
  if ($total_num_groups == 0 && empty($node->exposed_fields)) {
    $node->constants = array(
      0 => array()
    );
    $total_num_groups = 1;
  }
  foreach ($node->constants as $group_id => $set) {
    // revert default data array for next set of constants
    $default_data = $original_default_data;
    $group_index++;

    // Add constants
    if (!empty($set)) {
      print "Constants:\n";
      foreach ($set as $priority => $record) {
        foreach ($record as $field_id => $field) {

          print "\t- " . $field['chado_table'] . '.' . $field['chado_field'] . ' = ' . $field['value'] . "\n";

          if ($default_data[$priority]['table'] == $field['chado_table']) {
            if (isset($default_data[$priority]['values_array'][$field['chado_field']])) {
              if (isset($field2column[$priority][$field['chado_field']])) {
                $field2column[$priority][$field['chado_field']] = $field['value'];
              }
              else {
                $default_data[$priority]['values_array'][$field['chado_field']] = $field['value'];
              }
            }
            else {
              print "ERROR: Template has changed after constants were assigned!\n";
              tripal_bulk_loader_throw_error('Template has changed after constants were assigned', array(), TRIPAL_NOTICE);
              exit(1);
            }
          }
          else {
            print "ERROR: Template has changed after constants were assigned!\n";
            tripal_bulk_loader_throw_error('Template has changed after constants were assigned', array(), TRIPAL_NOTICE);
            exit(1);
          }
        }
      }
    }

    // Open File
    print "\tPreparing to load the current constant set...\n";
    print "\t\tOpen File...\n";
    try {
      $file = new SplFileObject($node->file, 'r');
    }
    catch (Exception $e) {
      tripal_bulk_loader_throw_error('Could not open file %file', 
      array('%file' => $node->file), TRIPAL_ERROR);
      return;
    }

    // Set defaults
    $header = '';
    if (preg_match('/(t|true|1)/', $node->file_has_header)) {
      $file->next();
      $header = $file->current();
    }
    $num_records = 0;
    $num_lines = 0;
    $num_errors = 0;
    $interval = intval($total_lines * 0.0001);
    if ($interval == 0) {
      $interval = 1;
    }

    // Start Transaction
    $savepoint = '';
    switch (variable_get('tripal_bulk_loader_transactions', 'row')) {
      case "none":
        break;
      case "all":
        print "\t\tStart Transaction...\n";
        $TRANSACTION = db_transaction();
        $transactions = TRUE;
        break;
      case "row":
        print "\t\tStart Transaction...\n";
        $TRANSACTION = db_transaction();
        $transactions = TRUE;
        $new_transaction_per_row = TRUE;
        break;
    }

    // Disable triggers
    $triggers_disabled = FALSE;
    if ($transactions AND variable_get('tripal_bulk_loader_disable_triggers', TRUE)) {
      print "\t\tDefer Constraints...\n";
      $triggers_disabled = TRUE;
      chado_query("SET CONSTRAINTS ALL DEFERRED");
    }

    // Acquire Locks
    if ($transactions) {
      print "\t\tAcquiring Table Locks...\n";
      $lockmode = variable_get('tripal_bulk_loader_lock', 'ROW EXCLUSIVE');
      foreach ($tables as $table) {
        print "\t\t\t$lockmode for $table\n";
        chado_query("LOCK TABLE {" . $table . "} IN " . $lockmode . " MODE");
      }
    }

    print "\tLoading the current constant set...\n";
    tripal_bulk_loader_progress_bar(0, $total_lines);
    while (!$file->eof()) {
      $file->next();
      $raw_line = $file->current();
      $raw_line = trim($raw_line);
      if (empty($raw_line)) {
        continue;
      } // skips blank lines
      $line = explode("\t", $raw_line);
      $num_lines++;

      // update the job status every 1% of lines processed for the current group
      if ($node->job_id and $num_lines % $interval == 0) {

        // percentage of lines processed for the current group
        $group_progress = round(($num_lines / $total_lines) * 100);
        tripal_bulk_loader_progress_bar($num_lines, $total_lines);

        // percentage of lines processed for all groups
        // <previous group index> * 100 + <current group progress>
        // --------------------------------------------------------
        //               <total number of groups>
        // For example, if you were in the third group of 3 constant sets
        // and had a group percentage of 50% then the job progress would be
        // (2*100 + 50%) / 3 = 250%/3 = 83%
        $job_progress = round(((($group_index - 1) * 100) + $group_progress) / $total_num_groups);
        tripal_set_job_progress($node->job_id, $job_progress);
      }

      $data = $default_data;

      // iterate through each record and process the line
      $data_keys = array_keys($data);
      foreach ($data_keys as $priority) {
        $options = array(
          'field2column' => $field2column,
          'record2priority' => $record2priority,
          'line' => $line,
          'line_num' => $num_lines,
          'group_index' => $group_index,
          'node' => $node,
          'nid' => $node->nid,
        );


        // execute all records that are not disabled
        $no_errors = FALSE;
        if (array_key_exists($priority, $data) and 
          array_key_exists('disabled', $data[$priority]) and 
          $data[$priority]['disabled'] == 0) {
          $no_errors = process_data_array_for_line($priority, $data, $default_data, $options);
        }
        else {
          // set status to true for skipped records
          $no_errors = TRUE;
        }

        tripal_bulk_loader_progress_file_track_job($job_id, $no_errors);
        $failed = FALSE;
        if ($no_errors == FALSE) {
          // Encountered an error
          if ($transactions) {
            $TRANSACTION->rollback();
          }
          tripal_bulk_loader_finish_loading($node->nid, FALSE);
          break;
        }
      } // end of foreach table in default data array

      tripal_bulk_loader_progress_file_track_job($job_id, FALSE, TRUE);

      if ($failed) {
        $TRANSACTION->rollback();
        tripal_bulk_loader_finish_loading($node->nid, FALSE);
        break;
      }
      else {
        // Row inserted successfully
        if ($transactions && $new_transaction_per_row) {
          // commit current transaction and start a new one
          unset($TRANSACTION);
          $TRANSACTION = db_transaction();
        }
      }
    } //end of foreach line of file

    // END Transaction
    if ($transactions) {
      unset($TRANSACTION);
    }

    if ($failed) {
      $loaded_without_errors = FALSE;
      break;
    }
    tripal_bulk_loader_progress_bar($total_lines, $total_lines);
    tripal_bulk_loader_progress_file_track_job($job_id, FALSE, FALSE, TRUE);
  } //end of foreach constant set

  tripal_bulk_loader_finish_loading($node->nid, $loaded_without_errors);

}