Python对接支付宝支付自实现
# -*- coding: utf-8 -*-
import base64
import json
import urllib.parse
from datetime import datetime
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
class AliPayException(Exception):
def __init__(self, data):
super(AliPayException, self).__init__()
self.data = data
def __str__(self):
return "alipay - {}".format(self.data)
def __unicode__(self):
return u"alipay - {}".format(self.data)
class AliPayVerifyException(AliPayException):
def __init__(self, msg, data):
super(AliPayVerifyException, self).__init__('alipay verify except - {}:{}'.format(msg, data))
class AliPay:
def __init__(self, **kwargs):
"""
:param kwargs:
url: 请求地址
notify_url: 支付宝服务器主动通知商户服务器里指定的页面http/https路径
app_id: 支付宝分配给开发者的应用ID
sign_type: 商户生成签名字符串所使用的签名算法类型,目前支持RSA2和RSA,推荐使用RSA2
app_private_key: 签名私钥
"""
self._app_id = kwargs['app_id']
self._seller_id = kwargs['seller_id']
self._gateway_url = kwargs['gateway_url']
self._notify_url = kwargs.get('notify_url')
self._sign_type = kwargs.get('sign_type', 'RSA2')
if self._sign_type not in ('RSA', 'RSA2'):
raise Exception('alipay sign_type must `RSA` or `RSA2`')
self._charset = 'utf-8'
self._format = 'json'
with open(kwargs['app_private_key']) as f:
self._app_private_key = serialization.load_pem_private_key(
f.read().encode('utf8'),
None,
default_backend()
)
with open(kwargs['public_key']) as f:
self._public_key = serialization.load_pem_public_key(
f.read().encode('utf8'),
default_backend()
)
@property
def app_id(self):
return self._app_id
@property
def seller_id(self):
return self._seller_id
def app_private_sign(self, data):
if self._sign_type == 'RSA':
signature = self._app_private_key.sign(
data.encode('utf8'),
padding.PKCS1v15(),
hashes.SHA1())
else:
signature = self._app_private_key.sign(
data.encode('utf8'),
padding.PKCS1v15(),
hashes.SHA256())
return base64.b64encode(signature).decode('utf8')
def sync_verify(self, method, raw_data):
"""
同步验签
:return:
"""
method = method.replace('.', '_') + '_response'
raw_data = raw_data.decode('utf8')
sign_index = raw_data.rfind('sign')
signature = base64.b64decode(raw_data[sign_index + 7: -2])
method_data = raw_data[raw_data.find(method) + len(method) + 2: sign_index - 2]
self._public_key.verify(
signature,
method_data.encode('utf8'),
padding.PKCS1v15(),
hashes.SHA256())
def async_verify(self, data):
"""
异步验签
:return:
"""
sign_data = {}
for k, v in data.items():
if k in ('sign', 'sign_type'):
continue
sign_data[k] = v
self._public_key.verify(
base64.b64decode(data['sign']),
'&'.join(['{}={}'.format(item) for item in self.sort_data(sign_data)]).encode('utf8'),
padding.PKCS1v15(),
hashes.SHA256())
@staticmethod
def sort_data(data):
return [(k, data[k]) for k in sorted(data.keys())]
def params(self, method, biz_content):
data = {
'app_id': self._app_id,
'method': method,
'format': self._format,
'charset': self._charset,
'sign_type': self._sign_type,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # yyyy-MM-dd HH:mm:ss
'version': '1.0',
'biz_content': json.dumps(biz_content, separators=(',', ':'))
}
if self._notify_url:
data['notify_url'] = self._notify_url
sign = self.app_private_sign('&'.join(['{}={}'.format(item) for item in self.sort_data(data)]))
p = '&'.join(['{}={}'.format(item[0], urllib.parse.quote(item[1])) for item in self.sort_data(data)])
p += '&{}={}}'.format('sign', urllib.parse.quote(sign))
return p
def command(self, method, biz_content):
params = self.params(method, biz_content)
response = requests.get('%s?%s' % (self._gateway_url, params))
response_raw_data = response.content
response_data = response.json()
alipay_response_data = response_data[method.replace('.', '_') + '_response']
if alipay_response_data.get('code', '10000') != '10000':
raise AliPayException(alipay_response_data)
self.sync_verify(method, response_raw_data)
return alipay_response_data
if __name__ == '__main__':
alipay = AliPay(**{
'app_id': '...',
'seller_id': '...',
'gateway_url': 'https://openapi.alipaydev.com/gateway.do',
'notify_url': '...',
'app_private_key': 'path to private_key.pem',
'public_key': 'path to public_key.pem'
})
biz_content = {
'out_trade_no': "111",
'total_amount': 0.01,
'subject': "test",
}
alipay.command('alipay.trade.precreate', biz_content)