Write UnstructuredGrid in parallel

Hello,

I’m attempting write out an UnstructuredGrid using vtkXMLPUnstructuredGridWriter, but I’m hitting a wall. Specifically, I can’t seem to actually write out the file in N pieces and instead it writes out the entire UnstructuredGrid N times.

Using the vtu files here:

from pathlib import Path
import pyvista as pv
from vtk import vtkXMLPUnstructuredGridWriter, vtkXMLPUnstructuredGridReader

# read unstructured grid
grid = pv.read("./drivaer_data/run_1/volume_1.vtk")
subgrid = grid.extract_cells(range(1_000_000))  # speed up testing


# Write as parallel unstructured grid
out_path = Path("/tmp/vtk-test")
filename = out_path / "subgrid_test.pvtu"
writer = vtkXMLPUnstructuredGridWriter()
writer.SetInputData(subgrid)
writer.SetNumberOfPieces(8)
writer.SetStartPiece(0)
writer.SetEndPiece(7)
writer.SetFileName(str(filename))
writer.SetCompressorTypeToNone()
writer.Write()

# Read back the PVTU file
reader = vtkXMLPUnstructuredGridReader()
reader.SetFileName(str(filename))
reader.Update()

# Total size of pieces
total_bytes = sum(
    f.stat().st_size for f in out_path.glob("subgrid_test_*.vtu") if f.is_file()
)
print(f"Total size of all pieces: {total_bytes / (1024**2):.2f} MiB")

###############################################################################
# Write out a single file
out_path_single = Path("/tmp/single-file").expanduser()
out_path_single.mkdir(exist_ok=True)
subgrid.save(out_path_single / "single-file.vtu", compression=None)
total_bytes = (out_path_single / "single-file.vtu").stat().st_size
print(f"Single file size: {total_bytes / (1024**2):.2f} MiB")

Results in:

Total size of all pieces: 2235.55 MiB
Single file size: 279.44 MiB

Is there pre-processing that’s necessary to write out the file in parallel?

Partially figured out the problem thanks to adeak (Andras Deak) · GitHub

In https://examples.vtk.org/site/Cxx/IO/XMLPUnstructuredGridWriter/

changing:

writer.SetInputConnection(delaunay.GetOutputPort())

to:

writer.SetInputData(delaunay.GetOutput())

Results in the full grid being saved 4x.

This seems to work:

import time
from pathlib import Path
import pyvista as pv
import vtk

n = 100
grid = pv.ImageData(dimensions=(n, n, n)).cast_to_unstructured_grid()


parallel_dir = Path("/tmp/parallel-testing-new")
parallel_dir.mkdir(parents=True, exist_ok=True)

filename_multi_part = str(parallel_dir / "multi-part.pvtu")
n_pieces = 8

piece_extractor = vtk.vtkExtractUnstructuredGridPiece()
piece_extractor.SetInputData(grid)

writer = vtk.vtkXMLPUnstructuredGridWriter()
writer.SetInputConnection(piece_extractor.GetOutputPort())
writer.SetNumberOfPieces(n_pieces)
writer.SetFileName(filename_multi_part)
writer.SetCompressorTypeToNone()
writer.SetStartPiece(0)
writer.SetEndPiece(n_pieces - 1)
writer.SetDataModeToBinary()
writer.SetEncodeAppendedData(False)
writer.Write()

grid_in = pv.read(filename_multi_part)

However, the individual parts have extra points (even outside of the points that exist at the boundaries). These can be eliminated with grid.clean, but I’m guessing vtkExtractUnstructuredGridPiece is the wrong filter.

Here’s to write out in parallel in Python using threading using vtk==9.5.1 and pyvista==0.46.3. Note: there’s a risk for segmentation fault when using vtkExtractSelection as the filter has the side effect of creating the vtkOriginalPointIds and vtkOriginalCellIds arrays on the input dataset.

import numpy as np
import shutil
import time
from pathlib import Path
import pyvista as pv
import vtk
from concurrent.futures import ThreadPoolExecutor


# Dataset: Icosphere -> tetrahedralize
n = 300
grid = pv.ImageData(dimensions=(n, n, n)).cast_to_unstructured_grid()

expected_nbytes = (
    grid.points.nbytes
    + grid.offset.nbytes
    + grid.cell_connectivity.nbytes
    + grid.celltypes.nbytes
)
print(f"Expected size:       {expected_nbytes / 1024**2:.2f} MB")

parallel_dir = Path("/tmp/parallel-testing-new")
parallel_dir.mkdir(parents=True, exist_ok=True)


filename_multi_part = str(parallel_dir / "multi-part.pvtu")
try:
    shutil.rmdir(parallel_dir)
except:
    pass
n_pieces = 8


def patch_summary_file(pvtu_file: str, n_pieces: int):
    with open(pvtu_file, "r") as f:
        lines = f.readlines()

    new_lines = []
    for line in lines:
        if "<Piece Source=" in line:
            base = line.strip()
            for i in range(n_pieces):
                new_line = base.replace("_0.vtu", f"_{i}.vtu")
                new_lines.append(new_line + "\n")
        else:
            new_lines.append(line)

    with open(pvtu_file, "w") as f:
        f.writelines(new_lines)


def write_piece(piece):
    # extract a "piece" of the unstructured grid
    n_cells = grid.n_cells
    cells_per_piece = n_cells // n_pieces
    start = piece * cells_per_piece
    end = n_cells if piece == n_pieces - 1 else (piece + 1) * cells_per_piece
    piece_grid = grid.copy(deep=False).extract_cells(range(start, end))

    # write out that piece
    writer = vtk.vtkXMLPUnstructuredGridWriter()
    writer.SetInputData(piece_grid)
    writer.SetNumberOfPieces(n_pieces)
    writer.SetFileName(filename_multi_part)
    writer.SetCompressorTypeToNone()
    writer.SetStartPiece(piece)
    writer.SetEndPiece(piece)
    if piece != 0:
        # do not write the summary file
        writer.WriteSummaryFileOff()
    writer.Update()


tstart = time.time()
with ThreadPoolExecutor(max_workers=n_pieces) as ex:
    futures = []
    for piece in range(n_pieces):
        futures.append(ex.submit(write_piece, piece))
    for f in futures:
        f.result()


# patch the summary file since
# See: https://discourse.vtk.org/t/7418
patch_summary_file(filename_multi_part, n_pieces)

print(f"Parallel written in: {time.time() - tstart:.2f} seconds")

parallel_total_size = sum(f.stat().st_size for f in parallel_dir.glob("*.vtu"))
print(f"Parallel total size: {parallel_total_size / 1024**2:.2f} MB")

# Single write
single_dir = Path("/tmp/single-testing-new")
single_dir.mkdir(parents=True, exist_ok=True)

tstart = time.time()
single_file = "grid.vtu"
swriter = vtk.vtkXMLUnstructuredGridWriter()
swriter.SetInputData(grid)
swriter.SetFileName(str(single_dir / single_file))
swriter.SetCompressorTypeToNone()
swriter.Write()
print(f"Single written in    {time.time() - tstart:.2f} seconds")

# File size comparison
single_file_size = (single_dir / "grid.vtu").stat().st_size

print(f"Single file size:    {single_file_size / 1024**2:.2f} MB")
print(f"Parallel/single:     {parallel_total_size / single_file_size:.2f}")

tstart_total = time.time()
tstart = time.time()
grid_in = pv.read(filename_multi_part)
print(f"Loaded parallel file in {time.time() - tstart:.2f} seconds")

tstart = time.time()
grid_in.clean()
print(f"Cleaned grid in         {time.time() - tstart:.2f} seconds")
print(f"  Total time spent      {time.time() - tstart_total:.2f} seconds")

tstart = time.time()
grid_in = pv.read(str(single_dir / single_file))
print(f"Loaded single file in   {time.time() - tstart:.2f} seconds")

And here’s the output from that:

Expected size:       2169.95 MB
Parallel written in: 4.14 seconds
Parallel total size: 3455.93 MB
Single written in    10.24 seconds
Single file size:    2893.27 MB
Parallel/single:     1.19
Loaded parallel file in 13.18 seconds
Cleaned grid in         3.07 seconds
  Total time spent      16.25 seconds
Loaded single file in   10.97 seconds

A few observations here:

  • It’s faster to write in parallel, but not a linear speedup as the dataset must be partitioned.
  • Slower to read (probably could write a threaded XML reader and then append and clean in one step)

If you need more performance in parallel, you should give vtkHDFWriter/vtkHDFReader a try

Encountered a segmentation fault with the vtkHDFReader. I’ll open an issue.

please do. There are “known” crashes due to malformed VTKHDF data, but it should not have issues with valid data.