Skip to content

Commit

Permalink
Initial Commit and Updated Readme
Browse files Browse the repository at this point in the history
  • Loading branch information
davids60 committed Jun 18, 2024
1 parent 7451af4 commit 4a705a7
Show file tree
Hide file tree
Showing 2 changed files with 380 additions and 1 deletion.
270 changes: 270 additions & 0 deletions PrimerScore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
import math
import pandas as pd
import numpy as np
from os import path
from matplotlib import pyplot as plt

weights = {}
thresholds = []
replicates = 0
primerSetName = ""
rxnTypeDesignation = ""

MAX_INST_SIGNAL = 140000
threshold_perc = 0.1


# Initialization method to set weights and thresholds.
# Aruments
# set_weights := Array of length 5 which contains weights in the following order:
# Average Maximum Intensity
# Standard Deviation of Maximum Intensity
# Standard Deviation of Reaction Time
# Average Reaction Time
# False Positive
# set_thresholds := Array of threshold values to determine exponential phase beginning,
# plateau phase ending, positive reaction calculation threshold,
# and false positive threshold, respectively
# set_replicates := Number of replicates. Must be the same for all primer sets.
def initialize(set_weights = [5, 5, 10, 20, 60], set_thresholds = [3000, 200, 0.9, 0.2], set_replicates = 4):
if not (len(set_weights) == 5):
raise ValueError('Weights do not contain 5 values. Refer to readme for more information.')
if not (len(set_thresholds) == 4):
raise ValueError('thresholds do not contain 2 values. Refer to readme for moe information.')
if not (set_replicates >= 3):
raise ValueError('Number of replicates is not greater than or equal to 3.')

global weights, thresholds, replicates, primerSetName, rxnTypeDesignation
weights = {'Intensity_Avg': set_weights[0], 'Intensity_StdDev': set_weights[1], 'RxnTime_Std':set_weights[2],
'RxnTime_Avg': set_weights[3], 'False_Positives':set_weights[4]}
thresholds = set_thresholds
replicates = set_replicates

# Main primer scoring method. Call to generate output
# Arguments:
# primerData := Path to input file containing primer data formatted correctly. See readme for more details.
# output := Output file name containing .xlsx
def scorePrimers(primerData, output):
if not (len(thresholds) == 4):
raise BaseException('Not initialized. Initialize scoring')
if not path.exists(primerData):
raise ValueError('Primer Data file not found.')

if not (len(output) > 0):
raise ValueError('Ouput file name not properly defined.')

getPrimerSummary(primerData).to_csv(output, encoding='utf-8')



# Function to calculate average maximum intensity across the positive replicates of a single primer set
# Arguments:
# name := String, name of primer set to calculate maximum intensity for
# df := DataFrame, dataframe containing all primer set amplification data
def calc_max_avg(name, df):
df = df[df['Primer']==name]
pos_rxns = df[df['RxnType']=='+'].drop(['Primer', 'RxnType','ObservedType', 'RxnTime', 'RxnPenalty'], axis=1)
return pos_rxns.max(axis=1).mean()

# Function to calculate the standard deviation of the maximum intensity across the positive replicates of a single primer set
# Arguments:
# name := String, name of primer set to calculate standard deviation for
# df := DataFrame, dataframe containing all primer set amplification data
def calc_max_std(name, df):
df = df[df['Primer']==name]
pos_rxns = df[df['RxnType']=='+'].drop(['Primer', 'RxnType','ObservedType', 'RxnTime', 'RxnPenalty'], axis=1)
return pos_rxns.max(axis=1).std()

# Function to calculate average reaction time for a single primer set.
# Arguments:
# name := String, name of primer set to calculate average reaction time
# df := DataFrame, dataframe containing all primer set amplification data
def calc_rxn_time_avg(name, df):
df = df[df['Primer']==name]
pos_rxns = df[df['RxnType']=='+']
return pos_rxns['RxnTime'].mean()

# Function to calculate standard deviation of the reaction time across the positive replicates of a single primer set
# Arguments:
# name := String, name of primer set to calculate maximum intensity for
# df := DataFrame, dataframe containing all primer set amplification data
def calc_rxn_time_std(name, df):
curPrimer = df[df['Primer']==name]
pos_rxns = curPrimer[curPrimer['RxnType']=='+']
return pos_rxns['RxnTime'].to_numpy().std()

# Function to determine if a reaction is positive or not. Function deteremines a positive according to the following algorithm:
# 1. Determine the intersection point of the reaction and then the reaction reversed.
# 2. Determine the angle from the interxection to time point 0 of both the forward and reverse season.
# 3. If the angle is sufficiently close to 0 (0.95% error in the approximation of the cosine of theta), then mark as negative
# 4. Otherwise, take the derivative and if the lower quantile is greter than 0 (i.e. the signal is not noisy), return as positive
# 5. Otherwise, return negative.
# Arguments:
# rxnTimeSeries := DataSeries, time series of reaction fluoroescent data.
def determine_pos(rxnTimeSeries):
errorRateTan = 0.95

data = rxnTimeSeries
revData = rxnTimeSeries[::-1]

idxIntersect = np.where(np.greater(revData.values, data.values)==False)[0][0]

dataVec = np.array([idxIntersect, data[idxIntersect] - data[0]])
revDataVec = np.array([idxIntersect, revData[idxIntersect] - revData[0]])

magnitude = (np.linalg.norm(dataVec)*np.linalg.norm(revDataVec))

if magnitude == 0:
return 0
else:
theta = math.acos(np.inner(dataVec, revDataVec)/magnitude)

if((math.cos(theta) > errorRateTan) or math.isnan(theta)):
return 0
else:
if(data.max() > (threshold_perc * MAX_INST_SIGNAL)):
return 1
else:
return 0

# Function to determine true positives based on predicted amplification
# Arguments:
# name := String, name of primer set to calculate maximum intensity for
# df := DataFrame, dataframe containing all primer set amplification data
def calc_true_pos(name, df):
df = df[df['Primer']==name]
pos_rxns = df[df['RxnType']=='+']
true_pos_rxns = pos_rxns['ObservedType'].sum()

return true_pos_rxns

# Function to determine false positives based on predicted amplification
# Arguments:
# name := String, name of primer set to calculate maximum intensity for
# df := DataFrame, dataframe containing all primer set amplification data
def calc_false_pos(name, df):
df = df[df['Primer']==name]
neg_rxns = df[df['RxnType']=='-']
true_pos_rxns = neg_rxns['ObservedType'].sum()

return true_pos_rxns

# Function to calculate reaction time of a single reaction.
# Reaction time is determined as the maximum of the second derivative of the time series.
# Arguments:
# rxnTimeSeries := DataSeries, time series of reaction fluoroescent data.
def calc_rxn_time(rxnTimeSeries):
mat = rxnTimeSeries.to_numpy()
return np.argmax(np.gradient(np.gradient(mat)))

# Function to calculate scored value for a single item
# Arguments:
# row := DataFrame, results for a single primer set
# df_results := DataFrame, dataframe for results of all primer sets
# item := The reaction characteristic that is being scored
def calc_item_score(row, df_results, item):
item_min = df_results[item].min()
item_max = df_results[item].max()
item_range = item_max - item_min
value = row[item]
if item_range == 0:
score = 1
else:
if(item == "Intensity_Avg"):
score = 1 - ((item_max - value)/item_range)
else:
score = 1 - ((value - item_min)/item_range)
weighted_score = score * weights[item]
return weighted_score

# Function to calculate the overall score for a given primer set in water.
# row := DataFrame, results for a single primer set
# df_results := DataFrame, dataframe for results of all primer sets
def calc_overall_score(row, df_results):
overall_score = 0
for item in weights.keys():
if not item=="False_Positives":
overall_score += calc_item_score(row, df_results, item)
for i in range(0, replicates):
overall_score += row['FP_{}'.format(i+1)]
return overall_score

# Function to calculate the overall score for a given primer set in saliva.
# row := DataFrame, results for a single primer set
# df_results := DataFrame, dataframe for results of all primer sets
def calc_overall_score_saliva(row, df_results):
overall_score = 0
weights_sal = {'Intensity_Avg': 20, 'Intensity_StdDev': 10, 'RxnTime_Std':40, 'RxnTime_Avg': 30}
for item in weights_sal.keys():
overall_score += calc_item_score(row, df_results, item)
return overall_score

def calc_FP_cardinal(name, df, df_results, n):

# Calculate the total number of divisions given a number of replicates. This is just the sum of 1...num of replicates.
totalDivisions = 0
for i in range(0, replicates):
totalDivisions = totalDivisions + (i + 1)

# Allocate the total number of points for this order of false positives (i.e. 1x, 2x, etc.)
totalPoints = (weights['False_Positives'] / totalDivisions) * n

# Select only primer sets that have at least n false positives for consideration in ranking
subjectPrimerSets = df_results[df_results['False_Positives'] >= n]['PrimerSet']



if(name not in subjectPrimerSets.values):
return totalPoints
else:
subjectRxnPenalty = df[(df['RxnType']=='-') & (df['ObservedType']==1) & (df['Primer'].isin(subjectPrimerSets))].groupby('Primer')['RxnPenalty'].apply(lambda x: x.sort_values(ascending=False).iloc[n-1])
maxSubjectRxnPenalty = subjectRxnPenalty.max()

return (1 - (subjectRxnPenalty[name]/maxSubjectRxnPenalty)) * totalPoints

def calc_rxn_penalty(row):
rxnTimePenalty = 60 - row['RxnTime']
rxnIntensity = row.drop(['Primer', 'RxnType','ObservedType','RxnTime']).max()
return rxnIntensity*rxnTimePenalty

# Function to contstruct and return the results table for a set of reactions for a given file.
# Arguments:
# fileName: String, relative path to the file of interest containing time series data for primer sets.
def getPrimerSummary(fileName):
[df, primerNames] = format_df(fileName)
results = pd.DataFrame(data=primerNames, columns=['PrimerSet'])
results['TruePos'] = results.apply(lambda row: calc_true_pos(row['PrimerSet'], df), axis=1)
results['Intensity_Avg'] = results.apply(lambda row: 0 if(row['TruePos'] < replicates) else calc_max_avg(row['PrimerSet'], df), axis=1)
results['Intensity_StdDev'] = results.apply(lambda row: 0 if(row['TruePos'] < replicates) else calc_max_std(row['PrimerSet'], df), axis=1)
results['RxnTime_Avg'] = results.apply(lambda row: 0 if(row['TruePos'] < replicates) else calc_rxn_time_avg(row['PrimerSet'], df), axis=1)
results['RxnTime_Std'] = results.apply(lambda row: 0 if(row['TruePos'] < replicates) else calc_rxn_time_std(row['PrimerSet'], df), axis=1)
results['False_Positives'] = results.apply(lambda row: 0 if(row['TruePos'] < replicates) else calc_false_pos(row['PrimerSet'], df), axis=1)

for i in range(0, replicates):
results['FP_{}'.format(i+1)] = results.apply(lambda row: 0 if(row['TruePos'] < replicates) else calc_FP_cardinal(row['PrimerSet'], df, results, (i+1)), axis=1)

results['Overall_Score'] = results.apply(lambda row: 0 if(row['TruePos'] < replicates) else calc_overall_score(row, results[results['TruePos']==replicates]), axis=1)
return results

# Function to return primer names and DataFrame in a dictionary
# Arguments:
# fileName: String, relative path to the file of interest containing time series data for primer sets.
def get_primer_data(fileName):
[df, primerNames] = format_df(fileName)
return {"DataFrame": df, "primerNames": primerNames}

# Function to format time series data into DataFrame
# Arguments:
# fileName: String, relative path to the file of interest containing time series data for primer sets.
def format_df(filename):
df = pd.read_excel(filename, header=None)
primerNames = df.T[0].unique()
row_names = ['Primer', 'RxnType']
for i in range(1, 61):
row_names.append(str(i))
df.index = row_names
df = df.T
df['ObservedType'] = df.apply(lambda row: determine_pos(row.drop(['Primer', 'RxnType'])), axis=1)
df['RxnTime'] = df.apply(lambda row: calc_rxn_time(row.drop(['Primer', 'RxnType','ObservedType'])), axis=1)
df['RxnPenalty'] = df.apply(lambda row: calc_rxn_penalty(row), axis=1)
return df, primerNames
111 changes: 110 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,111 @@
# PrimerScoring
Code used in the scoring of primers developed by the Verma Lab
Contains scripts used to score primer sets.

# Setup
## Dependencies
- Pandas
- Numpy
## Installation
Copy the PrimerScore.py file to either a valid location in the search path or to the current director.

# Usage

## Data formatting
The input data file must be in .xls format **NOT .XLSX**. Additionally, the file **MUST** be formmatted in the following manner, with each column representing the data for a single reaction for a specific primer set:
- First row should contain primer set IDs in the form of *{Target}*.*{SetID}*. For example, orf1ab.2.
- Second row should contain reaction type annotation.
- For negative reaction, put **'-'** in the cell.
- For positive reactions, put **'+'** in the cell.
- Rows three and onward should contain data.

**NOTE:**
1. The data should be rectangular; this means that all data is of the same length. This has not been tested for data of different lengths.
2. There should be an equal amount of positive and negatives for a speific primer set.
3. For proper scoring, each primer set should have the same number of replicates

## Execution
### Importing
If the PrimerScore.py file is located in a valid path or the current directory, simply execute the following line to import the module.
```
import PrimerScore
```

### Initialization
To use the primer scoring method, the initialization method must first be called to set weights and tolerances.

To initialize with default weights, simply execute the following line of code:
```
PrimerScore.intialize()
```
This will initialize weights to the following value:

| Metric | Weight |
|:---:|:---:|
| Average Maximum Intensity | 5 |
| Max. Intensity Std. Dev. | 5 |
| Reaction Time Std. Dev. | 10|
| Average Reaction Time | 20|
| Number of False Positives | 60|

Additionally, the thresholds will be set at:
| Threshold | Value |
|:---:|:---:|
| Exponential Phase | 3000 |
| Plateau Phase | 200 |
| Positive Reaction Threshold | 0.9 |
| False Positive Threshold | 0.2 |

Finally, the number of replicates will be set at 4.

The thresholds above will be used to the time at which the reactThe ion enters and exits the Exponential and Plateau phases, respectively. For a reaction to be considered a complete reaction, the plateau phase must occur after the exponential phase.


To use custom weightings, execute the following line of code with the given arguments.

```
PrimerScoring.initialize(set_weights, set_thresholds, set_replicates.)
```
The arguments in the above expression are defined as:
- set_weights: An array containing weights for the following metrics in the following order:
- Avg. Max. Intensity
- Max. Intensity Std. Dev.
- Reaction time Std. Dev.
- Avg. Reaction Time
- Number of False Positives
- **THE ARRAY MUST CONTAIN WEIGHTS FOR ALL OF THE ABOVE METRICS.**
- set_thresholds: An array contianing weights for the threshold tolerances of the exponential and plateau phases, positive reaction threshold, and false positive threshold, respectively.
- **THE ARRAY MUCH CONTAIN THRESHOLDS FOR BOTH PHASES AND BOTH THRESHOLDS**
- set_replicates: An integer value greater than 3 for the number of replicates. **Must be the same for all primer sets scored.**

An example to initialize to default settings as above would be as follows:
```
PrimerScoring.intialize([5,5,10,20,60], [3000, 200, 0.9, 0.2], 4)
```

### Scoring
To calculate the scores for each primer, execute the following line of code with the given arguments:

```
PrimerScore.scorePrimers(primerData, output)
```
The arguments in the above line of code are defined as follow:
- primerData: Path (relative or absolute) or file name of excel (.xls) file containing primer data formated properly as outlined above.
- output: Output excel file (.xlsx) name or path containing primer scores and metrics. Please include .xslx extension.

An example to take an input file "WaterScore.xls" and write values to "PrimerScores_Water.xlsx" is as follows:
```
PrimerScore.scorePrimer('WaterScore.xls', 'PrimerScores_Water.xlsx')
```

### Output
After scoring primers as outlined above, the output file will have the following column headers:
- Primer Set: Column contains Primer Set ID (copied from input)
- TruePos: Column contains the number of true positives (completed reaction)
- Intensity_Avg: Calculated average maximum intensity.
- Intensity_StdDev: Calculated maximum intensity standard deviation.
- RxnTime_Avg: Calculated average reaction time.
- RxnTime_StdDev: Calculated reaction time standard deviation.
- False_Positives: Number of false positives.
- Overall Score: Calculated overall score for each primer set.


0 comments on commit 4a705a7

Please sign in to comment.