讯飞星火鉴权流程
step 1、根据指定规则生成一串字符串
tmp = "host: " + "spark-api.xf-yun.com" + "\n"
tmp += "date: " + date + "\n"
tmp += "GET " + "/v1.1/chat" + " HTTP/1.1"
step 2、用 hmac 算法生成摘要
import hmac
import hashlib
# 此处假设APISecret = MjlmNzkzNmZkMDQ2OTc0ZDdmNGE2ZTZi
tmp_sha = hmac.new(self.APISecret.encode('utf-8'), tmp.encode('utf-8'), digestmod=hashlib.sha256).digest()
step 3、将摘要用 base64转换成可见字符,形成签名
signature = base64.b64encode(tmp_sha).decode(encoding='utf-8')
step 4、在利用签名生成鉴权内容
authorization_origin = f"api_key='{APIKey}'', algorithm='hmac-sha256', headers='host date request-line', signature='{signature}''"
step 5、将鉴权内容转换成 base64 的可见字符串
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
与平台间的数据交互
- APISecret:从平台获取,用于 hmac 加密算法的密钥。不用传输
- APIKey:从平台获取,用于认证鉴别应用。放在认证信息里边传输
- date:自定义生成。放在 http 请求参数中传输
- host:辨识参数。放在 http 请求参数中传输
鉴权
通过解码 authorization 可以拿到 APIKey 和 signature。
再通过 APIKey 拿到对应的 APISecret。
最后再通过上述方法进行加密得到签名,将签名与 signature 进行对比,相同则通过,否则不通过。
# 解码所需要的输出流
a = io.BytesIO()
# authorization的值
b = io.BytesIO(b"YXBpX2tleT0iYWRkZDIyNzJiNmQ4YjdjOGFiZGQ3OTUzMTQyMGNhM2IiLCBhbGdvcml0aG09ImhtYWMtc2hhMjU2IiwgaGVhZGVycz0iaG9zdCBkYXRlIHJlcXVlc3QtbGluZSIsIHNpZ25hdHVyZT0iejVnSGR1M3B4VlY0QURNeWs0Njd3T1dEUTlxNkJRelIzbmZNVGpjL0RhUT0i")
base64.decode(b, a)
# 'api_key="addd2272b6d8b7c8abdd79531420ca3b", algorithm="hmac-sha256", headers="host date request-line", signature="z5gHdu3pxVV4ADMyk467wOWDQ9q6BQzR3nfMTjc/DaQ="'
print(a.getvalue())
百度云鉴权流程(明文传输,token 有效期长,不可取)
流程
通过向鉴权服务器发送 API Key 和 Secret Key 获取 access_token。
再将 access_token 放到 http 请求参数中进行传输。
鉴权
access_token 由服务器产生,因此可以直接匹配传输过来的 token 和 access_token 来进行判断是否通过。
微信鉴权流程
约定:服务器和客户端均保存相同的 WECHAT_TOKEN
- 微信在 http 请求中添加参数 signature、timestamp 和 nonce 分别表示签名,时间戳和 nonce。nonce 是为系随机生成的内容,类似加盐处理。
- 将[WECHAT_TOKEN, timestamp, nonce]组成列表
- 将这个列表升序排列
- 将这个列表的字符串进行拼接
- 使用 sha1对拼接好的字符串进行摘要生成。
- 生成的摘要与传输过来的 signature 进行对比。相同则通过验证,否则不通过。
微信公众号服务器示例代码:
# coding:utf-8
from flask import Flask, request, abort, render_template
import hashlib
import xmltodict
import requests
import time
# 用它可以访问http请求地址
import urllib3.request as urllib2
import urllib3
import urllib
import re
import json
import io
import sys
sys.setdefaultencoding('utf8') #python2版本
#sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf-8') #python3版本
# 微信的token令牌
WECHAT_TOKEN = '你的token'
app = Flask(__name__)
@app.route("/zhuanfa", methods=["GET", "POST"])
def wechat():
"""验证服务器地址的有效性"""
# 开发者提交信息后,微信服务器将发送GET请求到填写的服务器地址URL上,GET请求携带四个参数:
# signature:微信加密, signature结合了开发者填写的token参数和请求中的timestamp参数 nonce参数
# timestamp:时间戳(chuo这是拼音)
# nonce: 随机数
# echostr: 随机字符串
# 接收微信服务器发送参数
signature = request.args.get("signature")
timestamp = request.args.get("timestamp")
nonce = request.args.get("nonce")
# 校验参数
# 校验流程:
# 将token、timestamp、nonce三个参数进行字典序排序
# 将三个参数字符串拼接成一个字符串进行sha1加密
# 开发者获得加密后的字符串可与signature对比,标识该请求来源于微信
if not all([signature, timestamp, nonce]):
# 抛出400错误
abort(400)
# 按照微信的流程计算签名
li = [WECHAT_TOKEN, timestamp, nonce]
# 排序
li.sort()
# 拼接字符串
tmp_str = "".join(li)
tmp_str = tmp_str.encode('utf-8')
# 进行sha1加密, 得到正确的签名值
sign = hashlib.sha1(tmp_str).hexdigest()
# 将自己计算的签名值, 与请求的签名参数进行对比, 如果相同, 则证明请求来自微信
if signature != sign:
# 代表请求不是来自微信
# 弹出报错信息, 身份有问题
abort(403)
else:
# 表示是微信发送的请求
if request.method == "GET":
# 表示第一次接入微信服务器的验证
echostr = request.args.get("echostr")
# 校验echostr
if not echostr:
abort(400)
return echostr
elif request.method == "POST":
# 表示微信服务器转发消息过来
# 拿去xml的请求数据
xml_str = request.data
# 当xml_str为空时
if not xml_str:
abort(400)
# 对xml字符串进行解析成字典
xml_dict = xmltodict.parse(xml_str)
xml_dict = xml_dict.get("xml")
# MsgType是消息类型 这里是提取消息类型
msg_type = xml_dict.get("MsgType")
if msg_type == "text":
# 表示发送文本消息
# 够造返回值, 经由微信服务器回复给用户的消息内容
# 回复消息
# ToUsername: (必须传) 接收方账号(收到的OpenID)
# FromUserName: (必须传) 开发者微信号
# CreateTime: (必须传) 消息创建时间(整形)
# MsgType: (必须传) 消息类型
# Content: (必须传) 回复消息的内容(换行:在Content中能够换行, 微信客户端就支持换行显示)
resp_dict = {
"xml": {
"ToUserName": xml_dict.get("FromUserName"),
"FromUserName": xml_dict.get("ToUserName"),
"CreateTime": int(time.time()),
"MsgType": "image",
"Content": "用户发送的是文字"
}
}
elif msg_type == "image":
resp_dict = {
"xml": {
"ToUserName": xml_dict.get("FromUserName"),
"FromUserName": xml_dict.get("ToUserName"),
"CreateTime": int(time.time()),
"MsgType": "image",
"Content": "用户发送的是图片"
}
}
elif msg_type == "voice":
resp_dict = {
"xml": {
"ToUserName": xml_dict.get("FromUserName"),
"FromUserName": xml_dict.get("ToUserName"),
"CreateTime": int(time.time()),
"MsgType": "text",
"Content": "用户发送的是语音"
}
}
elif msg_type == "link":
resp_dict = {
"xml": {
"ToUserName": xml_dict.get("FromUserName"),
"FromUserName": xml_dict.get("ToUserName"),
"CreateTime": int(time.time()),
"MsgType": "text",
"Content": "用户发送的是链接"
}
}
elif msg_type == "vedio":
resp_dict = {
"xml": {
"ToUserName": xml_dict.get("FromUserName"),
"FromUserName": xml_dict.get("ToUserName"),
"CreateTime": int(time.time()),
"MsgType": "text",
"Content": "用户发送的是视频"
}
}
elif msg_type == "event":
if xml_dict.get('Event') == 'subscribe':
msg = '用户初次关注发送消息'
resp_dict = {
"xml": {
"ToUserName": xml_dict.get("FromUserName"),
"FromUserName": xml_dict.get("ToUserName"),
"CreateTime": int(time.time()),
"MsgType": "text",
"Content": msg
}
}
# 将字典转换为xml字符串
resp_xml_str = xmltodict.unparse(resp_dict)
# 返回消息数据给微信服务器
return resp_xml_str
if __name__ == '__main__':
app.run(port=8000, debug=True)