From a33dac084a8fcde963ed9e5fcd91a9248b0c61e9 Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 11 Jul 2018 16:07:35 +1000 Subject: [PATCH 1/8] Multithreading support for time() decorators Fixes #287 --- prometheus_client/core.py | 56 +++++++--------------------------- tests/test_core.py | 64 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 45 deletions(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 0f1d0144..7adaf9c2 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -13,7 +13,7 @@ import time import types -from threading import Lock +from threading import local, Lock from timeit import default_timer from .decorator import decorate @@ -765,7 +765,7 @@ def time(self): Can be used as a function decorator or context manager. ''' - return _GaugeTimer(self) + return _Timer(self.set) def set_function(self, f): '''Call the provided function to return the Gauge value. @@ -829,7 +829,7 @@ def time(self): Can be used as a function decorator or context manager. ''' - return _SummaryTimer(self) + return _Timer(self.observe) def _samples(self): return ( @@ -919,7 +919,7 @@ def time(self): Can be used as a function decorator or context manager. ''' - return _HistogramTimer(self) + return _Timer(self.observe) def _samples(self): samples = [] @@ -932,24 +932,6 @@ def _samples(self): return tuple(samples) -class _HistogramTimer(object): - def __init__(self, histogram): - self._histogram = histogram - - def __enter__(self): - self._start = default_timer() - - def __exit__(self, typ, value, traceback): - # Time can go backwards. - self._histogram.observe(max(default_timer() - self._start, 0)) - - def __call__(self, f): - def wrapped(func, *args, **kwargs): - with self: - return func(*args, **kwargs) - return decorate(f, wrapped) - - class _ExceptionCounter(object): def __init__(self, counter, exception): self._counter = counter @@ -986,34 +968,18 @@ def wrapped(func, *args, **kwargs): return decorate(f, wrapped) -class _SummaryTimer(object): - def __init__(self, summary): - self._summary = summary - - def __enter__(self): - self._start = default_timer() - - def __exit__(self, typ, value, traceback): - # Time can go backwards. - self._summary.observe(max(default_timer() - self._start, 0)) - - def __call__(self, f): - def wrapped(func, *args, **kwargs): - with self: - return func(*args, **kwargs) - return decorate(f, wrapped) - - -class _GaugeTimer(object): - def __init__(self, gauge): - self._gauge = gauge +class _Timer(object): + def __init__(self, callback): + self._callback = callback + self._storage = local() def __enter__(self): - self._start = default_timer() + self._storage.start = default_timer() def __exit__(self, typ, value, traceback): # Time can go backwards. - self._gauge.set(max(default_timer() - self._start, 0)) + duration = max(default_timer() - self._storage.start, 0) + self._callback(duration) def __call__(self, f): def wrapped(func, *args, **kwargs): diff --git a/tests/test_core.py b/tests/test_core.py index c949b623..fea563fe 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -3,6 +3,8 @@ import inspect import time import unittest +from concurrent.futures import ThreadPoolExecutor + from prometheus_client.core import ( CollectorRegistry, @@ -124,6 +126,26 @@ def f(): f() self.assertNotEqual(0, self.registry.get_sample_value('g')) + def test_function_decorator_multithread(self): + self.assertEqual(0, self.registry.get_sample_value('g')) + workers = 2 + pool = ThreadPoolExecutor(max_workers=workers) + + @self.gauge.time() + def f(duration): + time.sleep(duration) + + expected_duration = 1 + pool.submit(f, expected_duration) + time.sleep(0.7 * expected_duration) + pool.submit(f, expected_duration * 2) + time.sleep(expected_duration) + + rounding_coefficient = 0.9 + adjusted_expected_duration = expected_duration * rounding_coefficient + self.assertLess(adjusted_expected_duration, self.registry.get_sample_value('g')) + pool.shutdown(wait=True) + def test_time_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('g')) with self.gauge.time(): @@ -155,6 +177,27 @@ def f(): f() self.assertEqual(1, self.registry.get_sample_value('s_count')) + def test_function_decorator_multithread(self): + self.assertEqual(0, self.registry.get_sample_value('s_count')) + workers = 3 + duration = 0.1 + pool = ThreadPoolExecutor(max_workers=workers) + + @self.summary.time() + def f(): + time.sleep(duration) + + jobs = workers * 3 + for i in range(jobs): + pool.submit(f) + pool.shutdown(wait=True) + + self.assertEqual(jobs, self.registry.get_sample_value('s_count')) + + rounding_coefficient = 0.9 + total_expected_duration = jobs * duration * rounding_coefficient + self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum')) + def test_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('s_count')) with self.summary.time(): @@ -234,6 +277,27 @@ def f(): self.assertEqual(1, self.registry.get_sample_value('h_count')) self.assertEqual(1, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) + def test_function_decorator_multithread(self): + self.assertEqual(0, self.registry.get_sample_value('h_count')) + workers = 3 + duration = 0.1 + pool = ThreadPoolExecutor(max_workers=workers) + + @self.histogram.time() + def f(): + time.sleep(duration) + + jobs = workers * 3 + for i in range(jobs): + pool.submit(f) + pool.shutdown(wait=True) + + self.assertEqual(jobs, self.registry.get_sample_value('h_count')) + + rounding_coefficient = 0.9 + total_expected_duration = jobs * duration * rounding_coefficient + self.assertLess(total_expected_duration, self.registry.get_sample_value('h_sum')) + def test_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('h_count')) self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '+Inf'})) From 263e2b451b885dad71f779889bea7d41584fe540 Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 11 Jul 2018 16:08:16 +1000 Subject: [PATCH 2/8] Installing futures package for Python 2.x Required for multithreading tests. --- tox.ini | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index c9c1138f..04a84389 100644 --- a/tox.ini +++ b/tox.ini @@ -9,11 +9,17 @@ deps = [testenv:py26] ; Last pytest and py version supported on py26 . -deps = +deps = unittest2 py==1.4.31 pytest==2.9.2 coverage + futures + +[testenv:py27] +deps = + {[base]deps} + futures [testenv] deps = From e4969a5856c90d0c843834a9b96d1f447072dbbd Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 11 Jul 2018 16:29:56 +1000 Subject: [PATCH 3/8] Fixing text environment for Py27 multithread --- tox.ini | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 04a84389..e42fc8eb 100644 --- a/tox.ini +++ b/tox.ini @@ -30,7 +30,9 @@ commands = coverage run --parallel -m pytest {posargs} ; Ensure test suite passes if no optional dependencies are present. [testenv:py27-nooptionals] -deps = {[base]deps} +deps = + {[base]deps} + futures commands = coverage run --parallel -m pytest {posargs} [testenv:py36-nooptionals] From 40fdecc3eb73aff0ad87c4342f1fb4f43a49557e Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 11 Jul 2018 16:32:49 +1000 Subject: [PATCH 4/8] Py2.x unittest compatability --- tests/test_core.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index fea563fe..81c1461f 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -2,9 +2,13 @@ import inspect import time -import unittest from concurrent.futures import ThreadPoolExecutor +try: + import unittest2 as unittest +except ImportError: + import unittest + from prometheus_client.core import ( CollectorRegistry, From 2bb344dcb5eeb5f9b2d019fd9a86dd24b5be4422 Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 11 Jul 2018 16:34:47 +1000 Subject: [PATCH 5/8] pypy needs futures for testing as well It's a 2.7 version of Python lang. --- tox.ini | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tox.ini b/tox.ini index e42fc8eb..2b0d0bbb 100644 --- a/tox.ini +++ b/tox.ini @@ -21,6 +21,11 @@ deps = {[base]deps} futures +[testenv:pypy] +deps = + {[base]deps} + futures + [testenv] deps = {[base]deps} From 1807d224dc36cdae1a228d7b0b2cc813e7b38568 Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Thu, 12 Jul 2018 18:33:36 +1000 Subject: [PATCH 6/8] Ensuring that different instances of timer do not interfere Signed-off-by: Zaar Hai --- prometheus_client/core.py | 6 ++++-- tests/test_core.py | 7 ++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 7adaf9c2..9ca6cd63 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -972,13 +972,15 @@ class _Timer(object): def __init__(self, callback): self._callback = callback self._storage = local() + self.key = "k_{}".format(id(self)) def __enter__(self): - self._storage.start = default_timer() + setattr(self._storage, self.key, default_timer()) def __exit__(self, typ, value, traceback): + start = getattr(self._storage, self.key) # Time can go backwards. - duration = max(default_timer() - self._storage.start, 0) + duration = max(default_timer() - start, 0) self._callback(duration) def __call__(self, f): diff --git a/tests/test_core.py b/tests/test_core.py index 81c1461f..a732dc21 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -183,13 +183,17 @@ def f(): def test_function_decorator_multithread(self): self.assertEqual(0, self.registry.get_sample_value('s_count')) + summary2 = Summary('s2', 'help', registry=self.registry) + workers = 3 duration = 0.1 pool = ThreadPoolExecutor(max_workers=workers) @self.summary.time() def f(): - time.sleep(duration) + time.sleep(duration / 2) + # Testing that different instances of timer do not interfere + summary2.time()(lambda : time.sleep(duration / 2))() jobs = workers * 3 for i in range(jobs): @@ -201,6 +205,7 @@ def f(): rounding_coefficient = 0.9 total_expected_duration = jobs * duration * rounding_coefficient self.assertLess(total_expected_duration, self.registry.get_sample_value('s_sum')) + self.assertLess(total_expected_duration / 2 , self.registry.get_sample_value('s2_sum')) def test_block_decorator(self): self.assertEqual(0, self.registry.get_sample_value('s_count')) From 6f32ab99c4cd04b769c389533949220ca3eb0190 Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Thu, 12 Jul 2018 18:37:23 +1000 Subject: [PATCH 7/8] Python2.6 compliance Signed-off-by: Zaar Hai --- prometheus_client/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 9ca6cd63..fd6b041f 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -972,7 +972,7 @@ class _Timer(object): def __init__(self, callback): self._callback = callback self._storage = local() - self.key = "k_{}".format(id(self)) + self.key = "k_{1}".format(id(self)) def __enter__(self): setattr(self._storage, self.key, default_timer()) From b95fc8350a70683fee938137be596ff7acb75430 Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Thu, 12 Jul 2018 18:41:03 +1000 Subject: [PATCH 8/8] Python2.6 compliance take 2 Signed-off-by: Zaar Hai --- prometheus_client/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index fd6b041f..706e0234 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -972,7 +972,7 @@ class _Timer(object): def __init__(self, callback): self._callback = callback self._storage = local() - self.key = "k_{1}".format(id(self)) + self.key = "k_{0}".format(id(self)) def __enter__(self): setattr(self._storage, self.key, default_timer())