Skip to content

Code style

This page contains some guidance on code style.

Info

Additional information will be added to this page later.

Rust

Naming conventions

Naming conventions for variables:

let s: Series = ...
let ca: ChunkedArray = ...
let arr: ArrayRef = ...
let arr: PrimitiveArray = ...
let dtype: DataType = ...
let data_type: ArrowDataType = ...

Code example

use std::{
    fmt::Display,
    path::{Path, PathBuf},
    sync::Arc,
};

use arrow::array::{Array, Int32Builder, ListBuilder, RecordBatch, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};

use bstr::BString;
use derive_builder::Builder;
use log::info;
use serde::{Deserialize, Serialize};

use crate::{io::write_parquet, types::Element};

use super::{triat::Encoder, FqEncoderOption, RecordData};
use anyhow::{Context, Result};
use pyo3::prelude::*;
use rayon::prelude::*;

#[derive(Debug, Builder, Default)]
pub struct ParquetData {
    pub id: BString,          // id
    pub seq: BString,         // kmer_seq
    pub qual: Vec<Element>,   // kmer_qual
    pub target: Vec<Element>, // kmer_target
}

#[pyclass]
#[derive(Debug, Builder, Default, Clone, Serialize, Deserialize)]
pub struct ParquetEncoder {
    pub option: FqEncoderOption,
}

impl ParquetEncoder {
    pub fn new(option: FqEncoderOption) -> Self {
        Self { option }
    }

    fn generate_schema(&self) -> Arc<Schema> {
        Arc::new(Schema::new(vec![
            Field::new("id", DataType::Utf8, false),
            Field::new("seq", DataType::Utf8, false),
            Field::new(
                "qual",
                DataType::List(Box::new(Field::new("item", DataType::Int32, true)).into()),
                false,
            ),
            Field::new(
                "target",
                DataType::List(Box::new(Field::new("item", DataType::Int32, true)).into()),
                false,
            ),
        ]))
    }

    fn generate_batch(&self, records: &[RecordData], schema: &Arc<Schema>) -> Result<RecordBatch> {
        let data: Vec<ParquetData> = records
            .into_par_iter()
            .filter_map(|data| {
                let id = data.id.as_ref();
                let seq = data.seq.as_ref();
                let qual = data.qual.as_ref();
                match self.encode_record(id, seq, qual).context(format!(
                    "encode fq read id {} error",
                    String::from_utf8_lossy(id)
                )) {
                    Ok(result) => Some(result),
                    Err(_e) => None,
                }
            })
            .collect();

        // Create builders for each field
        let mut id_builder = StringBuilder::new();
        let mut seq_builder = StringBuilder::new();
        let mut qual_builder = ListBuilder::new(Int32Builder::new());
        let mut target_builder = ListBuilder::new(Int32Builder::new());

        // Populate builders
        data.into_iter().for_each(|parquet_record| {
            id_builder.append_value(parquet_record.id.to_string());
            seq_builder.append_value(parquet_record.seq.to_string());

            parquet_record.qual.into_iter().for_each(|qual| {
                qual_builder.values().append_value(qual);
            });
            qual_builder.append(true);

            parquet_record.target.into_iter().for_each(|target| {
                target_builder.values().append_value(target);
            });
            target_builder.append(true);
        });

        // Build arrays
        let id_array = Arc::new(id_builder.finish());
        let seq_array = Arc::new(seq_builder.finish());
        let qual_array = Arc::new(qual_builder.finish());
        let target_array = Arc::new(target_builder.finish());

        // Create a RecordBatch
        let record_batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                id_array as Arc<dyn Array>,
                seq_array as Arc<dyn Array>,
                qual_array as Arc<dyn Array>,
                target_array as Arc<dyn Array>,
            ],
        )?;
        Ok(record_batch)
    }

    pub fn encode_chunk<P: AsRef<Path>>(
        &mut self,
        path: P,
        chunk_size: usize,
        parallel: bool,
    ) -> Result<()> {
        let schema = self.generate_schema();
        let records = self.fetch_records(&path, self.option.kmer_size)?;
        info!("Encoding records with chunk size {} ", chunk_size);

        // create a folder for the chunk parquet files
        let file_name = path.as_ref().file_name().unwrap().to_str().unwrap();
        let chunks_folder = path
            .as_ref()
            .parent()
            .unwrap()
            .join(format!("{}_{}", file_name, "chunks"))
            .to_path_buf();
        // create the folder
        std::fs::create_dir_all(&chunks_folder).context("Failed to create folder for chunks")?;

        if parallel {
            records
                // .chunks(chunk_size)
                .par_chunks(chunk_size)
                .enumerate()
                .for_each(|(idx, record)| {
                    let record_batch = self
                        .generate_batch(record, &schema)
                        .context(format!("Failed to generate record batch for chunk {}", idx))
                        .unwrap();
                    let parquet_path = chunks_folder.join(format!("{}_{}.parquet", file_name, idx));
                    write_parquet(parquet_path, record_batch, schema.clone())
                        .context(format!("Failed to write parquet file for chunk {}", idx))
                        .unwrap();
                });
        } else {
            records
                .chunks(chunk_size)
                .enumerate()
                .for_each(|(idx, record)| {
                    let record_batch = self
                        .generate_batch(record, &schema)
                        .context(format!("Failed to generate record batch for chunk {}", idx))
                        .unwrap();
                    let parquet_path = chunks_folder.join(format!("{}_{}.parquet", file_name, idx));
                    write_parquet(parquet_path, record_batch, schema.clone())
                        .context(format!("Failed to write parquet file for chunk {}", idx))
                        .unwrap();
                });
        }

        Ok(())
    }
}