import io
import os
import sys
import time
import re
import hashlib
import itertools
import logging
import json
import configparser
import toml
from box import Box
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from dae.utils.variant_utils import GENOTYPE_TYPE
from dae.variants.attributes import TransmissionType
from dae.variants.family_variant import (
FamilyAllele,
FamilyVariant,
calculate_simple_best_state,
)
from dae.variants.variant import SummaryVariant, SummaryAllele
from dae.backends.schema2.serializers import AlleleParquetSerializer
logger = logging.getLogger(__name__)
[docs]class PartitionDescriptor:
"""Abstract class for partition description."""
family_alleles_dirname: str = "family"
summary_alleles_dirname: str = "summary"
def __init__(self):
pass
[docs] def has_partitions(self):
raise NotImplementedError()
[docs] def build_impala_partitions(self):
raise NotImplementedError()
@property
def chromosomes(self):
raise NotImplementedError()
@property
def region_length(self):
raise NotImplementedError()
[docs] def summary_filename(self, summary_allele):
raise NotImplementedError()
[docs] def family_filename(self, family_allele):
raise NotImplementedError()
[docs] def write_partition_configuration(self):
raise NotImplementedError()
[docs]class NoPartitionDescriptor(PartitionDescriptor):
"""Defines class for missing partition description."""
def __init__(self, root_dirname=""):
super().__init__()
self.output = root_dirname
[docs] def has_partitions(self):
return False
[docs] def build_impala_partitions(self):
raise ValueError("unexpected use of build_impala_partitions")
@property
def chromosomes(self):
return []
@property
def region_length(self):
return 3_000_000_000
[docs] def summary_filename(self, summary_allele):
bucket_index = summary_allele.get_attribute("bucket_index")
filename = f"nopart_{bucket_index:0>6}_summary_alleles.parquet"
return os.path.join(
self.output, self.summary_alleles_dirname, filename
)
[docs] def family_filename(self, family_allele):
bucket_index = family_allele.get_attribute("bucket_index")
filename = f"nopart_{bucket_index:0>6}_family_alleles.parquet"
return os.path.join(self.output, self.family_alleles_dirname, filename)
[docs] def write_partition_configuration(self):
partition_desc_table = pa.Table.from_pydict(
{"key": [], "value": []},
schema=pa.schema({"key": pa.string(), "value": pa.string()}),
)
pq.write_table(
partition_desc_table, os.path.join(self.output, "meta.parquet")
)
[docs] @staticmethod
def generate_file_access_glob():
"""Return a glob for accessing every parquet file in the partition."""
return "*variants.parquet"
[docs] @staticmethod
def variants_filename_basedir(filename):
"""Extract the variants basedir from filename."""
regexp = re.compile("^(?P<basedir>.+)/(?P<prefix>.+)variants.parquet$")
match = regexp.match(filename)
if not match:
return None
assert "basedir" in match.groupdict()
basedir = match.groupdict()["basedir"]
if basedir and basedir[-1] != "/":
basedir += "/"
return basedir
[docs]class ParquetPartitionDescriptor(PartitionDescriptor):
"""Defines partition description used for parquet datasets."""
def __init__(
self,
chromosomes,
region_length,
family_bin_size=0,
coding_effect_types=None,
rare_boundary=0,
root_dirname="",
):
super().__init__()
self.output = root_dirname
self._chromosomes = chromosomes
self._region_length = region_length
self._family_bin_size = family_bin_size
self._coding_effect_types = \
coding_effect_types if coding_effect_types else []
self._rare_boundary = rare_boundary
[docs] def has_partitions(self):
return True
[docs] def build_impala_partitions(self):
partitions = ["region_bin string"]
if self.rare_boundary > 0:
partitions.append("frequency_bin tinyint")
if self.coding_effect_types:
partitions.append("coding_bin tinyint")
if self.family_bin_size > 0:
partitions.append("family_bin tinyint")
return ", ".join(partitions)
@property
def chromosomes(self):
return self._chromosomes
@property
def region_length(self):
return self._region_length
@property
def family_bin_size(self):
return self._family_bin_size
@property
def coding_effect_types(self):
return self._coding_effect_types
@property
def rare_boundary(self):
return self._rare_boundary
# reoganize to path
# def from_string
[docs] @staticmethod
def from_config(config_path, root_dirname=""):
"""Create a partition description from the provided config file."""
assert os.path.exists(config_path), config_path
config = configparser.ConfigParser()
config.read(config_path)
assert config["region_bin"] is not None
return ParquetPartitionDescriptor.from_dict(config, root_dirname)
[docs] @staticmethod
def from_dict(config, root_dirname=""):
"""Create a partition description from the provided dictionary."""
chromosomes = list(
map(str.strip, config["region_bin"]["chromosomes"].split(","))
)
region_length = int(config["region_bin"]["region_length"])
family_bin_size = 0
coding_effect_types = []
rare_boundary = 0
if "family_bin" in config:
family_bin_size = int(config["family_bin"]["family_bin_size"])
if "coding_bin" in config:
coding_effect_types = config["coding_bin"]["coding_effect_types"]
coding_effect_types = [
et.strip() for et in coding_effect_types.split(",")
]
coding_effect_types = [et for et in coding_effect_types if et]
if "frequency_bin" in config:
rare_boundary = int(config["frequency_bin"]["rare_boundary"])
return ParquetPartitionDescriptor(
chromosomes,
region_length,
family_bin_size,
coding_effect_types,
rare_boundary,
root_dirname=root_dirname,
)
def _evaluate_region_bin(self, summary_allele):
chromosome = summary_allele.chromosome
pos = summary_allele.position // self._region_length
if chromosome in self._chromosomes:
return f"{chromosome}_{pos}"
return f"other_{pos}"
def _family_bin_from_id(self, family_id):
sha256 = hashlib.sha256()
sha256.update(family_id.encode())
digest = int(sha256.hexdigest(), 16)
return digest % self.family_bin_size
def _evaluate_family_bin(self, family_allele):
return self._family_bin_from_id(family_allele.family_id)
def _evaluate_coding_bin(self, summary_allele):
if summary_allele.is_reference_allele:
return 0
variant_effects = set(summary_allele.effect_types)
coding_effect_types = set(self._coding_effect_types)
result = variant_effects.intersection(coding_effect_types)
if len(result) == 0:
return 0
return 1
def _evaluate_frequency_bin(self, summary_allele):
count = summary_allele.get_attribute("af_allele_count")
frequency = summary_allele.get_attribute("af_allele_freq")
transmission_type = summary_allele.transmission_type
if transmission_type == TransmissionType.denovo:
frequency_bin = 0
elif count and int(count) == 1: # Ultra rare
frequency_bin = 1
elif frequency and float(frequency) < self._rare_boundary: # Rare
frequency_bin = 2
else: # Common
frequency_bin = 3
return frequency_bin
[docs] def summary_filename(self, summary_allele):
"""Return filename that summary_allele should be appended to."""
# TODO eliminated copy/pasted code with family_filename
filepath = os.path.join(self.output, self.summary_alleles_dirname)
filename = "summary"
current_bin = self._evaluate_region_bin(summary_allele)
filepath = os.path.join(filepath, f"region_bin={current_bin}")
filename += f"_region_bin_{current_bin}"
if self._rare_boundary > 0:
current_bin = self._evaluate_frequency_bin(summary_allele)
filepath = os.path.join(filepath, f"frequency_bin={current_bin}")
filename += f"_frequency_bin_{current_bin}"
if len(self._coding_effect_types) > 0:
current_bin = self._evaluate_coding_bin(summary_allele)
filepath = os.path.join(filepath, f"coding_bin={current_bin}")
filename += f"_coding_bin_{current_bin}"
bucket_index = summary_allele.get_attribute("bucket_index")
filename += f"_bucket_index_{bucket_index:0>6}"
filename += ".parquet"
return os.path.join(filepath, filename)
[docs] def family_filename(self, family_allele):
"""Return filename that family_allele should be appended to."""
filepath = os.path.join(self.output, self.family_alleles_dirname)
filename = "family"
summary_allele = family_allele
current_bin = self._evaluate_region_bin(summary_allele)
filepath = os.path.join(filepath, f"region_bin={current_bin}")
filename += f"_region_bin_{current_bin}"
if self._rare_boundary > 0:
current_bin = self._evaluate_frequency_bin(summary_allele)
filepath = os.path.join(filepath, f"frequency_bin={current_bin}")
filename += f"_frequency_bin_{current_bin}"
if len(self._coding_effect_types) > 0:
current_bin = self._evaluate_coding_bin(summary_allele)
filepath = os.path.join(filepath, f"coding_bin={current_bin}")
filename += f"_coding_bin_{current_bin}"
if self._family_bin_size > 0:
current_bin = self._evaluate_family_bin(family_allele)
filepath = os.path.join(filepath, f"family_bin={current_bin}")
filename += f"_family_bin_{current_bin}"
bucket_index = family_allele.get_attribute("bucket_index")
filename += f"_bucket_index_{bucket_index:0>6}"
filename += ".parquet"
return os.path.join(filepath, filename)
def _variants_partition_bins(self):
partition_bins = []
region_bins = [
("region_bin", f"{chrom}_0") for chrom in self._chromosomes
]
partition_bins.append(region_bins)
if self._rare_boundary > 0:
frequency_bins = [
("frequency_bin", "1"),
("frequency_bin", "2"),
("frequency_bin", "3"),
]
partition_bins.append(frequency_bins)
if len(self._coding_effect_types) > 0:
coding_bins = [("coding_bin", "0"), ("coding_bin", "1")]
partition_bins.append(coding_bins)
if self._family_bin_size > 0:
family_bins = [
("family_bin", f"{fb}") for fb in range(self._family_bin_size)
]
partition_bins.append(family_bins)
return partition_bins
def _variants_filenames_regexp(self):
partition_bins = self._variants_partition_bins()
product = next(itertools.product(*partition_bins))
dirname_parts = [f"{p[0]}=(?P<{p[0]}>.+)" for p in product]
filename_parts = [f"{p[0]}_(?P={p[0]})" for p in product]
dirname = "/".join(dirname_parts)
dirname = os.path.join("^(?P<basedir>.+)", dirname)
filename = "_".join(filename_parts)
filename = f"variants_{filename}\\.parquet$"
filename = os.path.join(dirname, filename)
return re.compile(filename, re.VERBOSE)
[docs] def variants_filename_basedir(self, filename):
"""Extract the variants basedir from filename."""
regexp = self._variants_filenames_regexp()
match = regexp.match(filename)
if not match:
return None
assert "basedir" in match.groupdict()
basedir = match.groupdict()["basedir"]
if basedir and basedir[-1] != "/":
basedir += "/"
return basedir
[docs] def write_partition_configuration(self):
config = configparser.ConfigParser()
config.add_section("region_bin")
config["region_bin"]["chromosomes"] = ", ".join(self._chromosomes)
config["region_bin"]["region_length"] = str(self._region_length)
config.add_section("family_bin")
config["family_bin"]["family_bin_size"] = str(
self._family_bin_size
)
config.add_section("coding_bin")
config["coding_bin"]["coding_effect_types"] = ", ".join(
self._coding_effect_types
)
config.add_section("frequency_bin")
config["frequency_bin"]["rare_boundary"] = str(self._rare_boundary)
filename = os.path.join(self.output, "_PARTITION_DESCRIPTION")
os.makedirs(os.path.dirname(filename), exist_ok=True)
config_buffer = io.StringIO()
config.write(config_buffer)
with open(filename, "w") as configfile:
configfile.write(config_buffer.getvalue())
partition_desc_table = pa.Table.from_pydict(
{
"key": ["partition_description"],
"value": [config_buffer.getvalue()],
},
schema=pa.schema({"key": pa.string(), "value": pa.string()}),
)
pq.write_table(
partition_desc_table, os.path.join(self.output, "meta.parquet")
)
[docs] def generate_file_access_glob(self):
"""Return a glob for accessing every parquet file in the partition."""
glob = "*/"
if self.family_bin_size != 0:
glob += "*/"
if self.coding_effect_types:
glob += "*/"
if self.rare_boundary != 0:
glob += "*/"
glob += "*.parquet"
return glob
[docs] def add_family_bins_to_families(self, families):
for family in families.values():
family_bin = self._family_bin_from_id(family.family_id)
for person in family.persons.values():
person.set_attr("family_bin", family_bin)
families._ped_df = None # pylint: disable=protected-access
return families
[docs]class ContinuousParquetFileWriter:
"""A continous parquet writer.
Class that automatically writes to a given parquet file when supplied
enough data. Automatically dumps leftover data when closing into the file
"""
def __init__(
self,
filepath,
variant_loader,
filesystem=None,
rows=100_000,
schema="schema",
):
self.filepath = filepath
annotation_schema = variant_loader.get_attribute("annotation_schema")
extra_attributes = variant_loader.get_attribute("extra_attributes")
self.serializer = AlleleParquetSerializer(
annotation_schema, extra_attributes
)
self.schema = getattr(self.serializer, schema)
dirname = os.path.dirname(filepath)
if dirname and not os.path.exists(dirname):
os.makedirs(dirname)
self.dirname = dirname
self._writer = pq.ParquetWriter(
filepath, self.schema, compression="snappy", filesystem=filesystem
)
self.rows = rows
self._data = None
self.data_reset()
[docs] def data_reset(self):
self._data = {name: [] for name in self.schema.names}
[docs] def size(self):
# chromosome is not guaranteed across all schemas - Kevin
# return len(self._data["chromosome"])
return max([len(self._data[k]) for k in self._data.keys()])
[docs] def build_table(self):
table = pa.Table.from_pydict(self._data, self.schema)
return table
def _write_table(self):
self._writer.write_table(self.build_table())
self.data_reset()
[docs] def append_summary_allele(self, allele, json_data):
"""Append the data for an entire variant to the correct file."""
data = self.serializer.build_summary_allele_batch_dict(
allele, json_data
)
for k, v in self._data.items():
v.extend(data[k])
if self.size() >= self.rows:
logger.info(
"parquet writer %s data flushing at len %s",
self.filepath, self.size())
self._write_table()
[docs] def append_family_allele(self, allele, json_data):
"""Append the data for an entire variant to the correct file."""
data = self.serializer.build_family_allele_batch_dict(
allele, json_data
)
for k, v in self._data.items():
v.extend(data[k])
if self.size() >= self.rows:
logger.info(
"parquet writer %s data flushing at len %s",
self.filepath, self.size())
self._write_table()
[docs] def close(self):
logger.info(
"closing parquet writer %s at len %d", self.dirname, self.size())
if self.size() > 0:
self._write_table()
self._writer.close()
[docs]class VariantsParquetWriter:
"""Provide functions for storing variants into parquet dataset."""
def __init__(
self,
variants_loader,
partition_descriptor,
bucket_index=1,
rows=100_000,
include_reference=True,
filesystem=None,
):
self.variants_loader = variants_loader
self.families = variants_loader.families
self.full_variants_iterator = variants_loader.full_variants_iterator()
self.bucket_index = bucket_index
self.rows = rows
self.filesystem = filesystem
self.include_reference = include_reference
self.start = time.time()
self.data_writers = {}
assert isinstance(partition_descriptor, PartitionDescriptor)
self.partition_descriptor = partition_descriptor
annotation_schema = self.variants_loader.get_attribute(
"annotation_schema"
)
extra_attributes = self.variants_loader.get_attribute(
"extra_attributes"
)
self.serializer = AlleleParquetSerializer(
annotation_schema, extra_attributes
)
@staticmethod
def _setup_reference_allele(summary_variant, family):
genotype = -1 * np.ones(shape=(2, len(family)), dtype=GENOTYPE_TYPE)
best_state = calculate_simple_best_state(
genotype, summary_variant.allele_count
)
ref_allele = summary_variant.ref_allele
reference_allele = FamilyAllele(ref_allele, family, genotype,
best_state)
return reference_allele
@staticmethod
def _setup_all_unknown_allele(summary_variant, family):
genotype = -1 * np.ones(shape=(2, len(family)), dtype=GENOTYPE_TYPE)
best_state = calculate_simple_best_state(
genotype, summary_variant.allele_count
)
ref_allele = summary_variant.ref_allele
unknown_allele = FamilyAllele(
SummaryAllele(
ref_allele.chromosome,
ref_allele.position,
ref_allele.reference,
ref_allele.reference,
summary_index=ref_allele.summary_index,
allele_index=-1,
transmission_type=ref_allele.transmission_type,
attributes={},
),
family,
genotype,
best_state,
)
return unknown_allele
def _setup_all_unknown_variant(self, summary_variant, family_id):
family = self.families[family_id]
genotype = -1 * np.ones(shape=(2, len(family)), dtype=GENOTYPE_TYPE)
alleles = [
self._setup_reference_allele(summary_variant, family),
self._setup_all_unknown_allele(summary_variant, family),
]
best_state = -1 * np.ones(
shape=(len(alleles), len(family)), dtype=GENOTYPE_TYPE
)
return FamilyVariant(
SummaryVariant(alleles), family, genotype, best_state
)
def _get_bin_writer_family(self, allele):
filename = self.partition_descriptor.family_filename(allele)
if filename not in self.data_writers:
self.data_writers[filename] = ContinuousParquetFileWriter(
filename,
self.variants_loader,
filesystem=self.filesystem,
rows=self.rows,
schema="schema_family",
)
return self.data_writers[filename]
def _get_bin_writer_summary(self, allele):
filename = self.partition_descriptor.summary_filename(allele)
if filename not in self.data_writers:
self.data_writers[filename] = ContinuousParquetFileWriter(
filename,
self.variants_loader,
filesystem=self.filesystem,
rows=self.rows,
schema="schema_summary",
)
return self.data_writers[filename]
def _write_internal(self):
family_variant_index = 0
report = False
for summary_variant_index, (
summary_variant,
family_variants,
) in enumerate(self.full_variants_iterator):
num_fam_alleles_written = 0
for fv in family_variants:
family_variant_index += 1
if fv.is_unknown() or fv.is_reference():
continue
fv.summary_index = summary_variant_index
fv.family_index = family_variant_index
for family_allele in fv.alleles:
extra_atts = {
"bucket_index": self.bucket_index,
"family_index": family_variant_index,
}
family_allele.update_attributes(extra_atts)
family_variant_data_json = json.dumps(fv.to_record,
sort_keys=True)
for family_allele in fv.alt_alleles:
family_bin_writer = self._get_bin_writer_family(
family_allele
)
family_bin_writer.append_family_allele(
family_allele, family_variant_data_json
)
num_fam_alleles_written += 1
# don't store summary alleles withouth family ones
if num_fam_alleles_written > 0:
# build summary json blob (concat all other alleles)
# INSIDE summary_variant
summary_blobs_json = json.dumps(
summary_variant.to_record, sort_keys=True
)
for summary_allele in summary_variant.alleles:
extra_atts = {
"bucket_index": self.bucket_index,
}
summary_allele.summary_index = summary_variant_index
summary_allele.update_attributes(extra_atts)
summary_writer = self._get_bin_writer_summary(
summary_allele
)
summary_writer.append_summary_allele(
summary_allele, summary_blobs_json
)
if summary_variant_index % 1000 == 0:
report = True
if report:
elapsed = time.time() - self.start
print(
f"Bucket {self.bucket_index}; "
f"{summary_variant.chromosome}:"
f"{summary_variant.position}: "
f"{summary_variant_index}/"
f"{family_variant_index} variants imported "
f"for {elapsed:.2f} sec ({len(self.data_writers)} files)",
file=sys.stderr,
)
report = False
filenames = list(self.data_writers.keys())
for bin_writer in self.data_writers.values():
bin_writer.close()
print("-------------------------------------------", file=sys.stderr)
print("Bucket:", self.bucket_index, file=sys.stderr)
print("-------------------------------------------", file=sys.stderr)
elapsed = time.time() - self.start
print(
f"DONE: {family_variant_index} family variants imported "
f"for {elapsed:.2f} sec ({len(self.data_writers)} files)",
file=sys.stderr,
)
print("-------------------------------------------", file=sys.stderr)
return filenames
[docs] def write_schema(self):
"""Write the schema to a separate file."""
config = {}
schema_summary = self.serializer.schema_summary
schema_family = self.serializer.schema_family
config["summary_schema"] = {}
config["family_schema"] = {}
for k in schema_summary.names:
v = schema_summary.field(k)
config["summary_schema"][k] = str(v.type)
for k in schema_family.names:
v = schema_family.field(k)
config["family_schema"][k] = str(v.type)
if os.path.isdir(self.partition_descriptor.output):
path = self.partition_descriptor.output
else:
path = os.path.dirname(self.partition_descriptor.output)
filename = os.path.join(path, "_VARIANTS_SCHEMA")
config["extra_attributes"] = {}
extra_attributes = self.serializer.extra_attributes
for attr in extra_attributes:
config["extra_attributes"][attr] = "string"
with open(filename, "w") as configfile:
content = toml.dumps(config)
configfile.write(content)
[docs] def write_dataset(self):
filenames = self._write_internal()
self.partition_descriptor.write_partition_configuration()
self.write_schema()
return filenames
[docs]class ParquetManager:
"""Provide function for producing variants and pedigree parquet files."""
[docs] @staticmethod
def build_parquet_filenames(
prefix, study_id=None, bucket_index=0, suffix=None
):
"""Build parquet filenames."""
assert bucket_index >= 0
basename = os.path.basename(os.path.abspath(prefix))
if study_id is None:
study_id = basename
assert study_id
if suffix is None and bucket_index == 0:
filesuffix = ""
elif bucket_index > 0 and suffix is None:
filesuffix = f"_{bucket_index:0>6}"
elif bucket_index == 0 and suffix is not None:
filesuffix = f"{suffix}"
else:
filesuffix = f"_{bucket_index:0>6}{suffix}"
variants_dirname = os.path.join(prefix, "variants")
summary_filename = os.path.join(
variants_dirname, f"{study_id}{filesuffix}_summary_alleles.parquet"
)
family_filename = os.path.join(
variants_dirname, f"{study_id}{filesuffix}_family_alleles.parquet"
)
pedigree_filename = os.path.join(
prefix, "pedigree", f"{study_id}{filesuffix}_pedigree.parquet"
)
conf = {
"variants_dirname": variants_dirname,
"family_alleles": family_filename,
"summary_alleles": summary_filename,
"pedigree": pedigree_filename,
}
return Box(conf, default_box=True)
[docs] @staticmethod
def families_to_parquet(families, pedigree_filename):
dirname = os.path.dirname(pedigree_filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
save_ped_df_to_parquet(families.ped_df, pedigree_filename)
[docs] @staticmethod
def variants_to_parquet(
variants_loader,
partition_descriptor,
bucket_index=1,
rows=100_000,
include_reference=False,
):
"""Read variants from variant_loader and store them in parquet."""
# assert variants_loader.get_attribute("annotation_schema") is not None
start = time.time()
variants_writer = VariantsParquetWriter(
variants_loader,
partition_descriptor,
bucket_index=bucket_index,
rows=rows,
include_reference=include_reference,
)
variants_writer.write_dataset()
elapsed = time.time() - start
print(f"DONE: for {elapsed:.2f} sec", file=sys.stderr)
[docs]def pedigree_parquet_schema():
"""Return the schema for pedigree parquet file."""
fields = [
pa.field("family_id", pa.string()),
pa.field("person_id", pa.string()),
pa.field("dad_id", pa.string()),
pa.field("mom_id", pa.string()),
pa.field("sex", pa.int8()),
pa.field("status", pa.int8()),
pa.field("role", pa.int32()),
pa.field("sample_id", pa.string()),
pa.field("generated", pa.bool_()),
pa.field("layout", pa.string()),
pa.field("not_sequenced", pa.bool_()),
]
return pa.schema(fields)
[docs]def add_missing_parquet_fields(pps, ped_df):
"""Add missing parquet fields."""
missing_fields = set(ped_df.columns.values) - set(pps.names)
if "family_bin" in missing_fields:
pps = pps.append(pa.field("family_bin", pa.int8()))
missing_fields = missing_fields - set(["family_bin"])
rename = {}
for column in missing_fields:
name = column.lower().replace(".", "_")
pps = pps.append(pa.field(name, pa.string()))
rename[column] = name
ped_df = ped_df.rename(columns=rename)
missing_fields = [rename[col] for col in missing_fields]
for column in missing_fields:
ped_df[column] = ped_df[column].apply(str)
return ped_df, pps
[docs]def save_ped_df_to_parquet(ped_df, filename, filesystem=None):
"""Save ped_df as a parquet file named filename."""
ped_df = ped_df.copy()
ped_df.role = ped_df.role.apply(lambda r: r.value)
ped_df.sex = ped_df.sex.apply(lambda s: s.value)
ped_df.status = ped_df.status.apply(lambda s: s.value)
if "generated" not in ped_df:
ped_df["generated"] = False
if "layout" not in ped_df:
ped_df["layout"] = None
if "not_sequenced" not in ped_df:
ped_df["not_sequenced"] = False
pps = pedigree_parquet_schema()
ped_df, pps = add_missing_parquet_fields(pps, ped_df)
table = pa.Table.from_pandas(ped_df, schema=pps)
pq.write_table(table, filename, filesystem=filesystem)