-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
114 lines (100 loc) · 2.67 KB
/
utils.py
File metadata and controls
114 lines (100 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import threading
import queue
import asyncio
from enum import Enum
buttons_characteristic_uuid = '1A270002-C2ED-4D11-AD1E-FC06D8A02D37'
device_name_map = {
"beep": "WAC-2463",
"boop": "WAC-7F36",
"buzz": "WAC-98CE",
"bzzt": "WAC-D329",
"chirp": "WAC-1A74",
"click": "WAC-27B7",
"clonk": "WAC-9776",
"clunk": "WAC-7740",
"crash": "WAC-22F2",
"dink": "WAC-6BC7",
"doot": "WAC-47F1",
"fizz": "WAC-121A",
"honk": "WAC-5613",
"hoot": "WAC-9717",
"jolt": "WAC-A466",
"noot": "WAC-EC1C",
"oink": "WAC-7236",
"pew": "WAC-2C37",
"ping": "WAC-9F29",
"pong": "WAC-C9B9",
"pop": "WAC-E5C",
"pow": "WAC-5F6",
"purr": "WAC-F8D0",
"quark": "WAC-B45E",
"ring": "WAC-60F4",
"roar": "WAC-6BE7",
"sigh": "WAC-CE8B",
"snip": "WAC-2E11",
"sput": "WAC-D6A0",
"swsh": "WAC-383E",
"tape": "WAC-FCBA",
"thud": "",
"thum": "",
"tik": "",
"tok": "",
"tong": "",
"vroom": "",
"whim": "",
"whir": "",
"whiz": "",
"whoop": "",
"whum": "",
"wizz": "",
"wow": "",
"yip": "",
"zap": "",
"zip": "",
"zot": "",
}
class DynamicObject:
def __init__(self):
pass
def __setattr__(self, name, value):
self.__dict__[name] = value
class RobotState(Enum):
DISCONNECTED = 0
CONNECTING = 1
CONNECTED_IDLE = 2
RUNNING = 3
DONE = 4
def copy_queue(original_queue):
new_queue = queue.Queue(maxsize=original_queue.qsize())
temp_list = []
# Lock to ensure thread-safe access to the queue
with threading.Lock():
while not original_queue.empty():
try:
# Try to get an item without blocking
item = original_queue.get_nowait()
new_queue.put_nowait(item)
temp_list.append(item)
except queue.Empty:
# The queue is empty, break the loop
break
# Put items back into the original queue
for item in temp_list:
original_queue.put(item)
return new_queue
async def copy_asyncio_queue(original_queue: asyncio.Queue) -> asyncio.Queue:
new_queue = asyncio.Queue(maxsize=original_queue.maxsize)
temp_list = []
# Transfer items from the original queue to the new queue
while not original_queue.empty():
item = await original_queue.get()
await new_queue.put(item)
temp_list.append(item)
# Restore items to the original queue
for item in temp_list:
await original_queue.put(item)
return new_queue
async def empty_asyncio_queue(queue: asyncio.Queue):
while not queue.empty():
await queue.get()
queue.task_done()