server.py (2729B)
1 import json 2 from flask import Flask, send_from_directory 3 from flask_sockets import Sockets 4 from gevent import pywsgi 5 from geventwebsocket.handler import WebSocketHandler 6 import os 7 import os.path 8 import pickle 9 10 try: 11 import unicornhathd as unicorn 12 print("unicorn hat hd detected") 13 except ImportError: 14 from unicorn_hat_sim import unicornhathd as unicorn 15 16 # Actions that can be sent 17 NO_OP = 'NO_OP' 18 SET_PIXEL = 'SET_PIXEL' 19 CLEAR = 'CLEAR' 20 SAVE = 'SAVE' 21 LOAD = 'LOAD' 22 23 # GLobal references to app & clients 24 all_clients = set() 25 app = Flask(__name__, static_folder='') 26 sockets = Sockets(app) 27 28 def send_state(): 29 global all_clients 30 state = { 31 "saves": os.listdir(SAVES_DIR), 32 "pixels": unicorn.get_pixels().tolist() 33 } 34 stateJson = json.dumps(state) 35 for client in all_clients: 36 client.send(stateJson) 37 38 def execute_command(command): 39 cmd_type = command['type'] 40 if cmd_type == NO_OP: # Used just to get state without doing an action 41 pass 42 elif cmd_type == SET_PIXEL: 43 x = int(command['x']) 44 y = int(command['y']) 45 r = int(command['r']) 46 g = int(command['g']) 47 b = int(command['b']) 48 unicorn.set_pixel(x, y, r, g, b) 49 unicorn.show() 50 elif cmd_type == CLEAR: 51 unicorn.clear() 52 unicorn.show() 53 elif cmd_type == SAVE: 54 saveName = command["saveName"] 55 save(saveName) 56 elif cmd_type == LOAD: 57 saveName = command["saveName"] 58 load(saveName) 59 60 # Constants 61 SAVES_DIR = 'saves/' 62 63 def save(saveName): 64 with open(os.path.join(SAVES_DIR, saveName), "wb") as f: 65 pickle.dump(unicorn.get_pixels(), f) 66 67 def load(saveName): 68 with open(os.path.join(SAVES_DIR, saveName), "rb") as f: 69 pixels = pickle.load(f) 70 for x, row in enumerate(pixels): 71 for y, pixel in enumerate(row): 72 unicorn.set_pixel(x, y, *pixel) 73 unicorn.show() 74 75 @sockets.route('/ws') 76 def do_websocket(websocket): 77 global all_clients 78 all_clients.add(websocket) 79 try: 80 while not websocket.closed: 81 command = websocket.receive() 82 if not command: 83 break 84 cmd = json.loads(command) 85 execute_command(cmd) 86 send_state() 87 finally: 88 all_clients.remove(websocket) 89 90 91 @app.route('/<path:path>') 92 def send_static(path): 93 return send_from_directory('build/', path) 94 95 def main(): 96 try: 97 os.mkdir(SAVES_DIR) 98 except: 99 pass 100 try: 101 print("Serving on port 3001") 102 server = pywsgi.WSGIServer(('', 3001), app, handler_class=WebSocketHandler) 103 server.serve_forever() 104 finally: 105 unicorn.off() 106 107 if __name__=="__main__": 108 main() 109