Binary DataArray in XML using python/numpy: what are the leading int32 values?

I’ll have a look to your code tomorrow (too late for today); in the meantime I’ll found some stuffs in Meshio project (see _vtu.py file) and in addition to you file, it might be interesting to get additionnal informtions into.

Some code I need to have a look.

Thanks for your support, remain in touch

Paul

# -*- coding: utf-8 -*-

import numpy as np
import os, base64, zlib, lzma  
    
################################
vtu_to_numpy_type = {
    "Float32": np.dtype(np.float32),
    "Float64": np.dtype(np.float64),
    "Int8": np.dtype(np.int8),
    "Int16": np.dtype(np.int16),
    "Int32": np.dtype(np.int32),
    "Int64": np.dtype(np.int64),
    "UInt8": np.dtype(np.uint8),
    "UInt16": np.dtype(np.uint16),
    "UInt32": np.dtype(np.uint32),
    "UInt64": np.dtype(np.uint64),
}
numpy_to_vtu_type = {v: k for k, v in vtu_to_numpy_type.items()}


################################
def num_bytes_to_num_base64_chars(num_bytes):
    # Rounding up in integer division works by double negation since Python
    # always rounds down.
    return -(-num_bytes // 3) * 4


################################
def read_data(self, c):
    fmt = c.attrib["format"] if "format" in c.attrib else "ascii"

    data_type = c.attrib["type"]
    try:
        dtype = vtu_to_numpy_type[data_type]
    except KeyError:
        print(f"Illegal data type '{data_type}'.")

    if fmt == "ascii":
        # ascii
        if c.text.strip() == "":
            # https://github.com/numpy/numpy/issues/18435
            data = np.empty((0,), dtype=dtype)
        else:
            data = np.fromstring(c.text, dtype=dtype, sep=" ")
    elif fmt == "binary":
        reader = (
            self.read_uncompressed_binary
            if self.compression is None
            else self.read_compressed_binary
        )
        data = reader(c.text.strip(), dtype)
    elif fmt == "appended":
        offset = int(c.attrib["offset"])
        reader = (
            self.read_uncompressed_binary
            if self.compression is None
            else self.read_compressed_binary
        )
        assert self.appended_data is not None
        data = reader(self.appended_data[offset:], dtype)
    else:
       print(f"Unknown data format '{fmt}'.")

    if "NumberOfComponents" in c.attrib:
        nc = int(c.attrib["NumberOfComponents"])
        try:
            data = data.reshape(-1, nc)
        except ValueError:
            name = c.attrib["Name"]
            print(f"VTU file corrupt. The size of the data array '{name}' is {data.size} which doesn't fit the number of components {nc}"
            )
    return data
    

################################
def read_compressed_binary(self, data, dtype):
    # first read the block size; it determines the size of the header
    header_dtype = vtu_to_numpy_type[self.header_type]
    if self.byte_order is not None:
        header_dtype = header_dtype.newbyteorder(
            "<" if self.byte_order == "LittleEndian" else ">"
        )
    num_bytes_per_item = np.dtype(header_dtype).itemsize
    num_chars = num_bytes_to_num_base64_chars(num_bytes_per_item)
    byte_string = base64.b64decode(data[:num_chars])[:num_bytes_per_item]
    num_blocks = np.frombuffer(byte_string, header_dtype)[0]

    # read the entire header
    num_header_items = 3 + int(num_blocks)
    num_header_bytes = num_bytes_per_item * num_header_items
    num_header_chars = num_bytes_to_num_base64_chars(num_header_bytes)
    byte_string = base64.b64decode(data[:num_header_chars])
    header = np.frombuffer(byte_string, header_dtype)

    # num_blocks = header[0]
    # max_uncompressed_block_size = header[1]
    # last_compressed_block_size = header[2]
    block_sizes = header[3:]

    # Read the block data
    byte_array = base64.b64decode(data[num_header_chars:])
    if self.byte_order is not None:
        dtype = dtype.newbyteorder(
            "<" if self.byte_order == "LittleEndian" else ">"
        )

    byte_offsets = np.empty(block_sizes.shape[0] + 1, dtype=block_sizes.dtype)
    byte_offsets[0] = 0
    np.cumsum(block_sizes, out=byte_offsets[1:])

    assert self.compression is not None                                             ##########################################
    c = {"vtkLZMADataCompressor": lzma, "vtkZLibDataCompressor": zlib}[
        self.compression
    ]

    # process the compressed data
    block_data = np.concatenate(
        [
            np.frombuffer(
                c.decompress(byte_array[byte_offsets[k] : byte_offsets[k + 1]]),
                dtype=dtype,
            )
            for k in range(num_blocks)
        ]
    )

    return block_data