Python 服务端集成 腾讯云 IM 服务

最近做的项目,需要接入腾讯云 IM,翻看了一下文档,iOS、Android 以及 Web 端基本上都有 SDK 可以集成。我使用的服务端是用 Python 写的,腾讯 IM 暂时还没有 Python 的官方文档。但是在腾讯云的官方论坛上找到了解决方法。
服务端最基本的一个需求是:使用接入用户的 identifier 和应用申请的腾讯云 appid、私钥等信息,通过指定算法,生成用户用来登录腾讯云 IM 的 usersig。

源代码:

#! /usr/bin/python
# coding:utf-8

import OpenSSL, base64, zlib, json, time

ecdsa_pri_key = """
自己的私钥
"""

def list_all_curves():
    list = OpenSSL.crypto.get_elliptic_curves()
    for element in list:
        print element

def get_secp256k1():
    print OpenSSL.crypto.get_elliptic_curve('secp256k1');


def base64_encode_url(data):
    base64_data = base64.b64encode(data)
    base64_data = base64_data.replace('+', '*')
    base64_data = base64_data.replace('/', '-')
    base64_data = base64_data.replace('=', '_')
    return base64_data

def base64_decode_url(base64_data):
    base64_data = base64_data.replace('*', '+')
    base64_data = base64_data.replace('-', '/')
    base64_data = base64_data.replace('_', '=')
    raw_data = base64.b64decode(base64_data)
    return raw_data

class TLSSigAPI:
    """"""    
    __acctype = 0
    __identifier = ""
    __appid3rd = ""
    __sdkappid = 0
    __version = 20151204
    __expire = 3600*24*30       # 默认一个月,需要调整请自行修改
    __pri_key = ""
    __pub_key = ""
    _err_msg = "ok"

    def __get_pri_key(self):
        return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, self.__pri_key);

    def __init__(self, sdkappid, pri_key):
        self.__sdkappid = sdkappid
        self.__pri_key = pri_key

    def __create_dict(self):
        m = {}
        m["TLS.account_type"] = "%d" % self.__acctype
        m["TLS.identifier"] = "%s" % self.__identifier
        m["TLS.appid_at_3rd"] = "%s" % self.__appid3rd
        m["TLS.sdk_appid"] = "%d" % self.__sdkappid
        m["TLS.expire_after"] = "%d" % self.__expire
        m["TLS.version"] = "%d" % self.__version
        m["TLS.time"] = "%d" % time.time()
        return m

    def __encode_to_fix_str(self, m):
        fix_str = "TLS.appid_at_3rd:"+m["TLS.appid_at_3rd"]+"\n" \
                  +"TLS.account_type:"+m["TLS.account_type"]+"\n" \
                  +"TLS.identifier:"+m["TLS.identifier"]+"\n" \
                  +"TLS.sdk_appid:"+m["TLS.sdk_appid"]+"\n" \
                  +"TLS.time:"+m["TLS.time"]+"\n" \
                  +"TLS.expire_after:"+m["TLS.expire_after"]+"\n"
        return fix_str

    def tls_gen_sig(self, identifier):
        self.__identifier = identifier

        m = self.__create_dict()
        fix_str = self.__encode_to_fix_str(m)
        pk_loaded = self.__get_pri_key()
        sig_field = OpenSSL.crypto.sign(pk_loaded, fix_str, "sha256");
        sig_field_base64 = base64.b64encode(sig_field)
        m["TLS.sig"] = sig_field_base64
        json_str = json.dumps(m)
        sig_cmpressed = zlib.compress(json_str)
        base64_sig = base64_encode_url(sig_cmpressed)
        return base64_sig 

def main():
    api = TLSSigAPI(1400001052, ecdsa_pri_key)
    sig = api.tls_gen_sig("xiaojun")
    print sig

if __name__ == "__main__":
    main()

过程:

  1. 将用户的信息组装成一个字符串(json格式的,是直接拼装的,因为顺序不能乱),是哪些信息,可以看 __encode_to_fix_str;
  2. 使用 sha256 将字符串 hash,然后再用私钥签名,一般加密接口都会一把搞定,加密曲线使用的是 secp256k1
  3. 把第2步得到的缓冲区进行 base64;
  4. 将所有用户的信息以及第3步得到签名写进一个 json 串,此时可以不论顺序;
  5. 将 json 进行序列化,再 zlib 压缩,最后 base64(替换了某些字符,具体哪些看代码)。

需要注意的是,加密曲线使用的是 secp256k1, 有些系统的 openssl 是不支持这个曲线的。查看系统所支持的所有曲线,可参考 list_all_curves, 这个方法会打印出所有支持的曲线。如果你的服务器恰好支持这个曲线,那经过上面的过程就可以正常使用了。

如果你的服务器不支持 secp256k1,可以使用 python ecdsa 。

  1. python ecdsa 开发库 下载地址
  2. 使用 pip 安装:pip install ecdsa

由于 python ecdsa 这个开发库仅支持 ec 格式的私钥,从腾讯云下载的私钥格式是 pk #8 的格式,需要使用 openssl 命令进行转换。转换命令如下:

openssl ec -outform PEM -inform PEM -in private.pem -out private_ec.pem

-in 后面的传入下载的私钥 -out 后面是转换后的私钥文件

最终实现代码:

#! /usr/bin/python
# coding:utf-8

import OpenSSL, base64, zlib, json, time, hashlib
from ecdsa import SigningKey,util

# 这里请填写应用自己的私钥
ecdsa_pri_key = """
请填上应用自己的私钥
"""

def base64_encode_url(data):
    base64_data = base64.b64encode(data)
    base64_data = base64_data.replace('+', '*')
    base64_data = base64_data.replace('/', '-')
    base64_data = base64_data.replace('=', '_')
    return base64_data

def base64_decode_url(base64_data):
    base64_data = base64_data.replace('*', '+')
    base64_data = base64_data.replace('-', '/')
    base64_data = base64_data.replace('_', '=')
    raw_data = base64.b64decode(base64_data)
    return raw_data

class TLSSigAPI:
    """"""    
    __acctype = 0
    __identifier = ""
    __appid3rd = ""
    __sdkappid = 0
    __version = 20151204
    __expire = 3600*24*30       # 默认一个月,需要调整请自行修改
    __pri_key = ""
    __pub_key = ""
    _err_msg = "ok"
    

    def __get_pri_key(self):
        return self.__pri_key_loaded

    def __init__(self, sdkappid, pri_key):
        self.__sdkappid = sdkappid
        self.__pri_key = pri_key
        self.__pri_key_loaded = SigningKey.from_pem(self.__pri_key)

    def __create_dict(self):
        m = {}
        m["TLS.account_type"] = "%d" % self.__acctype
        m["TLS.identifier"] = "%s" % self.__identifier
        m["TLS.appid_at_3rd"] = "%s" % self.__appid3rd
        m["TLS.sdk_appid"] = "%d" % self.__sdkappid
        m["TLS.expire_after"] = "%d" % self.__expire
        m["TLS.version"] = "%d" % self.__version
        m["TLS.time"] = "%d" % time.time()
        return m

    def __encode_to_fix_str(self, m):
        fix_str = "TLS.appid_at_3rd:"+m["TLS.appid_at_3rd"]+"\n" \
                  +"TLS.account_type:"+m["TLS.account_type"]+"\n" \
                  +"TLS.identifier:"+m["TLS.identifier"]+"\n" \
                  +"TLS.sdk_appid:"+m["TLS.sdk_appid"]+"\n" \
                  +"TLS.time:"+m["TLS.time"]+"\n" \
                  +"TLS.expire_after:"+m["TLS.expire_after"]+"\n"
        return fix_str

    def tls_gen_sig(self, identifier):
        self.__identifier = identifier
        m = self.__create_dict()
        fix_str = self.__encode_to_fix_str(m)
        pk_loaded = self.__get_pri_key()
        sig_field = pk_loaded.sign(fix_str, hashfunc=hashlib.sha256, sigencode=util.sigencode_der)
        sig_field_base64 = base64.b64encode(sig_field)
        m["TLS.sig"] = sig_field_base64
        json_str = json.dumps(m)
        sig_cmpressed = zlib.compress(json_str)
        base64_sig = base64_encode_url(sig_cmpressed)
        return base64_sig 

def main():
    api = TLSSigAPI(1400001052, ecdsa_pri_key)
    sig = api.tls_gen_sig("xiaojun")
    print sig

if __name__ == "__main__":
    main()

参考链接,腾讯云官方论坛:
http://bbs.qcloud.com/thread-14366-1-1.html
http://bbs.qcloud.com/thread-23280-1-1.html

    原文作者:莫17
    原文地址: https://www.jianshu.com/p/f7bf639a554e
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞