Python-OSC, how to share blinks/jaw clenches with other functions?
Posted: Sat Sep 17, 2022 4:37 am
This is most probably more a Python related question than directly related to MindMonitor or Python-OSC, but as I've not found an answer on the web, I thought I'd ask here.
Below is skeletal code for a very simple Pong-game which I'm ultimately trying to control with blinks and jaw clenches instead of from the keyboard. Where I'm stuck is how to share the amount of blinks from the function def blink_handler to the calling function pong. The error I get is NameError: name 'blinks' is not defined on row 137 (print(blinks)).
Might this be because threading is being used, and global variables are not shared? Any clue how to fix this, or can I avoid threading somehow?
Below is skeletal code for a very simple Pong-game which I'm ultimately trying to control with blinks and jaw clenches instead of from the keyboard. Where I'm stuck is how to share the amount of blinks from the function def blink_handler to the calling function pong. The error I get is NameError: name 'blinks' is not defined on row 137 (print(blinks)).
Might this be because threading is being used, and global variables are not shared? Any clue how to fix this, or can I avoid threading somehow?
Code: Select all
from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import BlockingOSCUDPServer
from tkinter import *
import tkinter as tk
import time
import random
import threading
global isFailed
IP = "0.0.0.0"
PORT = 5000
def muse_handler(address, *args):
"""handle all muse data"""
print(f"{address}: {args}")
def eeg_handler(address, *args):
"""handle raw eeg data"""
print(f"{address}: {args}")
def blink_handler(address, *args):
global blinks
"""handle blink data"""
blinks = blinks + 1
print("Blink detected ", blinks)
def jaw_handler(address, *args):
"""handle jaw clench data"""
print("Jaw Clench detected")
def get_dispatcher():
dispatcher = Dispatcher()
dispatcher.map("/muse/elements/blink", blink_handler)
dispatcher.map("/muse/elements/jaw_clench", jaw_handler)
return dispatcher
def start_blocking_server(ip, port):
server = BlockingOSCUDPServer((ip, port), dispatcher)
server.serve_forever() # Blocks forever
def moveBaseLR(base, dir, x, y = 0):
x1, y1, x2, y2 = c.coords(base)
if ((x1 > 0 and dir == 'l') or (x2 < 400 and dir == 'r')):
c.move(base, x, y)
c.update()
def dispatch():
global dispatcher
dispatcher = get_dispatcher()
start_blocking_server(IP, PORT)
def startBall(ball, sp):
s = random.randint(-sp, sp)
x, y = s, 0-sp #Start. Ball to move in random direction. 0-sp is used to get negative value
c.move(ball, x, y)
for p in range(1, 500000):
l, t, r, b = c.coords(ball)
txtS.delete(0, END)
txtS.insert(0, str(p))
#Need to change direction on hitting wall. Eight options are there.
if(r >= 400 and x >= 0 and y < 0): #Ball moving ↗ and hit right wall
x, y = 0-sp, 0-sp
elif(r >= 400 and x >= 0 and y >= 0): #Ball moving ↘ and hit right wall
x, y = 0-sp, sp
elif(l <= 0 and x < 0 and y < 0): #Ball moving ↖ and hit left wall
x, y = sp, 0-sp
elif(l <= 0 and x < 0 and y >= 0): #Ball moving ↙ and hit left wall
x, y = sp, sp
elif(t <= 0 and x >= 0 and y < 0): #Ball moving ↗ and hit top wall
x, y = sp, sp
elif(t <= 0 and x < 0 and y < 0): #Ball moving ↖ and hit top wall
x, y = 0-sp, sp
elif(b >= 385): #Ball reached base level. Check if base touches ball
tchPt = l + 10 #Size is 20. Half of it.
bsl, bst, bsr, bsb = c.coords(base)
if(tchPt >= bsl and tchPt <= bsr): #Ball touch base
n = random.randint(-sp, sp)
x, y = n, 0-sp
else: #Hit bottom. Failed
c.itemconfigure(lblID, state='normal')
global isFailed
isFailed = True
break
time.sleep(.025)
c.move(ball, x, y)
c.update()
def restart():
global isFailed
if(isFailed == True):
isFailed = False
c.itemconfigure(lblID, state='hidden')
c.moveto(base, 150, 385)
c.moveto(ball, 190, 365)
startBall(ball, ballsp)
def pong():
global c, ball, txtS, base, lblID, ballsp, blinks
root = Tk()
root.minsize(400,400)
basesp = 10
ballsp = 5
global isFailed
isFailed = False
thread = threading.Thread(target=dispatch)
thread.daemon = True
thread.start()
c = Canvas(width=400, height=400, background='#a0aa00')
c.pack()
base = c.create_rectangle(150, 385, 250, 400, fill='blue', outline='blue')
ball = c.create_oval(190, 365, 210, 385, fill='red', outline='red')
txtS = tk.Entry(c, text='0')
txtScore = c.create_window(350, 0, anchor='nw', window=txtS)
lblM = tk.Label(c, text='Failed!!!Press Enter key to start again')
lblID = c.create_window(100, 190, anchor='nw', window=lblM)
c.itemconfigure(lblID, state='hidden')
root.bind("<KeyPress-Left>", lambda event: moveBaseLR(base, 'l', 0-basesp))
root.bind("<KeyPress-Right>", lambda event: moveBaseLR(base, 'r', basesp))
root.bind("<Return>", lambda event: restart())
print(blinks)
startBall(ball, ballsp)
root.mainloop()
if __name__ == "__main__":
pong()