Skip to content

Commit 1dfcd05

Browse files
committed
add tests with async grpc server
1 parent 704e1f5 commit 1dfcd05

File tree

2 files changed

+69
-9
lines changed

2 files changed

+69
-9
lines changed

tests/contrib/grpc/grpc_app/server.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2929
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3030

31+
import asyncio
3132
import logging
33+
import os
3234
import sys
3335
from concurrent import futures
3436

@@ -67,6 +69,30 @@ def GetServerResponseException(self, request, context):
6769
raise Exception("oh no")
6870

6971

72+
class TestServiceAsync(pb2_grpc.TestServiceServicer):
73+
def __init__(self, *args, **kwargs):
74+
pass
75+
76+
async def GetServerResponse(self, request, context):
77+
message = request.message
78+
result = f'Hello I am up and running received "{message}" message from you'
79+
result = {"message": result, "received": True}
80+
81+
return pb2.MessageResponse(**result)
82+
83+
async def GetServerResponseAbort(self, request, context):
84+
await context.abort(grpc.StatusCode.INTERNAL, "foo")
85+
86+
async def GetServerResponseUnavailable(self, request, context):
87+
"""Missing associated documentation comment in .proto file."""
88+
context.set_code(grpc.StatusCode.UNAVAILABLE)
89+
context.set_details("Method not available")
90+
return pb2.MessageResponse(message="foo", received=True)
91+
92+
async def GetServerResponseException(self, request, context):
93+
raise Exception("oh no")
94+
95+
7096
def serve(port):
7197
apm_client = GRPCApmClient(
7298
service_name="grpc-server", disable_metrics="*", api_request_time="100ms", central_config="False"
@@ -78,10 +104,24 @@ def serve(port):
78104
server.wait_for_termination()
79105

80106

107+
async def serve_async(port):
108+
apm_client = GRPCApmClient(
109+
service_name="grpc-server", disable_metrics="*", api_request_time="100ms", central_config="False"
110+
)
111+
server = grpc.aio.server()
112+
pb2_grpc.add_TestServiceServicer_to_server(TestServiceAsync(), server)
113+
server.add_insecure_port(f"[::]:{port}")
114+
await server.start()
115+
await server.wait_for_termination()
116+
117+
81118
if __name__ == "__main__":
82119
if len(sys.argv) > 1:
83120
port = sys.argv[1]
84121
else:
85122
port = "50051"
86123
logging.basicConfig()
87-
serve(port)
124+
if os.environ.get("GRPC_SERVER_ASYNC") == "1":
125+
asyncio.run(serve_async(port))
126+
else:
127+
serve(port)

tests/contrib/grpc/grpc_client_tests.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,14 @@
4949
from .grpc_app.testgrpc_pb2_grpc import TestServiceStub
5050

5151

52-
@pytest.fixture()
53-
def grpc_server(validating_httpserver, request):
52+
def setup_env(request, validating_httpserver):
5453
config = getattr(request, "param", {})
5554
env = {f"ELASTIC_APM_{k.upper()}": str(v) for k, v in config.items()}
5655
env.setdefault("ELASTIC_APM_SERVER_URL", validating_httpserver.url)
56+
return env
57+
58+
59+
def setup_grpc_server(env):
5760
free_port = get_free_port()
5861
server_proc = subprocess.Popen(
5962
[os.path.join(sys.prefix, "bin", "python"), "-m", "tests.contrib.grpc.grpc_app.server", str(free_port)],
@@ -62,15 +65,32 @@ def grpc_server(validating_httpserver, request):
6265
env=env,
6366
)
6467
wait_for_open_port(free_port)
65-
yield f"localhost:{free_port}"
66-
server_proc.terminate()
68+
return server_proc, free_port
6769

6870

6971
@pytest.fixture()
70-
def grpc_client_and_server_url(grpc_server):
71-
test_channel = grpc.insecure_channel(grpc_server)
72+
def env_fixture(validating_httpserver, request):
73+
env = setup_env(request, validating_httpserver)
74+
return env
75+
76+
77+
if hasattr(grpc, "aio"):
78+
grpc_server_fixture_params = ["async", "sync"]
79+
else:
80+
grpc_server_fixture_params = ["sync"]
81+
82+
83+
@pytest.fixture(params=grpc_server_fixture_params)
84+
def grpc_client_and_server_url(env_fixture, request):
85+
env = {k: v for k, v in env_fixture.items()}
86+
if request.param == "async":
87+
env["GRPC_SERVER_ASYNC"] = "1"
88+
server_proc, free_port = setup_grpc_server(env)
89+
server_addr = f"localhost:{free_port}"
90+
test_channel = grpc.insecure_channel(server_addr)
7291
test_client = TestServiceStub(test_channel)
73-
yield test_client, grpc_server
92+
yield test_client, server_addr
93+
server_proc.terminate()
7494

7595

7696
def test_grpc_client_server_instrumentation(instrument, sending_elasticapm_client, grpc_client_and_server_url):
@@ -200,7 +220,7 @@ def test_grpc_client_unsampled_span(instrument, sending_elasticapm_client, grpc_
200220

201221

202222
@pytest.mark.parametrize(
203-
"grpc_server",
223+
"env_fixture",
204224
[
205225
{
206226
"recording": "False",

0 commit comments

Comments
 (0)