Class to create the corpus txt file for the semantic search model from a dataframe.
The class contains the following methods:
- concat_columns: Concatenate the columns to create the corpus from the dataframe. This will take all the columns in the dataframe and concatenate them to create the corpus.
- write_corpus_to_file: Write the corpus to a file from the concatenated columns.
Example
from fleming.discovery.corpus_creation import CorpusCreation
from pyspark.sql import SparkSession
# Not required if using Databricks
spark = SparkSession.builder.appName("corpus_creation").getOrCreate()
corpus_df = spark.read.csv("/tmp/corpus.csv", header=True, inferSchema=True)
corpus_file_path = "/tmp/search_corpus.txt"
corpus_creation = CorpusCreation(corpus_df, corpus_file_path)
corpus = corpus_creation.concat_columns()
corpus_creation.write_corpus_to_file(corpus)
Parameters:
Name |
Type |
Description |
Default |
spark |
SparkSession
|
|
required
|
corpus_df |
df
|
Source dataframe of the corpus
|
required
|
corpus_file_path |
str
|
File path to write the corpus
|
required
|
Source code in src/fleming/discovery/corpus_creation.py
| class CorpusTextCreation:
"""
Class to create the corpus txt file for the semantic search model from a dataframe.
The class contains the following methods:
1. concat_columns: Concatenate the columns to create the corpus from the dataframe. This will take all the columns in the dataframe and concatenate them to create the corpus.
2. write_corpus_to_file: Write the corpus to a file from the concatenated columns.
Example
--------
```python
from fleming.discovery.corpus_creation import CorpusCreation
from pyspark.sql import SparkSession
# Not required if using Databricks
spark = SparkSession.builder.appName("corpus_creation").getOrCreate()
corpus_df = spark.read.csv("/tmp/corpus.csv", header=True, inferSchema=True)
corpus_file_path = "/tmp/search_corpus.txt"
corpus_creation = CorpusCreation(corpus_df, corpus_file_path)
corpus = corpus_creation.concat_columns()
corpus_creation.write_corpus_to_file(corpus)
```
Parameters:
spark (SparkSession): Spark Session
corpus_df (df): Source dataframe of the corpus
corpus_file_path (str): File path to write the corpus
"""
spark: SparkSession
corpus_df: DataFrame
corpus_file_path: str
def __init__(
self, spark: SparkSession, corpus_df: DataFrame, corpus_file_path: str
) -> None:
self.spark = spark
self.corpus_df = corpus_df
self.corpus_file_path = corpus_file_path
def concat_columns(self) -> list:
"""
Concatenate the columns to create the corpus
Parameters:
None
Returns:
corpus(list): List of concatenated columns
"""
df = self.corpus_df.withColumn(
"ConcatColumns", concat_ws(" ", *self.corpus_df.columns)
)
corpus = [row["ConcatColumns"] for row in df.collect()]
return corpus
def write_corpus_to_file(self, corpus) -> None:
"""
Write the corpus to a file
Parameters:
corpus(list): List of concatenated columns
Returns:
None
"""
with open(self.corpus_file_path, "w") as file:
for sentence in corpus:
try:
file.write(sentence + "\n")
print(sentence)
except Exception as e:
logging.exception(str(e))
raise e
|
concat_columns()
Concatenate the columns to create the corpus
Parameters:
None
Returns:
corpus(list): List of concatenated columns
Source code in src/fleming/discovery/corpus_creation.py
| def concat_columns(self) -> list:
"""
Concatenate the columns to create the corpus
Parameters:
None
Returns:
corpus(list): List of concatenated columns
"""
df = self.corpus_df.withColumn(
"ConcatColumns", concat_ws(" ", *self.corpus_df.columns)
)
corpus = [row["ConcatColumns"] for row in df.collect()]
return corpus
|
write_corpus_to_file(corpus)
Write the corpus to a file
Parameters:
corpus(list): List of concatenated columns
Returns:
None
Source code in src/fleming/discovery/corpus_creation.py
| def write_corpus_to_file(self, corpus) -> None:
"""
Write the corpus to a file
Parameters:
corpus(list): List of concatenated columns
Returns:
None
"""
with open(self.corpus_file_path, "w") as file:
for sentence in corpus:
try:
file.write(sentence + "\n")
print(sentence)
except Exception as e:
logging.exception(str(e))
raise e
|