Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
include AUTHORS CHANGES LICENSE MANIFEST.in README.rst
include setup.py
include statsd/py.typed
recursive-include docs *
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mock==1.0.1
nose==1.2.1
flake8==1.7.0
flake8
mypy==0.910
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upgrade?

1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
license='MIT',
packages=find_packages(),
include_package_data=True,
package_data={'': ['README.rst']},
test_suite='nose.collector',
classifiers=[
'Development Status :: 5 - Production/Stable',
Expand Down
11 changes: 9 additions & 2 deletions statsd/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from __future__ import absolute_import, division, unicode_literals

from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
from .udp import StatsClient # noqa
from .stream import TCPStatsClient, UnixSocketStatsClient
from .udp import StatsClient


__all__ = [
'TCPStatsClient',
'UnixSocketStatsClient',
'StatsClient',
]
59 changes: 39 additions & 20 deletions statsd/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,37 @@
import random
from collections import deque
from datetime import timedelta
from typing import Deque, Optional, TypeVar, Union

from .timer import Timer


P = TypeVar('P', bound='PipelineBase')


class StatsClientBase(object):
"""A Base class for various statsd clients."""
_prefix: Optional[str]

def close(self):
def close(self) -> None:
"""Used to close and clean up any underlying resources."""
raise NotImplementedError()

def _send(self):
def _send(self) -> None:
raise NotImplementedError()

def pipeline(self):
def pipeline(self) -> 'PipelineBase':
raise NotImplementedError()

def timer(self, stat, rate=1):
def timer(self, stat: str, rate: float = 1) -> Timer:
return Timer(self, stat, rate)

def timing(self, stat, delta, rate=1):
def timing(
self,
stat: str,
delta: Union[timedelta, float],
rate: float = 1,
) -> None:
"""
Send new timing information.

Expand All @@ -34,15 +44,21 @@ def timing(self, stat, delta, rate=1):
delta = delta.total_seconds() * 1000.
self._send_stat(stat, '%0.6f|ms' % delta, rate)

def incr(self, stat, count=1, rate=1):
def incr(self, stat: str, count: int = 1, rate: float = 1) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might accept non-integer values, at least, there's a test for that case

cl.incr('foo', 1.2)

Suggested change
def incr(self, stat: str, count: int = 1, rate: float = 1) -> None:
def incr(self, stat: str, count: float = 1, rate: float = 1) -> None:

"""Increment a stat by `count`."""
self._send_stat(stat, '%s|c' % count, rate)

def decr(self, stat, count=1, rate=1):
def decr(self, stat: str, count: int = 1, rate: float = 1) -> None:
"""Decrement a stat by `count`."""
self.incr(stat, -count, rate)

def gauge(self, stat, value, rate=1, delta=False):
def gauge(
self,
stat: str,
value: float,
rate: float = 1,
delta: bool = False,
) -> None:
"""Set a gauge value."""
if value < 0 and not delta:
if rate < 1:
Expand All @@ -55,53 +71,56 @@ def gauge(self, stat, value, rate=1, delta=False):
prefix = '+' if delta and value >= 0 else ''
self._send_stat(stat, '%s%s|g' % (prefix, value), rate)

def set(self, stat, value, rate=1):
def set(self, stat: str, value: str, rate: float = 1) -> None:
"""Set a set value."""
self._send_stat(stat, '%s|s' % value, rate)

def _send_stat(self, stat, value, rate):
def _send_stat(self, stat: str, value: str, rate: float) -> None:
self._after(self._prepare(stat, value, rate))

def _prepare(self, stat, value, rate):
def _prepare(self, stat: str, value: str, rate: float) -> Optional[str]:
if rate < 1:
if random.random() > rate:
return
return None
value = '%s|@%s' % (value, rate)

if self._prefix:
stat = '%s.%s' % (self._prefix, stat)

return '%s:%s' % (stat, value)

def _after(self, data):
def _after(self, data: Optional[str]) -> None:
if data:
self._send(data)


class PipelineBase(StatsClientBase):
_prefix: Optional[str]
_stats: Deque[str]
_client: StatsClientBase

def __init__(self, client):
def __init__(self, client: StatsClientBase) -> None:
self._client = client
self._prefix = client._prefix
self._stats = deque()

def _send(self):
def _send(self) -> None:
raise NotImplementedError()

def _after(self, data):
def _after(self, data: Optional[str]) -> None:
if data is not None:
self._stats.append(data)

def __enter__(self):
def __enter__(self: P) -> P:
return self

def __exit__(self, typ, value, tb):
def __exit__(self, *exc_info) -> None:
self.send()

def send(self):
def send(self) -> None:
if not self._stats:
return
self._send()

def pipeline(self):
def pipeline(self: P) -> P:
return self.__class__(self)
53 changes: 39 additions & 14 deletions statsd/client/stream.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,63 @@
from __future__ import absolute_import, division, unicode_literals

import socket
from typing import Optional, Union

from .base import StatsClientBase, PipelineBase


class StreamPipeline(PipelineBase):
def _send(self):
def _send(self) -> None:
self._client._after('\n'.join(self._stats))
self._stats.clear()


class StreamClientBase(StatsClientBase):
def connect(self):
_sock: Optional[socket.socket]

def connect(self) -> None:
raise NotImplementedError()

def close(self):
def close(self) -> None:
if self._sock and hasattr(self._sock, 'close'):
self._sock.close()
self._sock = None

def reconnect(self):
def reconnect(self) -> None:
self.close()
self.connect()

def pipeline(self):
def pipeline(self) -> StreamPipeline:
return StreamPipeline(self)

def _send(self, data):
def _send(self, data: str) -> None:
"""Send data to statsd."""
if not self._sock:
self.connect()
self._do_send(data)

def _do_send(self, data):
def _do_send(self, data: str) -> None:
assert self._sock is not None
self._sock.sendall(data.encode('ascii') + b'\n')


class TCPStatsClient(StreamClientBase):
"""TCP version of StatsClient."""

def __init__(self, host='localhost', port=8125, prefix=None,
timeout=None, ipv6=False):
_host: str
_port: int
_prefix: Optional[str]
_timeout: Optional[float]
_ipv6: bool
_sock: Optional[socket.socket]

def __init__(
self,
host: str = 'localhost',
port: int = 8125,
prefix: Optional[str] = None,
timeout: Optional[float] = None,
ipv6: bool = False,
) -> None:
"""Create a new client."""
self._host = host
self._port = port
Expand All @@ -50,7 +66,7 @@ def __init__(self, host='localhost', port=8125, prefix=None,
self._prefix = prefix
self._sock = None

def connect(self):
def connect(self) -> None:
fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
family, _, _, _, addr = socket.getaddrinfo(
self._host, self._port, fam, socket.SOCK_STREAM)[0]
Expand All @@ -61,15 +77,24 @@ def connect(self):

class UnixSocketStatsClient(StreamClientBase):
"""Unix domain socket version of StatsClient."""

def __init__(self, socket_path, prefix=None, timeout=None):
_socket_path: Union[str, bytes, tuple]
_prefix: Optional[str]
_timeout: Optional[float]
_sock: Optional[socket.socket]

def __init__(
self,
socket_path: Union[str, bytes, tuple],
prefix: Optional[str] = None,
timeout: Optional[float] = None,
) -> None:
"""Create a new client."""
self._socket_path = socket_path
self._timeout = timeout
self._prefix = prefix
self._sock = None

def connect(self):
def connect(self) -> None:
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.settimeout(self._timeout)
self._sock.connect(self._socket_path)
37 changes: 27 additions & 10 deletions statsd/client/timer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import, division, unicode_literals

import functools
from typing import TYPE_CHECKING, Callable, Optional, TypeVar

# Use timer that's not susceptible to time of day adjustments.
try:
Expand All @@ -11,25 +12,41 @@
from time import time as time_now


def safe_wraps(wrapper, *args, **kwargs):
if TYPE_CHECKING:
from .base import StatsClientBase


F = TypeVar('F', bound=Callable)
C = TypeVar('C', bound=Callable)
T = TypeVar('T', bound='Timer')


def safe_wraps(wrapper: F, *args, **kwargs) -> Callable[[C], C]:
"""Safely wraps partial functions."""
while isinstance(wrapper, functools.partial):
wrapper = wrapper.func
wrapper = wrapper.func # type: ignore[assignment]
return functools.wraps(wrapper, *args, **kwargs)


class Timer(object):
"""A context manager/decorator for statsd.timing()."""
client: 'StatsClientBase'
stat: str
rate: float
ms: Optional[float]
_sent: bool
_start_time: Optional[float]

def __init__(self, client, stat, rate=1):
def __init__(self, client: 'StatsClientBase',
stat: str, rate: float = 1) -> None:
self.client = client
self.stat = stat
self.rate = rate
self.ms = None
self._sent = False
self._start_time = None

def __call__(self, f):
def __call__(self, f: F) -> F:
"""Thread-safe timing function decorator."""
@safe_wraps(f)
def _wrapped(*args, **kwargs):
Expand All @@ -39,21 +56,21 @@ def _wrapped(*args, **kwargs):
finally:
elapsed_time_ms = 1000.0 * (time_now() - start_time)
self.client.timing(self.stat, elapsed_time_ms, self.rate)
return _wrapped
return _wrapped # type: ignore[return-value]

def __enter__(self):
def __enter__(self: T) -> T:
return self.start()

def __exit__(self, typ, value, tb):
def __exit__(self, *exc_info) -> None:
self.stop()

def start(self):
def start(self: T) -> T:
self.ms = None
self._sent = False
self._start_time = time_now()
return self

def stop(self, send=True):
def stop(self: T, send: bool = True) -> T:
if self._start_time is None:
raise RuntimeError('Timer has not started.')
dt = time_now() - self._start_time
Expand All @@ -62,7 +79,7 @@ def stop(self, send=True):
self.send()
return self

def send(self):
def send(self) -> None:
if self.ms is None:
raise RuntimeError('No data recorded.')
if self._sent:
Expand Down
Loading