Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ endif::[]

* Fix url argument fetching in aiohttp_client instrumentation {pull}1890[#1890]
* Fix a bug in the AWS Lambda instrumentation when `event["headers"] is None` {pull}1907[#1907]
* Fix a bug in AWS Lambda where metadata could be incomplete, causing validation errors with the APM Server {pull}1914[#1914]
* Fix a bug in AWS Lambda where sending the partial transaction would be recorded as an extra span {pull}1914[#1914]



Expand Down
29 changes: 29 additions & 0 deletions elasticapm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def __init__(self, config=None, **inline):
# setting server_version here is mainly used for testing
self.server_version = inline.pop("server_version", None)
self.activation_method = elasticapm._activation_method
self._extra_metadata = {}

self.check_python_version()

Expand Down Expand Up @@ -475,8 +476,36 @@ def build_metadata(self):
data.pop("cloud")
if self.config.global_labels:
data["labels"] = enforce_label_format(self.config.global_labels)
if self._extra_metadata:
# Merge one key deep
for key, val in self._extra_metadata.items():
if isinstance(val, dict) and key in data and isinstance(data[key], dict):
data[key].update(val)
else:
data[key] = val
return data

def add_extra_metadata(self, data):
"""
Add additional metadata to future metadata dictionaries.

Only used in specific instances where metadata relies on data we only
have at request time, such as for lambda metadata.

Metadata is only merged one key deep at build time.
"""
# Merge one key deep
for key, val in data.items():
if isinstance(val, dict) and key in self._extra_metadata and isinstance(self._extra_metadata[key], dict):
self._extra_metadata[key].update(val)
else:
self._extra_metadata[key] = val

# Ensure that the extra metadata is added, even if we've already
# generated metadata for this process
if self._transport._metadata:
self._transport._metadata = self.build_metadata()

def _build_msg_for_logging(
self, event_type, date=None, context=None, custom=None, stack=None, handled=True, **kwargs
):
Expand Down
17 changes: 10 additions & 7 deletions elasticapm/contrib/serverless/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,9 @@ def set_metadata_and_context(self, coldstart: bool) -> None:
metadata["service"]["node"] = {"configured_name": os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")}
# This is the one piece of metadata that requires deep merging. We add it manually
# here to avoid having to deep merge in _transport.add_metadata()
if self.client._transport._metadata:
node_name = nested_key(self.client._transport._metadata, "service", "node", "name")
if node_name:
metadata["service"]["node"]["name"] = node_name
node = nested_key(self.client.get_service_info(), "node")
if node:
metadata["service"]["node"] = node

metadata["cloud"] = {}
metadata["cloud"]["provider"] = "aws"
Expand All @@ -417,7 +416,7 @@ def set_metadata_and_context(self, coldstart: bool) -> None:
elasticapm.set_context(faas, "faas")
if message_context:
elasticapm.set_context(message_context, "message")
self.client._transport.add_metadata(metadata)
self.client.add_extra_metadata(metadata)

def send_partial_transaction(self):
"""
Expand All @@ -434,17 +433,19 @@ def send_partial_transaction(self):
and os.environ.get("ELASTIC_APM_LAMBDA_APM_SERVER")
and ("localhost" in self.client.config.server_url or "127.0.0.1" in self.client.config.server_url)
):
transaction = execution_context.get_transaction()
transport = self.client._transport
logger.debug("Sending partial transaction and early metadata to the lambda extension...")
data = transport._json_serializer({"metadata": transport._metadata}) + "\n"
data += transport._json_serializer({"transaction": execution_context.get_transaction().to_dict()})
data = transport._json_serializer({"metadata": self.client.build_metadata()}) + "\n"
data += transport._json_serializer({"transaction": transaction.to_dict()})
partial_transaction_url = urllib.parse.urljoin(
self.client.config.server_url
if self.client.config.server_url.endswith("/")
else self.client.config.server_url + "/",
"register/transaction",
)
try:
transaction.pause_sampling = True
transport.send(
data,
custom_url=partial_transaction_url,
Expand All @@ -462,6 +463,8 @@ def send_partial_transaction(self):
)
else:
logger.warning("Failed to send partial transaction to APM Lambda Extension", exc_info=True)
finally:
transaction.pause_sampling = False


def get_data_from_request(event: dict, capture_body: bool = False, capture_headers: bool = True) -> dict:
Expand Down
2 changes: 1 addition & 1 deletion elasticapm/instrumentation/packages/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
transaction = execution_context.get_transaction()
if not transaction:
return wrapped(*args, **kwargs)
elif not transaction.is_sampled:
elif not transaction.is_sampled or transaction.pause_sampling:
args, kwargs = self.mutate_unsampled_call_args(module, method, wrapped, instance, args, kwargs, transaction)
return wrapped(*args, **kwargs)
else:
Expand Down
7 changes: 7 additions & 0 deletions elasticapm/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ def __init__(
if not trace_parent:
trace_parent = TraceParent.new(self.id, is_sampled)

# While this is set to True, underlying instrumentations will not create
# spans. The same outcome could be reached via `is_sampled`, but
# that requires a user to correctly restore the previous value. This
# setting works independent of the is_sampled setting, and is assumed to
# be False unless a user sets it to True.
self.pause_sampling = False

self.trace_parent: TraceParent = trace_parent
self.timestamp = start if start is not None else time.time()
self.name: Optional[str] = None
Expand Down
19 changes: 0 additions & 19 deletions elasticapm/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,25 +226,6 @@ def _write_metadata(self, buffer):
data = (self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8")
buffer.write(data)

def add_metadata(self, data):
"""
Add additional metadata do the dictionary

Only used in specific instances where metadata relies on data we only
have at request time, such as for lambda metadata

Metadata is only merged one key deep.
"""
if self._metadata is not None:
# Merge one key deep
for key, val in data.items():
if isinstance(val, dict) and key in self._metadata and isinstance(self._metadata[key], dict):
self._metadata[key].update(val)
else:
self._metadata[key] = val
else:
self._metadata = data

def _init_event_queue(self, chill_until, max_chill_time):
# some libraries like eventlet monkeypatch queue.Queue and switch out the implementation.
# In those cases we can't rely on internals of queue.Queue to be there, so we simply use
Expand Down
9 changes: 8 additions & 1 deletion tests/contrib/serverless/aws_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def test_func(event, context):

test_func(event_api, context)

assert elasticapm_client._transport._metadata["service"]["name"] == "override"
assert elasticapm_client.build_metadata()["service"]["name"] == "override"


def test_capture_serverless_list_event(event_list, context, elasticapm_client):
Expand Down Expand Up @@ -424,10 +424,16 @@ def test_func(event, context):
test_func(event_api, context)

assert len(sending_elasticapm_client.httpserver.requests) == 2

# There should be no spans from the partial transaction
for payload in sending_elasticapm_client.httpserver.payloads[1]:
assert "span" not in payload

request = sending_elasticapm_client.httpserver.requests[0]
assert request.full_path == "/register/transaction?"
assert request.content_type == "application/vnd.elastic.apm.transaction+ndjson"
assert b"metadata" in request.data
assert b"AWS Lambda" in request.data
assert b"transaction" in request.data
sending_elasticapm_client.close()

Expand All @@ -453,6 +459,7 @@ def test_func(event, context):
assert request.full_path == "/register/transaction?"
assert request.content_type == "application/vnd.elastic.apm.transaction+ndjson"
assert b"metadata" in request.data
assert b"AWS Lambda" in request.data
assert b"transaction" in request.data
sending_elasticapm_client.close()

Expand Down
1 change: 1 addition & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ def sending_elasticapm_client(request, validating_httpserver):
client_config.setdefault("span_compression_same_kind_max_duration", "0ms")
client_config.setdefault("include_paths", ("*/tests/*",))
client_config.setdefault("metrics_interval", "0ms")
client_config.setdefault("cloud_provider", False)
client_config.setdefault("central_config", "false")
client_config.setdefault("server_version", (8, 0, 0))
client = Client(**client_config)
Expand Down