#!/home/trna/bin/perl5.new # INTRODUCTION # ============ # Daemon to monitor ABI sequencing machine gel arrival and call # getlanes on each new arrival. # The complications are that it must not crash when getlanes # cannot run, or does not finish, it must also watch for new # gels that have not completely arrived yet. # Algorithm # ========= # Use classic client server technique of spawning one process per # gel and look for processes that do not terminate, or do not produce # an output file when they have finished. Use UNIX message queues # to get results back from spawned processes. # Author: Jeremy Parsons # Date: 950918 # VERSION 1.02 require 'sys/ipc.ph'; require 'sys/msg.ph'; # =========================================================== # General Constants # ----------------- $TRUE = 1; $FALSE = 0; $SECONDS_PER_MINUTE = 60; $HOSTNAME = `uname -n`; chop($HOSTNAME); $CYCLE_TIME = 30; # Seconds per wake up call $TRACKING_TIMEOUT = 10; # Maximum minutes allowed to complete one getlanes $MAX_NUM_PROCESSES = 2; # Any more would probably be a waste $TRACKING_PROGRAM_NAME = "getlanes"; $THIS_SCRIPT_NAME = "${0}"; $THIS_SCRIPT_NAME =~ s/^.*\///; $MIN_GEL_SIZE = 15000000; # Gels are at least 15MBytes in size. if (${HOSTNAME} eq "est"){ $GELDIR = "/home/${HOSTNAME}/shotgun/gels"; # Where to look for new gels } else { $GELDIR = "/home1/${HOSTNAME}/shotgun/gels"; # Where to look for new gels } $STATUS_LOGFILE = "/tmp/${THIS_SCRIPT_NAME}.status"; # Process monitoring $OUTPUT_LOGDIR = "/home1/watson/seqmgr/logs/getlanesd"; # Success statistics $IMAGE_EXTENSION = ".lanes.tif"; $RETRACKED_EXTENSION = ".munge"; # Getlanes make a new resource fork $RESOURCE_FORK_SUBDIR = ".resource"; # Subdirectory where CAP stores r-forks # =========================================================== #Initialisation $| = 1; # Un-buffer standard output # Check that not already running #$existing_count = `ps -efl | grep -v grep | grep -c $THIS_SCRIPT_NAME`; #chop($existing_count); #if ($existing_count > 1){ # warn "There seems to be a $THIS_SCRIPT_NAME running already \n"; # } # start up message queue $channel_id = msgget(0x5678, (&IPC_CREAT | 0644)); die "Can't get message queue: $!\n" unless defined($channel_id); $SIG{'INT'} = $SIG{'QUIT'} = $SIG{'STOP'} ="leave"; $SIG{'CHLD'} = 'IGNORE'; if ($#ARGV != -1){ die "Usage: $THIS_SCRIPT_NAME (The program takes no options currently) \n"; } # Open all the log files for appending open (STATUSLOG, ">>$STATUS_LOGFILE") || die "Cannot open $STATUS_LOGFILE \n"; open (OUTPUTLOG, ">>${OUTPUT_LOGDIR}/${HOSTNAME}") || die "Cannot open ${OUTPUT_LOGDIR}/${HOSTNAME} \n"; select(STATUSLOG); $| = 1; # Unbuffer file writing to avoid losing, or duplicating select(OUTPUTLOG); $| = 1; # the output when crashing or forking. select(STDOUT); $ascii_start_time = `date '+%y%m%d %T'`; chop($ascii_start_time); print STATUSLOG "Starting processing on $HOSTNAME at $ascii_start_time\n"; # ========================================================================== # Run the main loop sleeping and then starting processes as necessary # ------------------------------------------------------------------- $pid_count = 0; while($TRUE){ # Collect final outputs from finished jobs if ($pid_count > 0){ &any_finished(*channel_id, *pid_aarr, *pid_count); } # Kill jobs that have gone on too long if ($pid_count > 0){ &any_overdue(($TRACKING_TIMEOUT * $SECONDS_PER_MINUTE), *pid_aarr, *pid_count); } # To help make things robust put in a sanity check if ($pid_count < 0){ $pid_count = 0; } # If possible, start any newly arrived gels if ($pid_count < $MAX_NUM_PROCESSES){ # Get list of gels to process @new_gel_list = (); &any_new_gels(*new_gel_list, *pid_aarr); if ($#new_gel_list != ($[ - 1)){ start_new_jobs(*new_gel_list, *channel_id, *pid_aarr, *pid_count); @new_gel_list = (); } } $^T = (time - $CYCLE_TIME); # Reset the script's start time sleep($CYCLE_TIME); } exit; #should never get here # ========================================================================== # Check all kinds of properties of the files in $GELDIR to determine # if any of them are new stable gel files. sub any_new_gels{ local (*new_gel_list, *pid_aarr) = @_; # Cannot easily use time to discover new gels because of queue blocks # Look for large files as a first step instead. unless (opendir(GELNAMES, "$GELDIR")){ print STATUSLOG "Cannot list $GELDIR contents\n"; die "Cannot list $GELDIR contents\n"; } @names = grep(!/^\./, readdir(GELNAMES)); # print "DEBUG: Found files in geldir: names= @names\n"; undef(@biggies); foreach $file (@names){ # if it is large and a binary file if (((-s "${GELDIR}/${file}" ) > $MIN_GEL_SIZE) && (-B "${GELDIR}/${file}")){ push(@biggies, $file); } } # Now check to see if these files have already beeen processed # print "DEBUG: Found files in geldir: biggies= @biggies\n"; undef(@new_gels); foreach $file (@biggies){ if (! (-e "${GELDIR}/${file}${IMAGE_EXTENSION}")){ push(@new_gels, $file); } } # Now check to see if these files are still growing # print "DEBUG: Found files in geldir: new_gels= @new_gels\n"; undef(@unchanging); foreach $file (@new_gels){ if ((-M "${GELDIR}/${file}") > 0.0){ # Negative ages are files that were modified within last cycle push(@unchanging, $file); } #else { # print "DEBUG: ${file} was too new\n"; # } } # Now check to see if these files have a .resource file (really Mac) # print "DEBUG: Found files in geldir: unchanging= @unchanging\n"; undef(@mac_files); foreach $file (@unchanging){ if (-w "${GELDIR}/${RESOURCE_FORK_SUBDIR}/${file}"){ push(@mac_files, $file); } } # Some groups use crazy characters "*" in filenames - must be removed &remove_screwy_characters(*mac_files); # Now check to see if these files are already being processed #print "DEBUG: Found files in geldir: mac_files= @mac_files\n"; undef(@ready_gels); foreach $file (@mac_files){ if (! (defined($pid_aarr{$file}))){ push(@ready_gels, $file); } } #print "DEBUG: Found files in geldir: ready_gels= @ready_gels\n"; if (defined(@ready_gels)){ @new_gel_list = @ready_gels; } else { @new_gel_list = (); } closedir(GELNAMES); return; } # ========================================================================== # Fork off a process (if not too many already) to do the actual getlanes code sub start_new_jobs{ local(*new_gel_list, *channel_id, *pid_aarr, *pid_count) = @_; local($complained); $complained = $FALSE; foreach $gel (@new_gel_list){ if ($pid_count < $MAX_NUM_PROCESSES){ #print "DEBUG: About to start $gel in start_new_jobs\n"; MACHINE_FORK:{ if ($temp_pid = fork){ #Parent Process $start_time = time; $ascii_start_time = `date '+%y%m%d %T'`; chop($ascii_start_time); $pid_count ++; $pid_aarr{$gel} = join(':', $temp_pid, $start_time); print STATUSLOG "$temp_pid processing $gel at $ascii_start_time \n"; } elsif (defined ($temp_pid)){ # Child Process &do_one_server($gel, $channel_id); exit; } elsif ($! =~ /No more process/){ #EAGAIN, supposedly recoverable fork error if (!(${complained})){ print STATUSLOG "Cannot fork: $!\n"; $complained = $TRUE; } sleep(10); redo MACHINE_FORK; } else { # Wierd fork error print STATUSLOG "Cannot fork: $!\n"; } } # end of MACHINE_FORK Label code block } # end if } # end foreach return; } # ========================================================================== # Listen to incoming messages from slave getlane precesses # If any have finished, then update process id lists, and write to logfile # Messages are buffered so no need to spawn a listening process. sub any_finished{ local (*channel_id, *pid_aarr, *pid_count) = @_; # Do a non-blocking receive while($returned = msgrcv($channel_id, $message, 512, 0, &IPC_NOWAIT)){ # want a process id, and text $temp_len = length($message); ($junk, $finished_process, $stats_line) = unpack("L I A*", $message); # print "DEBUG: received stats_line= $stats_line\n"; # Kill the finished process kill(9, $finished_process); $time_ended = `date '+%y%m%d %T'`; chop($time_ended); # Record that it has gone ${pid_count}--; $gel_name = &get_gel_name(%pid_aarr, $finished_process); print STATUSLOG "$finished_process finished $gel_name before $time_ended\n"; delete $pid_aarr{$gel_name}; &record_statistics($stats_line, $gel_name); } return; } # ========================================================================== # Kill any process that has been going too long sub any_overdue{ local ($timeout, *pid_aarr, *pid_count) = @_; local ($gel, $curr_pid, $started_at, $now, $ascii_time_ended); # Check start time of each running process $now = time; foreach $gel (keys(%pid_aarr)){ $gel_line = $pid_aarr{$gel}; ($curr_pid, $started_at) = split(':', $gel_line); #print STATUSLOG "DEBUG: now= $now, gel_line= $gel_line , curr_pid= $curr_pid started_at= $started_at\n"; if (($started_at + $timeout) < $now){ # Kill the hung process kill(9, $curr_pid); $ascii_time_ended = `date '+%y%m%d %T'`; chop ($ascii_time_ended); # Record that it has gone ${pid_count}--; $gel_name = &get_gel_name(%pid_aarr, $curr_pid); print STATUSLOG "$curr_pid hung_on $gel_name at $ascii_time_ended\n"; delete $pid_aarr{$gel_name}; # Make a fake output file to prevent same mistake system("touch ${GELDIR}/${gel_name}${IMAGE_EXTENSION}"); } } return; } # ========================================================================== # Get the name of the gel being worked on by a process sub get_gel_name{ local (%pid_aarr, $unknown_process) = @_; local ($gel); foreach $gel (keys(%pid_aarr)){ $gel_line = $pid_aarr{$gel}; ($curr_pid, $started_at) = split(':', $gel_line); if ($curr_pid == $unknown_process){ return ($gel); } } return "LOSTGELNAME"; } # ========================================================================== sub record_statistics{ local ($stats_line, $gel_name) = @_; local ($num_of_rows, $starting_row, $left_boundary, $right_boundary, $percentage_guessed, $signal_to_noise_ratio, $peak_height, $percent_correlation, $lane_width, $gap_width); local ($astro_date); ($num_of_rows, $starting_row, $left_boundary, $right_boundary, $percentage_guessed, $signal_to_noise_ratio, $peak_height, $percent_correlation, $lane_width, $gap_width) = split(' ', $stats_line); $astro_date = `date '+%y%m%d`; chop ($astro_date); $tabbed_text = join(' ', $astro_date, $num_of_rows, $starting_row, $left_boundary, $right_boundary, $percentage_guessed, $signal_to_noise_ratio, $peak_height, $percent_correlation, $lane_width, $gap_width, $gel_name); print OUTPUTLOG "$tabbed_text \n"; return; } # ========================================================================== sub do_one_server{ local ($gel, $channel_id) = @_; local ($num_of_rows, $starting_row, $left_boundary, $right_boundary, $percentage_guessed, $signal_to_noise_ratio, $peak_height, $percent_correlation, $lane_width, $gap_width, $output_message); close(STDIN); close(STDOUT); unless (open (SCREEN_OUTPUT, "cd ${GELDIR}; $TRACKING_PROGRAM_NAME ${gel} 2>&1 |")){ # Just slow things down and complain a lot $error_time = `date`; chop($error_time); print STATUS_LOG "$error_time Cannot run $TRACKING_PROGRAM_NAME \n"; sleep (10 * $CYCLE_TIME); return; } while($_ = ){ @words = split; if (/rows in the image/){ $num_of_rows = $words[8]; } if (/^Starting Row/){ $starting_row = $words[3]; } if (/^Left Boundary/){ $left_boundary = $words[3]; } if (/^Right Boundary/){ $right_boundary = $words[3]; } if (/^Percentage:/){ $percentage_guessed = $words[1]; # Only need two positions after decimal point so truncate $percentage_guessed = substr($percentage_guessed, 0, (index ($percentage_guessed, "\.") + 3)); } if (/^Average SNR:/){ $signal_to_noise_ratio = $words[2]; $signal_to_noise_ratio = substr($signal_to_noise_ratio, 0, (index ($signal_to_noise_ratio, "\.") + 3)); } if (/Peak Height/){ $peak_height = $words[4]; $peak_height = substr($peak_height, 0, index ($peak_height, "\.")); } if (/^Adaptive Correlation:/){ $percent_correlation = $words[5]; } if (/Lane Width/){ $lane_width = $words[4]; $lane_width = substr($lane_width, 0, (index ($lane_width, "\.") + 3)); } if (/Gap Width/){ $gap_width = $words[4]; $gap_width = substr($gap_width, 0, (index ($gap_width, "\.") + 3)); } } $output_message = join(" ", $num_of_rows, $starting_row, $left_boundary, $right_boundary, $percentage_guessed, $signal_to_noise_ratio, $peak_height, $percent_correlation, $lane_width, $gap_width); $my_pid = $$; $packed_message = pack("L I A*", 1, $my_pid, $output_message); ($temp1, $temp2, $temp3) = unpack("L I a*", $packed_message); unless (msgsnd($channel_id, $packed_message, &IPC_NOWAIT)){ print STATUSLOG "Can't send message about $gel: $!\n"; exit; } # Need to change resource fork names so the retracked one is default $munge_file = "${GELDIR}/${RESOURCE_FORK_SUBDIR}/${gel}${RETRACKED_EXTENSION}"; $default_rfork = "${GELDIR}/${RESOURCE_FORK_SUBDIR}/${gel}"; $temp_file = "${GELDIR}/${RESOURCE_FORK_SUBDIR}/${gel}.temp"; if (!(-e $munge_file)){ print STATUSLOG "$gel did not have a new rfork file\n"; return; } if ((!(-w $munge_file)) || (!(-w $default_rfork))){ print STATUSLOG "$gel did not have a writeable rfork file\n"; return; } if (rename($munge_file, $temp_file) != 1){ print STATUSLOG "$gel had non-renameable rfork file\n"; return; } if (rename($default_rfork, $munge_file) != 1){ print STATUSLOG "$gel had non-renameable rfork file\n"; return; } if (rename($temp_file, $default_rfork) != 1){ print STATUSLOG "$gel had non-renameable rfork file\n"; return; } exit; } # ========================================================================== # Cannot undef an entry from an associative array so need to fake it sub delete_from_aarr{ local (*gel_name, *pid_aarr) = @_; local (%new_arr, $entry); foreach $entry (keys(%pid_aarr)){ if (!($entry eq $gel_name)){ $new_aarr{$entry} = $pid_aarr{$entry}; } } undef (%pid_aarr); %pid_aarr = %new_aarr; return; } # ========================================================================== # Change the names of any files that have UNIX unfriendly characters sub remove_screwy_characters{ local (*file_list) = @_; for ($i = $[; $i <= $#file_list; $i++){ $gel = $file_list[$i]; $temp_name = $gel; $temp_name =~ tr/0-9a-zA-Z_\.//cd; if ($temp_name ne $gel){ # Found a crazy character so need to change 2 filenames # (Data fork, Resource fork) and change name in array. $data_fork = "${GELDIR}/${gel}"; $new_data_fork = "${GELDIR}/${temp_name}"; $resource_rfork = "${GELDIR}/${RESOURCE_FORK_SUBDIR}/${gel}"; $new_resource_rfork = "${GELDIR}/${RESOURCE_FORK_SUBDIR}/${temp_name}"; if (rename($data_fork, $new_data_fork) != 1){ print STATUSLOG "Unable to clean (rename) $gel\n"; &fake_done_file($gel); return; } if (rename($resource_rfork, $new_resource_rfork) != 1){ print STATUSLOG "Unable to clean (rename) $gel resource fork\n"; &fake_done_file($gel); return; } $file_list[$i] = $temp_name; print STATUSLOG "Cleaned name of $gel to $temp_name\n"; } } return; } # ========================================================================== sub fake_done_file{ local ($gel) = @_; $done_file = "${GELDIR}/${gel}${IMAGE_EXTENSION}"; system("touch $done_file"); print STATUSLOG "Faked a tif file for $gel\n"; return; } # ========================================================================== sub leave { close(OUTPUTLOG); close(STATUSLOG); $x = msgctl($channel_id, &IPC_RMID, 0); if (!defined($x) || $x < 0) { die "Can't remove message queue: $!\n"; } exit; } # ==========================================================================