百度云加速自定义证书与cyberpanel的let’s encrypt 证书搭配使用

个人认证后,弄了个百度云加速的免费版用用,可以上传自定义的证书。
这个玩意,还要人工操作,不方便,翻了下云加速的api:https://su.baidu.com/agency/api.html#/7_kaifazhinan/1_APIcankao/19_zhengshuxiangguan/9.7_zhengshuliebiao-zhengshushangchuan.md

发现可以用api上传。

我用的是cyberpanel的面板,自带的let’s encrypt证书,而且还会自动更新过期失效的证书。
于是乎,可以通过百度云加速提供的api自动上传部署let’s encrypt证书。
然后,开干:
通过搜索github的源码,定位renew.py的位置:https://github.com/usmannasir/cyberpanel/search?q=renew.py&unscoped_q=renew.py

plogical/renew.py

需要pip安装一些库:
pip install typing
pip install tld

修改后的代码如下:

#!/usr/local/CyberCP/bin/python2
import os
import os.path
import sys
import django

import requests
import json
import traceback
import urllib
import hashlib
import time
import string
import datetime
from hashlib import sha1
import base64
import hmac
from hmac import new as hmac
from tld import get_fld

sys.path.append('/usr/local/CyberCP')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
django.setup()
from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging
from websiteFunctions.models import Websites, ChildDomains
from os import path
from datetime import datetime
import OpenSSL
from plogical.virtualHostUtilities import virtualHostUtilities

class Renew:
    OPENAPI_BASE_URL = "https://api.su.baidu.com/%s"
    ACCESS_KEY = "这里填ak"
    SECRET_KEY = "这里填sk"
    SIGN_METHOD = "HMAC-SHA1"

    def get_signature(self, sec_key, text):
        return str(hmac(sec_key, text, sha1).digest().encode('base64')[:-1])
        ##for python3
        ##return base64.b64encode(hmac(sec_key.encode(encoding='utf-8'), text.encode(encoding='utf-8'), sha1).digest())

    def get_inited_common_params(self,path):
        param_map = {}
        auth_timestamp = str(int(time.time()))
        param_map['X-Auth-Access-Key'] = self.ACCESS_KEY
        param_map['X-Auth-Nonce'] = auth_timestamp
        param_map['X-Auth-Path-Info'] = path
        param_map['X-Auth-Signature-Method'] = self.SIGN_METHOD
        param_map['X-Auth-Timestamp'] = auth_timestamp
        return param_map

    def get_parsed_all_params(self, params):
        param_list = []
        for k in params:
            param_str = '%s=%s' % (k, params[k])
            param_list.append(param_str)
        param_list.sort()
        return '&'.join(param_list)

    def get_request_header(self, path, biz_params):
        common_params = self.get_inited_common_params(path)
        all_params = {}
        headers = {}
        headers.update(common_params)
        all_params.update(common_params)
        all_params.update(biz_params)
        all_params_str = self.get_parsed_all_params(all_params)
        sign = self.get_signature(self.SECRET_KEY, all_params_str)
        headers["X-Auth-Sign"] = sign
        return headers

    def del_cert(self,domain):
        path = "v3/yjs/custom_certificates"
        biz_params = {}
        biz_params["domain"] = get_fld(domain, fix_protocol=True)
        biz_params["info"] = domain
        headers = self.get_request_header(path, biz_params)
        url = self.OPENAPI_BASE_URL % path
        querystring = {}
        querystring.update(biz_params)
        response = requests.request("DELETE", url, data=json.dumps(biz_params), headers=headers, params=querystring)
        logging.writeToFile(response.text)

    def post_cert(self,domain):
        path = "v3/yjs/custom_certificates"
        biz_params = {}
        biz_params["domain"] = get_fld(domain, fix_protocol=True)
        biz_params["info"] = domain
        keyfilepath = '/etc/letsencrypt/live/%s/privkey.pem' % (domain)
        f = open(keyfilepath, "r")
        biz_params["private_key"] = f.read()
        certfilepath = '/etc/letsencrypt/live/%s/fullchain.pem' % (domain)
        f = open(certfilepath, "r")
        biz_params["certificate"] = f.read()
        headers = self.get_request_header(path, biz_params)
        url = self.OPENAPI_BASE_URL % path
        querystring = {}
        querystring.update(biz_params)
        response = requests.request("POST", url, data=json.dumps(biz_params), headers=headers, params=querystring)
        logging.writeToFile(response.text)


    def SSLObtainer(self):
        try:
            logging.writeToFile('Running SSL Renew Utility')

            ## For websites

            for website in Websites.objects.all():
                logging.writeToFile('Checking SSL for %s.' % (website.domain))
                filePath = '/etc/letsencrypt/live/%s/fullchain.pem' % (website.domain)

                if path.exists(filePath):
                    logging.writeToFile('SSL exists for %s. Checking if SSL will expire in 15 days..' % (website.domain))
                    x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
                                                           open(filePath, 'r').read())
                    expireData = x509.get_notAfter().decode('ascii')
                    finalDate = datetime.strptime(expireData, '%Y%m%d%H%M%SZ')
                    now = datetime.now()
                    diff = finalDate - now

                    if int(diff.days) >= 15:
                        logging.writeToFile(
                            'SSL exists for %s and is not ready to renew, skipping..' % (website.domain))
                    elif x509.get_issuer().get_components()[1][1] == 'Denial':
                        logging.writeToFile(
                            'SSL exists for %s and ready to renew..' % (website.domain))
                        logging.writeToFile(
                            'Renewing SSL for %s..' % (website.domain))

                        virtualHostUtilities.issueSSL(website.domain, '/home/%s/public_html' % (website.domain),
                                                      website.adminEmail)
                        self.del_cert('%s'%(website.domain))
                        self.post_cert('%s'%(website.domain))
                    else:
                        logging.writeToFile(
                            'SSL exists for %s and ready to renew..' % (website.domain))
                        logging.writeToFile(
                            'Renewing SSL for %s..' % (website.domain))

                        virtualHostUtilities.issueSSL(website.domain, '/home/%s/public_html' % (website.domain), website.adminEmail)
                        self.del_cert('%s'%(website.domain))
                        self.post_cert('%s'%(website.domain))
                else:
                    logging.writeToFile(
                        'SSL does not exist for %s. Obtaining now..' % (website.domain))
                    virtualHostUtilities.issueSSL(website.domain, '/home/%s/public_html' % (website.domain),
                                                  website.adminEmail)
                    self.del_cert('%s'%(website.domain))
                    self.post_cert('%s'%(website.domain))

            ## For child-domains

            for website in ChildDomains.objects.all():
                logging.writeToFile('Checking SSL for %s.' % (website.domain))
                filePath = '/etc/letsencrypt/live/%s/fullchain.pem' % (website.domain)

                if path.exists(filePath):
                    logging.writeToFile(
                        'SSL exists for %s. Checking if SSL will expire in 15 days..' % (website.domain))
                    x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
                                                           open(filePath, 'r').read())
                    expireData = x509.get_notAfter().decode('ascii')
                    finalDate = datetime.strptime(expireData, '%Y%m%d%H%M%SZ')
                    now = datetime.now()
                    diff = finalDate - now

                    if int(diff.days) >= 15:
                        logging.writeToFile(
                            'SSL exists for %s and is not ready to renew, skipping..' % (website.domain))
                    elif x509.get_issuer().get_components()[1][1] == 'Denial':
                        logging.writeToFile(
                            'SSL exists for %s and ready to renew..' % (website.domain))
                        logging.writeToFile(
                            'Renewing SSL for %s..' % (website.domain))

                        virtualHostUtilities.issueSSL(website.domain, website.path,
                                                      website.master.adminEmail)
                        self.del_cert('%s'%(website.domain))
                        self.post_cert('%s'%(website.domain))
                    else:
                        logging.writeToFile(
                            'SSL exists for %s and ready to renew..' % (website.domain))
                        logging.writeToFile(
                            'Renewing SSL for %s..' % (website.domain))

                        virtualHostUtilities.issueSSL(website.domain, website.path,
                                                      website.master.adminEmail)
                        self.del_cert('%s'%(website.domain))
                        self.post_cert('%s'%(website.domain))
                else:
                    logging.writeToFile(
                        'SSL does not exist for %s. Obtaining now..' % (website.domain))
                    virtualHostUtilities.issueSSL(website.domain, website.path,
                                                  website.master.adminEmail)
                    self.del_cert('%s'%(website.domain))
                    self.post_cert('%s'%(website.domain))

        except BaseException, msg:
           logging.writeToFile(str(msg) + '. Renew.SSLObtainer')


if __name__ == "__main__":
    sslOB = Renew()
    sslOB.SSLObtainer()