Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accommodate Typed Relation Display name Fields #810

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions workbench
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ def create():
config, field_definitions, node, row, custom_field
)

# Typed relation with display name fields.
elif field_definitions[custom_field]["field_type"] == "typed_relation_display_name":
typed_relation_display_name_field = workbench_fields.TypedRelationDisplayNameField()
node = typed_relation_display_name_field.create(
config, field_definitions, node, row, custom_field
)

# Geolocation fields.
elif field_definitions[custom_field]["field_type"] == "geolocation":
geolocation_field = workbench_fields.GeolocationField()
Expand Down Expand Up @@ -800,6 +807,18 @@ def update():
node_field_values[custom_field],
)

# Typed relation display name fields (currently, only taxonomy term).
elif field_definitions[custom_field]["field_type"] == "typed_relation_display_name":
typed_relation_display_name_field = workbench_fields.TypedRelationDisplayNameField()
node = typed_relation_display_name_field.update(
config,
field_definitions,
node,
row,
custom_field,
node_field_values[custom_field],
)

# Geolocation fields.
elif field_definitions[custom_field]["field_type"] == "geolocation":
geolocation_field = workbench_fields.GeolocationField()
Expand Down Expand Up @@ -1978,6 +1997,18 @@ def update_media() -> None:
media_field_values[custom_field],
)

# Typed relation display name fields (currently, only taxonomy term).
elif field_definitions[custom_field]["field_type"] == "typed_relation_display_name":
typed_relation_display_name_field = workbench_fields.TypedRelationDisplayNameField()
patch_request_json = typed_relation_display_name_field.update(
config,
field_definitions,
patch_request_json,
row,
custom_field,
media_field_values[custom_field],
)

# Geolocation fields.
elif field_definitions[custom_field]["field_type"] == "geolocation":
geolocation_field = workbench_fields.GeolocationField()
Expand Down Expand Up @@ -3239,6 +3270,18 @@ def update_terms():
term_field_values[custom_field],
)

# Typed relation display name fields (currently, only taxonomy term).
elif field_definitions[custom_field]["field_type"] == "typed_relation_display_name":
typed_relation_display_name_field = workbench_fields.TypedRelationDisplaynameField()
term = typed_relation_display_name_field.update(
config,
field_definitions,
term,
row,
custom_field,
term_field_values[custom_field],
)

# Geolocation fields.
elif field_definitions[custom_field]["field_type"] == "geolocation":
geolocation_field = workbench_fields.GeolocationField()
Expand Down
199 changes: 199 additions & 0 deletions workbench_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,205 @@ def serialize(self, config, field_definitions, field_name, field_data):
else:
return subvalues[0]

class TypedRelationDisplayNameField(TypedRelationField):
"""Functions for handling fields with 'typed_relation_display_name' Drupal field data type.
All functions return an "entity" dictionary that is passed to Requests' "json"
parameter.

Currently this field type only supports Typed Relation Taxonomies (not other
Typed Relation entity types).

Note: this class assumes that the entity has the field identified in 'field_name'.
Callers should pre-emptively confirm that. For an example, see code near the top
of workbench.update().
"""

def create(self, config, field_definitions, entity, row, field_name):
"""Parameters
----------
config : dict
The configuration settings defined by workbench_config.get_config().
field_definitions : dict
The field definitions object defined by get_field_definitions().
entity : dict
The dict that will be POSTed to Drupal as JSON.
row : OrderedDict.
The current CSV record.
field_name : string
The Drupal fieldname/CSV column header.
Returns
-------
dictionary
A dictionary representing the entity that is POSTed to Drupal as JSON.
"""
if not row[field_name]:
return entity

id_field = row.get(config.get("id_field", "not_applicable"), "not_applicable")
# Currently only supports Typed Relation Display Name with taxonomy entities.
if field_definitions[field_name]["target_type"] == "taxonomy_term":
target_type = "taxonomy_term"
field_vocabs = get_field_vocabularies(config, field_definitions, field_name)
field_values = []
subvalues = split_typed_relation_display_name_string(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make this class as short as possible as it inherits from TypedRelationField. This Class could be much shorter if we made the "split into subvalues" function a parameter of the class. In the create() and update() functions, the ONLY difference is using my custom "split_typed_relation_display_name_string" function.

config, row[field_name], target_type
)
subvalues = self.dedupe_values(subvalues)
cardinality = int(field_definitions[field_name].get("cardinality", -1))
if -1 < cardinality < len(subvalues):
log_field_cardinality_violation(field_name, id_field, str(cardinality))
subvalues = subvalues[:cardinality]
for subvalue in subvalues:
subvalue["target_id"] = prepare_term_id(
config, field_vocabs, field_name, subvalue["target_id"]
)
field_values.append(subvalue)
entity[field_name] = field_values

return entity

def update(
self, config, field_definitions, entity, row, field_name, entity_field_values
):
"""Note: this method appends incoming CSV values to existing values, replaces existing field
values with incoming values, or deletes all values from fields, depending on whether
config['update_mode'] is 'append', 'replace', or 'delete'. It doesn not replace individual
values within fields.
"""
"""Parameters
----------
config : dict
The configuration settings defined by workbench_config.get_config().
field_definitions : dict
The field definitions object defined by get_field_definitions().
entity : dict
The dict that will be POSTed to Drupal as JSON.
row : OrderedDict.
The current CSV record.
field_name : string
The Drupal fieldname/CSV column header.
entity_field_values : list
List of dictionaries containing existing value(s) for field_name in the entity being updated.
Returns
-------
dictionary
A dictionary represeting the entity that is PATCHed to Drupal as JSON.
"""
if config["update_mode"] == "delete":
entity[field_name] = []
return entity

if not row[field_name]:
return entity

if field_name not in entity:
entity[field_name] = []

if config["task"] == "update_terms":
entity_id_field = "term_id"
if config["task"] == "update":
entity_id_field = "node_id"
if config["task"] == "update_media":
entity_id_field = "media_id"

# Currently only supports Typed Relation taxonomy entities.
if field_definitions[field_name]["target_type"] == "taxonomy_term":
target_type = "taxonomy_term"
field_vocabs = get_field_vocabularies(config, field_definitions, field_name)

cardinality = int(field_definitions[field_name].get("cardinality", -1))
if config["update_mode"] == "replace":
subvalues = split_typed_relation_display_name_string(
config, row[field_name], target_type
)
subvalues = self.dedupe_values(subvalues)
field_values = []
for subvalue in subvalues:
subvalue["target_id"] = prepare_term_id(
config, field_vocabs, field_name, subvalue["target_id"]
)
field_values.append(subvalue)
if -1 < cardinality < len(field_values):
field_values = field_values[:cardinality]
log_field_cardinality_violation(
field_name, row[entity_id_field], str(cardinality)
)
entity[field_name] = field_values
if config["update_mode"] == "append":
field_values = []
subvalues = split_typed_relation_display_name_string(
config, row[field_name], target_type
)
for subvalue in subvalues:
subvalue["target_id"] = prepare_term_id(
config, field_vocabs, field_name, subvalue["target_id"]
)
entity_field_values.append(subvalue)
entity_field_values = self.dedupe_values(entity_field_values)
if -1 < cardinality < len(entity_field_values):
entity[field_name] = entity_field_values[:cardinality]
log_field_cardinality_violation(
field_name, row[entity_id_field], str(cardinality)
)
else:
entity[field_name] = entity_field_values

return entity


def serialize(self, config, field_definitions, field_name, field_data):
"""Serialized values into a format consistent with Workbench's CSV-field input format."""
"""Parameters
----------
config : dict
The configuration settings defined by workbench_config.get_config().
field_definitions : dict
The field definitions object defined by get_field_definitions().
field_name : string
The Drupal fieldname/CSV column header.
field_data : string
Raw JSON from the field named 'field_name'.
Returns
-------
string
A string structured same as the Workbench CSV field data for this field type,
or None if there is nothing to return.
"""
if "field_type" not in field_definitions[field_name]:
return None

subvalues = list()
for subvalue in field_data:
display_name = str(subvalue["display_name"])
if config["export_csv_term_mode"] == "name":
vocab_id = get_term_vocab(config, subvalue["target_id"])
term_name = get_term_name(config, subvalue["target_id"])
if display_name:
subvalues.append(
str(subvalue["rel_type"]) + ":" + vocab_id + ":" + term_name + "~" + display_name
)
else:
subvalues.append(
str(subvalue["rel_type"]) + ":" + vocab_id + ":" + term_name
)
else:
# Term IDs.
if display_name:
subvalues.append(
str(subvalue["rel_type"]) + ":" + str(subvalue["target_id"]) + "~" + display_name
)
else:
subvalues.append(
str(subvalue["rel_type"]) + ":" + str(subvalue["target_id"])
)

if len(subvalues) > 1:
return config["subdelimiter"].join(subvalues)
elif len(subvalues) == 0:
return None
else:
return subvalues[0]


class AuthorityLinkField:
"""Functions for handling fields with 'authority_link' Drupal field data type.
Expand Down
74 changes: 74 additions & 0 deletions workbench_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4256,6 +4256,50 @@ def split_typed_relation_string(config, typed_relation_string, target_type):

return return_list

def split_typed_relation_display_name_string(config, typed_relation_display_name_string, target_type):
"""Fields of type 'typed_relation_display_name' are represented in the CSV file
using a structured string, specifically namespace:property:id~display_name,
e.g., 'relators:pht:5~Jane Smith', with the final display_name parameter
(separated by a tilde ~) optional. 'id' is either a term ID or a node ID. This
function takes one of those strings (optionally with a multivalue
subdelimiter) and returns a list of dictionaries in the form they
take in existing node values. ID values can also be term names (strings)
and term URIs (also strings, but in the form 'http....').

Also, these values can (but don't need to) have an optional namespace
in the term ID segment, which is the vocabulary ID string. These
typed relation strings look like 'relators:pht:person:Jordan, Mark~Mark Jordan'.
However, since we split the typed relation strings only on the first
two :, the entire third segment (minus the display name) is considered,
for the purposes of splitting the value, to be the term.
"""
typed_relation_display_name_string = typed_relation_display_name_string.strip()

return_list = []
if len(typed_relation_display_name_string) == 0:
return return_list

temp_list = typed_relation_display_name_string.split(config["subdelimiter"])
for item in temp_list:
display_name = None
item_list = item.split(":", 2)
term_and_display_name = item_list[2].split("~",1)
if term_and_display_name[1]:
display_name = term_and_display_name[1]
if value_is_numeric(term_and_display_name[0]):
target_id = int(term_and_display_name[0])
else:
target_id = term_and_display_name[0]

item_dict = {
"target_id": target_id,
"rel_type": item_list[0] + ":" + item_list[1],
"target_type": target_type,
"display_name": display_name,
}
return_list.append(item_dict)

return return_list

def split_geolocation_string(config, geolocation_string):
"""Fields of type 'geolocation' are represented in the CSV file using a
Expand Down Expand Up @@ -6489,6 +6533,17 @@ def get_term_field_data(config, vocab_id, term_name, term_csv_row):
field_name,
)

# Typed relation display name fields.
elif vocab_field_definitions[field_name]["field_type"] == "typed_relation_display_name":
typed_relation_display_name_field = workbench_fields.TypedRelationDisplayNameField()
term_field_data = typed_relation_display_name_field.create(
config,
vocab_field_definitions,
term_field_data,
term_csv_row,
field_name,
)

# Geolocation fields.
elif vocab_field_definitions[field_name]["field_type"] == "geolocation":
geolocation_field = workbench_fields.GeolocationField()
Expand Down Expand Up @@ -8605,6 +8660,19 @@ def create_children_from_directory(config, parent_csv_record, parent_node_id):
inherited_field,
)

# Typed relation fields.
elif (
field_definitions[inherited_field]["field_type"] == "typed_relation_display_name"
):
typed_relation_display_name_field = workbench_fields.TypedRelationDisplayNameField()
node_json = typed_relation_display_name_field.create(
config,
field_definitions,
node_json,
csv_row_to_apply_to_paged_children,
inherited_field,
)

# Geolocation fields.
elif field_definitions[inherited_field]["field_type"] == "geolocation":
geolocation_field = workbench_fields.GeolocationField()
Expand Down Expand Up @@ -9977,6 +10045,12 @@ def serialize_field_json(config, field_definitions, field_name, field_data):
csv_field_data = serialized_field.serialize(
config, field_definitions, field_name, field_data
)
# Typed relation display name fields (currently, only taxonomy term)
elif field_definitions[field_name]["field_type"] == "typed_relation_display_name":
serialized_field = workbench_fields.TypedRelationDisplayNameField()
csv_field_data = serialized_field.serialize(
config, field_definitions, field_name, field_data
)
# Geolocation fields.
elif field_definitions[field_name]["field_type"] == "geolocation":
serialized_field = workbench_fields.GeolocationField()
Expand Down
Loading