D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
usr
/
local
/
bin
/
Filename :
custom-ns
back
Copy
#!/opt/imh-python/bin/python3 """custom-ns: sets up custom nameservers""" from collections import OrderedDict import argparse import sys import os from subprocess import run, DEVNULL, PIPE, CalledProcessError import re from blessings import Terminal import rads from fix_reseller import fix_reseller, is_reseller, set_reseller_nameservers import named from select_menu import select_menu TERM = Terminal() MENU = OrderedDict( [ ('Reseller', ''), ('Domain', ''), ('NS1 Name', 'ns1'), ('NS1 IP', ''), ('NS2 Name', 'ns2'), ('NS2 IP', ''), ] ) SELECTION = 0 def do_setup(): """The MENU global dict contains chosen final, options. Proceeed""" print('proceeding to set up custom nameservers:\n') fix_reseller(MENU['Reseller']) print('Adding ns1/ns2 A or AAAA records ...') named.set_ns_records( MENU['Domain'], MENU['NS1 Name'], MENU['NS1 IP'], MENU['NS2 Name'], MENU['NS2 IP'], ) if is_reseller(MENU['Reseller']): print(f"Setting reseller nameservers for {MENU['Reseller']} ...") set_reseller_nameservers( MENU['Reseller'], MENU['Domain'], MENU['NS1 Name'], MENU['NS2 Name'] ) print('detecting firewall type ...', end=' ') fwtype, fwhook = detect_firewall() print(fwtype) if fwhook is not None: print(f'checking {fwtype} firewall configuration ...') fwhook() print('running /scripts/setupnameserver ...') named.enable_bind() print('enabling BIND in /etc/chkserv.d/chkservd.conf ...') named.enable_in_chkservd() print('Turning off DNS recursion in BIND ...') named.disable_recursion() print('Enabling all hosts to query nameservers ...') named.allow_query_all() print('Reloading BIND ...') named.rndc_reload() print('Restarting cPanel ...') try: run(['service', 'cpanel', 'restart'], stderr=DEVNULL, check=True) except CalledProcessError: print(TERM.bold_red('failed to start cpanel!')) def parse_line(ltxt): """parse config line; return tuple of key, value""" toks = re.match(r'^\s*([^=]+)\s*=\s*(.*)\s*$', ltxt) if toks is not None: lkey = toks.group(1).strip().upper() lval = toks.group(2).strip() if lval.startswith('"') or lval.startswith('\''): lval = lval[1:-1].strip() return (lkey, lval) return (None, ltxt) def config_firewall(confpath, keylist, svcname): """Make config changes to firewall configs""" with open(confpath, encoding='utf-8') as f: alines = f.readlines() changes = 0 for lnum, aline in enumerate(alines): ikey, ival = parse_line(aline) if ikey in keylist: ports = [ _f for _f in [tport.strip() for tport in ival.split(',')] if _f ] if '53' not in ports: ports.append('53') newline = '{}="{}"\n'.format(ikey, ','.join(ports)) alines[lnum] = newline changes += 1 print(f' -- added port 53 to {ikey}') else: print( f' -- port 53 already exists in {ikey}, no change required' ) if changes > 0: try: with open(confpath, 'w', encoding='utf-8') as f: f.writelines(alines) print(f"updated {svcname} configuration written OK") return 1 except Exception as e: print( "ERROR: failed to write updated configuration to", f"/etc/conf/conf.apf: {e}", ) else: print("firewall configuration up-to-date, no changes required") return 0 def setup_firewall_apf(): """Update APF configuration""" if ( config_firewall( '/etc/apf/conf.apf', ['IG_TCP_CPORTS', 'IG_UDP_CPORTS'], 'APF' ) == 1 ): print("reloading APF ...") run(['apf', '-r'], stdout=DEVNULL, check=False) def setup_firewall_csf(): """Update CSF configuration""" if config_firewall('/etc/csf/csf.conf', ['TCP_IN', 'UDP_IN'], 'CSF') == 1: print("reloading CSF ...") run(['csf', '-r'], stdout=DEVNULL, check=False) def detect_firewall(): """Determine installed firewall""" if os.path.exists('/etc/apf/conf.apf'): fwtype = "APF" fwhook = setup_firewall_apf elif os.path.exists('/etc/csf/csf.conf'): fwtype = "CSF" fwhook = setup_firewall_csf else: fwtype = "None or unsupported" fwhook = None return (fwtype, fwhook) def get_all_ips(): """Get all IP addresses on the server""" ips = [] ip_re = re.compile(r'\s*inet6? (?P<ip>[^/]*)') ret = run( ['ip', 'addr', 'show', 'scope', 'global'], stdout=PIPE, encoding='utf-8', check=False, ) for line in ret.stdout.splitlines(): if match := ip_re.match(line): ips.append(match.groupdict()['ip']) return ips def get_main_ip(): """Return main IP from wwwacct.conf if set, or a random IP if not""" with open('/etc/wwwacct.conf', encoding='utf-8') as wwwacct: for line in wwwacct.read().splitlines(): if not line.startswith('ADDR '): continue try: return line.split()[1] except IndexError: return get_all_ips()[0] return get_all_ips()[0] def show_main_menu(): """Show the main ncurses menu""" print(TERM.clear()) for index, key in enumerate(MENU): print('%13s:' % key, end=' ') # print the label, right justified if index == SELECTION: # if selected, display highligted print(TERM.bold_black_on_white, end=' ') else: print(TERM.normal, end=' ') print('%-60s' % MENU[key], end=' ') print(TERM.normal) print() # at the bottom of the menu, display a SETUP button with TERM.location(10, None): if SELECTION == len(MENU): print(TERM.bold_black_on_white, 'SETUP', TERM.normal) else: print(' SETUP') def get_default_reseller(): """Determine the default reseller to show in the UI""" # most servers will only have one reseller. grab the first valid reseller with open('/var/cpanel/resellers', encoding='utf-8') as res_file: for line in res_file.read().splitlines(): user = line.split(':')[0] if user not in rads.SYS_USERS and rads.is_cpuser(user): return user # this server was provisioned wrong. Grab the first valid user for user in os.listdir('/var/cpanel/users'): if user not in rads.SYS_USERS and rads.is_cpuser(user): return user # no valid users at all? We can't proceed until someone is provisioned sys.exit('No cpanel users found') def populate_form_from_reseller(): """Populate Domain, NS1, and NS2 given the chosen reseller""" dom = rads.get_primary_domain(MENU['Reseller']) if dom is None: for key in ('Domain', 'NS1', 'NS2'): MENU[key] = 'ERROR' else: main_ip = get_main_ip() MENU['Domain'] = dom MENU['NS1 IP'] = main_ip MENU['NS2 IP'] = main_ip def try_args(): """Try to parse a reseller name from commandline arguments Return True if possible, or False to use the ncurses menu""" parser = argparse.ArgumentParser( description='Setup Custom Nameservers', epilog='If no user is provided, an ncurses menu will display', ) parser.add_argument( 'user', metavar='USER', nargs='?', default=None, const=None, help='reseller on whos primary domain to setup custom nameservers', ) user = parser.parse_args().user if user is None: # no user was chosen from commandline arguments return False if not rads.is_cpuser(user): sys.exit(f'{user} is not a cPanel user') MENU['Reseller'] = user populate_form_from_reseller() if 'ERROR' in list(MENU.values()): sys.exit( 'unable to get primary domain for %s. Please use ncurses menu ' 'instead to specify it manually by providing no command arguments' ) question = 'Proceed with the following values?\n' for key, value in MENU.items(): question = f'{question}\n{key}: {value}' question = f'{question}\n\n' if rads.prompt_y_n(question): return True sys.exit('Specify another reseller or none to use the ncurses menu') def take_form_input(): """Allow typable fields in the main form""" with TERM.location(0, 10): key = dict(enumerate(MENU))[SELECTION] try: MENU[key] = input(f'Enter new value for {key}: ') except KeyboardInterrupt: pass # exit menu if __name__ == '__main__': # if we can collect a reseller from commandline arguments if try_args(): # skip the ncurses stuff, go straight to the setup do_setup() sys.exit(0) # pre-populate the ncurses menu with stuff MENU['Reseller'] = get_default_reseller() populate_form_from_reseller() print(TERM.enter_fullscreen()) show_main_menu() # main loop for the ncurses menu while True: try: keypress = rads.get_keypress() print(repr(keypress)) except KeyboardInterrupt: print(TERM.exit_fullscreen()) sys.exit(0) if keypress == 'ESC': print(TERM.exit_fullscreen()) sys.exit(0) elif keypress == 'ENTER': if SELECTION == 0: users = [ x for x in os.listdir('/var/cpanel/users') if rads.is_cpuser(x) ] try: MENU['Reseller'] = select_menu('Select a user', users) populate_form_from_reseller() except KeyboardInterrupt: pass # go back up one menu elif SELECTION == 3: try: MENU['NS1 IP'] = select_menu('Select an IP', get_all_ips()) except KeyboardInterrupt: pass # cancel elif SELECTION == 5: try: MENU['NS2 IP'] = select_menu('Select an IP', get_all_ips()) except KeyboardInterrupt: pass # go back up one menu elif SELECTION == len(MENU): # setup button print(TERM.exit_fullscreen()) do_setup() break else: take_form_input() elif keypress == "ARROW_DN": SELECTION += 1 elif keypress == "ARROW_UP": SELECTION -= 1 elif keypress == 'TAB': SELECTION = len(MENU) # jump to setup button SELECTION = SELECTION % (len(MENU) + 1) show_main_menu()