Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 13 additions & 45 deletions prometheus_client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import time
import types

from threading import Lock
from threading import local, Lock
from timeit import default_timer

from .decorator import decorate
Expand Down Expand Up @@ -765,7 +765,7 @@ def time(self):

Can be used as a function decorator or context manager.
'''
return _GaugeTimer(self)
return _Timer(self.set)

def set_function(self, f):
'''Call the provided function to return the Gauge value.
Expand Down Expand Up @@ -829,7 +829,7 @@ def time(self):

Can be used as a function decorator or context manager.
'''
return _SummaryTimer(self)
return _Timer(self.observe)

def _samples(self):
return (
Expand Down Expand Up @@ -919,7 +919,7 @@ def time(self):

Can be used as a function decorator or context manager.
'''
return _HistogramTimer(self)
return _Timer(self.observe)

def _samples(self):
samples = []
Expand All @@ -932,24 +932,6 @@ def _samples(self):
return tuple(samples)


class _HistogramTimer(object):
def __init__(self, histogram):
self._histogram = histogram

def __enter__(self):
self._start = default_timer()

def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._histogram.observe(max(default_timer() - self._start, 0))

def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)


class _ExceptionCounter(object):
def __init__(self, counter, exception):
self._counter = counter
Expand Down Expand Up @@ -986,34 +968,20 @@ def wrapped(func, *args, **kwargs):
return decorate(f, wrapped)


class _SummaryTimer(object):
def __init__(self, summary):
self._summary = summary

def __enter__(self):
self._start = default_timer()

def __exit__(self, typ, value, traceback):
# Time can go backwards.
self._summary.observe(max(default_timer() - self._start, 0))

def __call__(self, f):
def wrapped(func, *args, **kwargs):
with self:
return func(*args, **kwargs)
return decorate(f, wrapped)


class _GaugeTimer(object):
def __init__(self, gauge):
self._gauge = gauge
class _Timer(object):
def __init__(self, callback):
self._callback = callback
self._storage = local()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe for reentrant use within the same thread?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a point here - local() returns reference to the same object within a thread and therefore it will work incorrectly, particularly for nested timers or regular timers in async code. I'm thinking about using id(self) as a local storage key.
What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If id(self) would work, then we would be able to store in the self directly as it's just the memory address of self.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will bring us back to the original problem when a single instance of timer used in different threads.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that id(self) doesn't help, as you wouldn't be seeing a race in the first place if that worked. So this doesn't fix reentrancy. We may need a stack here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, lets define the problem :)
Having

s1 = Summary("s1",...)
s2 = Summary("s2",...)

The original issue is that:

@s1.time()
def f(): pass

Produces wrong timings when f() is called simultaneously from different threads. My last code solves inter thread issue using TLS, and uses id(self) as TSL key to support multiple timer instances within a thread. Such that both f() and g() (below) work

@s1.time()
def g():
    s2.time()(lambda : time.sleep(1))()

And both f() and g() cases are covered with tests.

Now, what do you refer as reentrancy? This? -

@s1.time()
def h():
    @s1.time()
    def i(): pass
    i()

This will work as well, since each call to time() will produce different instances of the _Time() with distinct id(self).

P.S. I actually think that the better and simpler idea is to rework both decorator and context manager to use function closure to record the start time. It will be also a bit faster IMHO. I need to think whether its possible to implement it without changing the time() semantics.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reentrant means that the function calls itself, e.g.:

def f():
  @s1.time():
     f()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case?
Recursion like this?

@s1.time()
def f(i=0):
    f(i+1) if i > 5 else True

Then no, _Timer is not reentrant. But neither is the current implementation (as in master).

self.key = "k_{0}".format(id(self))

def __enter__(self):
self._start = default_timer()
setattr(self._storage, self.key, default_timer())

def __exit__(self, typ, value, traceback):
start = getattr(self._storage, self.key)
# Time can go backwards.
self._gauge.set(max(default_timer() - self._start, 0))
duration = max(default_timer() - start, 0)
self._callback(duration)

def __call__(self, f):
def wrapped(func, *args, **kwargs):
Expand Down
75 changes: 74 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import inspect
import time
import unittest
from concurrent.futures import ThreadPoolExecutor

try:
import unittest2 as unittest
except ImportError:
import unittest


from prometheus_client.core import (
CollectorRegistry,
Expand Down Expand Up @@ -124,6 +130,26 @@ def f():
f()
self.assertNotEqual(0, self.registry.get_sample_value('g'))

def test_function_decorator_multithread(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
workers = 2
pool = ThreadPoolExecutor(max_workers=workers)

@self.gauge.time()
def f(duration):
time.sleep(duration)

expected_duration = 1
pool.submit(f, expected_duration)
time.sleep(0.7 * expected_duration)
pool.submit(f, expected_duration * 2)
time.sleep(expected_duration)

rounding_coefficient = 0.9
adjusted_expected_duration = expected_duration * rounding_coefficient
self.assertLess(adjusted_expected_duration, self.registry.get_sample_value('g'))
pool.shutdown(wait=True)

def test_time_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('g'))
with self.gauge.time():
Expand Down Expand Up @@ -155,6 +181,32 @@ def f():
f()
self.assertEqual(1, self.registry.get_sample_value('s_count'))

def test_function_decorator_multithread(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))
summary2 = Summary('s2', 'help', registry=self.registry)

workers = 3
duration = 0.1
pool = ThreadPoolExecutor(max_workers=workers)

@self.summary.time()
def f():
time.sleep(duration / 2)
# Testing that different instances of timer do not interfere
summary2.time()(lambda : time.sleep(duration / 2))()

jobs = workers * 3
for i in range(jobs):
pool.submit(f)
pool.shutdown(wait=True)

self.assertEqual(jobs, self.registry.get_sample_value('s_count'))

rounding_coefficient = 0.9
total_expected_duration = jobs * duration * rounding_coefficient
self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum'))
self.assertLess(total_expected_duration / 2 , self.registry.get_sample_value('s2_sum'))

def test_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('s_count'))
with self.summary.time():
Expand Down Expand Up @@ -234,6 +286,27 @@ def f():
self.assertEqual(1, self.registry.get_sample_value('h_count'))
self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))

def test_function_decorator_multithread(self):
self.assertEqual(0, self.registry.get_sample_value('h_count'))
workers = 3
duration = 0.1
pool = ThreadPoolExecutor(max_workers=workers)

@self.histogram.time()
def f():
time.sleep(duration)

jobs = workers * 3
for i in range(jobs):
pool.submit(f)
pool.shutdown(wait=True)

self.assertEqual(jobs, self.registry.get_sample_value('h_count'))

rounding_coefficient = 0.9
total_expected_duration = jobs * duration * rounding_coefficient
self.assertLess(total_expected_duration, self.registry.get_sample_value('h_sum'))

def test_block_decorator(self):
self.assertEqual(0, self.registry.get_sample_value('h_count'))
self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'}))
Expand Down
17 changes: 15 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,22 @@ deps =

[testenv:py26]
; Last pytest and py version supported on py26 .
deps =
deps =
unittest2
py==1.4.31
pytest==2.9.2
coverage
futures

[testenv:py27]
deps =
{[base]deps}
futures

[testenv:pypy]
deps =
{[base]deps}
futures

[testenv]
deps =
Expand All @@ -24,7 +35,9 @@ commands = coverage run --parallel -m pytest {posargs}

; Ensure test suite passes if no optional dependencies are present.
[testenv:py27-nooptionals]
deps = {[base]deps}
deps =
{[base]deps}
futures
commands = coverage run --parallel -m pytest {posargs}

[testenv:py36-nooptionals]
Expand Down