Skip to content

Commit fa4fb35

Browse files
committed
1.8.0 - new Python 3 based agent version
0 parents commit fa4fb35

File tree

207 files changed

+19730
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

207 files changed

+19730
-0
lines changed

.gitignore

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
*.pyc
2+
*.pyo
3+
*.log
4+
*.idea
5+
*.swp
6+
*.un~
7+
*.sqlite
8+
stubs
9+
*.iml
10+
.project
11+
.vscode
12+
env
13+
env/*
14+
.pydevproject
15+
.settings/
16+
htmlcov/*
17+
htmlcov
18+
build/*
19+
nginx-repo.crt
20+
nginx-repo.key
21+
.coverage
22+
naas_agent.yaml
23+
.cache/*
24+
.cache
25+
*~
26+
.DS_Store
27+
.env
28+
.pytest_cache/
29+
key.priv
30+
key.pub
31+
node.pid
32+
.python-version
33+
Pipfile*

LICENSE

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (C) Nginx, Inc.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions
7+
* are met:
8+
* 1. Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* 2. Redistributions in binary form must reproduce the above copyright
11+
* notice, this list of conditions and the following disclaimer in the
12+
* documentation and/or other materials provided with the distribution.
13+
*
14+
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15+
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17+
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20+
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21+
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22+
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23+
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24+
* SUCH DAMAGE.
25+
*/

MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
recursive-include etc *

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# NGINX Amplify Agent
2+
3+
The NGINX Amplify Agent is a Python application that provides system and NGINX metric collection. It is part of NGINX Amplify — a free configuration monitoring tool for NGINX.
4+
5+
Please check the list of the supported operating systems [here](https://github.com/nginxinc/nginx-amplify-doc/blob/master/amplify-faq.md#21-what-operating-systems-are-supported).
6+
7+
This repository is not for installation purposes. To install the agent, please follow [this](https://github.com/nginxinc/nginx-amplify-doc/blob/master/amplify-guide.md#installing-and-managing-nginx-amplify-agent) document.
8+
9+
For more information about NGINX Amplify, please check the official documentation [here](https://github.com/nginxinc/nginx-amplify-doc).

amplify/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# -*- coding: utf-8 -*-
2+
3+
__author__ = "Mike Belov"
4+
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
5+
__license__ = ""
6+
__maintainer__ = "Mike Belov"
7+
__email__ = "dedm@nginx.com"

amplify/agent/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# -*- coding: utf-8 -*-
2+
3+
4+
__author__ = "Mike Belov"
5+
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
6+
__license__ = ""
7+
__maintainer__ = "Mike Belov"
8+
__email__ = "dedm@nginx.com"
9+
10+
11+
class Singleton(object):
12+
"""
13+
WARN: If you choose to use implied references (re-init), this object can
14+
still be marked for cleanup by the GC. You must keep the reference
15+
counter > 0 at all times or you may have an unexpected clean up cause
16+
unexpected behavior.
17+
"""
18+
_instance = None
19+
_init = True # use this flag to skip future init calls if desirable
20+
21+
def __new__(cls):
22+
if not cls._instance:
23+
cls._instance = super(Singleton, cls).__new__(cls)
24+
return cls._instance
25+
26+
def __init__(self):
27+
self._init = False
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# -*- coding: utf-8 -*-
2+
3+
4+
__author__ = "Mike Belov"
5+
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
6+
__license__ = ""
7+
__maintainer__ = "Mike Belov"
8+
__email__ = "dedm@nginx.com"
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
# -*- coding: utf-8 -*-
2+
import time
3+
4+
from abc import abstractproperty
5+
from collections import defaultdict
6+
from threading import current_thread
7+
from gevent import GreenletExit
8+
9+
from amplify.agent.common.context import context
10+
11+
__author__ = "Mike Belov"
12+
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
13+
__license__ = ""
14+
__maintainer__ = "Mike Belov"
15+
__email__ = "dedm@nginx.com"
16+
17+
18+
class AbstractCollector(object):
19+
"""
20+
Abstract data collector
21+
Runs in a thread and collects specific data
22+
"""
23+
short_name = None
24+
25+
zero_counters = tuple()
26+
27+
def __init__(self, object=None, interval=None):
28+
self.object = object
29+
self.in_container = self.object.in_container
30+
self.interval = interval
31+
self.previous_counters = defaultdict(dict) # for deltas
32+
self.current_counters = defaultdict(int) # for aggregating
33+
self.current_latest = defaultdict(int) # for latest
34+
self.current_gauges = defaultdict(lambda: defaultdict(float)) # gauges
35+
self.methods = set()
36+
37+
# stamp store organized by type - metric_name - stamp
38+
self.current_stamps = defaultdict(lambda: defaultdict(time.time))
39+
40+
def init_counters(self, counters=None):
41+
"""
42+
Helper function for sending 0 values when no data is found.
43+
44+
:param counters: Iterable String values of names of counters to init as
45+
0 (default is self.zero_counters)
46+
"""
47+
counters = counters or self.zero_counters
48+
for counter in counters:
49+
self.object.statsd.incr(counter, value=0)
50+
51+
def run(self):
52+
"""
53+
Common collector cycle
54+
55+
1. Collect data
56+
2. Sleep
57+
3. Stop if object stopped
58+
"""
59+
# TODO: Standardize this with Managers.
60+
current_thread().name = self.short_name
61+
context.setup_thread_id()
62+
63+
try:
64+
while True:
65+
context.inc_action_id()
66+
if self.object.running:
67+
self._collect()
68+
self._sleep()
69+
else:
70+
break
71+
72+
# Since kill signals won't work, we raise it ourselves.
73+
raise GreenletExit
74+
except GreenletExit:
75+
context.log.debug(
76+
'%s collector for %s received exit signal' % (
77+
self.__class__.__name__,
78+
self.object.definition_hash
79+
)
80+
)
81+
82+
context.teardown_thread_id()
83+
84+
context.log.debug(
85+
'%s collector for %s teardown complete' % (
86+
self.__class__.__name__,
87+
self.object.definition_hash
88+
)
89+
)
90+
except:
91+
context.log.error(
92+
'%s collector run failed' % self.object.definition_hash,
93+
exc_info=True
94+
)
95+
raise
96+
97+
def register(self, *methods):
98+
"""
99+
Register methods for collecting
100+
"""
101+
self.methods.update(methods)
102+
103+
def _collect(self):
104+
"""
105+
Wrapper for actual collect process. Handles memory reporting before
106+
and after collect process.
107+
"""
108+
start_time = time.time()
109+
try:
110+
self.collect()
111+
finally:
112+
end_time = time.time()
113+
context.log.debug(
114+
'%s collect in %.3f' % (
115+
self.object.definition_hash,
116+
end_time - start_time
117+
)
118+
)
119+
120+
def _sleep(self):
121+
time.sleep(self.interval)
122+
123+
def collect(self, *args, **kwargs):
124+
if self.zero_counters:
125+
self.init_counters()
126+
127+
for method in self.methods:
128+
try:
129+
method(*args, **kwargs)
130+
except Exception as e:
131+
self.handle_exception(method, e)
132+
133+
def handle_exception(self, method, exception):
134+
context.log.error('%s failed to collect: %s raised %s%s' % (
135+
self.short_name, method.__name__, exception.__class__.__name__,
136+
' (in container)' if self.in_container else ''
137+
))
138+
context.log.debug('additional info:', exc_info=True)
139+
140+
def increment_counters(self):
141+
"""
142+
Increment counter method that takes the "current_values" dictionary of
143+
metric name - value pairs increments statsd appropriately based on
144+
previous values.
145+
"""
146+
for metric_name, value in self.current_counters.items():
147+
prev_stamp, prev_value = self.previous_counters.get(
148+
metric_name, (None, None)
149+
)
150+
stamp = self.current_stamps['counters'][metric_name]
151+
value = self.current_counters[metric_name]
152+
153+
if isinstance(prev_value, (int, float, complex)) and prev_stamp:
154+
value_delta = value - prev_value
155+
if value_delta >= 0:
156+
# Only increment our statsd client and send data to backend
157+
# if calculated value is non-negative.
158+
self.object.statsd.incr(
159+
metric_name, value_delta, stamp=stamp
160+
)
161+
162+
# Re-base the calculation for next collect
163+
self.previous_counters[metric_name] = (stamp, value)
164+
165+
# reset counter stores
166+
self.current_counters = defaultdict(int)
167+
if self.current_stamps['counters']:
168+
del self.current_stamps['counters']
169+
170+
def aggregate_counters(self, counted_vars, stamp=None):
171+
"""
172+
Aggregate several counter metrics from multiple places and store their
173+
sums in a metric_name-value store.
174+
175+
:param counted_vars: Dict Metric_name - Value dict
176+
:param stamp: Int Timestamp of Plus collect
177+
"""
178+
for metric_name, value in counted_vars.items():
179+
self.current_counters[metric_name] += value
180+
if stamp:
181+
self.current_stamps['counters'][metric_name] = stamp
182+
183+
def finalize_latest(self):
184+
"""
185+
Go through stored latest variables and send them to the object statsd.
186+
"""
187+
for metric_name, value in self.current_latest.items():
188+
stamp = self.current_stamps['latest'][metric_name]
189+
self.object.statsd.latest(metric_name, value, stamp)
190+
191+
# reset latest store
192+
self.current_latest = defaultdict(int)
193+
if self.current_stamps['latest']:
194+
del self.current_stamps['latest']
195+
196+
def aggregate_latest(self, latest_vars, stamp=None):
197+
"""
198+
Aggregate several latest metrics from multiple places and store the
199+
final value in a metric_name-value store.
200+
201+
:param latest_vars: Dict Metric_name - Value dict
202+
:param stamp: Int Timestamp of collect
203+
"""
204+
for metric_name in latest_vars:
205+
self.current_latest[metric_name] += 1
206+
if stamp:
207+
self.current_stamps['latest'][metric_name] = stamp
208+
209+
def aggregate_gauges(self, gauge_vars, stamp=None):
210+
"""
211+
Aggregate several gauge metrics from multiple sources. Track their
212+
values until collection/finalize and then send the cumalitive to
213+
statsd.
214+
215+
Example gauge_vars:
216+
{
217+
'gauge_name': {
218+
'source': value
219+
'source2': value
220+
}
221+
}
222+
223+
:param gauge_vars: Dict Metric_Name - Source - Value dict
224+
:param stamp: Int Timestamp of collect
225+
"""
226+
for metric_name, value_map in gauge_vars.items():
227+
for source, value in value_map.items():
228+
# override current gauge from source with the passed value
229+
self.current_gauges[metric_name][source] = value
230+
231+
# save this latest stamp
232+
if stamp:
233+
self.current_stamps['gauges'][metric_name] = stamp
234+
235+
def finalize_gauges(self):
236+
"""
237+
Iterate through the stored gauges in self.current_gauges, sum them, and
238+
then send them to statsd for reporting.
239+
"""
240+
for metric_name, value_map in self.current_gauges.items():
241+
total_gauge = 0
242+
for source, value in value_map.items():
243+
total_gauge += value
244+
245+
self.object.statsd.gauge(
246+
metric_name,
247+
total_gauge,
248+
stamp=self.current_stamps['gauges'][metric_name]
249+
)
250+
251+
# reset gauge stores
252+
self.current_gauges = defaultdict(lambda: defaultdict(int))
253+
if self.current_stamps['gauges']:
254+
del self.current_stamps['gauges']
255+
256+
257+
class AbstractMetaCollector(AbstractCollector):
258+
default_meta = abstractproperty()
259+
260+
def __init__(self, **kwargs):
261+
super(AbstractMetaCollector, self).__init__(**kwargs)
262+
self.meta = {}
263+
264+
def collect(self, *args):
265+
self.meta.update(self.default_meta)
266+
super(AbstractMetaCollector, self).collect(*args)
267+
self.object.metad.meta(self.meta)
268+
269+
270+
class AbstractMetricsCollector(AbstractCollector):
271+
status_metric_key = None
272+
273+
def status_update(self):
274+
if hasattr(self, 'object') and self.status_metric_key is not None:
275+
self.object.statsd.object_status(self.status_metric_key)
276+
277+
def collect(self, *args, **kwargs):
278+
self.status_update()
279+
super(AbstractMetricsCollector, self).collect(*args, **kwargs)

0 commit comments

Comments
 (0)