本系列内容:
- Kafka环境搭建与测试
- Python生产者/消费者测试
- Spark接收Kafka消息处理,然后回传到Kafka
- Flask引入消费者
- WebSocket实时显示
版本:
spark-2.4.3-bin-hadoop2.7.tgz
kafka_2.11-2.1.0.tgz
—————————–第4小节:Flask引入消费者———————————
步骤01:使用pip安装flask与flask_socketio
步骤02:编写后台处理程序
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
consumer = KafkaConsumer('result', bootstrap_servers=['192.168.147.128:9092'])
# 接收到消息就调用test_message方法,test_message是定义在web_socket对象上的js函数
def background_thread():
for msg in consumer:
data_json = msg.value.decode('utf8')
socketio.emit('test_message', {'data': data_json})
# JS代码中可以调用这个装饰器下的视图函数,以初始化消费者监听kafka
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread)
socketio.emit('connected', {'data': 'Connected'})
# 返回一个html页面
@app.route("/")
def handle_mes():
return render_template("index.html")
if __name__ == '__main__':
socketio.run(app, debug=True)