Spark+Kafka+WebSocket+eCharts实时分析-完全记录(4)

本系列内容:

  1. Kafka环境搭建与测试
  2. Python生产者/消费者测试
  3. Spark接收Kafka消息处理,然后回传到Kafka
  4. Flask引入消费者
  5. WebSocket实时显示

版本:

spark-2.4.3-bin-hadoop2.7.tgz

kafka_2.11-2.1.0.tgz

—————————–第4小节:Flask引入消费者———————————

步骤01:使用pip安装flask与flask_socketio

步骤02:编写后台处理程序

import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
consumer = KafkaConsumer('result', bootstrap_servers=['192.168.147.128:9092'])

# 接收到消息就调用test_message方法,test_message是定义在web_socket对象上的js函数
def background_thread():
    for msg in consumer:
        data_json = msg.value.decode('utf8')
        socketio.emit('test_message', {'data': data_json})

# JS代码中可以调用这个装饰器下的视图函数,以初始化消费者监听kafka
@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        thread = socketio.start_background_task(target=background_thread)
    socketio.emit('connected', {'data': 'Connected'})

# 返回一个html页面
@app.route("/")
def handle_mes():
    return render_template("index.html")


if __name__ == '__main__':
    socketio.run(app, debug=True)

    原文作者:陋室
    原文地址: https://zhuanlan.zhihu.com/p/71591407
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞