Daily Rounds to Observations

Field level migration status tracking: https://docs.google.com/spreadsheets/d/1ZhwTAT4e3-i9S9DjwoC4n2fUHBeH6SZFn4aJYSCu6PQ

CSV to field mappings

import csv
import json
import logging
from pathlib import Path

import requests

logger = logging.getLogger(__name__)

"""
This is a helper script to generate the field mapping for the daily_round to observations migration.

Fields in the CSV:

Any one of value_key or notes_key should be present

- value_key: The key to access the value in the daily_round object
- notes_key: The key to access the notes in the daily_round object

- value_type: The type of value for the observation:
    group | boolean | decimal | integer | string | text | display | date | dateTime |
    time | choice | open-choice | attachment | reference | quantity | structured

- value_map: A json mapping of values to be used for the value field, useful for mapping values to codes
eg: for rhythm
{
    0: {system: "<http://loinc.org/>", code: "8867-4", display_name: "Normal sinus rhythm"},
    1: {system: "<http://loinc.org/>", code: "271594007", display_name: "Atrial fibrillation"}
    ...
}

Below are the fields that are used to generate Coding objects:

- category_code: The code for the category of the observation, uses the HL7 observation system by default
- main_code: The main code for the observation, uses the LOINC system by default
- alternate_coding: The alternate coding for the observation, uses the SNOMED system by default
- body_site: The body site for the observation
- method: The method used for the observation
- unit_code: The code for the unit of the observation
- quantity_code: The code for the quantity of the observation

unit_code and quantity_code are only used when the value_type is quantifiable

All code fields have a corresponding display_name field which is used to
generate the display field in the Coding object, the system value is
inferred as LOINC or SNOMED if the code is a URL

"""

def make_codeable_concept(code, display_name, system=None):
    code = str(code)
    if not code:
        return {}

    # try to infer the system from the code if its a url
    if "loinc" in code:
        code = [x for x in code.split("/") if x][-1]
        system = "<http://loinc.org/>"
    elif "snomed" in code:
        code = [x for x in code.split("/") if x][-1]
        system = "<http://www.snomed.info/sct>"

    if not system:
        system = "<http://example.com>"

    return {
        "system": system,
        "code": code,
        "display_name": display_name,
        # TODO: add fields like version, display, userSelected, etc.
    }

def clean_csv_and_save_as_json(sheet_id):
    csv_url = (
        f"<https://docs.google.com/spreadsheets/d/{sheet_id}/export?gid=0&format=csv>"
    )
    logger.info("Downloading CSV from %s", csv_url)

    # Download the CSV content
    response = requests.get(csv_url, timeout=10)
    response.raise_for_status()
    csv_data = response.text.splitlines()

    reader = csv.DictReader(
        csv_data,
        fieldnames=[
            "value_key",
            "notes_key",
            "category_code",
            "category_code_display_name",
            "main_code",
            "main_code_display_name",
            "alternate_coding",
            "alternate_coding_display_name",
            "value_type",
            "unit_code",
            "unit_code_display_name",
            "quantity_code",
            "quantity_code_display_name",
            "value_map",
            "body_site",
            "body_site_display_name",
            "method",
            "method_display_name",
        ],
    )

    cleaned_rows = []

    logger.info("Cleaning CSV")
    for row in reader:
        # Ensure we skip the header line if it's repeated (in case of specifying fieldnames)
        if row["value_key"] == "value_key":
            continue

        row["notes_key"] = ""

        row["value_type"] = row["value_type"].lower() or "string"

        # Convert codes to coding objects
        row["category_code"] = make_codeable_concept(
            row["category_code"],
            row["category_code_display_name"],
            "<http://terminology.hl7.org/CodeSystem/observation-category>",
        )

        row["main_code"] = make_codeable_concept(
            row["main_code"],
            row["main_code_display_name"],
            "<http://loinc.org/>",
        )

        row["alternate_coding"] = make_codeable_concept(
            row["alternate_coding"],
            row["alternate_coding_display_name"],
            "<http://www.snomed.info/sct>",
        )

        row["unit_code"] = make_codeable_concept(
            row["unit_code"], row["unit_code_display_name"]
        )

        row["quantity_code"] = make_codeable_concept(
            row["quantity_code"], row["quantity_code_display_name"]
        )

        row["body_site"] = make_codeable_concept(
            row["body_site"], row["body_site_display_name"]
        )

        row["method"] = make_codeable_concept(row["method"], row["method_display_name"])

        if row["value_map"]:
            row["value_map"] = json.loads(row["value_map"])

        # Remove display_name fields
        for col in [
            "category_code_display_name",
            "main_code_display_name",
            "alternate_coding_display_name",
            "unit_code_display_name",
            "quantity_code_display_name",
            "body_site_display_name",
            "method_display_name",
        ]:
            row.pop(col, None)

        cleaned_rows.append(row)

    # Convert to JSON lines
    json_file_path = (
        Path.cwd()
        / "care/emr/migrations"
        / "daily_round_to_observations_field_mapping.json"
    )
    with json_file_path.open("w", encoding="utf-8") as f:
        json.dump(cleaned_rows, f, ensure_ascii=False, indent=2)

    logger.info("Cleaned data saved to %s", json_file_path)

# <https://docs.google.com/spreadsheets/d/1xgR5G1NFAKo0mODKUULOZapCtuaP-wd5kEacky0RNJA/edit>
sheet_id = "1xgR5G1NFAKo0mODKUULOZapCtuaP-wd5kEacky0RNJA"
clean_csv_and_save_as_json(sheet_id)

Migration file

# Generated by Django 5.1.1 on 2024-12-11 08:29

from django.db import migrations
from django.core.paginator import Paginator
from django.db.models import F
from pathlib import Path
import json
from operator import attrgetter

migration_id = 31

quantity_value_types = {"decimal", "integer", "quantity"}

def get_nested_value(obj, key):
    # jq
    keys = key.split(".")
    value = getattr(obj, keys[0], None)
    for key in keys[1:]:
        if isinstance(value, dict):
            value = value.get(key, None)
        else:
            return None
    return value

def get_observations(field_mapping, daily_round):
    observations = []

    if not daily_round.created_by_id:
        # bad data in staging
        print(f"Daily round {daily_round.id} has missing values {daily_round.created_by_id=}")
        return observations

    default_observation = {
        "status": "final",
        "subject_type": "patient",
        "subject_id": daily_round.patient_external_id,
        "patient_id": daily_round.patient_id,
        "encounter_id": daily_round.consultation_id,
        "effective_datetime": daily_round.taken_at or daily_round.created_date,
        "created_date": daily_round.created_date,
        "data_entered_by_id": daily_round.created_by_id,
        "performer": {},
        "meta": {
            "is_migration": True,
            "migration_id": migration_id,
            "orignal_model": "DailyRound",
            "orignal_model_id": daily_round.id,
            "created_by_telemedicine": daily_round.created_by_telemedicine,
            "round_type": daily_round.rounds_type,
            "in_prone_position": daily_round.in_prone_position,
        }
    }

    for field in field_mapping:
        try:
            daily_round_value = None
            daily_round_note = None

            #TODO: instead of always using the helper we can ask it explicitly from the field mapping data
            if field["value_key"]:
                daily_round_value = get_nested_value(daily_round, field["value_key"])
            if field["notes_key"]:
                daily_round_note = get_nested_value(daily_round, field["value_key"])

            if daily_round_value or daily_round_note:
                value = {}
                if daily_round_value:
                    value["value"] = str(daily_round_value)

                    if field["value_map"]:
                        value["value_code"] = field["value_map"].get(daily_round_value, {})

                    if field["value_type"] in quantity_value_types:
                        value["value_quantity"] = {
                            "value": float(daily_round_value),
                        }
                        if field["unit_code"]:
                            value["value_quantity"]["unit"] = field["unit_code"]
                        # if field["quantity_code"]:
                        #     TODO: maybe this needs to be derived from the value?
                        #     value["value_quantity"]["code"] = field["quantity_code"]

                observations.append(dict(
                    **default_observation,
                    category = field["category_code"],
                    main_code = field["main_code"],
                    alternate_coding = field["alternate_coding"],
                    value_type = field["value_type"],
                    value = value,
                    body_site = field["body_site"],
                    method = field["method"],
                ))
        except Exception as e:
            print(f"Error while processing {field['value_key']} for {daily_round.id=}: {e}")
    return observations

def migrate_daily_rounds_to_observation(apps, schema_editor):
    DailyRound = apps.get_model("facility", "DailyRound")
    Observation = apps.get_model("emr", "Observation")

    #load filed mapping from json
    field_mapping_file = Path(__file__).parent / "daily_round_to_observations_field_mapping.json"
    with field_mapping_file.open() as f:
        field_mapping = json.load(f)

    queryset = DailyRound.objects.all().order_by("id").annotate(
        patient_id=F("consultation__patient_id"),
        patient_external_id=F("consultation__patient__external_id"),
    )
    paginator = Paginator(queryset, 3000) # TODO: come up with a better batch size
    for page_num in paginator.page_range:
        bulk_observations = []
        daily_rounds = paginator.get_page(page_num)
        for daily_round in daily_rounds:
            bulk_observations.extend(get_observations(field_mapping, daily_round))
        Observation.objects.bulk_create(
            [Observation(**observation) for observation in bulk_observations],
        )

def reverse_migration(apps, schema_editor):
    # drop all observations created by this migration
    schema_editor.execute("DELETE FROM emr_observation WHERE meta->>'migration_id' = %s", [str(migration_id)])

class Migration(migrations.Migration):

    dependencies = [
        ('emr', '0030_alter_questionnaireresponse_encounter'),
    ]

    operations = [
        migrations.RunPython(migrate_daily_rounds_to_observation,
                             reverse_code=reverse_migration),
    ]