forked from seung-lab/python-task-queue
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtaskqueue_cli.py
More file actions
142 lines (114 loc) · 3.33 KB
/
taskqueue_cli.py
File metadata and controls
142 lines (114 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import math
import importlib
import click
from tqdm import tqdm
from taskqueue import TaskQueue, __version__, QueueEmptyError
from taskqueue.lib import toabs
from taskqueue.paths import get_protocol
def normalize_path(queuepath):
if not get_protocol(queuepath):
return "fq://" + toabs(queuepath)
return queuepath
@click.group()
@click.version_option(version=__version__)
def main():
"""
CLI tool for managing python-task-queue queues.
https://github.com/seung-lab/python-task-queue
"""
pass
@main.command()
def license():
"""Prints the license for this library and cli tool."""
path = os.path.join(os.path.dirname(__file__), 'LICENSE')
with open(path, 'rt') as f:
print(f.read())
@main.command()
@click.argument("queuepath")
def rezero(queuepath):
"""Reset collected statistics for queue."""
TaskQueue(normalize_path(queuepath)).rezero()
@main.command()
@click.argument("queuepath")
def status(queuepath):
"""Print vital statistics for queue."""
tq = TaskQueue(normalize_path(queuepath))
ins = tq.inserted
enq = tq.enqueued
comp = tq.completed
leased = tq.leased
if not math.isnan(ins):
print(f"Inserted: {ins}")
if ins > 0:
print(f"Enqueued: {enq} ({enq / ins * 100:.1f}% left)")
if not math.isnan(comp):
print(f"Completed: {comp} ({comp / ins * 100:.1f}%)")
else:
print(f"Enqueued: {enq} (--% left)")
if not math.isnan(comp):
print(f"Completed: {comp} (--%)")
if enq > 0:
print(f"Leased: {leased} ({leased / enq * 100:.1f}% of queue)")
else:
print(f"Leased: {leased} (--%) of queue")
@main.command()
@click.argument("queuepath")
def release(queuepath):
"""Release all tasks from their leases."""
TaskQueue(normalize_path(queuepath)).release_all()
@main.command()
@click.argument("src")
@click.argument("dest")
@click.option('--load', default=None, help="Load a module to get task definitions.", show_default=True)
def cp(src, dest, load):
"""
Copy the contents of a queue to another
service or location. Do not run this
process while a queue is being worked.
Currently sqs queues are not copiable,
but you can copy an fq to sqs. The mv
command supports sqs queues.
"""
if load:
importlib.import_module(load)
src = normalize_path(src)
dest = normalize_path(dest)
if get_protocol(src) == "sqs":
print("ptq: cp does not support sqs:// as a source.")
return
tqd = TaskQueue(dest)
tqs = TaskQueue(src)
tqd.insert(tqs)
@main.command()
@click.argument("src")
@click.argument("dest")
def mv(src, dest):
"""
Moves the contents of a queue to another
service or location. Do not run this
process while a queue is being worked.
Moving an sqs queue to a file queue
may result in duplicated tasks.
"""
src = normalize_path(src)
dest = normalize_path(dest)
tqd = TaskQueue(dest, progress=False)
tqs = TaskQueue(src, progress=False)
total = tqs.enqueued
with tqdm(total=total, desc="Moving") as pbar:
while True:
try:
tasks = tqs.lease(num_tasks=10, seconds=10)
except QueueEmptyError:
break
tqd.insert(tasks)
tqs.delete(tasks)
pbar.update(len(tasks))
@main.command()
@click.argument("queuepath")
def purge(queuepath):
"""Delete all queued messages and zero out queue statistics."""
queuepath = normalize_path(queuepath)
tq = TaskQueue(queuepath)
tq.purge()