ConnectionService.js


connectionModule = angular.module('ConnectionServiceModule', [])

/**
 * The connectionService is used throughout the entire app, from filetransfer to login authentication.
 * Uses a mixture of protocols to accomplish these tasks.
 *
 * @ngdoc service
 * @memberof HCCGo
 * @class connectionService
 * @requires $log
 * @requires $q
 * @requires $routeParams
 * @requires $location
 * @requires notifierService
 * @requires async
 * @requires path
 * @requires fs
 * @requires ssh2
 */
connectionModule.factory('connectionService',['$log', '$q', '$routeParams', '$location', 'notifierService', 'analyticsService', '$rootScope', function($log, $q, $routeParams, $location, notifierService, analyticsService, $rootScope) {

   const async = require('async');
   const path = require('path');
   const fs = require('fs');
   var connectionDetails = {
     "username": null,
     "hostname": null,
     "shorthost": null
   }
   var connection = null;


   /**
   * To initiate ssh connections to remote clusters.
   *
   */


  /**
    * Make a random 5 character string to use for random file id's
    */
  function makeid() {
    var text = "";
    var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

    for( var i=0; i < 5; i++ )
      text += possible.charAt(Math.floor(Math.random() * possible.length));

    return text;
  }


  // Check the writability of a file
  var checkWritable = function(file) {

    var deferred = $q.defer();
    if (!file) {
      deferred.resolve(false);
    }

    // Write to the file to test
    async.waterfall([
      // Get the sftp module
      function(callback) {
        var sftp_return = connection.sftp(function (err, sftp) {
          $log.log("Got sftp now");
          if (err){
            return callback(err);
          }
          return callback(null, sftp);
        });

        if (!sftp_return) {
          callback("Unable to get sftp handle");
          $log.log("Unable to get sftp handle");
          return callback("Unable to get sftp handle");
        }

      }, function(sftp, callback){

        // Check for writeable directory
        // Try to write to a test file
        var dirname_path = path.posix.dirname(file);
        var test_path = path.posix.join(dirname_path, ".hccgo-test" + makeid());

        sftp.open(test_path, 'w', function(err, handle) {
          if (err){
            sftp.end();
            return callback(err);
          }
          sftp.close(handle, function(err) {
            if (err) {
              sftp.end();
              return callback(err);
            }
            return callback(null, sftp, test_path);
          });
        });
      },
      // Now, delete the file
      function(sftp, test_path, callback) {

        sftp.unlink(test_path, function(err) {
          if (err) {
            sftp.end();
            return callback(test_path + ": " + err);
          }
          sftp.end();
          return callback(null);


        });

      }
    ], function(err, results) {
      // Now, the end results
      if (err) {
        deferred.reject(err);
      } else {
        deferred.resolve(true);
      }

    });

    return deferred.promise;
  }

  // Functionality to upload a file to the server
  var uploadJobFile = function(jobFile, remotePath) {

    var deferred = $q.defer();

    // using the 'fs' library for this, temporary until how to pass
    // process progression data is figured out

    // Starts the connection
    connection.sftp(function (err, sftp) {
      if (err) {
        deferred.reject(err);
      }
      else {
        // Process to console
        $log.debug( "SFTP has begun");
        $log.debug( "Value of remotePath: " + remotePath );

        // Setting the I/O streams
        var writeStream = sftp.createWriteStream ( remotePath );
        // Catch writestream erros
        writeStream.on('error', function (err) {
          deferred.reject(err);
        });
        // Sets logic for finishing of process
        writeStream.on(
          'close',
          function () {
            sftp.end();
            $log.debug("File has been transferred");
          }
        );

        // Does the thing
        writeStream.write(jobFile, function(err) {
          if (err) {
            deferred.reject(err);
          }
          else {
            deferred.resolve("Job successfully uploaded");
          }
        });
      }
    });
    return deferred.promise;
  }

   var submitJob = function(location) {
      var deferred = $q.defer();
      $log.debug("Running command: " + 'sbatch ' + location);
      runCommand('cd ' + path.dirname(location) + '; sbatch ' + location).then(function(data) {
        deferred.resolve(data);
      }, function(data) { // thrown on failure
        $log.log("Error log: " + data)
        return deferred.reject("An error occurred when submitting the job.");
      });
      return deferred.promise;
   }

   var closeStream = function() {
     
     if (connection != null) {
       connection.end();
     }
   };

   var runCommandQueue = async.priorityQueue(function (task, callback) {
      // Starts Command session
      // Always source /etc/bashrc on the remote cluster before
      // running commands
      // How are we going to handle those people whose default shell is not bash?
      real_task = "if [ -f /etc/bashrc ]; then . /etc/bashrc; fi; if [ -f ~/.bashrc ]; then . ~/.bashrc; fi; " + task.name;
      connection.exec(real_task, function(err, stream) {

         cumulData = "";

         if (err) {
           $log.error("Error running command " + task.name + ": "+ err);
           callback(err, cumulData);
           return;
         }

         stream.on('data', function(data) {

           $log.debug("Got data: " + data);
           cumulData += data;

         }).on('close', function(code, signal) {

           $log.debug('Stream :: close :: code: ' + code + ', signal: ' + signal);
           callback(null, cumulData);   // Once the command actually completes full data stored here

         });
      });
   }, 1);
   
   /**
    * Run a command on the remote cluster and get the output
    * @param {String} comamnd - Command to execute
    * @param {Integer} [priority=1] - Priority of the command, default: 1.  Higher numbers have lower priority.  0 is the highest priority.
    * @memberof HCCGo.connectionService
    *
    */
   var runCommand = function(command, priority) {
      priority = (typeof priority !== 'undefined') ?  priority : 1;

      var deferred = $q.defer();         // Used to return promise data

      runCommandQueue.push({name: command}, priority, function(err, cumulData) {
          if (err) {
              deferred.reject("Error running command " + command + ": " + err);
          } else {
              deferred.resolve(cumulData);
          }
      });

      return deferred.promise;   // Asynchronous command, doesn't really return anything until deferred.resolve is called
   }

   var getUsername = function() {
      var deferred = $q.defer();

      runCommand('whoami').then(function(data) {

         deferred.resolve(data.trim());

      })
      return deferred.promise;

   }

  var test = function() {
    alert("Success");
  }

   var getHomeWD = function() {
     var deferred = $q.defer();
     runCommand('echo $HOME').then(function(data) {
         $log.debug("Home dir: " + data);
         deferred.resolve(data.trim());
     });
     return deferred.promise;
   }

   var getWorkWD = function() {
     var deferred = $q.defer();
     runCommand('echo $WORK').then(function(data) {
         $log.debug("Word dir: " + data);
         deferred.resolve(data.trim());
     });
     return deferred.promise;
   }

   var readDirQueue = async.cargo(function (task, callback) {
      // Starts SFTP session
      connection.sftp(function (err, sftp) {
         // Debug to console
         // $log.debug("SFTP has begun");
         // $log.debug("Reading server");

         // Read directory
         async.each(task, function(worker, done) {
            sftp.readdir(worker.name, function(err, list) {
               if (err) {
                  $log.debug("Failure on directory: " + task.name);
                  $log.debug(err);
                  done(err);
               } else {
                  worker.caller(list);
                  done(null);
               }
            });
         }, function(err){
            sftp.end();
            if (err) {
                callback(err);
            } else {
                callback(null);
            }
         });
      });
   }, 20);

   // Reads filesystem directory on server
   var readDir = function(directory) {
      var deferred = $q.defer();

      readDirQueue.push({name: directory,
          caller: function(dir) {
              deferred.resolve(dir);
              return 0;
          }}, function(err) {
              if (err) {
                  deferred.reject(err);
              }
      });

      return deferred.promise;
   }

   // Creates directory on server
   // Publicly available
   var makeDir = function(dirList, root, dest, callback) {
      var attrs = {mode: '0775'};

      connection.sftp(function (err, sftp) {
      async.eachSeries(dirList, function(dir, done) {
          var dirs = [];
	      var exists = false;
          dir = dest + path.posix.relative(root,dir);
          $log.debug("Creating folder: " + dir);
          async.until(function() {
              return exists;
          }, function(innerDone) {
              sftp.stat(dir, function(err, stats) {
                  if (err) {
                      $log.debug("STAT :: SFTP :: " + dir);
                      dirs.push(dir);
                      dir = path.posix.dirname(dir);
                  } else {
                      exists = true;
                  }
                  innerDone();
              });
          }, function(err) {
              if (err) {
                  done(err)
              } else {
                  async.eachSeries(dirs.reverse(), function(curr, mkdone) {
                      sftp.mkdir(curr, function(err) {
                          if(err) $log.debug("curr: " + curr);
                          mkdone(err);
                      });
                  }, function(err){
                      done(err);
                  });
              }
          });
      }, function(err) {
              sftp.end();
              callback(err);
      });
      });
   }

   //Makes local directories
   var lmakeDir = function(dirList, root, dest, callback) {
      var attrs = {mode: '0775'};

      async.eachSeries(dirList, function(dir, done) {
          var dirs = [];
	        var exists = false;
          dir = dest + path.relative(root,dir);
          $log.debug("Creating folder: " + dir);
          async.until(function() {
              return exists;
          }, function(innerDone) {
              fs.stat(dir, function(err, stats) {
                  if (err) {
                      $log.debug("STAT :: LOCAL :: " + dir);
                      dirs.push(dir);
                      dir = path.dirname(dir);
                  } else {
                      exists = true;
                  }
                  innerDone();
              });
          }, function(err) {
              if (err) {
                  done(err)
              } else {
                  async.eachSeries(dirs.reverse(), function(curr, mkdone) {
                      fs.mkdir(curr, function(err) {
                          if(err) $log.debug("curr: " + curr);
                          mkdone(err);
                      });
                  }, function(err){
                      done(err);
                  });
              }
          });
      }, function(err) {
              callback(err);
      });
   }

   var localSize = function(dir) {
        //Recursively builds directory structure
        var deferred = $q.defer();
        var sizeTotal = 0;
        var BFSCounter = function(currDir, bfs) {
            fs.readdir(currDir, function(err, files) {
                async.each(files, function(file, done) {
                    fs.stat(currDir + '/' + file, function(err, stats) {
                        if(err){
                            done(err);
                        } else if (stats.isFile()) {
                            sizeTotal += stats.size;
                            done(null);
                        } else if (stats.isDirectory()) {
                            BFSCounter(currDir + '/' + file, function(err) {
                                if(err) {
                                    $log.debug("BFS Error on: " + currDir + "/" + file);
                                    $log.debug(err);
                                }
                                done(null);
                            });
                        }
                    });
                }, function(err) {
                    bfs(err);
                });
            });
        };
        fs.stat(dir, function(err, stats) {
            if (stats.isFile()) {
                deferred.resolve(stats.size);
            } else if (stats.isDirectory()) {
                BFSCounter(dir, function(err) {
                    if (err) throw err;
                    deferred.resolve(sizeTotal);
                });
            }
        });

        return deferred.promise;
   }

   // get a file size in bytes
   var getFileSize = function(filePath) {
     // takes in a string of filenames separated by spaces
     var deferred = $q.defer();

     // Use stat from sftp
     connection.sftp(function (err, sftp) {
       if(err) {
         sftp.end();
         return deferred.reject("Error getting SFTP object: " + err);
       }
       sftp.stat(filePath, function(err, stats) {
         if(err) {
           sftp.end();
           return deferred.reject("Error getting stat: " + err);
         }
         sftp.end();
         return deferred.resolve(stats.size);
       });
     });

     return deferred.promise;
   }

   // get the text of a file
   var getFileText = function(filePath) {
     var deferred = $q.defer();

    //  runCommand('cat ' + filePath).then(function(data) {
    //     deferred.resolve(data.trim());
    //  });
    connection.sftp(function (err, sftp) {
      if(err) {
        return deferred.reject("Error getting SFTP object: " + err);
      }
      var readStream = sftp.createReadStream(filePath);
      var text = "";
      readStream.on('error', function (err) {
        sftp.end();
        return deferred.reject(err);
      });

      readStream.on('data', function(chunk) {
        text += chunk;
      });

      readStream.on('end', function() {
        sftp.end()
        return deferred.resolve(text);
      });
    });

     return deferred.promise;
   }

   var uploaderQueue = async.cargo(function (task, callback) {
      // Starts SFTP session
      connection.sftp(function (err, sftp) {
        async.each(task, function(worker, done) {
            var parityCheck = true;
            var totalCollector = 0;
            sftp.fastPut(worker.local, worker.remote,
                {step:function(total_transferred,chunk,total){
                       if (parityCheck) {
                           totalCollector = total;
                           parityCheck = false;
                       }
                       worker.data(total_transferred);
            },concurrency:25},
            function(err){
                // Cleans up processing
                worker.finish(totalCollector);

                // Processes errors
                if (err) {
                    $log.debug("download error: " + worker.name);
                    $log.debug("worker.local: " + worker.local);
                    $log.debug("worker.remote: " + worker.remote);
                    $log.debug(err);
                    done(err);
                } else {
                    //$log.debug("SFTP :: fastPut success");
                    done(null);
                }
            });
        }, function(err) {
            sftp.end();
            callback(err);
        });
      });
   }, 10);

    var uploadFile = function (src, dest, callback, finished, error) {
        var localFiles = [];
        var mkFolders = [];
        var filesTotal = 0;
        var currentTotal = 0;
        var sizeTotal = 0;
        var counter = 0;
        var BFSFolders = function(currDir, bfs) {
            //Recursively builds directory structure
            fs.readdir(currDir, function(err, files) {
                async.each(files, function(file, done) {
                     fs.stat(currDir + '/' + file, function(err, stats) {
                         if(err){
                             done(err);
                         } else if (stats.isFile()) {
                             localFiles.push(currDir + '/' + file);
                             sizeTotal += stats.size;
                             filesTotal += 1;
                             done(null);
                         } else if (stats.isDirectory()) {
                             if (mkFolders.indexOf(currDir) > -1) {
                                 mkFolders[mkFolders.indexOf(currDir)] = currDir + '/' + file;
                             } else {
                                 mkFolders.push(currDir + '/' + file);
                             }
                             BFSFolders(currDir + '/' + file, function(err) {
                                 if(err) {
                                     $log.debug("BFS Error on: " + currDir + "/" + file);
                                     $log.debug(err);
                                 }
                                 done(null);
                             });
                         }
                     });
                }, function(err) {
                    bfs(err);
                });
            });
        }
        // Starts the connection
        async.waterfall([
            function(water) {
               fs.stat(src.replace(/\/$/, ''), function(err, stats) {
                   if(stats.isDirectory()){
                       mkFolders.push(src);

                       BFSFolders(src.replace(/\/$/, ''), function(err) {
                           $log.debug("New Folders: ");
                           $log.debug(mkFolders);
                           // Set destination directory setting
                           dest = dest + path.posix.basename(src) + '/';
                           water(err, true);
                       });
                   } else if (stats.isFile()) {
                       localFiles.push(src);
                       sizeTotal += stats.size;
                       filesTotal += 1;
                       src = path.posix.dirname(src);
                       water(err, false);
                   }
               });
            },
            function(arg, water) {
               if (arg) {
                   // Get the attributes of the source directory
                   makeDir(mkFolders, src, dest, function(err) {
                       water(err);
                   });
               } else {
                   water(null);
               }
            },
            function(water) {
               // Setting the I/O streams
               async.each(localFiles, function(file, done) {
                  // Process to console
                  // $log.debug( "SFTP has begun");
                  // $log.debug( "Value of localPath: " + file );

                  uploaderQueue.push({
                      name: file, local: file,
                      remote: dest + path.posix.relative(src,file),
                      data: function(total_transferred) {
                          callback(total_transferred, counter, filesTotal, currentTotal, sizeTotal);
                          return 0;
                      }, finish: function(finishTotal) {
                          currentTotal += finishTotal;
                          counter += 1;
                          callback(0, counter, filesTotal, currentTotal, sizeTotal);
                      }}, function(err) {
                          done(err);
                      });

               }, function(err) {
                  water(err);
               });
            }],
            function(err) {
                if(err) {
                    analyticsService.event('file upload', 'fail');
                    $log.debug(err);
                    error(err);
                } else {
				    analyticsService.event('file upload', 'success', '', sizeTotal);
                    finished();
                }
       });
    }

   var remoteStatQueue = async.cargo(function (task, callback) {
      // Starts SFTP session
      connection.sftp(function (err, sftp) {
         // Debug to console
         // $log.debug("SFTP Stat has begun");
         // $log.debug("Reading server file");

         // Read directory
         async.each(task, function(worker, done) {
            sftp.stat(worker.name, function(err, stats) {
               if (err) {
                  $log.debug("Failure on directory: " + task.name);
                  $log.debug(err);
                  done(err);
               } else {
                  worker.caller(stats);
                  done(null);
               }
            });
         }, function(err) {
            sftp.end();
            if (err) {
                callback(err);
            } else {
                callback(null);
            }
         });
      });
   }, 20);

   // Reads filesystem directory on server
   var remoteStat = function(directory) {
      var deferred = $q.defer();

      remoteStatQueue.push({name: directory,
          caller: function(stat){
              deferred.resolve(stat);
              return 0;
          }}, function(err) {
              if (err) {
                  deferred.reject(err);
              }
      });

      return deferred.promise;
   }

    var downloaderQueue = async.cargo(function (task, callback) {
      // Starts SFTP session
     connection.sftp(function (err, sftp) {
        async.each(task, function(worker, done) {
          var totalCollector = 0;
          var parityCheck = true;
          sftp.fastGet(worker.local, worker.remote,
             {step:function(total_transferred,chunk,total){
                       if (parityCheck) {
                           totalCollector = total;
                           parityCheck = false;
                       }
                       worker.data(total_transferred);
             },concurrency:25},
             function(err){
                // Finishes processing and sends total
                worker.finish(totalCollector);

                // Processes errors
                if (err) {
                   $log.debug("download error: " + worker.name);
                   $log.debug("task.local: " + worker.local);
                   $log.debug("task.remote: " + worker.remote);
                   $log.debug(err);
                   done(err);
                } else {
                   //$log.debug("SFTP :: fastPut success");
                   done(null);
                }
          });
        }, function(err) {
            sftp.end();
            callback(err);
        });
      });
   }, 10);

   // Functionality to download a file from the server
   var downloadFile = function(localPath, remotePath, callback, finished, error) {
        var remoteFiles = [];
        var mkFolders = [];
        var filesTotal = 0;
        var currentTotal = 0;
        var sizeTotal = 0;
        var counter = 0;

        var BFSFolders = function(currDir, bfs) {
            readDir(currDir).then(function(data) {
                async.each(data, function(file, done) {
                    if (file.longname.charAt(0) != 'd') {
                        remoteStat(currDir + '/' + file.filename).then(function(stats) {
                            remoteFiles.push(currDir + '/' + file.filename);
                            sizeTotal += stats.size;
                            filesTotal += 1;
                            done(null);
                        });
                    } else if (file.longname.charAt(0) == 'd') {
                        if (mkFolders.indexOf(currDir) > -1) {
                            mkFolders[mkFolders.indexOf(currDir)] = currDir + '/' + file.filename;
                        } else {
                            mkFolders.push(currDir + '/' + file.filename);
                        }
                        BFSFolders(currDir + '/' + file.filename, function(err) {
                            if(err) {
                                $log.debug("BFS Error on: " + currDir + '/' + file.filename);
                                $log.debug(err);
                            }
                            done(null);
                        });
                    }
                }, function(err) {
                    bfs(err);
                });
            });
        }
        // Starts the connection
        async.waterfall([
            function(water) {
               remoteStat(remotePath.replace(/\/$/, '')).then(function(data) {
                   if (data.isDirectory()) {
                       mkFolders.push(remotePath);
                       BFSFolders(remotePath.replace(/\/$/, ''), function(err) {
                           $log.debug("New Folders: ");
                           $log.debug(mkFolders);
                           // Set destination directory setting
                           localPath = localPath + path.basename(remotePath) + '/';
						   localPath = path.normalize(localPath);
                           water(err, true);
                       });
                   } else if (data.isFile()) {
                       remoteFiles.push(remotePath);
                       sizeTotal += data.size;
                       filesTotal += 1;
                       remotePath = path.dirname(remotePath);
                       water(null, false);
                   }
               });
            },
            function(arg, water) {
               if (arg) {
                   // Get the attributes of the source directory
                   lmakeDir(mkFolders, remotePath, localPath, function(err) {
                       water(err);
                   });
               } else {
                   water(null);
               }
            },
            function(water) {
               // Setting the I/O streams
               async.each(remoteFiles, function(file, done) {
                  // Process to console
                  downloaderQueue.push({
                      name: file, local: file,
                      remote: localPath + path.relative(remotePath,file),
                      data: function(total_transferred) {
                          callback(total_transferred, counter, filesTotal, currentTotal, sizeTotal);
                          return 0;
                      }, finish: function(finishTotal) {
                          currentTotal += finishTotal;
                          counter += 1;
                          callback(0, counter, filesTotal, currentTotal, sizeTotal);
                          return 0;
                      }}, function(err) {
                          done(err);
                      });
               }, function(err) {
                  //sftp.end();
                  water(err);
               });
            }],
            function(err) {
                if(err) {
                    analyticsService.event('file download', 'fail');
                    $log.debug(err);
                    error(err);
                } else {
				    analyticsService.event('file download', 'success', '', sizeTotal);
                    finished();
                }
       });

   };

   /**
    * Allows a user to quickly download a file from the server
    * @method quickDownload
    * @memberof HCCgo.connectionService
    * @param {String} remotePath - Where the file resides on the server
    * @param {String} localPath - Location where the file will be sent
    * @returns {Promise} A promise denoting whether the process was successful or not
    */
   var quickDownload = function(remotePath, localPath) {
       var deferred = $q.defer();
       connection.sftp(function (err, sftp) {
           if (err) {
               deferred.reject(err);
           }
           else
           {
               sftp.fastGet(remotePath, localPath, function(err) {
                   console.log("File transfer successful!");
                   sftp.end();
                   deferred.resolve(true);
               });
           }
       });
       return deferred.promise;
   };
   return {
   runCommand: runCommand,
   getUsername: getUsername,
   uploadFile: uploadFile,
   downloadFile: downloadFile,
   quickDownload: quickDownload,
   submitJob: submitJob,
   closeStream: closeStream,
   readDir: readDir,
   makeDir: makeDir,
   localSize: localSize,
   getHomeWD: getHomeWD,
   getWorkWD: getWorkWD,
   uploadJobFile: uploadJobFile,
   checkWritable: checkWritable,
   getFileText: getFileText,
   getFileSize: getFileSize,
   connectionDetails: connectionDetails,
   initiateConnection: function initiateConnection(username, password, hostname, cluster, needInput, completed) {

     var Client = require('ssh2').Client;
     var conn = new Client();
     try {

     if (username.length === 0 || password.length === 0)
     {
         completed("0 Length username or password given");
     }

     conn.on('ready', function() {
      connectionDetails['username'] = username;
      connectionDetails['hostname'] = hostname;
      connectionDetails['shorthost'] = cluster;
      completed(null);
      $rootScope.$broadcast("login");
      $log.log('Client :: ready');
     }).on('error', function(err) {
      $log.error(err);
      completed(err);


     }).on('keyboard-interactive', function(name, instructions,  instructionsLang, prompts, finishFunc) {
      $log.log("Name: " + name + ", instructions: " + instructions + "prompts" + prompts);


      if (prompts[0].prompt == "Password: ") {
        finishFunc([password]);
      } else {
        $log.log(prompts[0].prompt);
        needInput(prompts[0].prompt, function(input) {
            if (input < 1 || input > 3)
            {
                completed("Duo error");
            }
         finishFunc([input]);
        });
      }
      }).on('close', function(hadError) {
        $log.error("Connection closed");
        if (hadError) $log.error("Error while closing connection");
        notifierService.error("Disconnected from cluster", "Disconnection");
        $location.path("/");
      }).on('end', function() {
        $log.error("Connection ended");

     }).connect({
      host: hostname,
      username: username,
      tryKeyboard: true,
      readyTimeout: 99999999,
      debug: function(message) {
      //$log.log(message);
      }

    });
     }
     catch(err) {
         completed(err);
     }

     connection = conn;

   }

   }

}]);