Skip to content

Commit

Permalink
Add shared diglabtools utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
JuliaSprenger committed Jun 14, 2023
1 parent ad7c828 commit a9730d6
Show file tree
Hide file tree
Showing 6 changed files with 905 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/run_redcap_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ jobs:
- name: Test with pytest
env:
REDCAP_API_TOKEN: ${{ secrets.REDCAP_API_TOKEN }}
run: python redcap_bridge/test_redcap/check_connectivity.py
run: python redcap_bridge/tests/check_connectivity.py
175 changes: 175 additions & 0 deletions diglab_utils/conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import pandas as pd
import warnings
import re

def conversion_csv_to_json(csv_file):
"""
Test conversion function
"""
df = pd.read_csv(csv_file, na_filter=False, dtype='str')
elab_json = {}
elab_dict = {}
pos = 1

list_of_dict = df.to_dict('records')
for redcap_field_dict in list_of_dict:
# Skip the logic fields because ElabFTW does not understand them
if redcap_field_dict['Branching Logic (Show field only if...)'] != '':
continue
if redcap_field_dict['Variable / Field Name'] == 'record_id':
continue
if redcap_field_dict['Field Type'] == 'text':
if redcap_field_dict['Text Validation Type OR Show Slider Number'] == 'number' or redcap_field_dict[
'Text Validation Type OR Show Slider Number'] == 'integer':
elab_dict = number_to_dict(redcap_field_dict)
elif redcap_field_dict['Text Validation Type OR Show Slider Number'] == 'date_dmy':
elab_dict = date_to_dict(redcap_field_dict)
else:
elab_dict = text_to_dict(redcap_field_dict)
elif redcap_field_dict['Field Type'] == 'dropdown':
elab_dict = dropdown_to_dict(redcap_field_dict)
elif redcap_field_dict['Field Type'] == 'notes':
elab_dict = notes_to_dict(redcap_field_dict)
elif redcap_field_dict['Field Type'] == 'radio':
elab_dict = radio_to_dict(redcap_field_dict)
elif redcap_field_dict['Field Type'] == 'checkbox':
elab_dict = checkbox_to_dict(redcap_field_dict)
else:
pass
elab_json.update(elab_dict)
final_elab = {
"extra_fields": elab_json
}

for key in final_elab["extra_fields"].keys():
final_elab["extra_fields"][key].update({"position": pos})
pos += 1

return final_elab


def text_to_dict(redcap_field_dict):
temp_elab_dict = {
redcap_field_dict['Field Label']: {
"type": "text",
"value": "",
"description": redcap_field_dict['Field Note']},
}
return temp_elab_dict


def number_to_dict(redcap_field_dict):
# text mean multiples types in json. Need to define all of them
temp_elab_dict = {
redcap_field_dict['Field Label']: {
"type": "number",
"value": "",
"description": redcap_field_dict['Field Note']},
}
return temp_elab_dict


def date_to_dict(redcap_field_dict):
temp_elab_dict = {
redcap_field_dict['Field Label']: {
"type": "date",
"value": "",
"description": redcap_field_dict['Field Note']},
}
return temp_elab_dict


def radio_to_dict(redcap_field_dict):
assert redcap_field_dict["Field Type"] == "radio"
redcap_choice_str = redcap_field_dict["Choices, Calculations, OR Slider Labels"]
redcap_annotation_str = redcap_field_dict["Field Annotation"]
choice_labels, default_choice_label = parse_choices(redcap_choice_str, redcap_annotation_str)
temp_elab_dict = {
redcap_field_dict['Field Label']: {
"type": "radio",
"value": default_choice_label,
"options": choice_labels,
"description": redcap_field_dict['Field Note']
},
}
return temp_elab_dict


def checkbox_to_dict(redcap_field_dict):
assert redcap_field_dict["Field Type"] == "checkbox"
redcap_choice_str = redcap_field_dict["Choices, Calculations, OR Slider Labels"]
redcap_annotation_str = redcap_field_dict["Field Annotation"]
choice_labels, default_choice_label = parse_choices(redcap_choice_str, redcap_annotation_str)
temp_elab_dict = {
redcap_field_dict['Field Label']: {
"type": "select",
"value": default_choice_label,
"options": choice_labels,
"description": redcap_field_dict['Field Note'],
"allow_multi_values": True
},
}

return temp_elab_dict


def dropdown_to_dict(redcap_field_dict):
assert redcap_field_dict["Field Type"] == "dropdown"
redcap_choice_str = redcap_field_dict["Choices, Calculations, OR Slider Labels"]
redcap_annotation_str = redcap_field_dict["Field Annotation"]
choice_labels, default_choice_label = parse_choices(redcap_choice_str, redcap_annotation_str)

# dropdown is always select type in json
temp_elab_dict = {
redcap_field_dict['Field Label']: {
"type": "select",
"value": default_choice_label,
"options": choice_labels,
"description": redcap_field_dict['Field Note']
},
}
return temp_elab_dict


def notes_to_dict(redcap_field_dict):
temp_elab_dict = {redcap_field_dict['Field Label']: {
"type": "text",
"value": "",
"description": redcap_field_dict['Field Note']},
}
return temp_elab_dict


def parse_choices(choice_str, annotation_str):
"""
Extract choice labels and default choice label from redcap
"Choices, Calculations, OR Slider Labels" and "Annotations"
Returns
-------
(list, str)
first entry is the list of default choice labels
second entry is the default choice labels (is value of first entry)
"""
# default return values
choice_labels = []
default_choice_label = ''

choice_match = re.findall('(?:\|?)\s?(?P<choice>\w+)\s?,\s?(?P<label>[^,|]+?)\s*(?:\||$)', choice_str)
if choice_match:
choice_keys, choice_labels = zip(*choice_match)
if '@DEFAULT=' in annotation_str:
choice_selector = '|'.join(choice_keys)
match = re.match('@DEFAULT=["\'](' + choice_selector + ')["\']', annotation_str)
if match:
default_choice_key = match.groups()[0]
default_choice_label = choice_labels[choice_keys.index(default_choice_key)]
else:
warnings.warn(f'Could not determine default choice for {annotation_str}')

choice_labels = [re.sub(r'\{.*?\}', '', label) for label in choice_labels]
# Removal of embedded fields used in RedCap ( {...} ) as there is no equivalent in ElabFTW
default_choice_label = re.sub(r'\{.*?\}', '', default_choice_label)

return list(choice_labels), default_choice_label
40 changes: 40 additions & 0 deletions diglab_utils/provenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pathlib
import git


def get_repo_state(path):
"""
Extract the latest commit hash of a git repository
Args:
path: Path to the git repository
Returns:
2-tuple (str, bool)
latest commit id
repo status: True if repository is in a clean state
Raises:
ValueError: if path is not part of a git repository
"""

repo_root = None
# find repository root folder
path = pathlib.Path(path)
for parent in [path] + list(path.parents):
if (parent / '.git').exists():
repo_root = parent
break

if repo_root is None:
return '', None

repo = git.Repo.init(str(repo_root))
clean = not repo.is_dirty()
try:
commit_hash = repo.head.commit.hexsha
except ValueError as e:
commit_hash = ''
clean = False

return commit_hash, clean
14 changes: 14 additions & 0 deletions diglab_utils/test_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import json

from diglab_utils.conversion import conversion_csv_to_json
from diglab_utils.test_utils import test_directory, initialize_test_dir


def test_conversion_csv_to_json(initialize_test_dir):

test_dir = test_directory / 'testfiles_redcap' / 'elabConversion'

f = open(test_dir / 'elabFinal.json')
elab_final = json.load(f)
elab_conversion = conversion_csv_to_json(test_dir / 'csvRecord.csv')
assert elab_conversion == elab_final
36 changes: 36 additions & 0 deletions diglab_utils/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import shutil
import os
import pathlib
import tempfile
import pytest

test_directory = pathlib.Path(tempfile.gettempdir()) / 'diglabtools_testfiles'
project_dir = test_directory / 'testfiles_redcap' / 'TestProject'

@pytest.fixture
def initialize_test_dir(clean=True):
"""
Create main test folder if required and add test files
Parameters
----------
clean: (bool)
Remove test folder first in case it exists.
Returns
-------
path
path of the test directory
"""
if clean and os.path.exists(test_directory):
shutil.rmtree(test_directory)
if not os.path.exists(test_directory):
os.mkdir(test_directory)

# initialize test files
packaged_testfolders = [
pathlib.Path(__file__).parents[1] / 'redcap_bridge' / 'tests' /'testfiles_redcap',
pathlib.Path(__file__).parents[1] / 'elab_bridge' / 'tests' / 'testfiles_elab']
for server, packaged_testfolder in zip(['redcap', 'elab'], packaged_testfolders):
shutil.copytree(packaged_testfolder, test_directory / packaged_testfolder.name)
return test_directory
Loading

0 comments on commit a9730d6

Please sign in to comment.