API:
Python
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# -*- coding: utf-8 -*- import hashlib import time import requests import prettytable # 宝塔面板操作类 class BtPanel: __BTURL = '' __APIKEY = '' __REQ = requests.session() # 初始化宝塔面板 def __init__(self, host: str, apisk: str): ''' :param host: 宝塔面板地址(末尾不加/) :param apisk: 宝塔面板API密钥 ''' self.__BTURL = host self.__APIKEY = apisk # 计算MD5 def __GetMD5(self, s: str): ''' 计算字符串的MD5值 :param s: 待计算的字符串 :return: MD5值 ''' m = hashlib.md5() m.update(s.encode('utf-8')) return m.hexdigest() # 签名计算 # 签名计算 def __GetToken(self): request_time = int(time.time()) # 获取请求时间戳 # 注意以下行中的改动:去除了逗号 request_token = self.__GetMD5(str(request_time) + '' + self.__GetMD5(self.__APIKEY)) return {'request_time': request_time, 'request_token': request_token} # 获取站点列表 def GetSites(self, showlog=True): ''' 获取宝塔面板站点列表 :showlog: 是否输出结果 :return: 站点列表数据 ''' if showlog: print('\n### 获取站点列表...') tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'], 'p': 1, 'limit': 100, 'order': 'id' } print(playload) res = self.__REQ.post(url=self.__BTURL + '/data?action=getData&table=sites', data=playload).json() # 判断请求是否成功 if 'status' in res and 'msg' in res and not res['status']: print('>>> 获取站点列表失败:', res['msg']) return False # 使用prettytable输出站点列表 if showlog: tb = prettytable.PrettyTable() tb.field_names = ['网站名', '站点类型', '备注', 'SSL域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '状态'] tb.align['网站名'] = 'l' tb.align['备注'] = 'l' tb.align['SSL域名'] = 'l' tb.align['证书品牌'] = 'l' for site in res['data']: site_status = '运行' if site['status'] == '1' else '停止' if site['ssl'] == -1: sslinfo = {'notAfter': '-', 'endtime': '-', 'subject': '-', 'issuer': '-'} else: sslinfo = {'notAfter': site['ssl']['notAfter'], 'endtime': site['ssl']['endtime'], 'subject': site['ssl']['subject'], 'issuer': site['ssl']['issuer']} # 安全访问字典键,使用get方法提供默认值 project_type = site.get('project_type', '未知类型') tb.add_row( [site['name'], project_type, site.get('ps', '无备注'), sslinfo['subject'], sslinfo['notAfter'], sslinfo['endtime'], sslinfo['issuer'], site_status]) print(tb) return res['data'] # 设置站点SSL证书 def SetSSL(self, site_name: str, ssl_cert_content: str, ssl_key_content: str): ''' 设置站点SSL证书 :param site_name: 站点名称(域名) :param ssl_cert_content: ssl证书内容 :param ssl_key_content: ssl私钥内容 :return: 设置结果 ''' print('\n### 设置站点SSL证书...') tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'], 'type': 0, 'siteName': site_name, 'key': ssl_key_content, 'csr': ssl_cert_content } res = self.__REQ.post(url=self.__BTURL + '/site?action=SetSSL', data=playload).json() # 判断请求是否成功 if 'status' in res and 'msg' in res and not res['status']: print('>>> 设置站点SSL证书失败:', res['msg']) return False print('>>>', res['msg']) # 输出结果 return res['status'] # 获取证书夹列表 def GetCertList(self, showlog=True): if showlog: print('\n### 获取证书夹列表...') tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'], 'force_refresh': 1 # 0:获取本地证书 1:获取云端证书 } print("请求参数:", playload) # 打印请求参数用于调试 res = self.__REQ.post(url=self.__BTURL + '/ssl?action=getcertlist', data=playload) print("响应内容:", res.text) # 打印响应内容用于调试 if res.status_code == 200: data = res.json() if 'status' in data and 'msg' in data and not data['status']: print('>>> 获取证书夹列表失败:', data['msg']) return False # 使用prettytable输出证书夹列表 if showlog and 'data' in data: tb = prettytable.PrettyTable() tb.field_names = ['域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '可选域名'] tb.align['域名'] = 'l' tb.align['证书品牌'] = 'l' tb.align['可选域名'] = 'l' for cert in data['data']: tb.add_row([cert['subject'], cert['info']['notAfter'], cert['endtime'], cert['info']['issuer'], cert['dns']]) print(tb) return data else: print('>>> API请求失败,状态码:', res.status_code) return False # 删除过期SSL证书 def DelExpiredSSL(self): ''' 删除过期SSL证书 :showlog: 是否输出结果 :return: 删除结果 ''' print('\n### 删除过期SSL证书...') certs = self.GetCertList(showlog=False) # 获取证书列表 if not certs: return False expiredcerts = [] # 过期证书列表 for cert in certs: if cert['endtime'] < 0: # 证书已过期 expiredcerts.append(cert['subject']) # 加入过期证书列表 tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'], 'local': 1, 'ssl_hash': cert['hash'] } res = self.__REQ.post(url=self.__BTURL + '/ssl?action=remove_cloud_cert', data=playload).json() # 判断请求是否成功 if 'status' in res and 'msg' in res and not res['status']: print('>>> 删除过期SSL证书失败:', res['msg']) return False print('>>> 过期证书', cert['subject'], res['msg']) return res['status'] if len(expiredcerts) == 0: print('>>> 证书夹内未发现过期证书') return expiredcerts # 重启Nginx def RestartNginx(self): ''' 重启Nginx :return: 重启结果 ''' tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'], 'name': 'nginx', 'type': 'restart' } res = self.__REQ.post(url=self.__BTURL + '/system?action=ServiceAdmin', data=playload).json() # 判断请求是否成功 if 'status' in res and 'msg' in res and not res['status']: print('>>> 重启Nginx失败:', res['msg']) return False return res.json()['status'] # 获取系统基本信息 def GetSystemInfo(self, showlog=True): ''' 获取系统基本信息 :showlog: 是否输出结果 :return: 系统基本信息 ''' if showlog: print('\n### 获取系统基本信息...') tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'] } res = self.__REQ.post(url=self.__BTURL + '/system?action=GetSystemTotal', data=playload).json() # 判断请求是否成功 # 判断res是否存在status字段,若不存在则说明请求失败 if 'status' in res and 'msg' in res and not res['status']: print('>>> 获取系统基本信息失败:', res['msg']) return False res['memUtilization'] = round(res['memRealUsed'] / res['memTotal'], 2) * 100 # 内存使用率 if showlog: print('>>> 面板地址:', self.__BTURL) print('>>> 操作系统:', res['system']) print('>>> 面板版本:', res['version']) print('>>> 运行时间:', res['time']) print('>>> CPU使用率:', res['cpuRealUsed'], '%') print('>>> 内存使用率:', res['memUtilization'], '%') print('>>> 内存总量:', round(res['memTotal'] / 1024, 2), 'GB') return res if __name__ == '__main__': bt = BtPanel('http://xxxx:xxx', 'xxxxx') bt.GetSystemInfo() bt.GetSites() |