-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathcommunicate.py
More file actions
83 lines (64 loc) · 2 KB
/
Copy pathcommunicate.py
File metadata and controls
83 lines (64 loc) · 2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""
Author: Steffen Vogel <post@steffenvogel.de>
SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
SPDX-License-Identifier: Apache-2.0
""" # noqa: E501
import logging
import sys
from threading import Thread
from typing import Callable
import linuxfd # type: ignore[import]
from villas.node.formats import VillasHuman
from villas.node.sample import Sample
logger = logging.getLogger(__name__)
RecvCallback = Callable[[Sample], None]
SendCallback = Callable[[int], Sample]
class RecvThread(Thread):
def __init__(self, cb: RecvCallback):
super().__init__()
self.cb = cb
self.daemon = True
self.format = VillasHuman()
def run(self):
for line in sys.stdin:
logger.debug(f"RecvThread: {line}")
if (sample := self.format.load_sample(line)) is not None:
self.cb(sample)
class SendThread(Thread):
def __init__(self, cb: SendCallback, rate: float):
super().__init__()
self.cb = cb
self.daemon = True
self.format = VillasHuman()
self.rate = rate
self.sequence = 0
def run(self):
tfd = linuxfd.timerfd()
tfd.settime(1.0, 1.0 / self.rate)
while True:
tfd.read()
sample = self.cb(self.sequence)
if sample is None:
continue
sample = self.format.dump_sample(sample)
sys.stdout.write(sample)
sys.stdout.flush()
self.sequence += 1
def communicate(
rate: float,
recv_cb: RecvCallback | None = None,
send_cb: SendCallback | None = None,
wait: bool = True,
):
if recv_cb is not None:
rt = RecvThread(recv_cb)
rt.start()
if send_cb is not None:
st = SendThread(send_cb, rate)
st.start()
if wait:
try:
rt.join()
st.join()
except KeyboardInterrupt:
logger.info("Received Ctrl+C. Stopping send/recv threads")