-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathasync_python.py
More file actions
86 lines (61 loc) · 1.81 KB
/
async_python.py
File metadata and controls
86 lines (61 loc) · 1.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import asyncio
async def read_stdout(stdout):
print('read_stdout')
while True:
buf = await stdout.read(10)
if not buf:
break
print(f'stdout: { buf }')
async def read_stderr(stderr):
print('read_stderr')
while True:
buf = await stderr.read()
if not buf:
break
print(f'stderr: { buf }')
async def write_stdin(stdin):
print('write_stdin')
for i in range(100):
buf = f'line: { i }\n'.encode()
print(f'stdin: { buf }')
stdin.write(buf)
await stdin.drain()
await asyncio.sleep(0.5)
async def run():
proc = await asyncio.create_subprocess_exec(
'/usr/bin/tee',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
await asyncio.gather(
read_stderr(proc.stderr),
read_stdout(proc.stdout),
write_stdin(proc.stdin))
asyncio.run(run())
from subprocess import Popen, PIPE
from threading import Thread
SHELL = False
CMD = [b'/bin/bash', b'-i']
CMD2 = [b'/usr/local/bin/python3.7', b'-i']
async def main():
proc = await asyncio.subprocess.create_subprocess_shell(
b' '.join(CMD), **POPEN_ARGS
)
proc.stdin.write(b'echo ONE\n')
print(await proc.stdout.read(1024))
proc.stdin.write(b'ls\n')
print(await proc.stdout.read(2048))
proc.stdin.write(b'exit\n')
await proc.wait()
async def main2():
proc = await asyncio.subprocess.create_subprocess_shell(
b' '.join(CMD2), **POPEN_ARGS
)
proc.stdin.write(b'print("test")\n')
print(await proc.stdout.read(1024))
proc.stdin.write(b'dir()\n')
print(await proc.stdout.read(2048))
proc.stdin.write(b'quit()\n')
await proc.wait()
if __name__ == '__main__':
asyncio.run(main2())