Skip to content

XMLConverter

A class to convert data from document tree format (nested dict) to and from XML.

Parameters:

Name Type Description Default
data_model DataModel

The DataModel object used to parse XML files

required
document_tree dict

Data in the document tree format (optional, can be built later by the parse_xml method)

None
Source code in xml2db/xml_converter.py
def __init__(self, data_model: "DataModel", document_tree: dict = None):
    """A class to convert data from document tree format (nested dict) to and from XML.

    Args:
        data_model: The [`DataModel`](./data_model.md#xml2db.model.DataModel) object used to parse XML files
        document_tree: Data in the document tree format (optional, can be built later by the `parse_xml` method)
    """
    self.model = data_model
    self.document_tree = document_tree

_compute_hash_deduplicate(node, hash_maps)

A function to compute hash for a document tree node and deduplicate its content

Parameters:

Name Type Description Default
node tuple

A tuple of (node_type, content) representing a node

required
hash_maps dict

A dict of dicts storing reference to deduplicated nodes keyed by their type and hash value

required

Returns:

Type Description
tuple

A tuple of (node_type, content, hash) representing a node after deduplication

Source code in xml2db/xml_converter.py
def _compute_hash_deduplicate(self, node: tuple, hash_maps: dict) -> tuple:
    """
    A function to compute hash for a document tree node and deduplicate its content

    Args:
        node: A tuple of (node_type, content) representing a node
        hash_maps: A dict of dicts storing reference to deduplicated nodes keyed by their type and hash value

    Returns:
        A tuple of (node_type, content, hash) representing a node after deduplication
    """
    node_type, content = node
    if node_type not in self.model.tables:
        return "", None, b""
    table = self.model.tables[node_type]

    h = self.model.model_config["record_hash_constructor"]()
    for field_type, name, field in table.fields:
        if field_type == "col":
            if field.is_attr:
                h.update(
                    str(
                        content.get(
                            (
                                f"{name[:-5]}__attr"
                                if field.has_suffix
                                else f"{name}__attr"
                            ),
                            None,
                        )
                    ).encode("utf-8")
                )
            else:
                h.update(str(content.get(name, None)).encode("utf-8"))
        elif field_type == "rel1":
            h.update(content[name][0][2] if name in content else b"")
        elif field_type == "reln":
            h_children = [v[2] for v in content.get(name, [])]
            for h_child in sorted(h_children):
                h.update(h_child)
    node_hash = h.digest()

    if node_type not in hash_maps:
        hash_maps[node_type] = {}

    if node_hash in hash_maps[node_type]:
        return hash_maps[node_type][node_hash]

    node = (node_type, content, node_hash)

    if self.model.model_config["document_tree_node_hook"] is not None:
        node = self.model.model_config["document_tree_node_hook"](node)

    hash_maps[node_type][node_hash] = node
    return node

_parse_element_tree(xt)

Parse an etree.ElementTree recursively

Parameters:

Name Type Description Default
xt ElementTree

an XML ElementTree object

required

Returns:

Type Description
tuple

The parsed document tree (nested dict)

Source code in xml2db/xml_converter.py
def _parse_element_tree(self, xt: etree.ElementTree) -> tuple:
    """Parse an etree.ElementTree recursively

    Args:
        xt: an XML ElementTree object

    Returns:
        The parsed document tree (nested dict)
    """
    if self.model.tables[self.model.root_table].is_virtual_node:
        doc = etree.Element(self.model.root_table)
        doc.append(xt.getroot())
    else:
        doc = xt.getroot()
    hash_maps = {}

    return self._parse_xml_node(self.model.root_table, doc, True, hash_maps)

_parse_iterative(xml_file, recover=False)

Parse an XML file into a document tree (nested dict) in an iterative fashion.

This method uses etree.iterparse and does not load the entire XML document in memory. It saves memory, especially if you decide to filter out nodes using 'document_tree_node_hook' hook.

Parameters:

Name Type Description Default
xml_file Union[str, BytesIO]

an XML file to parse

required
recover bool

should we try to parse incorrect XML?

False

Returns:

Type Description
tuple

A tuple of node_type, content (dict), hash

Source code in xml2db/xml_converter.py
def _parse_iterative(
    self, xml_file: Union[str, BytesIO], recover: bool = False
) -> tuple:
    """Parse an XML file into a document tree (nested dict) in an iterative fashion.

    This method uses etree.iterparse and does not load the entire XML document in memory.
    It saves memory, especially if you decide to filter out nodes using 'document_tree_node_hook' hook.

    Args:
        xml_file: an XML file to parse
        recover: should we try to parse incorrect XML?

    Returns:
        A tuple of node_type, content (dict), hash
    """
    nodes_stack = [
        (
            (
                self.model.root_table
                if self.model.tables[self.model.root_table].is_virtual_node
                else None
            ),
            {},
        )
    ]
    hash_maps = {}

    joined_values = False
    skipped_nodes = 0
    for event, element in etree.iterparse(
        xml_file,
        recover=recover,
        events=["start", "end"],
        remove_blank_text=True,
    ):
        key = element.tag.split("}")[1] if "}" in element.tag else element.tag

        if event == "start" and skipped_nodes > 0:
            skipped_nodes += 1

        elif event == "start":
            if nodes_stack[-1][0]:
                node_type_key = (nodes_stack[-1][0], key)
                if node_type_key not in self.model.fields_transforms:
                    skipped_nodes += 1
                    continue
                node_type, transform = self.model.fields_transforms[node_type_key]
            else:
                node_type, transform = self.model.root_table, None
            joined_values = transform == "join"
            if not joined_values:
                content = {}
                for attrib_key, attrib_val in element.attrib.items():
                    if (
                        attrib_key
                        != "{http://www.w3.org/2001/XMLSchema-instance}noNamespaceSchemaLocation"
                    ):
                        content[f"{attrib_key}__attr"] = [
                            attrib_val.strip() if attrib_val.strip() else attrib_val
                        ]
                nodes_stack.append((node_type, content))

        elif event == "end" and skipped_nodes > 0:
            skipped_nodes -= 1

        elif event == "end":
            # joined_values was set with the previous "start" event just before and corresponds to lists of simple
            # type elements
            if joined_values:
                value = None
                if element.text:
                    if element.text.strip():
                        value = element.text.strip()
                    else:
                        value = element.text
                if key in nodes_stack[-1][1]:
                    nodes_stack[-1][1][key].append(value)
                else:
                    nodes_stack[-1][1][key] = [value]

            # else, we have completed a complex type node
            else:
                node = nodes_stack.pop()
                if nodes_stack[-1][0]:
                    node_type_key = (nodes_stack[-1][0], key)
                    node_type, transform = self.model.fields_transforms[
                        node_type_key
                    ]
                else:
                    node_type, transform = self.model.root_table, None
                if element.text and element.text.strip():
                    node[1]["value"] = [element.text.strip()]
                node = self._transform_node(*node)
                if transform not in ["elevate", "elevate_wo_prefix"]:
                    node = self._compute_hash_deduplicate(node, hash_maps)
                if node:
                    if key in nodes_stack[-1][1]:
                        nodes_stack[-1][1][key].append(node)
                    else:
                        nodes_stack[-1][1][key] = [node]
            joined_values = False
            element.clear(keep_tail=True)

    # return the outer container only if root table is a "virtual" node, else return the XML root node
    if nodes_stack[0][0]:
        res = self._transform_node(*nodes_stack[0])
        return self._compute_hash_deduplicate(res, hash_maps)
    for k, v in nodes_stack[0][1].items():
        return v[0]

_parse_xml_node(node_type, node, compute_hash, hash_maps)

Parse nodes of an XML document into a dict recursively

Parameters:

Name Type Description Default
node_type str

type of the node to parse

required
node Element

lxml node object

required
compute_hash bool

should we compute hash and deduplicate?

required
hash_maps dict

a dict referencing nodes based on their hash

required

Returns:

Type Description
tuple

A tuple of node_type, content (dict), hash

Source code in xml2db/xml_converter.py
def _parse_xml_node(
    self, node_type: str, node: etree.Element, compute_hash: bool, hash_maps: dict
) -> tuple:
    """Parse nodes of an XML document into a dict recursively

    Args:
        node_type: type of the node to parse
        node: lxml node object
        compute_hash: should we compute hash and deduplicate?
        hash_maps: a dict referencing nodes based on their hash

    Returns:
        A tuple of node_type, content (dict), hash
    """

    content = {}

    for key, val in node.attrib.items():
        if (
            key
            != "{http://www.w3.org/2001/XMLSchema-instance}noNamespaceSchemaLocation"
        ):
            content[f"{key}__attr"] = [val.strip() if val.strip() else val]

    if node.text and node.text.strip():
        content["value"] = [node.text.strip()]

    for element in node.iterchildren():
        if isinstance(element.tag, str):
            key = element.tag.split("}")[1] if "}" in element.tag else element.tag
            node_type_key = (node_type, key)
            value = None
            if element.text:
                value = (
                    element.text.strip() if element.text.strip() else element.text
                )
            if node_type_key not in self.model.fields_transforms:
                # skip the node if it is not in the data model
                continue
            transform = self.model.fields_transforms[node_type_key][1]
            if transform != "join":
                value = self._parse_xml_node(
                    self.model.fields_transforms[node_type_key][0],
                    element,
                    transform not in ["elevate", "elevate_wo_prefix"],
                    hash_maps,
                )
            if value is not None:
                if key in content:
                    content[key].append(value)
                else:
                    content[key] = [value]

    node = self._transform_node(node_type, content)

    if compute_hash:
        return self._compute_hash_deduplicate(node, hash_maps)

    return node

_transform_node(node_type, content)

Apply transformations to a given node

Parameters:

Name Type Description Default
node_type str

The node type to transform

required
content dict

The node content dict to transform

required

Returns:

Type Description
tuple

A tuple of (node_type, content) for the transformed node

Source code in xml2db/xml_converter.py
def _transform_node(self, node_type: str, content: dict) -> tuple:
    """Apply transformations to a given node

    Args:
        node_type: The node type to transform
        content: The node content dict to transform

    Returns:
        A tuple of (node_type, content) for the transformed node
    """
    for key in list(content.keys()):
        node_type_key = (node_type, key)
        if node_type_key in self.model.fields_transforms:
            transform = self.model.fields_transforms[node_type_key][1]
            if transform == "elevate" or transform == "elevate_wo_prefix":
                prefix = f"{key}_" if transform == "elevate" else ""
                child_content = content[key][0][1]
                del content[key]
                for child_key, val in child_content.items():
                    content[f"{prefix}{child_key}"] = val

    if node_type in self.model.types_transforms:
        if self.model.types_transforms[node_type] == "choice":
            child_key, val = list(content.items())[0]
            content = {"type": [child_key], "value": val}

    # convert some simple types to python types
    if node_type in self.model.tables:
        table = self.model.tables[node_type]
        for key in table.columns:
            content_key = (
                (
                    f"{key[:-5]}__attr"
                    if table.columns[key].has_suffix
                    else f"{key}__attr"
                )
                if table.columns[key].is_attr
                else key
            )
            if content_key in content:
                if table.columns[key].data_type in ["decimal", "float"]:
                    content[content_key] = [float(v) for v in content[content_key]]
                elif table.columns[key].data_type in [
                    "integer",
                    "int",
                    "nonPositiveInteger",
                    "nonNegativeInteger",
                    "positiveInteger",
                    "negativeInteger",
                    "short",
                    "byte",
                    "long",
                ]:
                    content[content_key] = [int(v) for v in content[content_key]]
                elif table.columns[key].data_type == "boolean":
                    content[content_key] = [
                        v == "true" or v == "1" for v in content[content_key]
                    ]

    return node_type, content

parse_xml(xml_file, file_path=None, skip_validation=False, recover=False, iterparse=True)

Parse an XML document into a nested dict and performs the simplifications defined in the DataModel object ("pull" child to upper level, transform a choice model into "type" and "value" fields or concatenate children as string).

Parameters:

Name Type Description Default
xml_file Union[str, BytesIO]

An XML file path or file content to be converted

required
file_path str

The file path to be printed in logs

None
skip_validation bool

Whether we should validate XML against the schema before parsing

False
recover bool

Try to process malformed XML (lxml option)

False
iterparse bool

Parse XML using iterative parsing, which is a bit slower but uses less memory

True

Returns:

Type Description
tuple

The parsed data in the document tree format (nested dict)

Source code in xml2db/xml_converter.py
def parse_xml(
    self,
    xml_file: Union[str, BytesIO],
    file_path: str = None,
    skip_validation: bool = False,
    recover: bool = False,
    iterparse: bool = True,
) -> tuple:
    """Parse an XML document into a nested dict and performs the simplifications defined in the
    DataModel object ("pull" child to upper level, transform a choice model into "type" and "value"
    fields or concatenate children as string).

    Args:
        xml_file: An XML file path or file content to be converted
        file_path: The file path to be printed in logs
        skip_validation: Whether we should validate XML against the schema before parsing
        recover: Try to process malformed XML (lxml option)
        iterparse: Parse XML using iterative parsing, which is a bit slower but uses less memory

    Returns:
        The parsed data in the document tree format (nested dict)
    """

    xt = None
    if not iterparse or (not skip_validation and recover):
        logger.info("Parsing XML file")
        xt = etree.parse(xml_file, parser=etree.XMLParser(recover=recover))

    if skip_validation:
        logger.info("Skipping XML file validation")
    else:
        logger.info("Validating XML file against the schema")
        if not self.model.lxml_schema.validate(xt if xt else etree.parse(xml_file)):
            logger.error(f"XML file {file_path} does not conform with the schema")
            raise ValueError(
                f"XML file {file_path} does not conform with the schema"
            )
        logger.info("XML file conforms with the schema")

    if iterparse:
        self.document_tree = self._parse_iterative(xml_file, recover)
    else:
        self.document_tree = self._parse_element_tree(xt)

    return self.document_tree

to_xml(out_file=None, nsmap=None, indent=' ')

Convert a document tree (nested dict) into an XML file

Parameters:

Name Type Description Default
out_file str

If provided, write output to a file.

None
nsmap dict

An optional namespace mapping.

None
indent str

A string used as indentin XML output.

' '

Returns:

Type Description
Element

The etree object corresponding to the root XML node.

Source code in xml2db/xml_converter.py
def to_xml(
    self, out_file: str = None, nsmap: dict = None, indent: str = "  "
) -> etree.Element:
    """Convert a document tree (nested dict) into an XML file

    Args:
        out_file: If provided, write output to a file.
        nsmap: An optional namespace mapping.
        indent: A string used as indentin XML output.

    Returns:
        The etree object corresponding to the root XML node.
    """
    doc = self._make_xml_node(
        self.document_tree,
        self.model.tables[self.document_tree[0]].name,
        nsmap,
    )
    if self.model.tables[self.model.root_table].is_virtual_node:
        child = None
        for child in doc:
            break
        doc = child
    if out_file:
        etree.indent(doc, space=indent)
        with open(out_file, "wt") as f:
            f.write(
                etree.tostring(
                    doc,
                    pretty_print=True,
                    encoding="utf-8",
                    xml_declaration=True,
                ).decode("utf-8")
            )
    return doc