Android通过socket长连接实现推送

工具:Android studio
软件方法及协议:socket、protobuf
实现原理:

通过本地建立一个socket,绑定服务器IP和port,然后connect,再开启另外线程定时心跳(注意这里的心跳不是自定义发送数据,而是采用socket本身的心跳功能sendUrgentData,否则有坑),心跳失败则自动重连,另一方面,启动循环从缓存获取socket数据。

大致框架

《Android通过socket长连接实现推送》 推送实现流程图

实现代码:

public class QpushClient implements Runnable {
protected static QpushClient mInstance;
protected Handler mHandler;
protected InetSocketAddress mAddress;
String mIMEI;
protected String TAG = "QpushClient";
//socket连接的超时时间
private final int CONNECT_TIME_OUT = 5 * 1000;
//巡检周期
private final int CHECK_PERIOD = 2 * 1000;
//连接尝试间隔时间
private final int CONNECT_PERIOD = 30 * 1000;
private final int HEARTBEART_PERIOD = 30 * 1000;
//若连接失败或响应失败,则尝试次数为9,若仍无效,则不再尝试
private final int CONNECT_TRY_TIMES = 9;

private final int SEND_MSG_TYPE_HEARTBEAT = 1; //心跳包
private final int SEND_MSG_TYPE_SOCKET_LOGIN = 2; //发送socket登录包
//连接尝试次数
private int mConnectCount;

Socket mClientSocket;
String mHost;
int mPort;
//设置是否去读取数据
boolean isStartRecieveMsg = false;
//开启心跳检测
boolean isKeepHeartBeat = false;

BufferedReader mReader;
ScheduledExecutorService executor;//定位定时器
HeartBeatTask mHeartBeatTask;

private QpushClient(Handler handler) {
    mHandler = handler;
}

public static QpushClient getInstance(Handler handler) {
    if (mInstance == null) {
        mInstance = new QpushClient(handler);
    }
    return mInstance;
}

public void init(String host, int port,String imei) {
    mHost = host;
    mPort = port;
    mIMEI = imei;
    new Thread(this).start();
    isStartRecieveMsg = true;
    isKeepHeartBeat = true;
}

@Override
public void run() {
    mAddress = new InetSocketAddress(getIP(mHost), mPort);

    //尝试连接,若未连接,则设置尝试次数
    while (mConnectCount < CONNECT_TRY_TIMES) {
        connect();
        if (!mClientSocket.isConnected()) {
            mConnectCount++;
            sleep(CONNECT_PERIOD);
        } else {
            mConnectCount = 0;//连接上,则恢复置0
            break;
        }
    }
    if (mClientSocket.isConnected()) {
        keepHeartBeat();
        recvProtobufMsg();
//            recvStringMsg();
    }
}

private void connect() {
    try {
        if (mClientSocket == null) {
            mClientSocket = new Socket();
        }
        mClientSocket.connect(mAddress, CONNECT_TIME_OUT);
        Driver.HeartBeat heartbeat = Driver.HeartBeat.newBuilder().setImei(mIMEI).build();
        Driver.ClientMessage socketLogin = Driver.ClientMessage.newBuilder().setType(1).setHeartBeat(heartbeat).build();
        sendMsg(socketLogin,SEND_MSG_TYPE_SOCKET_LOGIN);
    } catch (IOException e) {
        e.printStackTrace();
        Log.e(TAG, "连接失败 mClientSocket.connect fail ,ip=" + mAddress.getHostName() + ";port=" + mAddress.getPort() + ";detail:" + e.getMessage());
    }
}

/**
 * 心跳维护
 */
private void keepHeartBeat() {
    //设置心跳频率,启动心跳
    if (isKeepHeartBeat) {
        if (mHeartBeatTask == null) {
            mHeartBeatTask = new HeartBeatTask();
        }
        try {
            if (executor != null) {
                executor.shutdownNow();
                executor = null;
            }
            executor = Executors.newScheduledThreadPool(1);
            executor.scheduleAtFixedRate(
                    mHeartBeatTask,
                    1000,  //initDelay
                    HEARTBEART_PERIOD,  //period
                    TimeUnit.MILLISECONDS);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

/**
 * @param message
 * @param type    1=login;2=心跳;
 */
public void sendMsg(String message, int type) {
    PrintWriter writer;
    try {
        writer = new PrintWriter(new OutputStreamWriter(mClientSocket.getOutputStream(), "UTF-8"), true);
        writer.println(message);
        Log.e(TAG, "sendMsg  Socket.isClosed()=" + mClientSocket.isClosed() + ";connect=" + mClientSocket.isConnected());
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        switch (type) {
            case SEND_MSG_TYPE_HEARTBEAT:
                mHandler.obtainMessage(QpushService.PUSH_TYPE_PROTO_DATA, "发送心跳异常").sendToTarget();
                break;
        }

    }
}

/**
 * @param message
 * @param type    1=login;2=心跳;
 */
public void sendMsg(Driver.ClientMessage message, int type) {
    try {
        message.writeTo(mClientSocket.getOutputStream());
        Log.e(TAG, "sendMsg success");
    } catch (IOException e) {
        // TODO Auto-generated catch block
        if (type == SEND_MSG_TYPE_HEARTBEAT) {
            //心跳失败
            Log.e(TAG, "心跳失败");
            if (mClientSocket.isClosed()) {
                connect();
            }
        } else {
            Log.e(TAG, "发送数据失败");
        }
        e.printStackTrace();
    }
}

/**
 * 不断的检测是否有服务器推送的数据过来
 */
public void recvStringMsg() {
    while (mClientSocket != null && mClientSocket.isConnected() && !mClientSocket.isClosed()) {
        try {
            mReader = new BufferedReader(new InputStreamReader(mClientSocket.getInputStream(), "UTF-8"));
            String data = mReader.readLine();
            Log.e(TAG, "recvStringMsg data=" + data);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        sleep(2000);
    }
    sleep(CHECK_PERIOD);
}
/**
 * 不断的检测是否有服务器推送的数据过来
 */
public void recvProtobufMsg() {
    while (isStartRecieveMsg) {
        try {
            byte[] resultByte = recvByteMsg(mClientSocket.getInputStream());
            if (resultByte != null) {
                Driver.ClientMessage retMsg = Driver.ClientMessage.parseFrom(resultByte);
                mHandler.obtainMessage(QpushService.PUSH_TYPE_PROTO_DATA, retMsg).sendToTarget();
            } else {
                Log.e(TAG, "resultByte is null");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        sleep(5 * 1000);
    }
}
/**
 * 接收server的信息
 *
 * @return
 */
public byte[] recvByteMsg(InputStream inpustream) {
    try {
        byte len[] = new byte[1024];
        int count = inpustream.read(len);
        byte[] temp = new byte[count];
        for (int i = 0; i < count; i++) {
            temp[i] = len[i];
        }
        return temp;
    } catch (Exception localException) {
        localException.printStackTrace();
    }
    return null;
}

class HeartBeatTask implements Runnable {
    @Override
    public void run() {
        //执行发送心跳
        try {
            mClientSocket.sendUrgentData(65);
        } catch (IOException e) {
            e.printStackTrace();
            try {
                Log.e(TAG, "socket心跳异常,尝试断开,重连");
                mClientSocket.close();
                mClientSocket = null;
                //然后尝试重连
                connect();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
        Log.e(TAG, "发送心跳,Socket.isClosed()=" + mClientSocket.isClosed() + ";connect" + mClientSocket.isConnected());
    }
}

/**
 * 通过域名获取IP
 *
 * @param domain
 * @return
 */
public String getIP(String domain) {
    String IPAddress = "";
    InetAddress ReturnStr1 = null;
    try {
        ReturnStr1 = java.net.InetAddress.getByName(domain);
        IPAddress = ReturnStr1.getHostAddress();
    } catch (UnknownHostException e) {
        e.printStackTrace();
        Log.e(TAG, "获取IP失败" + e.getMessage());
    }
    return IPAddress;
//        return "192.168.3.121";
}

private void sleep(long sleepTime) {
    try {
        Thread.sleep(sleepTime);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

/**
 * 销毁socket
 */
public void onDestory() {
    if (mClientSocket != null) {
        try {
            mClientSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        mClientSocket = null;
    }
}

/*
 * Ready for use.
 */
public void close() {
    try {
        if (mClientSocket != null && !mClientSocket.isClosed())
            mClientSocket.close();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
}

QpushService类:主要负责启动socket客户端及管理消息

public class QpushService extends Service {
//数据推送,不显示到通知栏
final static int PUSH_TYPE_DATA = 1;
final static int PUSH_TYPE_PROTO_DATA = 2;
Handler mHandler;
String TAG = "QpushService";
QpushClient mClient;

@Override
public void onCreate() {
    Log.e(TAG, "onCreate");
    initHandler();
    super.onCreate();
}

/**
 * 初始化handler,用于接收推送的消息
 */
private void initHandler() {
    mHandler = new Handler() {
        @Override
        public void handleMessage(Message msg) {
            switch (msg.what) {
                case PUSH_TYPE_DATA:
                    Log.e(TAG, "PUSH_TYPE_DATA");
                    String data = (String) msg.obj;
                    ToastUtil.showShort(QpushService.this.getApplicationContext(), data);
                case PUSH_TYPE_PROTO_DATA:
                    Driver.ClientMessage clientMessage = (Driver.ClientMessage) msg.obj;
                    Log.e(TAG, "PUSH_TYPE_PROTO_DATA");
                    Intent intentCancelRighr = new Intent();
                    intentCancelRighr.setAction(PushReceiverAction.PUSH_ACTION);
                    intentCancelRighr.putExtra("data", clientMessage);
                    getApplicationContext().sendBroadcast(intentCancelRighr);
                    break;
            }
        }
    };
}

@Nullable
@Override
public IBinder onBind(Intent intent) {
    Log.e(TAG, "onBind");
    return null;
}

@Override
public int onStartCommand(Intent intent, int flags, int startId) {
    mClient = QpushClient.getInstance(mHandler);
    mClient.init(HttpUrls.SOCKET_HOST, HttpUrls.SOCKET_PORT, AppUtils.getPesudoUniqueID());
    return super.onStartCommand(intent, flags, startId);
}

@Override
public void onDestroy() {
    Log.e(TAG, "onDestroy");
    super.onDestroy();
    mClient.onDestory();
}
}

PushReceiver类:接收service消息的广播,运行的主进程,防止接收不到数据

public class PushReceiver extends BroadcastReceiver {
Context mContext;
String TAG = "PushReceiver";

@Override
public void onReceive(Context context, Intent intent) {
    mContext = context;
    if (intent.getAction().equals(PushReceiverAction.PUSH_ACTION)) {
        //推送的通知消息
        Driver.ClientMessage clientMessage = (Driver.ClientMessage) intent.getSerializableExtra("data");
        if (clientMessage != null) {
            switch (clientMessage.getType()) {
                case 1: //socket登录,不做处理
                    Log.e(TAG, "socket登录消息,sid=" + clientMessage.getHeartBeat().getSid());
                    MyApplication.sid =     clientMessage.getHeartBeat().getSid();
// showNotification("socket","登录成功",1);
                    break;
                case 2://通知告知被迫取消领取红包资格
                    Log.e(TAG, "取消抢红包资格,title=" + clientMessage.getLogOut().getTitle() + ";message=" + clientMessage.getLogOut().getContent());
                    ToastUtil.showShort(mContext, "被迫取消抢红包资格");
                    Intent intentCancelRighr = new Intent();
                    intentCancelRighr.setAction(PushReceiverAction.CANCEL_REDPACKET_RIGHT);
                    intentCancelRighr.putExtra("data", clientMessage);
                    context.sendBroadcast(intentCancelRighr);
                    MyApplication.mLoginStatus = 1;
                    showNotification(clientMessage.getLogOut().getTitle(), clientMessage.getLogOut().getContent(), 1);
                    break;
                case 3: //领取红包/未领到红包消息
                    Intent intentGo = new Intent();
                    intentGo.setAction(clientMessage.getRedPacket().getResult() == 1 ? PushReceiverAction.GET_REDPACKET_ACTION : PushReceiverAction.DONT_GET_REDPACKET_ACTION);
                    intentGo.putExtra("data", clientMessage);
                    context.sendBroadcast(intentGo);
                    Log.e(TAG, "红包消息,content=" + clientMessage.getRedPacket().getContent());
                    showNotification(clientMessage.getRedPacket().getTitle(), clientMessage.getRedPacket().getContent(), 2);
                    break;
            }
        } else {
            Log.e(TAG, "clientMessage is null");
        }
    }
}

/**
 * 在状态栏显示通知
 */
@SuppressWarnings("deprecation")
private void showNotification(String title, String content, int type) {
    // 创建一个NotificationManager的引用
    NotificationManager notificationManager = (NotificationManager) mContext.getSystemService(android.content.Context
            .NOTIFICATION_SERVICE);
    NotificationCompat.Builder builder = new NotificationCompat.Builder(mContext);
    builder.setContentTitle(title)
            .setContentText(content)
            .setTicker(title)
//                .setContentIntent(getDefalutIntent(Notification.FLAG_AUTO_CANCEL))//点击通知栏设置意图
            .setWhen(System.currentTimeMillis())
            .setPriority(Notification.PRIORITY_HIGH)
//                .setAutoCancel(true)
            .setOngoing(false)//ture,设置他为一个正在进行的通知。他们通常是用来表示一个后台任务,用户积极参与(如播放音乐)或以某种方式正在等待,因此占用设备(如一个文件下载,同步操作,主动网络连接)
            .setSmallIcon(R.drawable.logo);
    Notification notification = builder.build();
    Log.e(TAG, "isSoundOn:" + SharePreferUtils.isSoundOn(mContext));
    if (SharePreferUtils.isSoundOn(mContext) && type == 2) {  //type==红包消息
        notification.sound = Uri.parse("android.resource://" + mContext.getPackageName() + "/" + R.raw.diaoluo_da);
    }
    Intent notificationIntent;
    if (type == 2) {
        //进入我的资产
        notificationIntent = HomeActivity.toWitchFragmentOfHome(mContext, Constants.TOWITCHOFHOME_MYASSETS);
    } else {
        //进入主页流量球
        notificationIntent = HomeActivity.toWitchFragmentOfHome(mContext, Constants.TOWITCHOFHOME_BALL);
    }
    Log.e(TAG, "type=" + type);
    // 点击该通知后要跳转的Activity
    PendingIntent contentItent = PendingIntent.getActivity(mContext, 0, notificationIntent, 0);
    notification.contentIntent = contentItent;
    notification.flags = Notification.FLAG_AUTO_CANCEL;
    // 把Notification传递给NotificationManager
    notificationManager.notify(type, notification);
}
}

Driver类:该类是通过proto编译出来的,代码量比较大,Driver.proto文件贴下来,Driver.java文件我附上下载链接,编译教程见:protobuf在Android推送中的使用方法(比json、xml都要好的数据结构)
Driver.proto文件:

syntax = "proto3";

//编译教程:http://www.jianshu.com/p/8036003cb849
message ClientMessage
{
int32     type      = 1;  //类型1=socket登录;2=取消抢红包资格;3=领取红包消息
HeartBeat heartBeat = 2;  //socket登录
LogOut    logOut    = 3;  //取消抢红包资格消息
RedPacket redPacket = 4;  //领取&未抢到红包消息
}

message HeartBeat
{
string sid  = 1;  //服务ID
string imei = 2;  //手机唯一编号imei
}

message LogOut
{
string sid      = 1;  //服务ID
int32 result    = 2;  //1成功,2失败
string title    = 3;  //消息标题
string content  = 4;  //消息内容
}

message RedPacket
{
string sid      = 1;  //服务ID
int32 result    = 2;  //1成功,2失败
int32 price     = 3;  //红包金额单位分
string title    = 4;  //消息标题
string content  = 5;  //消息内容
}

Driver.java类下载地址:
http://files.cnblogs.com/files/feijian/Driver.rar

代码下载地址:
https://github.com/hqfeijian/qdemopush

    原文作者:赖床的猫
    原文地址: https://www.jianshu.com/p/0776dac9e3a3
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞