Skip to content
24 changes: 24 additions & 0 deletions apps/accounts/fixtures/scopes.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,29 @@
"protected_resources": "[[\"POST\", \"/v[12]/o/introspect\"]]",
"default": "False"
}
},
{
"model": "capabilities.protectedcapability",
"pk": 7,
"fields": {
"title": "My Medicare partially adjudicated claims.",
"slug": "patient/Claim.read",
"group": 5,
"description": "Claim FHIR Resource",
"protected_resources": "[\n [\n \"POST\",\n \"/v2/fhir/Claim/_search$\"\n ],\n [\n \"GET\",\n \"/v2/fhir/Claim/(?P<resource_id>[^/]+)$\"\n ]\n]",
"default": "True"
}
},
{
"model": "capabilities.protectedcapability",
"pk": 8,
"fields": {
"title": "My Medicare partially adjudicated claim responses.",
"slug": "patient/ClaimResponse.read",
"group": 5,
"description": "ClaimResponse FHIR Resource",
"protected_resources": "[\n [\n \"POST\",\n \"/v2/fhir/ClaimResponse/_search$\"\n ],\n [\n \"GET\",\n \"/v2/fhir/ClaimResponse/(?P<resource_id>[^/]+)$\"\n ]\n]",
"default": "True"
}
}
]
31 changes: 28 additions & 3 deletions apps/authorization/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ def has_object_permission(self, request, view, obj):
# Patient resources were taken care of above
# Return 404 on error to avoid notifying unauthorized user the object exists

return is_resource_for_patient(obj, request.crosswalk.fhir_id)
return is_resource_for_patient(obj, request.crosswalk.fhir_id, request.crosswalk.user_mbi)


def is_resource_for_patient(obj, patient_id):
def is_resource_for_patient(obj, patient_id, user_mbi):
try:
if obj['resourceType'] == 'Coverage':
reference = obj['beneficiary']['reference']
Expand All @@ -51,9 +51,15 @@ def is_resource_for_patient(obj, patient_id):
reference_id = obj['id']
if reference_id != patient_id:
raise exceptions.NotFound()
elif obj['resourceType'] == 'Claim':
if not _check_mbi(obj, user_mbi):
raise exceptions.NotFound()
elif obj['resourceType'] == 'ClaimResponse':
if not _check_mbi(obj, user_mbi):
raise exceptions.NotFound()
elif obj['resourceType'] == 'Bundle':
for entry in obj.get('entry', []):
is_resource_for_patient(entry['resource'], patient_id)
is_resource_for_patient(entry['resource'], patient_id, user_mbi)
else:
raise exceptions.NotFound()

Expand All @@ -62,3 +68,22 @@ def is_resource_for_patient(obj, patient_id):
except Exception:
return False
return True


# helper verify mbi of a claim or claim response resource
def _check_mbi(obj, mbi):
matched = False
try:
if obj['contained']:
for c in obj['contained']:
if c['resourceType'] == 'Patient':
identifiers = c['identifier']
if len(identifiers) > 0:
if identifiers[0]['value'] == mbi:
matched = True
break
except KeyError as ke:
# log error and return false
print(ke)
pass
return matched
36 changes: 36 additions & 0 deletions apps/capabilities/management/commands/create_blue_button_scopes.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,42 @@ def create_coverage_capability(group,
return c


def create_claim_capability(group,
fhir_prefix,
title="My Medicare partially adjudicated claims."):
c = None
description = "Claim FHIR Resource"
smart_scope_string = "patient/Claim.read"
pr = []
pr.append(["GET", "%sClaim/" % fhir_prefix])
pr.append(["GET", "%sClaim/[id]" % fhir_prefix])
if not ProtectedCapability.objects.filter(slug=smart_scope_string).exists():
c = ProtectedCapability.objects.create(group=group,
title=title,
description=description,
slug=smart_scope_string,
protected_resources=json.dumps(pr, indent=4))
return c


def create_claimresponse_capability(group,
fhir_prefix,
title="My Medicare partially adjudicated claim responses."):
c = None
description = "ClaimResponse FHIR Resource"
smart_scope_string = "patient/ClaimResponse.read"
pr = []
pr.append(["GET", "%sClaimResponse/" % fhir_prefix])
pr.append(["GET", "%sClaimResponse/[id]" % fhir_prefix])
if not ProtectedCapability.objects.filter(slug=smart_scope_string).exists():
c = ProtectedCapability.objects.create(group=group,
title=title,
description=description,
slug=smart_scope_string,
protected_resources=json.dumps(pr, indent=4))
return c


class Command(BaseCommand):
help = 'Create BlueButton Group and Scopes'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Generated by Django 4.2.17 on 2025-02-19 23:35

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('dot_ext', '0008_internalapplicationlabels_and_more'),
]

operations = [
migrations.CreateModel(
name='InternalApplicationLabelsProxy',
fields=[
],
options={
'verbose_name': 'Internal Category',
'verbose_name_plural': 'Internal Categories',
'proxy': True,
'indexes': [],
'constraints': [],
},
bases=('dot_ext.internalapplicationlabels',),
),
migrations.AlterField(
model_name='application',
name='internal_application_labels',
field=models.ManyToManyField(blank=True, to='dot_ext.internalapplicationlabels'),
),
]
2 changes: 1 addition & 1 deletion apps/fhir/bluebutton/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
ALLOWED_RESOURCE_TYPES = ['Patient', 'Coverage', 'ExplanationOfBenefit']
ALLOWED_RESOURCE_TYPES = ['Patient', 'Coverage', 'ExplanationOfBenefit', 'Claim', 'ClaimResponse']
DEFAULT_PAGE_SIZE = 10
MAX_PAGE_SIZE = 50
18 changes: 18 additions & 0 deletions apps/fhir/bluebutton/migrations/0005_crosswalk__user_mbi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.17 on 2025-02-19 23:35

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('bluebutton', '0004_createnewapplication_mycredentialingrequest'),
]

operations = [
migrations.AddField(
model_name='crosswalk',
name='_user_mbi',
field=models.CharField(db_column='user_mbi', db_index=True, default=None, max_length=32, null=True, unique=True, verbose_name='User MBI ID'),
),
]
20 changes: 20 additions & 0 deletions apps/fhir/bluebutton/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ class Crosswalk(models.Model):
db_index=True,
)

# This stores the MBI value.
# Can be null for backwards migration compatibility.
_user_mbi = models.CharField(
max_length=32,
verbose_name="User MBI ID",
unique=True,
null=True,
default=None,
db_column="user_mbi",
db_index=True,
)

objects = models.Manager() # Default manager
real_objects = RealCrosswalkManager() # Real bene manager
synth_objects = SynthCrosswalkManager() # Synth bene manager
Expand All @@ -145,6 +157,10 @@ def user_hicn_hash(self):
def user_mbi_hash(self):
return self._user_mbi_hash

@property
def user_mbi(self):
return self._user_mbi

@user_hicn_hash.setter
def user_hicn_hash(self, value):
self._user_id_hash = value
Expand All @@ -153,6 +169,10 @@ def user_hicn_hash(self, value):
def user_mbi_hash(self, value):
self._user_mbi_hash = value

@user_mbi.setter
def user_mbi(self, value):
self._user_mbi = value


class ArchivedCrosswalk(models.Model):
"""
Expand Down
26 changes: 25 additions & 1 deletion apps/fhir/bluebutton/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def has_object_permission(self, request, view, obj):
reference_id = reference.split("/")[1]
if reference_id != request.crosswalk.fhir_id:
raise exceptions.NotFound()
elif request.resource_type == "Claim":
if not _check_mbi(obj, request.crosswalk.user_mbi):
raise exceptions.NotFound()
elif request.resource_type == "ClaimResponse":
if not _check_mbi(obj, request.crosswalk.user_mbi):
raise exceptions.NotFound()
else:
reference_id = obj["id"]
if reference_id != request.crosswalk.fhir_id:
Expand All @@ -68,7 +74,6 @@ def has_object_permission(self, request, view, obj):
class SearchCrosswalkPermission(HasCrosswalk):
def has_object_permission(self, request, view, obj):
patient_id = request.crosswalk.fhir_id

if "patient" in request.GET and request.GET["patient"] != patient_id:
return False

Expand Down Expand Up @@ -98,3 +103,22 @@ def has_permission(self, request, view):
)

return True


# helper verify mbi of a claim or claim response resource
def _check_mbi(obj, mbi):
matched = False
try:
if obj['contained']:
for c in obj['contained']:
if c['resourceType'] == 'Patient':
identifiers = c['identifier']
if len(identifiers) > 0:
if identifiers[0]['value'] == mbi:
matched = True
break
except KeyError as ke:
# log error and return false
print(ke)
pass
return matched
33 changes: 33 additions & 0 deletions apps/fhir/bluebutton/v2/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
ReadViewCoverage,
ReadViewExplanationOfBenefit,
ReadViewPatient,
ReadViewClaim,
ReadViewClaimResponse,
)
from apps.fhir.bluebutton.views.search import (
SearchViewCoverage,
SearchViewExplanationOfBenefit,
SearchViewPatient,
SearchViewClaim,
SearchViewClaimResponse,
)

admin.autodiscover()
Expand Down Expand Up @@ -51,4 +55,33 @@
SearchViewExplanationOfBenefit.as_view(version=2),
name="bb_oauth_fhir_eob_search_v2",
),
# Claim SearchView
re_path(
r"Claim/_search$",
SearchViewClaim.as_view(version=2),
name="bb_oauth_fhir_claim_search",
),
re_path(
r"ClaimJSON/_search$",
SearchViewClaim.as_view(version=2),
name="bb_oauth_fhir_claimjson_search",
),
# Claim ReadView
re_path(
r"Claim/(?P<resource_id>[^/]+)",
ReadViewClaim.as_view(version=2),
name="bb_oauth_fhir_claim_read",
),
# ClaimResponse SearchView
re_path(
r"ClaimResponse/_search$",
SearchViewClaimResponse.as_view(version=2),
name="bb_oauth_fhir_claimresponse_search",
),
# ClaimResponse ReadView
re_path(
r"ClaimResponse/(?P<resource_id>[^/]+)",
ReadViewClaimResponse.as_view(version=2),
name="bb_oauth_fhir_claimresponse_read",
),
]
Loading
Loading