利用python修改DNS解析【未通过】

手动添加解析太过于麻烦,于是就写了一个python脚本来优化添加解析的流程。这里使用的是dnspod作为域名解析商,调用的其API。也算个dnspod的token利用工具吧。


dnspod.py
(下载链接:https://t.for-get.com/dnspod.py

环境:python 3.7

使用方法

修改第15行的LOGIN_TOKEN , API Token 生成方法详见:

完整的 API Token 是由 ID,Token 组合而成的,用英文的逗号分割.

python dnspod.py运行,Ctrl+Z退出。或python -c “from dnspod import *;command()” 执行单条命令

函数介绍

login()

尝试登陆,判断token有效性。返回值0表示登录成功。

user_log(show = True)

获取用户日志,如添加域名,删除域名等操作记录。当show为True时整理并展示结果,False表示只返回字典,默认显示,含义下同。

domain_log(domain, show = True)

获取域名日志,如添加A记录,删除MX记录等操作。domain为域名值,如domain = “for-get.com” ,所有domain参数都是必填,含义下同。

domains(show = True)

获取域名列表,即查询用户账户下所有域名。

records(domain, sub_domain = “”, record_type = “”, show = True)

查询域名下记录值,sub_domain为二级域名,如sub_domian = “@”表示根域名,含义下同。record_type表示记录类型,如record_type = “A” 表示A记录(不区分大小写)。当show为True时整理并展示结果。

Eg. records(“for-get.com”,”@”,”A”)

表示仅查询for-get.com下所有A记录

records(“for-get.com”,”@”)

表示查询for-get.com下所有类型记录

records(“for-get.com”,””,”A”)

表示查询for-get.com下所有A记录(包括所有子域名)。

records(“for-get.com”)

表示查询for-get.com下所有记录。

record(domain, record_id, show = True)

获取记录信息。record_id即记录ID,表示域名下的一条记录。

create(domain, sub_domain, record_type, value, mx = “”, record_line_id = “0”)

创建记录。mx表示mx优先级,当且仅当类型为MX的时候有效。record_line_id表示解析线路,0为默认,其他线路请参考官方说明文档。无返回,但创建成功时会展示record id。

Eg. create(“for-get.com”,”sub”,”A”,”127.0.0.1”)表示创建sub.for-get.com的一条A记录,值为127.0.0.1

modify(domain, record_id, sub_domain, record_type, value, mx = “”, record_line_id = “0”)

修改记录。record id定位,即修改record_id对应的记录。其余参数作用与create参数完全相同。

Eg. modify(“for-get.com”,”233”,”sub”,”A”,”127.0.0.1”) 表示将for-get.com下编号为233的记录修改为sub.for-get.com的一条A记录,值为127.0.0.1

remove(domain, record_id):

删除记录。

batch_remove(domain, record_id):

批量删除记录,此时record_id为列表或元组,或不含空格、逗号隔开的字符串,如“233,666,777”,可以接受get_record_id()的返回值。

Eg. batch_remove(“for-get.com”, “233,666,777”)表示删除for-get.com下记录id为233,666和777的记录。

ddns(domain, record_id = “”, sub_domain = “”, record_line_id = “0”)

更新动态DNS记录。Record_id和sub_domain二选一。推荐填写record_id。该命令只能动态更新本机公网ipv4地址到DNS,如果本地没有分配公网ip。此命令没有太多用途。 当仅填写sub_domain时,如果解析中含有多条A记录,默认修改第一条。命令含有检查本地ip。

Eg. ddns(“for-get.com”, “233”) 更新for-get.com下233记录为A记录,记录为本地公网ip。

ddns(“for-get.com”, “”,”sub”)

更新sub.for-get.com下的一条A记录为本地公网IP。

ddns(“for-get.com”, “233”,”sub”)

更新for-get.com下233记录为A记录,二级域名为sub,记录为本地公网IP。

get_record_id(domain, sub_domain = “”, record_type = “”, value = “”,show = True)

查询记录的记录ID。 相当于records()命令,不过返回含有id的列表,且可以单独查询记录值。返回列表第一个元素即[0]为记录id的数量。(忘记了len可以判断数量了23333)。另外填写value,且类型为NS,MX,CNAME等时,需要注意结尾有点(.),如“for-get.com.”(PS创建时不需要刻意添加,会自动生成,但是查询时需要)。

get_ipv4()

获取ipv4地址,返回ipv4地址。调用ip.sb查询。

get_ipv6()

获取ipv6地址,返回ipv6地址。调用ip.sb查询。

info()

展示用户下所有域名,及所有域名的记录。

ipinfo(show = True)

展示ip信息,调用get_ipv4()、get_ipv6(),返回字典。
返回示例 {“ipv4”:”127.0.0.1”,“ipv6”:”::1”}

command(s = “”):

命令模式。

命令模式

使用command()函数。即python dnspod.py的程序界面(类似于CLI shell)。原理是将字符串使用split()拆分,通过函数映射执行命令。

具体方法

1、建立function_mappings字典,键对应函数名。

2、拆分字符串s = s.split()

3、执行命令function_mappings[s[0]](*s[1 : ]) 。

command() 使输入变得简单。在本脚本中,如果遇到留空参数,需要使用“?“(不含双引号)代替。

支持的命令(全小写)

login
即login()
userlog
即user_log(),别名user_log
domainlog
即domain_log(),别名domain_log
domains
即domains()
records
即records(),别名find
Eg. find for-get.com
find for-get.com @
find for-get.com ? A
……
record
即record(),别名id
Eg. record for-get.com 2333
create
即create(),别名add
Eg. add for-get.com sub A 127.0.0.1
modify
即modify(),别名change
Eg. modify for-get.com 2333 @ A 127.0.0.1
remove
即remove(),别名del,delete
Eg. del for-get.com 2333
batch_remove
即batch_remove(),别名batchremove, batchdel, batch_del
Eg. batchremove for-get.com 233,666,777
ddns
即ddns()
Eg. ddns for-get.com 233
ddns for-get.com ? sub
getid
即get_record_id(),别名get_record_id, getrecordid
Eg. getid for-get.com @
getid for-get.com ? A
getid for-get.com ? ? 127.0.0.1
……
info
即info(),别名show
ip
即ipinfo(),别名getip,ipinfo
后记
异常情况尝试删除目录下“pycache”如果没有请掠过。
print有点多,自行删除优化。
dns简介
Forget? For get! - A Useless Website

源码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#Author: for-get
#Site: https://blog.for-get.com
import re
import requests
import os
import time
import sys
try:
    import json
except Exception:
    import simplejson as json
global LOGIN_TOKEN
LOGIN_TOKEN = "123456,aabbccddeeffaabbccddeeffaabbccddeeff"
def login():
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\tlogin()"))
    print("Login token: " + LOGIN_TOKEN)
    api = "https://dnsapi.cn/User.Detail"
    data = {
        "login_token" : LOGIN_TOKEN,
        "format": "json"
    }
    try:
        r = requests.post(api, data, timeout=5)
    except:
        print("Network Error!")
        exit(-1)
    if int(r.json()["status"]["code"]) is 1:
        print(time.strftime("[%Y/%m/%d %H:%M:%S]\tLogin Success."))
    else:
        print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
        exit(-1)
def user_log(show = True):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"user_log({show})")
    api = "https://dnsapi.cn/User.Log"
    data = {
        "login_token" : LOGIN_TOKEN,
        "format": "json",
    }
    r = requests.post(api, data, timeout=5)
    if int(r.json()["status"]["code"]) is not 1:
        print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
    elif show:
        for i in range(1, len(r.json()["log"]) + 1):
            print(r.json()["log"][len(r.json()["log"]) - i])
    return r.json()["log"]
def domain_log(domain, show = True):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"user_log(\"{domain}\", {show})")
    api = "https://dnsapi.cn/Domain.Log"
    data = {
        "login_token" : LOGIN_TOKEN,
        "domain" : domain,
        "offset" : 0,
        "length" : 3000,
        "format": "json",
    }
    r = requests.post(api, data, timeout=5)
    if int(r.json()["status"]["code"]) is not 1:
        print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
    elif show:
        for i in range(1, len(r.json()["log"]) + 1):
            print(r.json()["log"][len(r.json()["log"]) - i])
    return r.json()["log"]
def domains(show = True):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"domains({show})")
    api = "https://dnsapi.cn/Domain.List"
    data = {
        "login_token" : LOGIN_TOKEN,
        "format": "json",
    }
    r = requests.post(api, data, timeout=5)
    if int(r.json()["status"]["code"]) is not 1:
        print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
    elif show:
            print("Domain_ID\tDomain")
            for i in range(0, int(r.json()["info"]["domain_total"])):
                print(str(r.json()["domains"][i]["id"]) + "\t" + r.json()["domains"][i]["name"])
    return r.json()
def records(domain, sub_domain = "", record_type = "", show = True):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"records(\"{domain}\", \"{sub_domain}\", \"{record_type}\", {show})")
    api = "https://dnsapi.cn/Record.List"
    record_type = record_type.upper()
    sub_domain = sub_domain.lower()
    data = {
        "login_token" : LOGIN_TOKEN,
        "format": "json",
        "domain" : domain,
        "offset" : 0,
        "length" : 3000,
        "sub_domain" : sub_domain,
        "record_type" : record_type
    }
    r = requests.post(api, data, timeout=5)
    if int(r.json()["status"]["code"]) is not 1:
        print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
    elif show:
        print("Record_ID\tName\tType\tValue")
        for i in range(0, int(r.json()["info"]["records_num"])):
            if str(r.json()["records"][i]["type"]) == "MX":
                print(str(r.json()["records"][i]["id"]) + "\t" + r.json()["records"][i]["name"] + "\t" + "MX: " + r.json()["records"][i]["mx"] + "\t" + r.json()["records"][i]["value"])
            else:
                print(str(r.json()["records"][i]["id"]) + "\t" + r.json()["records"][i]["name"] + "\t" + r.json()["records"][i]["type"] + "\t" + r.json()["records"][i]["value"])
    return r.json()
def record(domain, record_id, show = True):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"record(\"{domain}\", \"{record_id}\", {show})")
    api = "https://dnsapi.cn/Record.Info"
    record_id = str(record_id)
    data = {
        "login_token" : LOGIN_TOKEN,
        "format": "json",
        "domain" : domain,
        "record_id" : record_id
    }
    r = requests.post(api, data, timeout=5)
    if int(r.json()["status"]["code"]) is not 1:
        print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
    elif show:
        if str(r.json()["record"]["record_type"]) == "MX":
            print("Record_ID\t" + str(r.json()["record"]["id"]))
            print("Sub_Domain\t" + r.json()["record"]["sub_domain"])
            print("MX\t" + r.json()["record"]["mx"])
            print("Value\t" + r.json()["record"]["value"])
        else:
            print("Record_ID\t" + str(r.json()["record"]["id"]))
            print("Sub_Domain\t" + r.json()["record"]["sub_domain"])
            print("Type\t" + r.json()["record"]["record_type"])
            print("Value\t" + r.json()["record"]["value"])
    return r.json()
def create(domain, sub_domain, record_type, value, mx = "", record_line_id = "0"):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"create(\"{domain}\", \"{sub_domain}\", \"{record_type}\", \"{value}\", \"{mx}\", \"{record_line_id}\")")
    api = "https://dnsapi.cn/Record.Create"
    record_type = record_type.upper()
    sub_domain = sub_domain.lower()
    data = {
        "login_token" : LOGIN_TOKEN,
        "format": "json",
        "domain" : domain,
        "sub_domain" : sub_domain,
        "record_type" : record_type,
        "record_line_id" : record_line_id, #10=0,Telecom;10=1,Unicom
        "value" : value,
        "mx" : mx    # mx = [1,20], only works in mx, 0
    }
    r = requests.post(api, data, timeout=5)
    if int(r.json()["status"]["code"]) is 1:
        print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "Create completed successful, Record id: " + r.json()["record"]["id"])
    else:
        print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
def modify(domain, record_id, sub_domain, record_type, value, mx = "", record_line_id = "0"):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"modify(\"{domain}\", \"{record_id}\", \"{sub_domain}\", \"{record_type}\", \"{value}\", \"{mx}\", \"{record_line_id}\")")
    value = value.lower()
    record_type = record_type.upper()
    sub_domain = sub_domain.lower()
    api = "https://dnsapi.cn/Record.Modify"
    data = {
        "login_token" : LOGIN_TOKEN,
        "format": "json",
        "domain" : domain,
        "record_id" : record_id,
        "sub_domain" : sub_domain,
        "record_type" : record_type,
        "record_line_id" : record_line_id, #10=0,Telecom;10=1,Unicom
        "value" : value,
        "mx" : mx    # mx = [1,20], only works in mx
    }
    r = requests.post(api, data, timeout=5)
    if int(r.json()["status"]["code"]) is 1:
        print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "Modify completed successful")
    else:
        print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
def remove(domain, record_id):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"remove(\"{domain}\", \"{record_id}\")")
    api = "https://dnsapi.cn/Record.Remove"
    record_id = int(record_id)
    data = {
        "login_token" : LOGIN_TOKEN,
        "format": "json",
        "domain" : domain,
        "record_id" : record_id,
    }
    r = requests.post(api, data, timeout=5)
    if int(r.json()["status"]["code"]) is 1:
        print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "Remove completed successful")
    else:
        print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
def batch_remove(domain, record_id):
    if type(record_id) is str:
        record_id = record_id.split(',')
    if len(record_id) == int(record_id[0]) + 1:
        for i in range(0, int(record_id[0])):
            remove(domain, int(record_id[i + 1]))
    else:
        for i in range(0, len(record_id)):
            remove(domain, int(record_id[i]))
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "Batch_remove completed successful")
def ddns(domain, record_id = "", sub_domain = "", record_line_id = "0"):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"ddns(\"{domain}\", \"{record_id}\", \"{sub_domain}\", \"{record_line_id}\")")
    if (sub_domain == "") and (record_id==""):
        print("Too few parameters!")
        return 0
    old_ip = ""
    new_ip = ""
    if sub_domain == "":
        try:
            tamp = record(domain, record_id, False)
            sub_domain = tamp["record"]["sub_domain"]
            old_ip = tamp["record"]["value"]
        except:
            print("Record ID Error!")
            return 0
    if record_id == "":
        record_id = get_record_id(domain, sub_domain, "A", "", False)
        if record_id[0] == 0:
            print("No sub_domain in A record, Please create first.")
            return 0
        else:
            record_id = record_id[1]
    ###IP Check###
    try:
        if old_ip == "":
            old_ip = record(domain, record_id, False)["record"]["value"] 
    except:
        print("Network Error or Record ID Error")
        return 0
    try:
        new_ip = get_ipv4().decode(encoding="utf-8",errors="strict")
        if str(old_ip) == str(new_ip):
            print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "No Change!")
            return 0
    except:
        print("Get ip timeout! Can't Check Change!")
    ##############
    api = "https://dnsapi.cn/Record.Ddns"
    record_id = int(record_id)
    sub_domain = sub_domain.lower()
    data = {
        "login_token" : LOGIN_TOKEN,
        "format": "json",
        "domain" : domain,
        "record_id" : record_id,
        "sub_domain" : sub_domain,
        "record_line_id" : record_line_id, #10=0,Telecom;10=1,Unicom
    }
    r = requests.post(api, data, timeout=5)
    if int(r.json()["status"]["code"]) is 1:
        print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "Ddns completed successful")
    else:
        print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
def get_record_id(domain, sub_domain = "", record_type = "", value = "",show = True):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"get_record_id(\"{domain}\", \"{sub_domain}\", \"{record_type}\", \"{value}\", {show})")
    value = value.lower()
    r = records(domain, sub_domain, record_type, False)
    id = []
    try:
        id.append(int(r["info"]["records_num"]))
    except:
        id.append(0)
    records_num = int(id[0])
    if records_num is 0:
        print("Record Not Found, Maybe sub_domain Error!")
        return id
    elif value != "":
        id[0] = 0
        for i in range(0, records_num):
            if value == str(r["records"][i]["value"]):
                id.append(int(r["records"][i]["id"]))
                sum = len(id)
                id[0] = sum - 1
                #if show == True:
                #    record(domain, id[sum - 1])    #record detail
        if id[0] == 0:
            print("Value Error!")
        elif show == True:
            print("Records_NUM\t" + str(id[0]))
            print("ID\t" + ''.join(str(id[1:]).split())[1:-1])
        return id
    else:
        for i in range(0, records_num):
            id.append(int(r["records"][i]["id"]))
            #if show == True:
            #    record(domain, id[i + 1])    #record detail
        if show == True:
            print("Records_NUM\t" + str(id[0]))
            print("ID\t" + ''.join(str(id[1:]).split())[1:-1])
        return id
def get_ipv4():
    api = "https://api-ipv4.ip.sb/ip"
    try:
        r = requests.get(api, timeout=5)
        return r.content.strip()
    except:
        r = "Error"
        return r.encode('utf-8')
def get_ipv6():
    api = "https://api-ipv6.ip.sb/ip"
    try:
        r = requests.get(api, timeout=5)
        return r.content.strip()
    except:
        r = "Error"
        return r.encode('utf-8')
def info():
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\tShow all domains information..."))
    domainslist = domains()
    for i in range(0, int(domainslist["info"]["domain_total"])):
        domain = domainslist["domains"][i]["punycode"]
        records(domain)
def ipinfo(show = True):
    print(time.strftime("[%Y/%m/%d %H:%M:%S]\tTry to get IP..."))
    ipv4 = get_ipv4().decode(encoding="utf-8",errors="strict")
    ipv6 = get_ipv6().decode(encoding="utf-8",errors="strict")
    ipaddr = { "ipv4" : ipv4, "ipv6" : ipv6 }
    if show:
        print("Your IPv4 Address:\t", ipv4)
        print("Your IPv6 Address:\t", ipv6)
    return ipaddr
def command(s = ""):
    function_mappings = {
        'login': login,
        'userlog': user_log,
        'user_log': user_log,
        'domain_log': domain_log,
        'domainlog': domain_log,
        'domains': domains,
        'records': records,
        'find': records,
        'record': record,
        'id': record,
        'create': create,
        'add': create,
        'modify': modify,
        'change': modify,
        'remove': remove,
        'batch_remove': batch_remove,
        'batchremove': batch_remove,
        'batchdel': batch_remove,
        'batch_del': batch_remove,
        'del': remove,
        'delete': remove,
        'ddns': ddns,
        'getid' : get_record_id,
        'getrecordid' : get_record_id,
        'get_record_id' : get_record_id,
        'info': info,
        'show': info,
        'ipinfo': ipinfo,
        'getip': ipinfo,
        'ip': ipinfo
    }
    if s == "":
        s = input(">>> ")
    s = s.lower()
    s = s.split()
    keyword = s[0]
    arg = len(s) - 1
    for i in range(0, len(s)):
        if s[i] == "?":
            s[i] = ""
    try:
        if arg <= int(function_mappings[keyword].__code__.co_argcount):
            function_mappings[keyword](*s[1 : arg + 1])
            return 0
        else:
            print("Too Many Arguments!")
            return -1
    except KeyError:
        print("Invalid function, try again.")
        return -1
    except:
        print("Too Few Parameters or Timeout or Unknown Error!")
        return -1
if __name__ == "__main__":
    print("DNSPOD SAMPLE CLIENT\n")
    try:
        login()
        print("\nDnspod Sample Shell")
        print("Input Ctrl + Z to exit!")
        while True:
            command()
    except:
        print("Error!")
        exit(-1)
  • 通过
  • 未通过
0 投票人