D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imh-python
/
lib
/
python2.7
/
site-packages
/
rads
/
Filename :
shared.py
back
Copy
""" Shared RADS common functions """ import os import re import gzip import pwd import sh import yaml import arrow import platform import json import grp from rads.cpanel_api import api_success, whm_api with open('/etc/rads/rads.json', 'r') as rads_conf: _rads_json = json.load(rads_conf) SYS_USERS = _rads_json['users']['system'] SYS_MYSQL_USERS = _rads_json['users']['mysql'] STAFF_GROUPS = _rads_json['users']['our_groups'] def is_t1_safe(username): """ Checks whether the user is safe for T1/T2C to operate on """ restricted_users = SYS_USERS sec_user = get_secure_username() user_groups = [g.gr_name for g in grp.getgrall() if username in g.gr_mem] if sec_user is not None: # will be None on VPS/Ded restricted_users.append(sec_user) for group in user_groups: if group in STAFF_GROUPS: return False if (username in restricted_users) or (not is_cpanel_user(username)): return False return True def acct_safe_size(user, max_mb=10240, skip_home=False): """Determine if an account is too large to package""" try: du_out = sh.du( sh.glob('/var/lib/mysql/%s_*' % user), c=True ).stdout mysql_mb = int(du_out.splitlines()[-1].split()[0]) / 1024 except sh.ErrorReturnCode_1: mysql_mb = 0 # no databases if mysql_mb > max_mb: return False if skip_home: return True quota = sh.quota.bake(v=True) try: home_mb = int(quota(user).stdout.splitlines()[2].split()[1]) / 1024 except (sh.ErrorReturnCode, ValueError): return False if home_mb + mysql_mb > max_mb: return False return True def get_secure_username(): """get the secure user's username""" hostname = platform.node() secure_hostname = re.sub('^[a-z]+', 'secure', hostname) try: with open('/etc/userdomains') as userdomains: for line in userdomains.read().splitlines(): domain, user = line.split(': ') if domain == secure_hostname: return user.rstrip() # splitlines handled lstrip return None except IOError: return None def read_historic_usage(offset): """ Returns the contents of archived sa data collected by RADS. offset is an integer offset intended to specify the negative offset in days to retrieve data for. 1 is added to offset to get the day. """ offset = offset + 1 # get the time offset days ago offset_arw = arrow.now().replace(days=-offset) # month needs to be the short string representation of the month (i.e. Jun) month, day = offset_arw.format('MMM DD').split() sadatadir = os.path.join( '/var/log/sa/rads', month, day ) if not os.path.isdir(sadatadir): return None histfiles = [ os.path.join(sadatadir, x) for x in os.listdir(sadatadir) if 'avg' in x ] histfile = sorted(histfiles, key=os.path.getmtime)[-1] try: with gzip.open(histfile) as filehandle: contents = filehandle.read().splitlines() except IOError: return None return contents def historic_usage(historic_days=1): """Returns historic usage dictionary""" hist_cp_dict = {} for day in map(read_historic_usage, xrange(historic_days)): if day is None: # error in read_historic_usage continue # just read the next day's sa data for hist_data in day: sa_line = hist_data.split() # example format below. username and x.xxcp are needed # userna5 264 0.11% 0.21re 0.00% 0.16cp 0.45% 58176k if len(sa_line) == 8: user = sa_line[0] usage = float(sa_line[5][:-2]) # strip off 'cp' try: hist_cp_dict[user].append(usage) except (NameError, KeyError): hist_cp_dict[user] = [usage] return hist_cp_dict def is_cpanel_user(username): """Returns True if the user is a valid cPanel user, False otherwise""" if not isinstance(username, str): if isinstance(username, unicode): username = username.encode('ascii', 'ignore') else: return False # neither unicode nor str try: user_homedir = pwd.getpwnam(username).pw_dir except (KeyError, AttributeError): return False if not os.path.isdir(user_homedir): # User file exists, but home does not return False user_file = os.path.join('/var/cpanel/users', username) userdata_folder = os.path.join('/var/cpanel/userdata', username) return all(( os.path.exists(user_file), os.path.exists(userdata_folder) )) def get_users(): """Returns a list of cPanel users from /etc/trueuserowners""" with open('/etc/trueuserowners') as userowners: userdict = yaml.load(userowners) assert isinstance(userdict, dict),"userdict is not a dict! Check /etc/trueuserowners!" return userdict.keys() def get_homedir_path(username): """get home directory path. return string path or None on error""" try: homedir = pwd.getpwnam(username).pw_dir except (KeyError, AttributeError): return None if re.match(r'/home[0-9]*/\w+', homedir) is None: # Even though we fetched the homedir successfully from /etc/passwd, # treat this as an error due to unexpected output. If the result was # '/' for example, some calling programs might misbehave or even # rm -rf / depending on what it's being used for return None return homedir def is_suspended(user): """Return True if the user is suspended, False otherwise""" return os.path.exists('/var/cpanel/suspended/%s' % user) def get_primary_domain(username): """Read primary domain from userdata or return None on error""" userdata_main_path = os.path.join( '/var/cpanel/userdata', username, 'main' ) try: with open(userdata_main_path, 'r') as userdata_filehandle: return yaml.load(userdata_filehandle)['main_domain'] except (yaml.YAMLError, KeyError, IOError): return None def top_users(how_many=10): """ Run sa to get a list of the top users and yield up to the integer value of 'how_many' tuples in the form (user, cp) where user is a cPanel user name and cp is a float containing current cp """ sec_usr = get_secure_username() skip_user_re = re.compile( r'(?:%s%s)\b' % ( '' if sec_usr is None else '%s|' % sec_usr, '|'.join(SYS_USERS) ) ) sa_output = sh.sa(m=True).stdout.splitlines() user_usage = ( line.split() for line in sa_output if not skip_user_re.match(line) ) count = 1 for sa_record in user_usage: # sa_record will be in the form of # ['userna5', '125', '3395.59re', '0.00cp', '25711k'] user = sa_record[0] if not is_cpanel_user(user): continue if count > how_many: raise StopIteration yield (user, sa_record[3][:-2]) count += 1 def switch_to_ip(username, ip_addr, debug=False): """ Attempt to switch the user's IP, return True if success or user is already on IP. return False if failure. """ ret = whm_api( 'setsiteip', version=1, timeout=480, user=username, ip=ip_addr ) if debug: return api_success(ret), ret return api_success(ret)