D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imh-python
/
lib
/
python2.7
/
site-packages
/
cmu_ded
/
Libs
/
Filename :
common.py
back
Copy
""" Functions and definitions that don't really belong anywhere. Created by Alex K. """ import os import string import shutil import random import subprocess from shutil import move from logthis import LogThis from tempfile import mkstemp from datetime import datetime from os import fdopen, remove from distutils.dir_util import copy_tree def tailored_move(func): """ Decorator Main focus of this is to move files depending on some of the variables set in the function. The files we are moving is dependant on what is returned by func. I recommend yielding the files you want to move in the function you are decorating however a list will work. The first case is assuming we are in vz, whereas the second case assumes we are in cmu_ded with an ssh connection For the second case, we then assume its an instance on tbSSH. :param func: The function we are decorating. :return: Moves the files. True if everything is kosher. False if not. """ def wrapper(self, *args, **kwargs): files_to_move = func(self, *args, **kwargs) if self.rootdir1 and self.rootdir2 and not self.ssh_connection: # We need to move files via VZ in this case. for iteration in files_to_move: if not iteration: continue self.info("Migrating %s via VZ" % iteration) if os.path.exists(self.rootdir2 + iteration): shutil.move(self.rootdir2 + iteration, self.rootdir2 + iteration + "-" + datetime.now().strftime('%y%m%d%s')) try: shutil.copy(self.rootdir1 + iteration, self.rootdir2 + iteration) except IOError as e: if e.errno == 21: copy_tree(self.rootdir1 + iteration, self.rootdir2 + iteration, preserve_symlinks=0) return True elif self.ssh_connection: # Checking the connection to validate everything is working. if self.ssh_connection.check_connection(): for iteration in files_to_move: self.info("Migrating %s via ssh" % iteration) # pull_files uses ftpget so you have to give pull_files the directory and then the file. split_iteration = iteration.split("/") file_name = split_iteration[-1] directory = '/'.join(split_iteration[:-1]) self.debug("directory = %s :: file_name = %s" % (directory, file_name)) self.ssh_connection.pull_files( remote_dir=self.rootdir1 + directory, local_dir=self.rootdir2 + directory, file_to_transfer=file_name, recursive=True ) # Validating all the files/folders have been moved. did_not_fail = True for iteration in files_to_move: if not os.path.exists(self.rootdir2 + iteration): self.warning("Failed to transfer %s" % iteration) did_not_fail = False return did_not_fail else: self.error("Something went wrong with tailored_move.") return False return wrapper def tailored_exec(new_server=True): """Required to pass new_server.""" def real_tailored_exec(func): """ Same conecept as tailored_move however we are doing this for commands we want to run. In the function you are decorating, yield the commands you want to run or return a list of commands. If you want to run the command on a old server, pass False into the decorator. :param func: Function we want to exec. :return: """ def wrapper(self, *args, **kwargs): self.debug("Entering wrapper for tailored_exec.") commands_to_run = func(self, *args, **kwargs) if self.rootdir1 and self.rootdir2: if new_server: veid = self.rootdir2.split('/')[-1] else: veid = self.rootdir1.split('/')[-1] self.info("Exec via vzctl for %s" % veid) return_string = '' for command in commands_to_run: run_command = subprocess.Popen( ['vzctl', 'exec', veid, command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE ) return_string += run_command.communicate()[0] return return_string elif self.ssh_connection: self.debug("Exec new_server=%s, via cmu_ded" % new_server) if new_server: return_string = "" for command in commands_to_run: command = command.split() run_command = subprocess.Popen( command, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE ) return_string += run_command.communicate()[0] return return_string else: """Doing everything on the old server.""" return_string = "" if self.ssh_connection.check_connection(): for command in commands_to_run: self.debug("Running %s" % command) return_string += self.ssh_connection.string_exec(command) return return_string else: self.warning("Class isn't setup properly.") return False return wrapper return real_tailored_exec class Migration(LogThis): """ Items that are there during a migration. Also functions that might be useful. """ def __init__(self, logger, dev_mode=False): super(Migration, self).__init__(logger=logger, dev_mode=dev_mode) self.rootdir1 = '' self.rootdir2 = '' self.ssh_connection = None # Supporting functions: @tailored_exec() def local_cmd_exec(self, cmd): """ Runs a specific command locally. :return: Returns the output of the command. """ if isinstance(cmd, list): for command in cmd: yield command else: yield cmd @tailored_exec(False) def remote_cmd_exec(self, cmd): """ Runs a specific command remotely. :return: Returns the output of the command. """ if isinstance(cmd, list): for command in cmd: yield command else: yield cmd @tailored_move def move_file(self, file): if isinstance(file, list): for item in file: yield file else: yield file def file_replace_line(file_path, search, replacement): """ Replacing any line that contains the a pattern with replacement :param file_path: File path. :param search: Pattern to search for. :param replacement: Replacement line. :return: Nothing. """ # Create temp file fh, abs_path = mkstemp() with fdopen(fh, 'w') as new_file: with open(file_path) as old_file: for line in old_file: if search in line: new_file.write(replacement) else: new_file.write(line) # Remove original file remove(file_path) # Move new file move(abs_path, file_path) def to_string(array): """ Changing an Array into a string for printing. Mainly everything is formatting. :param array: Can be either a list or tuple. :return: Returns a string of each element. """ return_string = '\n\n\t\t\t' # Translated into if Array is none or empty, set the return string to "None!" if array is None: return_string = "\nAn exception has occured during the execution of the program. Just in an run --resume to see what the error is."\ "You can also look in the /opt/cmu_ded/logs/cmu.debug log to find out what the error is. It looks like a normal python"\ "traceback.\n" elif array is True or not array: return_string = "None!" elif isinstance(array, tuple): for list_item in array: try: return_string += ''.join(list_item).decode('utf-8') + '\n\t\t\t' except (UnicodeDecodeError, UnicodeEncodeError, AttributeError): return_string += ''.join(list_item) + '\n\t\t\t' elif isinstance(array, list): for list_item in array: try: return_string += ''.join(list_item).decode('utf-8') + '\n\t\t\t' except (UnicodeDecodeError, UnicodeEncodeError, AttributeError): return_string += ''.join(list_item) + '\n\t\t\t' elif isinstance(array, dict): for dict_key, dict_value in array.iteritems(): try: return_string += ''.join(dict_key).decode('utf-8') + ":\t" +\ ''.join(dict_value).decode('utf-8') + "\n\t\t\t" except (UnicodeDecodeError, UnicodeEncodeError, AttributeError): return_string += ''.join(dict_key).strip() + ":\t" + ''.join(dict_value).strip() + "\n\t\t\t" return return_string def gen_random_string(string_length): """ Generating a random string. Randomly picks if the character should be a special character, number, or letter It then randomly chooses between a predefined list set in string. :param string_length: How long you want the string to be. :return: Psuedo - Random string. ( To the best of its ability. ) """ result = "" for _ in range(string_length): rand_result = random.randint(0, 2) if rand_result == 0: result += str(random.randint(0, 9)) elif rand_result == 1: result += random.choice(string.ascii_letters) elif rand_result == 2: result += random.choice(string.punctuation) return result def generate_remote_ssh_key(sup_server, sup_access_hash): """ Generates an SSH key on the server All values are attempting to be random :param sup_server: Server we are connecting too. :param sup_access_hash: Access hash we are using to auth. :return: Returns the access hash and its passphrase. The flow should go along the lines of: Generate an SSH Key. Import the SSH key on the remote host. Lastly authorize the SSH key. """ with open(sup_access_hash) as access_hash: cleaned_access_hash = ''.join(access_hash.read().splitlines()) keyname = gen_random_string(12) key_passphrase = gen_random_string(15) # Random test WHM_API test # print whm_api('listaccts', version=1, whmhost=sup_server, user='root', access_hash=cleaned_access_hash) # Generate the SSH key. Don't want to do this over WHM as it's only for the root user. # https://stackoverflow.com/questions/2466401/how-to-generate-ssh-key-pairs-with-python # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/rsa/ print("Generated SSH key with the following information: keyname: %s PassPhrase: %s" % (keyname, key_passphrase)) print("[!!] Generate Remote SSH Key Not setup yet.")