blob: ea4b8144619af5112857e02ab42f4b322d86719b (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
import json
from flask import Flask, send_from_directory
from flask_sockets import Sockets
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
try:
import unicornhathd as unicorn
print("unicorn hat hd detected")
except ImportError:
from unicorn_hat_sim import unicornhathd as unicorn
NO_OP = 'NO_OP'
SET_PIXEL = 'SET_PIXEL'
CLEAR = 'CLEAR'
all_clients = set()
app = Flask(__name__, static_folder='build/')
sockets = Sockets(app)
def send_state():
global all_clients
jsonPixels = json.dumps(unicorn.get_pixels())
for client in all_clients:
client.send(jsonPixels)
def execute_command(command):
cmd_type = command['type']
if cmd_type == NO_OP:
pass
elif cmd_type == SET_PIXEL:
x = int(command['x'])
y = int(command['y'])
r = int(command['r'])
g = int(command['g'])
b = int(command['b'])
unicorn.set_pixel(x, y, r, g, b)
unicorn.show()
elif cmd_type == CLEAR:
unicorn.clear()
unicorn.show()
@sockets.route('/ws')
def do_websocket(websocket):
global all_clients
all_clients.add(websocket)
try:
while not websocket.closed:
command = websocket.receive()
cmd = json.loads(command)
execute_command(cmd)
send_state()
finally:
all_clients.remove(websocket)
@app.route('/<path:path>')
def send_static(path):
return send_from_directory('build/', path)
def main():
try:
print("Serving on port 3001")
server = pywsgi.WSGIServer(('', 3001), app, handler_class=WebSocketHandler)
server.serve_forever()
except:
unicorn.off()
if __name__=="__main__":
main()
|