jobStatusService = angular.module('jobStatusService', []);
/**
* The jobStatusService allows for periodic loading of the status of jobs that comes from the clusters.
*
* A Promise with the data is returned, which is then set properly within the controller
*
* @ngdoc service
* @memberof HCCGo
* @class jobStatusService
*/
jobStatusService.service('jobStatusService',['$log','$q','notifierService', 'dbService', 'connectionService', function($log, $q, notifierService, dbService, connectionService) {
var async = require('async');
var oldData = null;
var lastRequestedTime = 0;
var lastPromise = null;
/**
* Adds job to submitted DB
* @method addJobToDb
* @memberof HCCGo.jobStatusService
* @param {Object} clusterInterface - Used to grab job attributes
* @param {Object} Job - Job information to add to DB
*/
var addJobToDb = function(clusterInterface, job, callback) {
job.complete = false;
job.cluster = connectionService.connectionDetails.shorthost;
var now = Date.now();
job.timestamp = now;
job.runtime = job.timelimit;
// Now, add the jobs to the db
dbService.getSubmittedJobsDB().then(function(submittedJobsDB) {
submittedJobsDB.insert(job, function(err, newDoc) {
// Now, we have the _id from the doc, now issue the commands to get the
// error and output files and update the DB when we do.
clusterInterface.getJobAttribute(newDoc.jobId, 'stdout').then(function(value) {
submittedJobsDB.update(
{ _id: newDoc._id },
{ $set:
{
"outputPath": value
}}
);
});
clusterInterface.getJobAttribute(newDoc.jobId, 'stderr').then(function(value) {
submittedJobsDB.update(
{ _id: newDoc._id },
{ $set:
{
"errorPath": value
}}
);
});
clusterInterface.getJobAttribute(newDoc.jobId, 'command').then(function(value) {
submittedJobsDB.update(
{ _id: newDoc._id },
{ $set:
{
"location": value
}}
);
});
return callback(null, newDoc);
});
});
}
return {
/**
* Makes asynchronous calls to check for job statuses within the database
* @method refreshDatabase
* @memberof HCCGo.jobStatusService
* @param {GenericClusterInterface} clusterInterface - Used to grab uncompleted jobs from the cluster
* @param {integer} clusterId - Unique ID of the cluster for querying the database
* @param {boolean} force - Flag denoting if the user wants to force update the database
* @returns {Promise} Promise object to be resolved in the controller
*/
refreshDatabase: function(clusterInterface, clusterId, force=false) {
// The lastPromise is a single promise that we will hand out to all requesters
// If this is the first run, or if it is time for new data
if (lastPromise == null || ((Date.now() - lastRequestedTime > 15000) || force)) {
lastPromise = $q.defer();
lastRequestedTime = Date.now();
async.parallel([
// Query all the uncompleted jobs in the DB
function(callback) {
dbService.getSubmittedJobsDB().then(function(db) {
db.find({complete: false, cluster: clusterId}, function (err, docs) {
if (err) {
$log.error("Error querying the DB for job states");
return callback("Error querying the DB for job states");
}
return callback(null, docs);
});
});
},
function(callback) {
dbService.getSubmittedJobsDB().then(function(db) {
db.find({complete: true, cluster: clusterId}, function (err, docs) {
if (err) {
$log.error("Error querying the DB for job states");
return callback("Error querying the DB for job states");
}
return callback(null, docs);
});
});
},
// Query for all of the jobs that are not completed:
function(callback) {
clusterInterface.getJobs().then(function(data) {
return callback(null, data);
});
}],
// Here is where we combine the results from the DB and the getting of jobs
function(err, results) {
// results[0] is jobs from the DB that have not completed
// results[1] is jobs completed in the DB
// results[2] is jobs from the cluster
var db_jobs = results[0];
var completed_jobs = results[1];
var cluster_jobs = results[2].jobs;
// For each job in the db_jobs, match it and update the status from squeue
var recent_completed = [];
for (var i = 0; i < db_jobs.length; i++) {
if (!cluster_jobs.hasOwnProperty(db_jobs[i].jobId) ) {
// Recenty completed job (or disappeared from the squeue output)
db_jobs[i].status = 'COMPLETE';
recent_completed.push(db_jobs[i]);
db_jobs.splice(i, 1);
i--;
} else {
// Job showed up in the cluster jobs output, update it's status
cluster_job = cluster_jobs[db_jobs[i].jobId];
// Remove the cluster job from the list so we can see externally submitted jobs
delete cluster_jobs[db_jobs[i].jobId];
if (cluster_job.running) {
db_jobs[i].status = 'RUNNING';
} else if (cluster_job.idle) {
db_jobs[i].status = 'IDLE';
}
db_jobs[i] = Object.assign(db_jobs[i], cluster_job);
// For some reason, I can't update the entire document
// Have to do a weird anonymous function for the db_jobs[i] because
// by the time the function is executed, db_jobs could be different (splicing)
// I also added cluster_job, but it probably isn't necessary
dbService.getSubmittedJobsDB().then( (function(db_job, cluster_job) {
return function(db) {
db.update(
{ _id: db_job._id },
{ $set:
{
"running": cluster_job.running,
"idle": cluster_job.idle,
"error": cluster_job.error,
"status": db_job.status,
"elapsed": cluster_job.runTime
}
},
{},
function(err, numAffected, affectedDocuments, upsert) {
if (err) $log.error(err);
}
)};
// Call the function I just created above, it will return a
// new anonymous function.
})(db_jobs[i], cluster_job));
}
} // End for loop through db_jobs
// Now everything in cluster_jobs should be newly discovered external jobs
// Now, recent_completed are jobs that are in the DB as running, but
// not in the list of running or idle jobs. So they must
// have completed
// Update the DB
async.series([
function(callback) {
// This function will update all of the cluster externally submitted jobs
// For each external job, add it to the DB
async.map(cluster_jobs, addJobToDb.bind(null, clusterInterface), function(err, results) {
callback(err, results);
});
}
,function(callback) {
if (recent_completed.length < 1) {
return callback(null, null);
}
clusterInterface.getCompletedJobs(recent_completed).then(
function(jobs) {
$log.debug("Got " + jobs.length + " completed jobs");
var recent_completed_jobs = [];
async.each(jobs, function(job, each_callback) {
$log.debug(job);
dbService.getSubmittedJobsDB().then(function(db) {
db.update(
{ _id: job._id },
{ $set:
{
"complete": true,
"idle": false,
"error": job.State != "COMPLETED" && job.State != "RUNNING" && job.State != "IDLE" && !job.State.startsWith("CANCELLED"),
"running": false,
"cancelled" : job.State.startsWith("CANCELLED"),
"elapsed": job.Elapsed,
"reqMem": job.ReqMem,
"jobName": job.JobName,
"status": "COMPLETE",
"reportedStatus": job.State,
"maxMemory": job.MaxRSS
}
},
{ returnUpdatedDocs: true },
function (err, numReplaced, affectedDocuments) {
// update db with data so it doesn't have to be queried again
if (!err && !affectedDocuments.cancelled) {
notifierService.success('Your job, ' + affectedDocuments.jobName + ', has been completed', 'Job Completed!');
recent_completed_jobs.push(affectedDocuments);
return each_callback(null);
}
}
)});
}, function(err) {
// After the for loop, return all of the recently completed jobs.
return callback(null, recent_completed_jobs);
});
}
, function(msg) {
$log.debug("No jobs returned by completed job");
return callback(null, null);
});
}
],
function(err, series_results) {
var updatedData = {
numRunning: results[2].numRunning,
numIdle: results[2].numIdle,
numError: results[2].numError,
jobs: null
};
$log.debug("Concat all the things!");
// Ok, now concat everything together. Running jobs, completed jobs, and recently completed jobs.
if (series_results[0] == null) {
updatedData.jobs = completed_jobs.concat(db_jobs);
} else {
updatedData.jobs = series_results[0].concat(completed_jobs, db_jobs);
}
if (series_results[1] != null) {
updatedData.jobs = updatedData.jobs.concat(series_results[1]);
}
lastPromise.resolve(updatedData);
});
}
);
}
return lastPromise.promise;
}
};
}]);