Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ endif::[]

* Fix an issue where compressed spans would count against `transaction_max_spans` {pull}1377[#1377]
* Make sure HTTP connections are not re-used after a process fork {pull}1374[#1374]
* Fix an issue with psycopg2 instrumentation when multiple hosts are defined {pull}1386[#1386]
* Update the `User-Agent` header to the new https://github.com/elastic/apm/pull/514[spec] {pull}1378[#1378]
* Improve status_code handling in AWS Lambda integration {pull}1382[#1382]
* Fix `aiohttp` exception handling to allow for non-500 responses including `HTTPOk` {pull}1384[#1384]
Expand Down
4 changes: 2 additions & 2 deletions elasticapm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,11 +653,11 @@ def send(self, url, **kwargs):
return None


def get_client():
def get_client() -> Client:
return CLIENT_SINGLETON


def set_client(client):
def set_client(client: Client):
global CLIENT_SINGLETON
if CLIENT_SINGLETON:
logger = get_logger("elasticapm")
Expand Down
37 changes: 22 additions & 15 deletions elasticapm/instrumentation/packages/psycopg2.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import

from typing import Optional, Union

from elasticapm.instrumentation.packages.dbapi2 import (
ConnectionProxy,
CursorProxy,
Expand Down Expand Up @@ -76,22 +78,11 @@ class Psycopg2Instrumentation(DbApi2Instrumentation):
def call(self, module, method, wrapped, instance, args, kwargs):
signature = "psycopg2.connect"

host = kwargs.get("host")
if host:
signature += " " + compat.text_type(host)

port = kwargs.get("port")
if port:
port = str(port)
if int(port) != default_ports.get("postgresql"):
host += ":" + port
signature += " " + compat.text_type(host)
else:
# Parse connection string and extract host/port
pass
host, port = get_destination_info(kwargs.get("host"), kwargs.get("port"))
signature = f"{signature} {host}:{port}"
destination_info = {
"address": kwargs.get("host", "localhost"),
"port": int(kwargs.get("port", default_ports.get("postgresql"))),
"address": host,
"port": port,
}
with capture_span(
signature,
Expand Down Expand Up @@ -140,3 +131,19 @@ def call(self, module, method, wrapped, instance, args, kwargs):
kwargs["scope"] = kwargs["scope"].__wrapped__

return wrapped(*args, **kwargs)


def get_destination_info(host: Optional[str], port: Union[None, str, int]) -> tuple:
if host:
if "," in host: # multiple hosts defined, take first
host = host.split(",")[0]
else:
host = "localhost"
if port:
port = str(port)
if "," in port: # multiple ports defined, take first
port = port.split(",")[0]
port = int(port)
else:
port = default_ports.get("postgresql")
return host, port
10 changes: 10 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@ def elasticapm_client(request):
assert not client._transport.validation_errors


@pytest.fixture()
def elasticapm_transaction(elasticapm_client):
"""
Useful fixture if spans from other fixtures should be captured.
This can be achieved by listing this fixture first.
"""
transaction = elasticapm_client.begin_transaction("test")
yield transaction


@pytest.fixture()
def elasticapm_client_log_file(request):
original_exceptionhook = sys.excepthook
Expand Down
38 changes: 34 additions & 4 deletions tests/instrumentation/psycopg2_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os
from typing import cast

import pytest

from elasticapm.conf.constants import TRANSACTION
from elasticapm.instrumentation.packages.psycopg2 import PGCursorProxy, extract_signature
from elasticapm import get_client
from elasticapm.conf.constants import SPAN, TRANSACTION
from elasticapm.instrumentation.packages.psycopg2 import PGCursorProxy, extract_signature, get_destination_info
from elasticapm.utils import default_ports
from tests.fixtures import TempStoreClient

psycopg2 = pytest.importorskip("psycopg2")

Expand Down Expand Up @@ -162,10 +165,10 @@ def test_select_with_dollar_quotes_custom_token():


def test_select_with_difficult_table_name():
sql_statement = u"""SELECT id FROM "myta\n-æøåble" WHERE id = 2323"""
sql_statement = """SELECT id FROM "myta\n-æøåble" WHERE id = 2323"""
actual = extract_signature(sql_statement)

assert u"SELECT FROM myta\n-æøåble" == actual
assert "SELECT FROM myta\n-æøåble" == actual


def test_select_subselect():
Expand Down Expand Up @@ -507,3 +510,30 @@ def test_psycopg2_execute_values(instrument, postgres_connection, elasticapm_cli
spans = elasticapm_client.spans_for_transaction(transactions[0])
assert spans[0]["name"] == "INSERT INTO test"
assert len(spans[0]["context"]["db"]["statement"]) == 10000, spans[0]["context"]["db"]["statement"]


@pytest.mark.integrationtest
def test_psycopg2_connection(instrument, elasticapm_transaction, postgres_connection):
# elastciapm_client.events is only available on `TempStoreClient`, this keeps the type checkers happy
elasticapm_client = cast(TempStoreClient, get_client())
elasticapm_client.end_transaction("test", "success")
span = elasticapm_client.events[SPAN][0]
host = os.environ.get("POSTGRES_HOST", "localhost")
assert span["name"] == f"psycopg2.connect {host}:5432"
assert span["action"] == "connect"


@pytest.mark.parametrize(
"host_in,port_in,expected_host_out,expected_port_out",
(
(None, None, "localhost", 5432),
(None, 5432, "localhost", 5432),
("localhost", "5432", "localhost", 5432),
("foo.bar", "5432", "foo.bar", 5432),
("localhost,foo.bar", "5432,1234", "localhost", 5432), # multiple hosts
),
)
def test_get_destination(host_in, port_in, expected_host_out, expected_port_out):
host, port = get_destination_info(host_in, port_in)
assert host == expected_host_out
assert port == expected_port_out