鉴权算法

鉴权算法

讯飞星火鉴权流程

step 1、根据指定规则生成一串字符串

tmp = "host: " + "spark-api.xf-yun.com" + "\n"
tmp += "date: " + date + "\n"
tmp += "GET " + "/v1.1/chat" + " HTTP/1.1"

step 2、用 hmac 算法生成摘要

import hmac
import hashlib
# 此处假设APISecret = MjlmNzkzNmZkMDQ2OTc0ZDdmNGE2ZTZi 
tmp_sha = hmac.new(self.APISecret.encode('utf-8'), tmp.encode('utf-8'), 						digestmod=hashlib.sha256).digest()

step 3、将摘要用 base64转换成可见字符,形成签名

signature = base64.b64encode(tmp_sha).decode(encoding='utf-8')

step 4、在利用签名生成鉴权内容

authorization_origin = f"api_key='{APIKey}'', algorithm='hmac-sha256', headers='host date request-line', signature='{signature}''"

step 5、将鉴权内容转换成 base64 的可见字符串

authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

与平台间的数据交互

  1. APISecret:从平台获取,用于 hmac 加密算法的密钥。不用传输
  2. APIKey:从平台获取,用于认证鉴别应用。放在认证信息里边传输
  3. date:自定义生成。放在 http 请求参数中传输
  4. host:辨识参数。放在 http 请求参数中传输

鉴权

通过解码 authorization 可以拿到 APIKey 和 signature。
再通过 APIKey 拿到对应的 APISecret。
最后再通过上述方法进行加密得到签名,将签名与 signature 进行对比,相同则通过,否则不通过。

# 解码所需要的输出流
a = io.BytesIO()

# authorization的值
b = io.BytesIO(b"YXBpX2tleT0iYWRkZDIyNzJiNmQ4YjdjOGFiZGQ3OTUzMTQyMGNhM2IiLCBhbGdvcml0aG09ImhtYWMtc2hhMjU2IiwgaGVhZGVycz0iaG9zdCBkYXRlIHJlcXVlc3QtbGluZSIsIHNpZ25hdHVyZT0iejVnSGR1M3B4VlY0QURNeWs0Njd3T1dEUTlxNkJRelIzbmZNVGpjL0RhUT0i") 

base64.decode(b, a)

# 'api_key="addd2272b6d8b7c8abdd79531420ca3b", algorithm="hmac-sha256", headers="host date request-line", signature="z5gHdu3pxVV4ADMyk467wOWDQ9q6BQzR3nfMTjc/DaQ="'
print(a.getvalue())

百度云鉴权流程(明文传输,token 有效期长,不可取)

流程

通过向鉴权服务器发送 API Key 和 Secret Key 获取 access_token。
再将 access_token 放到 http 请求参数中进行传输。

鉴权

access_token 由服务器产生,因此可以直接匹配传输过来的 token 和 access_token 来进行判断是否通过。

微信鉴权流程

约定:服务器和客户端均保存相同的 WECHAT_TOKEN

  1. 微信在 http 请求中添加参数 signature、timestamp 和 nonce 分别表示签名,时间戳和 nonce。nonce 是为系随机生成的内容,类似加盐处理。
  2. 将[WECHAT_TOKEN, timestamp, nonce]组成列表
  3. 将这个列表升序排列
  4. 将这个列表的字符串进行拼接
  5. 使用 sha1对拼接好的字符串进行摘要生成。
  6. 生成的摘要与传输过来的 signature 进行对比。相同则通过验证,否则不通过。
    微信公众号服务器示例代码:
# coding:utf-8
from flask import Flask, request, abort, render_template
import hashlib
import xmltodict
import requests
import time
# 用它可以访问http请求地址
import urllib3.request as urllib2
import urllib3
import urllib
import re
import json
import io
import sys
sys.setdefaultencoding('utf8')   #python2版本
#sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf-8')  #python3版本
 
 
# 微信的token令牌
WECHAT_TOKEN = '你的token'
app = Flask(__name__)
 
@app.route("/zhuanfa", methods=["GET", "POST"])
def wechat():
    """验证服务器地址的有效性"""
    # 开发者提交信息后,微信服务器将发送GET请求到填写的服务器地址URL上,GET请求携带四个参数:
    # signature:微信加密, signature结合了开发者填写的token参数和请求中的timestamp参数 nonce参数
    # timestamp:时间戳(chuo这是拼音)
    # nonce: 随机数
    # echostr: 随机字符串
    # 接收微信服务器发送参数
    signature = request.args.get("signature")
    timestamp = request.args.get("timestamp")
    nonce = request.args.get("nonce")
 
    # 校验参数
    # 校验流程:
    # 将token、timestamp、nonce三个参数进行字典序排序
    # 将三个参数字符串拼接成一个字符串进行sha1加密
    # 开发者获得加密后的字符串可与signature对比,标识该请求来源于微信
    if not all([signature, timestamp, nonce]):
        # 抛出400错误
        abort(400)
 
    # 按照微信的流程计算签名
    li = [WECHAT_TOKEN, timestamp, nonce]
    # 排序
    li.sort()
    # 拼接字符串
    tmp_str = "".join(li)
    tmp_str = tmp_str.encode('utf-8')
 
    # 进行sha1加密, 得到正确的签名值
    sign = hashlib.sha1(tmp_str).hexdigest()
 
    # 将自己计算的签名值, 与请求的签名参数进行对比, 如果相同, 则证明请求来自微信
    if signature != sign:
        # 代表请求不是来自微信
        # 弹出报错信息, 身份有问题
        abort(403)
    else:
        # 表示是微信发送的请求
        if request.method == "GET":
            # 表示第一次接入微信服务器的验证
            echostr = request.args.get("echostr")
            # 校验echostr
            if not echostr:
                abort(400)
            return echostr
 
        elif request.method == "POST":
            # 表示微信服务器转发消息过来
            # 拿去xml的请求数据
            xml_str = request.data
 
            # 当xml_str为空时
            if not xml_str:
                abort(400)
 
            # 对xml字符串进行解析成字典
            xml_dict = xmltodict.parse(xml_str)
 
            xml_dict = xml_dict.get("xml")
 
            # MsgType是消息类型 这里是提取消息类型
            msg_type = xml_dict.get("MsgType")
 
            if msg_type == "text":
                # 表示发送文本消息
                # 够造返回值, 经由微信服务器回复给用户的消息内容
                # 回复消息
                # ToUsername: (必须传) 接收方账号(收到的OpenID)
                # FromUserName: (必须传) 开发者微信号
                # CreateTime: (必须传) 消息创建时间(整形)
                # MsgType: (必须传) 消息类型
                # Content: (必须传) 回复消息的内容(换行:在Content中能够换行, 微信客户端就支持换行显示)
                resp_dict = {
                    "xml": {
                        "ToUserName": xml_dict.get("FromUserName"),
                        "FromUserName": xml_dict.get("ToUserName"),
                        "CreateTime": int(time.time()),
                        "MsgType": "image",
                        "Content": "用户发送的是文字"
                    }
                }
            elif msg_type == "image":
                resp_dict = {
                    "xml": {
                        "ToUserName": xml_dict.get("FromUserName"),
                        "FromUserName": xml_dict.get("ToUserName"),
                        "CreateTime": int(time.time()),
                        "MsgType": "image",
                        "Content": "用户发送的是图片"
                    }
                }
            elif msg_type == "voice":
                resp_dict = {
                    "xml": {
                        "ToUserName": xml_dict.get("FromUserName"),
                        "FromUserName": xml_dict.get("ToUserName"),
                        "CreateTime": int(time.time()),
                        "MsgType": "text",
                        "Content": "用户发送的是语音"
                    }
                }
            elif msg_type == "link":
                resp_dict = {
                    "xml": {
                        "ToUserName": xml_dict.get("FromUserName"),
                        "FromUserName": xml_dict.get("ToUserName"),
                        "CreateTime": int(time.time()),
                        "MsgType": "text",
                        "Content": "用户发送的是链接"
                    }
                }
            elif msg_type == "vedio":
                resp_dict = {
                    "xml": {
                        "ToUserName": xml_dict.get("FromUserName"),
                        "FromUserName": xml_dict.get("ToUserName"),
                        "CreateTime": int(time.time()),
                        "MsgType": "text",
                        "Content": "用户发送的是视频"
                    }
                }
            elif msg_type == "event":
                if xml_dict.get('Event') == 'subscribe':
                    msg = '用户初次关注发送消息'
                    resp_dict = {
                    "xml": {
                        "ToUserName": xml_dict.get("FromUserName"),
                        "FromUserName": xml_dict.get("ToUserName"),
                        "CreateTime": int(time.time()),
                        "MsgType": "text",
                        "Content": msg
                    }
                }    
            # 将字典转换为xml字符串
            resp_xml_str = xmltodict.unparse(resp_dict)
            # 返回消息数据给微信服务器
            return resp_xml_str
 
 
if __name__ == '__main__':
    app.run(port=8000, debug=True)
LICENSED UNDER CC BY-NC-SA 4.0