Introduction¶
In Coursework 1: "Sentiment Induction using the Bible as a Parallel Corpus", it was proposed that a the bible can be used as a Parallel Corpus source to transfer sentiment ratings between languages. This was shown to be successful from the display of differentiable wordclouds of positive and negative words.
In Coursework 1, some improvements were proposed, including:
- Using a larger primary and secondary corpus for the initial lexicon generation
- Trimming the 0 scoring verses from the primary corpus
Many other improvements were also made, including:
- The automatic download and extraction of the corpora
- The trimming of possible (calculated) stopwords from the lexicon
These improvements are made in this coursework.
In addition Coursework 2 attempts to integrate the sentiment lexicon resulting from Coursework 1 into a sentiment analysis pipeline, which can be used to evaluate the sentiment text of the requested language. In this case, comments extracted from the popular Afrikaans news website "Maroela Media". This is done in attempt to demonstrate the feasibility of utilizing "Parallel Corpus Sentiment Induction" in a production environment for a language without a decent sentiment lexicon.
Purpose: To implement a sentiment analysis pipeline capable of providing sentiment scores for the Afrikaans news website "Maroela Media".
Key Objectives:
- Implementing and improving the Lexicon generation pipeline from Coursework 1
- Fetch comments from the Wordpress API of "Maroela Media"
- Rate the sentiment of comments
- Generate a report on the sentiment scores, including:
- Correlations of sentiment scores between the article texts and the comments
- The change of sentiment scores over time
- Implement unit tests for the pipeline
- Implement logging of the pipeline actions and errors
Prepare Environment¶
Load Packages¶
# Reading from the configuration file
import configparser
# Unit testing
import unittest
from unittest.mock import patch, mock_open, MagicMock
# Logger
import logging
import json
from datetime import datetime
import platform
import inspect
#Read/Write Data
import os
from zipfile import ZipFile
import sqlite3
import sqlalchemy
from sqlalchemy import create_engine, select, update
from sqlalchemy.dialects.sqlite import insert
#Data Processing/Analysis
import pandas as pd
import numpy as np
from statistics import mean
import re
import string
import html
from itertools import groupby
#Graphing
import pandas as pd
import matplotlib.pyplot as plt
#Data Retrieval
import requests
#NLP
from nltk.stem import PorterStemmer
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
#Miscellaneous
from datetime import datetime, timedelta
import math
import time
Python Libraries and Alternatives Table¶
Category | Libraries Used | Alternatives | Reasoning |
---|---|---|---|
Configuration | configparser | json, yaml, os.environ | Chosen for ease of use and ability to include comments in the configuration file. Also part of Python's standard library. |
Unit Testing | unittest | pytest, nose | Offers a robust, well-documented built-in framework with no additional dependencies. |
Logger | logging | loguru, sentry | Python's built-in logging framework, selected for its reliability and native support. |
Reading and Writing Data | os, zipfile, sqlite3, sqlalchemy | shutil, gzip, PostgreSQL, MySQL | Part of Python's standard libraries, chosen for simplicity, ease of use, and no extra dependencies. SQLite doesn't require additional software or hosting. |
Data Processing and Analysis | pandas, numpy | dask, scipy | Well-supported and widely used for data manipulation and math operations. Chosen for simplicity and not requiring large data sets to be effective. |
Graphing | matplotlib | seaborn, plotly | Robust and offers basic graphing capabilities. Chosen for its simplicity. |
NLP | nltk.stem, vaderSentiment | spaCy, TextBlob | Specialized focus on stemming and sentiment analysis. Lightweight and simple, with vaderSentiment offering positive-negative sentiment rating. |
Miscellaneous | datetime, timedelta, math, time | arrow, dateutil | Part of Python's standard library, offering simple support for date, time, and math functions. No additional dependencies required. |
This table summarizes the libraries used in various categories of a Python project, their alternatives, and the reasoning behind their selection. The focus has primarily been on simplicity and minimal dependencies.
Setup Config Reader¶
To improve usability of the program, a config file
The config reader has been setup to improve the usability of the config.ini
file.
It allows for the retrieving of configuration values from the file using a simple structure:
config['section']['variable']
This implementation also converts the variables to the appropriate types.
config = ConfigReader('config.ini')
Setup Logger¶
Using the logger
library opposed to create every functionality from scratch allows for easier and a more consistent implementation of logging.
The logger has been set up, drawing much inspiration from ideal messages as would be expected in AWS Cloudwatch logs (JSON Structures).
For now the error messages are logged to the .log
files, which can considered a local 'Cloudwatch' for the purpose of this assignment.Less detailed error messages are also setup to be printed, alongside messages of other log levels.
Some libraries such as requests
and sqlalchemy
have their own implementation of the logger
library on INFO and DEBUG levels, which interferes with this implementation, so have been set to only log higher levels.
Define Classes¶
bible
¶
class Bible:
ConnectionPath: str
Corpus: str
Name: str
Source: str
NameShorthand: str
def __init__(self, connectionPath, corpus, source, name, nameShorthand =""):
self.ConnectionPath = connectionPath
self.Corpus = corpus
self.Name = name
self.Source = source
self.NameShorthand = nameShorthand
Setup SQLAlchemy¶
For the project, it was decided to implement SQLAlchemy as ORM, and connection provider. This was done to ease and support the development process, automate database creation, and to showcase some more advanced Software Development.
The SQLAlchemy implementation has been through multiple iterations, both to standardize naming and to add more fields to tables.
Best practice would be to implement end enforce foreign key constraints, but that would add additional complexity to the already complex project.
Comment
Table:
- Contains information regarding the comments left on a post, and has a many-to-one relationship with the
Post
table.
Post
Table:
- Contains information regarding the post
- Has a one-to-many relationship with the
Comment
table - Has a many-to-many relationship with the
Category
table - Has a many-to-many relationship with the
Tag
table
from sqlalchemy import create_engine, ForeignKey
from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.orm import declarative_base
engine = create_engine(config["Database"]["WordPressSync"], echo=False)
Base = declarative_base()
class WP_Comment(Base):
__tablename__ = "Comment"
comment_id = Column(Integer, primary_key=True)
post_id = Column(Integer)
date = Column(sqlalchemy.DATETIME)
content = Column(String)
clean_content = Column(String)
score = Column(sqlalchemy.Float)
class WP_Post(Base):
__tablename__ = "Post"
post_id = Column(Integer, primary_key=True)
link = Column(String)
date = Column(sqlalchemy.DATETIME)
modified = Column(sqlalchemy.DATETIME)
author = Column(Integer)
title = Column(String)
title_clean = Column(String)
title_score = Column(sqlalchemy.Float)
content = Column(String)
content_clean = Column(String)
content_score = Column(sqlalchemy.Float)
class WP_PostCategory(Base):
__tablename__ = "PostCategory"
post_id = Column(Integer, primary_key=True)
category_id = Column(Integer, primary_key=True)
class WP_PostTag(Base):
__tablename__ = "PostTag"
post_id = Column(Integer, primary_key=True)
tag_id = Column(Integer, primary_key=True)
class WP_Category(Base):
__tablename__ = "Category"
category_id = Column(Integer, primary_key=True)
name = Column(String)
link = Column(String)
parent_id = Column(Integer)
class WP_Tag(Base):
__tablename__ = "Tag"
tag_id = Column(Integer, primary_key=True)
name = Column(String)
link = Column(String)
Base.metadata.create_all(engine)
Set Constants¶
The Vader sentiment analyzer is set to const_sia_obj
to be used in scoring the primary corpus in scorePrimaryCorpus()
.
const_sia_obj = SentimentIntensityAnalyzer()
Clean Data - Helper Function¶
This function removes unwanted characters and words from the provided text.
Cleaning text is required in multiple places, and the function is therefore made to cater for all those possibilities.
def clean_text(
text,
stopwords = [],
remove_markup = False,
remove_bracketed = False,
remove_punctuation = False,
remove_special_characters = False,
replace_dashes = False
):
#remove html
if remove_markup:
text = html.unescape(text)
text = re.sub('<[^<]+?>', '', text)
text = text.strip()
text = text.replace("\n"," ")
#remove text within brackets eg. [2], as well as the brackets. Mostly used to denote sub text links.
if remove_bracketed:
text = re.sub(r'\[.*?\]', '', text)
#replaces the three types of dashes encountered in the data sources
if replace_dashes or remove_punctuation:
text = text.replace('-',' ')
text = text.replace('–',' ')
text = text.replace('—',' ')
#remove strange characters
if remove_special_characters:
text = "".join([char for char in text if ord(char) < 128])
#this is different from the remove_special_characters as this keeps non-ascii
if remove_punctuation:
text = "".join([i for i in text if (i not in string.punctuation and i not in ['…','“','”','‘','’'])])
#removes words if contained in the provided list
if (len(stopwords) > 0):
text = " ".join([w for w in text.split(' ') if w.lower() not in stopwords])
#removes consecutive empty spaces.
text = re.sub(' +', ' ', text)
return text
Load or generate the Sentiment Lexicon¶
Read list of bible corpora¶
Function to read and map list of bible
details from the specified csv file.
The csv file can be specified in the config.ini
, as LexiconGeneration -> CorpusDetailsFileName
def get_corpus_list():
return np.array(
list(map(
lambda bible_details:
Bible(
connectionPath=bible_details[0],
corpus=bible_details[1],
source=bible_details[2],
name=bible_details[3],
nameShorthand=bible_details[4]
),
pd.read_csv(config["LexiconGeneration"]["CorpusDetailsFileName"]).values.tolist()
))
)
Download Corpora¶
Function to download the bible corpora from a list of bible
s, they are downloaded as zip files to the specified folder, and then extract them to a different specified file.
- If the directories does not exist, they are created.
- If the zip files do not exist, they are downloaded.
- If the bibles have been downloaded but not extracted, they are extracted.
This function is executed and the returned list is used in the generateSentimentLexicon()
function, as part of the retrieving and scoring of the primary corpus, as well as the ultimate transfer of the scores to the secondary corpus.
The function currently only caters for the corpus being downloaded from https://www.ph4.org/ which is zip files containing SQLite databases in a specific file structure.
def download_corpus(bibles: list[Bible], zip_download_path = 'zipped', extracted_path = 'corpus'):
Logger.debug('Start - download_corpus()')
try:
#Download missing corpus files
if not os.path.exists(zip_download_path):
os.makedirs(zip_download_path)
zippedFiles = os.listdir(zip_download_path)
# Loop through the bibles to check if their corresponding zip files exist
for bible in filter(lambda x: f"{x.NameShorthand}.zip" not in zippedFiles, bibles):
Logger.info(f"Downloading: {bible.NameShorthand}.zip")
response = requests.get(bible.Source)
with open(f"{zip_download_path}/{bible.NameShorthand}.zip", "wb") as file:
file.write(response.content)
file.close()
#Extract un-extracted files
if not os.path.exists(extracted_path):
os.makedirs(extracted_path)
Logger.info(f"Created: {extracted_path}")
# Group the bibles by their corpus and process them
for corpusGroup in groupby(bibles, lambda x: x.Corpus):
corpusFolderName = f"{extracted_path}/{corpusGroup[0]}"
if not os.path.exists(corpusFolderName):
os.makedirs(corpusFolderName)
Logger.info(f"Created: {corpusFolderName}")
corpusFiles = os.listdir(extracted_path)
# Loop through each bible and extract its SQLite3 file if it doesn't already exist
for bible in bibles:
path = f"{extracted_path}/{bible.Corpus}"
if not os.path.exists(f"{extracted_path}/{bible.Corpus}/{bible.NameShorthand}.SQLite3"):
with ZipFile(f"{zip_download_path}/{bible.NameShorthand}.zip", 'r') as zObject:
Logger.info(f"Extracting: {bible.NameShorthand}.SQLite3")
zObject.extract(f"{bible.NameShorthand}.SQLite3",path=path)
except:
Logger.error("Could not download the corpora", exc_info=True)
finally:
Logger.debug('Start - download_corpus()')
Read Corpora¶
Function to read bible verses using a list of bibles, these bibles will have to be downloaded prior to reading using the downloadCorpus()
function.
As mentioned above, the project is currently reliant on a specific structure of the corpus, which is shared, however, in the future this could be expanded to cater for multiple sources of data.
def read_corpus(bibles: list[Bible], corpus_folder = "corpus"):
corpus = pd.DataFrame()
for bible in bibles:
filePath = f"{corpus_folder}/{bible.Corpus}/{bible.NameShorthand}.SQLite3"
con = sqlite3.connect(filePath)
df = pd.read_sql_query('SELECT * FROM verses', con)
con.close()
#To keep track of the corpus version for debug purposes
df["bible_version"] = bible.Name
corpus = pd.concat([corpus, df], ignore_index=True)
return corpus
Score Sentiment¶
A simple function that receives text and uses the Vader sentiment analyzer to determine the sentiment score, which is then returned.
This is vectorized to speed up execution. In the case of Coursework 1, the swifter
library was used, but after careful evaluation and experimentation vectorization proved to have higher performance. As well as out preforming scoring without either the swifter
library or vectorization.
def score_sentiment(text):
if text is None or text.strip() == "":
return 0.0
sentiment_dict = const_sia_obj.polarity_scores(text)
return sentiment_dict['compound']
Score Primary Corpus¶
This function's purpose is to compute the sentiment scores for the primary corpus.
- The function starts by checking if the scores have been computed already
- If it has, then check it is required to recompute the scores
- if either the scores have not been computed or it should be recomputed, then continue to compute, otherwise return the previously computed scores.
- To compute new scores the primary corpus is read by the
readCorpus()
function. - The dirty text is cleaned by the
cleanText()
function - The
score_sentiment()
function is vectorized and ran on the text of the corpus - The 0.00 scored entries are removed from the data set.
- This is done to polarize the results of the scoring, which in turn will result in a wider range of scores in the secondary corpus and finally the sentiment lexicon.
- The average scores of parallel entries are computed.
- And finally the scores for the primary corpus are saved, read and returned.
def score_primary_corpus(corpus_list, primary_corpus_group, corpus_path, score_database, recalculate = True):
Logger.debug('Start - score_primary_corpus()')
try:
verse_scores = pd.DataFrame()
#connect to the provided database or create a new one
con = sqlite3.connect(score_database)
#get the amount of database entries
verse_scores_count = pd.read_sql_query('SELECT count(*) FROM score', con)['count(*)'][0]
Logger.debug(f'Primary corpus verse scores found: [{verse_scores_count}]')
#check if the scores should be recalculated, if no entries were found then also recalculate
if (recalculate or verse_scores_count == 0) :
Logger.info('Recalculating primary corpus sentiment scores')
#read and clean the corpus
data_frame_primary = read_corpus(filter(lambda x: f"{x.Corpus}" == primary_corpus_group, corpus_list), corpus_path)
data_frame_primary["text"] = data_frame_primary["text"].apply(lambda x: clean_text(
x,
remove_markup = True,
remove_bracketed = True
))
Logger.debug('Start scoring of text')
#score the text using a vectorized `score_sentiment`
vectorized_score_sentiment = np.vectorize(score_sentiment)
data_frame_primary[['compound']] = pd.DataFrame(vectorized_score_sentiment(data_frame_primary["text"]))
Logger.debug('End scoring of text')
Logger.debug('Remove 0.00 score verses')
#remove entries from the primary corpus if the score is 0.00
data_frame_primary = data_frame_primary[data_frame_primary['compound'] != 0.00]
#average out the scores for the parallel entries
verse_scores = data_frame_primary.groupby(['book_number','chapter','verse']).agg(
compound = pd.NamedAgg(column="compound", aggfunc = 'mean')
).reset_index()
#save to database
verse_scores.to_sql('score',con,if_exists='replace')
Logger.info('Saved verse scores to database')
verse_scores = pd.read_sql_query('SELECT * FROM score', con)
con.close()
return verse_scores
except:
Logger.error("Could not score the primary corpus", exc_info=True)
return None
finally:
Logger.debug("End - score_primary_corpus()")
Generate Sentiment Lexicon¶
This function, although heavily modified is inspired by the work done in the preceding Coursework 1,
There has been many iterations of this function over the course of the project to include features like:
- the cleaning of text
- triggering of the corpus downloads
- lexicon trimming to remove words with low occurrences
- lexicon trimming to remove words only occurring with capital first letters
- lexicon trimming to remove words with very high occurrences, later removed in favor of generating a list of stopwords.
- generating a list of stopwords based on the occurrence frequency combined with the word score.
- if a word has a high occurrence but low polarity it is more likely to be a stopword
- occurences*(1/absolute score)
Primary Function¶
- Function begins with logging its initiation and initializes a flag for lexicon regeneration to
False
. - Evaluates conditions to determine whether lexicon regeneration is required or not.
- These conditions could include explicit instruction, absence of the lexicon file, or an empty lexicon.
- If lexicon regeneration is needed:
- Validates the presence of essential input variables.
- Calls a separate function to generate the lexicon and a list of potential stopwords.
- Saves the newly generated lexicon and list of stopwords to their respective files.
- If lexicon regeneration is not needed, reads the existing lexicon and list of stopwords from pre-specified files.
- Logs details about the number of lexicon entries and marks the end of function execution.
- Returns the lexicon and the list of stopwords.
- If an exception occurs, logs an error and returns a
None
value.
def load_sentiment_lexicon(
lexicon_file_name: str,
score_database: str,
primary_corpus_group,
secondary_corpus_group,
always_regenerate = False,
generation_minimum_word_occurrence = 5,
download_path = 'download',
extracted_path = 'corpus',
recalculate_primary_corpus_scores = False,
stopword_count = 250
):
Logger.debug("Start - load_sentiment_lexicon()")
try:
regenerate_lexicon = False
if(always_regenerate):
regenerate_lexicon = True
elif(not os.path.isfile(lexicon_file_name)):
Logger.info("Lexicon file not found")
regenerate_lexicon = True
elif(len(pd.read_csv(lexicon_file_name)) <= 1):
Logger.info("Lexicon is empty")
regenerate_lexicon = True
if(regenerate_lexicon):
Logger.info("Generating Lexicon")
if( not download_path or
not extracted_path):
raise Exception("The following input variables are also required to generate the lexicon: [download_path, extracted_path]")
(word_lexicon,possible_stopwords) = generate_sentiment_lexicon(
primary_corpus_group,
secondary_corpus_group,
download_path = download_path,
extracted_path = extracted_path,
score_database= score_database,
recalculate_primary_corpus_scores = recalculate_primary_corpus_scores,
generation_minimum_word_occurrence = generation_minimum_word_occurrence,
stopword_count = stopword_count
)
word_lexicon.to_csv(lexicon_file_name)
open('stopwords.txt', 'w', encoding='utf-8').write('\n'.join(possible_stopwords))
word_lexicon = pd.read_csv(lexicon_file_name)
possible_stopwords = open('stopwords.txt', 'r',encoding='utf-8').read().split('\n') if os.path.exists('stopwords.txt') else []
Logger.info("Populated Lexicon file found")
Logger.debug(f'Lexicon entries: {len(word_lexicon)}')
return (word_lexicon, possible_stopwords)
except:
Logger.error("Could not load sentiment lexicon", exc_info=True)
return None
finally:
Logger.debug("End - load_sentiment_lexicon()")
Load or Sync the WordPress data¶
The goal of this section of the project is to provide functions able to sync data of a WordPress website incrementally.
The incremental sync hinges on two types of state checks:
- Last updated date checks for the Posts and Comments
- Checking for missing entries, is implemented in the Category and Tag entities.
Categories and Tags, having a many-to-many relationship withe Posts, will first have their IDs synced into the intermediary tables, PostCategories
and PostTags
.
Then referencing those, checking which IDs exits in the intermediary tables but not in the full entity table.
The wordpress API allows for public access, negating the need for front-end crawling, and allowing bulk, and specific calls to be made.
- For the Posts and Comments, the API can be called, passing a date filter as parameter.
- For Tags and Categories, the API can be called, passing in specific ID's that should be retrieved
A challenge with passing the ID's is that the WordPress API has a limitation in the in-ability to comprehend long URLs of requests. This prevents the calling of all Tags or Categories by their IDs at once and paging through them all. To handle this, the ID lists also need to be split into batches of smaller chunks.
For all the sync, a "ping" to the server is made first, primarily to retrieve the list length (the "x-wp-total" header) to setup the paging for requests. It also checks that the site is live before attempting any further calls.
These syncs can be run independently, while if Posts have not been synced, the Category and Tag syncs will not have any effect.
Comments¶
Get last comment date¶
def get_last_comment_date():
last_comment_date = 0
with engine.connect() as conn:
last_comment_date = conn.execute(
select(WP_Comment.date).order_by(sqlalchemy.desc(WP_Comment.date)).limit(1)
).first()
if (last_comment_date == None) : last_comment_date = (datetime.utcnow() - timedelta(weeks=52)).isoformat()
else: last_comment_date = last_comment_date[0].isoformat()
conn.close()
return last_comment_date
Download new comments from WordPress¶
def download_new_word_press_comments(base_url: str, request_page_size = 100 , after_date = None):
Logger.debug("Start - download_new_word_press_comments()")
responseJson = []
try:
requestUrl = f'{base_url}/wp-json/wp/v2/comments'
last_comment = get_last_comment_date()
if (after_date == None): after_date = last_comment
elif(datetime.fromisoformat(after_date) < datetime.fromisoformat(last_comment)): after_date = last_comment
Logger.info(f'Latest comment created at [{last_comment}]')
#ping url
ping = requests.get(f'{requestUrl}?per_page=1&after={after_date}')
header_total = ping.headers.get('x-wp-total')
if (header_total == None or header_total == '0'):
Logger.info(f'No new comments to download')
return []
count = int(header_total)
pages = math.ceil(count/request_page_size)
page = 1
Logger.info(f'New comments to download: [{count}]')
while (page <= pages):
time.sleep(0.5)
Logger.debug(f"Getting Comments - Page [{page}/{pages}]")
requestUrl = f'{base_url}/wp-json/wp/v2/comments'
requestUrl += f'?per_page={request_page_size}&page={page}&orderby=date&order=asc'
requestUrl += f'&after={after_date}'
response = requests.get(requestUrl)
responseJson += response.json()
page += 1
except:
Logger.error("Could not load sentiment lexicon", exc_info=True)
finally:
Logger.debug("End - download_new_word_press_comments()")
return responseJson
Save comments to the database¶
def save_comments(comments_json):
Logger.debug("Start - save_comments()")
if(len(comments_json) < 1):
Logger.debug("No comments found to save to databases")
return
comments = list(map(
lambda x: {
'comment_id': x['id'],
'post_id': x['post'],
'date': datetime.fromisoformat(x['date']),
'content': x['content']['rendered']
}, comments_json))
with engine.connect() as conn:
query = insert(WP_Comment).values(
comments
)
query = query.on_conflict_do_update(
index_elements=['comment_id'],
set_=dict(query.excluded),
)
conn.execute(query)
conn.commit()
conn.close()
Logger.info(f'New comments saved to database')
Logger.debug("End - save_comments()")
Load comments from database¶
def load_comments_from_database(to_clean = False, to_rate = False):
Logger.debug("Start - load_comments_from_database()")
comments = []
with engine.connect() as conn:
statement = (select(WP_Comment).order_by(sqlalchemy.desc(WP_Comment.date)))
if (to_clean): statement = statement.where(sqlalchemy.or_(
WP_Comment.clean_content == None,
WP_Comment.clean_content == ''))
if (to_rate): statement = statement.where(sqlalchemy.or_(
WP_Comment.score == None,
WP_Comment.score == ''))
comments = conn.execute(
statement
).all()
conn.close()
Logger.debug("End - load_comments_from_database()")
return pd.DataFrame(comments)
Posts (News Articles)¶
Get last post date¶
def get_last_post_date():
last_post_date = 0
with engine.connect() as conn:
last_post_date = conn.execute(select(WP_Post.date).order_by(sqlalchemy.desc(WP_Post.date)).limit(1)).first()
if (last_post_date == None) : last_post_date = (datetime.utcnow() - timedelta(weeks=52)).isoformat()
else: last_post_date = last_post_date[0].isoformat()
conn.close()
return last_post_date
Download new posts from WordPress¶
def download_new_word_press_posts(base_url: str, request_page_size = 100, after_date = None):
Logger.debug("Start - download_new_word_press_posts()")
requestUrl = f'{base_url}/wp-json/wp/v2/posts'
last_post = get_last_post_date()
if (after_date == None): after_date = last_post
elif(datetime.fromisoformat(after_date) < datetime.fromisoformat(last_post)): after_date = last_post
Logger.info(f'Latest post created at [{last_post}]')
#ping url
ping = requests.get(f'{requestUrl}?per_page=1&after={after_date}')
header_total = ping.headers.get('x-wp-total')
if (header_total == None or header_total == '0'):
Logger.info(f'No new posts to download')
Logger.debug("End - download_new_word_press_posts()")
return []
count = int(header_total)
pages = math.ceil(count/request_page_size)
page = 1
Logger.info(f'New posts to download: [{count}]')
responseJson = []
while (page <= pages):
time.sleep(0.5)
Logger.debug(f"Getting Posts - Page [{page}/{pages}]")
requestUrl = f'{base_url}/wp-json/wp/v2/posts'
requestUrl += f'?per_page={request_page_size}&page={page}&orderby=date&order=asc'
requestUrl += f'&after={after_date}'
response = requests.get(requestUrl)
responseJson += response.json()
page += 1
Logger.debug("End - download_new_word_press_posts()")
return responseJson
Save posts to database¶
One of the major decision influencers of this function was the initial sync of posts, which poses a challenge as there is a limit to the number of variables that can be provided in an INSERT statement in SQLite.
def save_posts(posts_json):
Logger.debug("Start - save_posts()")
try:
if(len(posts_json) < 1):
Logger.debug("No posts found to save to databases")
return
posts = list(map(
lambda post: {
'post_id': post['id'],
'link': post['link'],
'date': datetime.fromisoformat(post['date']),
'modified': datetime.fromisoformat(post['modified']),
'author': post['author'],
'title': post['title']['rendered'],
'content': post['content']['rendered'],
}, posts_json))
with engine.connect() as conn:
query = insert(WP_Post).values(
posts
)
query = query.on_conflict_do_update(
index_elements=['post_id'],
set_=dict(query.excluded),
)
conn.execute(query.execution_options(insertmanyvalues_page_size=50))
conn.commit()
conn.close()
Logger.info(f'New posts saved to database')
except:
Logger.error("Could not save posts", exc_info=True)
finally:
Logger.debug("End - save_posts()")
def save_post_categories(posts_json):
Logger.debug("Start - save_post_categories()")
if(len(posts_json) < 1):
Logger.debug("No posts found to save to databases")
return
split_categories = lambda post: [{'post_id': post['id'], 'category_id': category_id} for category_id in post['categories']]
# Use the map function to split categories and flatten the result
mapped_list = list(map(split_categories, posts_json))
mapped_list = [obj for sublist in mapped_list for obj in sublist]
with engine.connect() as conn:
query = insert(WP_PostCategory).values(
mapped_list
)
query = query.on_conflict_do_nothing(
index_elements=['post_id', 'category_id'],
)
conn.execute(query)
conn.commit()
conn.close()
Logger.info(f'New post categories saved to database')
Logger.debug("End - save_post_categories()")
def save_post_tags(posts_json):
Logger.debug("Start - save_post_tags()")
if(len(posts_json) < 1):
Logger.debug("No posts found to save to databases")
return
split_tags = lambda post: [{'post_id': post['id'], 'tag_id': tag_id} for tag_id in post['tags']]
# Use the map function to split categories and flatten the result
mapped_list = list(map(split_tags, posts_json))
mapped_list = [obj for sublist in mapped_list for obj in sublist]
with engine.connect() as conn:
query = insert(WP_PostTag).values(
mapped_list
)
query = query.on_conflict_do_nothing(
index_elements=['post_id', 'tag_id'],
)
conn.execute(query)
conn.commit()
conn.close()
Logger.info(f'New post tags saved to database')
Logger.debug("End - save_post_tags()")
Load posts from database¶
def load_posts_from_database(to_clean = False, to_rate = False):
Logger.debug("Start - load_posts_from_database()")
posts = []
with engine.connect() as conn:
statement = (select(WP_Post).order_by(sqlalchemy.desc(WP_Post.date)))
if (to_clean): statement = statement.where(sqlalchemy.or_(
WP_Post.content_clean == None, WP_Post.title_clean == None,
WP_Post.content_clean == '', WP_Post.title_clean == ''))
if (to_rate): statement = statement.where(sqlalchemy.or_(
WP_Post.content_score == None, WP_Post.title_score == None,
WP_Post.content_score == '', WP_Post.title_score == ''))
posts = conn.execute(
statement
).all()
conn.close()
Logger.debug("End - load_posts_from_database()")
return pd.DataFrame(posts)
Categories¶
Get missing categories¶
def get_missing_categories():
categories = []
with engine.connect() as conn:
categories = conn.execute(
select(WP_PostCategory.category_id.distinct())
.where(WP_PostCategory.category_id.not_in(
select(WP_Category.category_id)
))
)
conn.close()
return list(map(lambda entry: str(entry[0]), categories.all()))
Download missing categories¶
def download_missing_categories(base_url: str, request_page_size = 100):
Logger.debug("Start - download_missing_categories()")
requestUrl = f'{base_url}/wp-json/wp/v2/categories'
missing_categories = get_missing_categories()
if (len(missing_categories) == 0):
Logger.info(f'No missing categories')
Logger.debug("End - download_missing_categories()")
return []
Logger.info(f'Missing categories: [{len(missing_categories)}]')
missing_categories_string = ','.join(missing_categories)
#ping url
ping = requests.get(f'{requestUrl}?per_page=1&include={missing_categories_string}')
count = int(ping.headers.get('x-wp-total'))
pages = math.ceil(count/request_page_size)
page = 1
Logger.info(f'New categories to download: [{count}]')
responseJson = []
while (page <= pages):
time.sleep(0.5)
Logger.debug(f"Getting Categories - Page [{page}/{pages}]")
requestUrl += f'?per_page={request_page_size}&page={page}'
requestUrl += f'&include={missing_categories_string}'
response = requests.get(requestUrl)
responseJson += response.json()
page += 1
Logger.debug("End - download_missing_categories()")
return responseJson
Save Categories¶
def save_categories(posts_json):
Logger.debug("Start - save_categories()")
if(len(posts_json) < 1):
Logger.debug("No categories found to save to database")
return
categories = list(map(
lambda post: {
'category_id': post['id'],
'link': post['link'],
'name': post['name'],
'parent_id': post['parent']
}, posts_json))
with engine.connect() as conn:
query = insert(WP_Category).values(
categories
)
query = query.on_conflict_do_update(
index_elements=['category_id'],
set_=dict(query.excluded),
)
conn.execute(query)
conn.commit()
conn.close()
Logger.info(f'New categories saved to database')
Logger.debug("End - save_categories()")
Tags¶
Get missing tags¶
def get_missing_tags():
tags = []
with engine.connect() as conn:
tags = conn.execute(
select(WP_PostTag.tag_id.distinct())
.where(WP_PostTag.tag_id.not_in(
select(WP_Tag.tag_id)
))
)
conn.close()
return list(map(lambda entry: str(entry[0]), tags.all()))
Download missing tags¶
def download_missing_tags(base_url: str, request_page_size = 100, batch_size = 50):
Logger.debug("Start - download_missing_tags()")
responseJson = []
requestUrl = f'{base_url}/wp-json/wp/v2/tags'
try:
missing_tags = get_missing_tags()
missing_tags_count = len(missing_tags)
if (missing_tags_count == 0):
Logger.info(f'No missing tags')
Logger.debug("End - downloadMissingTags")
return []
Logger.info(f'Missing tags to download: [{missing_tags_count}]')
batch_number = 1
batches = math.ceil(missing_tags_count/batch_size)
if (batch_size < request_page_size): request_page_size = batch_size
for i in range(0, missing_tags_count, batch_size):
batch = missing_tags[i:i + batch_size]
missing_tags_string = ','.join(batch)
#ping url
pages = 1
page = 1
requestUrl = f'{base_url}/wp-json/wp/v2/tags'
if (batch_size > request_page_size):
ping = requests.get(f'{requestUrl}?per_page=1&include={missing_tags_string}')
count = int(ping.headers.get('x-wp-total'))
pages = math.ceil(count/request_page_size)
while (page <= pages):
time.sleep(0.5)
Logger.debug(f"Getting Tags - Batch [{batch_number}/{batches}] - Page [{page}/{pages}]")
requestUrl += f'?per_page={request_page_size}&page={page}'
requestUrl += f'&include={missing_tags_string}'
response = requests.get(requestUrl)
responseJson += response.json()
page += 1
batch_number += 1
except:
Logger.error("Error while fetching Tags", exc_info=True, model = {requestUrl})
finally:
Logger.debug("End - download_missing_tags()")
return responseJson
Save Tags¶
def save_tags(posts_json):
Logger.debug("Start - save_tags()")
try:
if(len(posts_json) < 1):
Logger.debug("No tags found to save to database")
return
tags = list(map(
lambda post: {
'tag_id': post['id'],
'name': post['name'],
'link': post['link']
}, posts_json))
with engine.connect() as conn:
query = insert(WP_Tag).values(
tags
)
query = query.on_conflict_do_update(
index_elements=['tag_id'],
set_=dict(query.excluded),
)
conn.execute(query)
conn.commit()
conn.close()
except:
Logger.error("Could save tags", exc_info=True)
finally:
Logger.info(f'New tags saved to database')
Logger.debug("End - save_tags")
Clean HTML data¶
For the cleaning of both the Post and Comment data the pattern is followed as:
- Retrieve the data
- Clean
- Map cleaned field(s) and primary key
- Save to database
Cleaning the data can also be run independently of the other syncs
Clean Posts¶
def clean_posts():
Logger.debug("Start - clean_posts()")
try:
posts = load_posts_from_database(to_clean=True)
if (len(posts) == 0):
Logger.info("No posts found to clean")
return
posts['title_clean'] = posts['title'].apply(lambda x:
clean_text(x, remove_markup = True, remove_bracketed = True, remove_punctuation = True))
posts['content_clean'] = posts['content'].apply(lambda x:
clean_text(x, remove_markup = True, remove_bracketed = True, remove_punctuation = True))
posts = posts[['post_id','title_clean','content_clean']].to_dict(orient='records')
with engine.connect() as conn:
query = insert(WP_Post).values(
posts
)
query = query.on_conflict_do_update(
index_elements=['post_id'],
set_=dict(title_clean = query.excluded.title_clean, content_clean = query.excluded.content_clean),
)
conn.execute(query)
conn.commit()
conn.close()
except:
Logger.error("Could not clean posts", exc_info=True)
finally:
Logger.debug("End - clean_posts()")
Clean Comments¶
def clean_comments():
Logger.debug("Start - clean_comments()")
try:
comments = load_comments_from_database(to_clean=True)
if (len(comments) == 0):
Logger.info("No comments found to clean")
return
comments['clean_content'] = comments['content'].apply(lambda x:
clean_text(x,
remove_markup = True,
remove_bracketed = True,
replace_dashes = True,
#remove_special_characters = True,
remove_punctuation = True
))
comments = comments[['comment_id','clean_content']].to_dict(orient='records')
with engine.connect() as conn:
if(len(comments) > 0):
batches = [comments[i:i + 100] for i in range(0, len(comments), 100)]
for i, batch in enumerate(batches):
query = insert(WP_Comment).values(
batch
)
query = query.on_conflict_do_update(
index_elements=['comment_id'],
set_=dict(clean_content = query.excluded.clean_content),
)
conn.execute(query)
conn.commit()
conn.close()
except:
Logger.error("Could not clean comments", exc_info=True)
finally:
Logger.debug("End - clean_comments()")
Primary Function¶
This function triggers the downloading and saving of the data from the specified WordPress website.
The function provides control over which data should be downloaded, this is handy if only a specific subset of the data needs to be updated. This is also handy to test specific functionality.
Batch processing has been implemented specifically to handle large initial data downloads. It was implemented after trying to download a year's worth of data, at which point SQLite was no longer capable to handle the amount of data.
After the initial data download all downloads are run incrementally, building on previous downloads.
def update_word_press_data(
base_url: str,
download_posts: bool = True,
download_comments: bool = True,
download_categories: bool = True,
download_tags: bool = True,
request_page_size = 100,
download_start_date_time = None,
clean_html_data: bool = True,
batch_insert_size = 100
):
try:
Logger.debug("Start - update_word_press_data()")
if(download_posts):
Logger.info("Downloading WordPress Post data")
json_list = download_new_word_press_posts(base_url, request_page_size, download_start_date_time)
if(len(json_list) > 0):
batches = [json_list[i:i + batch_insert_size] for i in range(0, len(json_list), batch_insert_size)]
for i, batch in enumerate(batches):
save_posts(batch)
save_post_categories(batch)
save_post_tags(batch)
if(download_categories):
Logger.info("Downloading WordPress Category data")
json_list = download_missing_categories(base_url, request_page_size)
if(len(json_list) > 0):
batches = [json_list[i:i + batch_insert_size] for i in range(0, len(json_list), batch_insert_size)]
for i, batch in enumerate(batches):
save_categories(batch)
if(download_tags):
Logger.info("Downloading WordPress Tag data")
json_list = download_missing_tags(base_url, request_page_size)
if(len(json_list) > 0):
batches = [json_list[i:i + batch_insert_size] for i in range(0, len(json_list), batch_insert_size)]
for i, batch in enumerate(batches):
save_tags(batch)
if(download_comments):
Logger.info("Downloading WordPress Comments data")
json_list = download_new_word_press_comments(base_url, request_page_size, download_start_date_time)
if(len(json_list) > 0):
batches = [json_list[i:i + batch_insert_size] for i in range(0, len(json_list), batch_insert_size)]
for i, batch in enumerate(batches):
save_comments(batch)
if(clean_html_data):
Logger.info("Cleaning HTML data")
clean_posts()
clean_comments()
except:
Logger.error("Could not load the wordpress data", exc_info=True)
return (None, None)
finally:
Logger.debug("End - update_word_press_data()")
Sentiment Scoring¶
This section of sentiment scoring works differently than that of scoring the primary corpus, used in the lexicon generation.
In determining the optimal method of scoring the wordpress text, using the average score of each word provided the best (most reliable and polarized) results, while not having a unrealistic execution time.
Replacing the built in lexicon of the Vader SentimentIntensityAnalyzer
with the one generated was attempted, but this did not generate satisfactory results.
In calculating the average score, using the levenshtein distance to determine related words as well as a regex search try and cater for prefixes and suffixes was also attempted, however this was also not satisfactory, as it regularly misidentified words.
It was decided that a simple stemmer from the ntlk
library would be used to cater for suffixes, however in most cases the word to be scored would simply be looked up in the lexicon.
There was also experimented with the normalization of the scores in attempt to polarize the results more, but this resulted in the scores, in stead of converging at just above 0, having unpredictable distribution, with the content score clustering at maximum.
Calculate Sentiment¶
def calculate_custom_sentiment(text: str, lexicon: dict, stopwords:list[str] = [], normalize = False):
sentiment_sum = 0.0
word_count = 0
stemmer = PorterStemmer()
for word in text.lower().split():
if stopwords is not None and word in stopwords:
continue
stemmed_word = stemmer.stem(word)
if stemmed_word in lexicon:
sentiment_sum += lexicon[stemmed_word]
word_count += 1
elif word in lexicon:
sentiment_sum += lexicon[word]
word_count += 1
if word_count == 0:
return 0.0
if normalize:
return math.tanh(sentiment_sum)
return sentiment_sum / word_count
def levenshtein_distance(s1, s2):
if len(s1) > len(s2):
s1, s2 = s2, s1
distances = range(len(s1) + 1)
for i2, c2 in enumerate(s2):
distances_ = [i2 + 1]
for i1, c1 in enumerate(s1):
if c1 == c2:
distances_.append(distances[i1])
else:
distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1])))
distances = distances_
return distances[-1]
def calculate_custom_sentiment_levenshtein(text: str, lexicon: dict, normalize = False):
sentiment_sum = 0.0
word_count = 0
for word in text.lower().split():
found = False
if word in lexicon:
sentiment_sum += lexicon[word]
word_count += 1
found = True
else:
for lex_word in lexicon.keys():
if re.search(lex_word[:4], word) or levenshtein_distance(lex_word, word) <= 2:
sentiment_sum += lexicon[lex_word]
word_count += 1
found = True
break
if not found:
pass
if word_count == 0:
return 0.0
if normalize:
return math.tanh(sentiment_sum)
return sentiment_sum / word_count
def update_comment_sentiments(lexicon:pd.DataFrame, stopwords:list[str] = [], update_all = False):
Logger.debug("Start - update_comment_sentiments()")
try:
comments = load_comments_from_database(to_rate = not update_all)
if (len(comments)==0): return
lexicon_dict = lexicon.set_index('word').to_dict()['compound']
comments['score'] = comments['clean_content'].apply(lambda x: calculate_custom_sentiment(x, lexicon_dict, stopwords))
comments = comments[['comment_id','score']].to_dict(orient='records')
with engine.connect() as conn:
if(len(comments) > 0):
batches = [comments[i:i + 100] for i in range(0, len(comments), 100)]
for i, batch in enumerate(batches):
query = insert(WP_Comment).values(
batch
)
query = query.on_conflict_do_update(
index_elements=['comment_id'],
set_=dict(score = query.excluded.score),
)
conn.execute(query)
conn.commit()
conn.close()
except:
Logger.error("Failed to update comment sentiment scores", exc_info=True)
finally:
Logger.debug("End - update_comment_sentiments()")
def update_post_sentiments(lexicon:pd.DataFrame, stopwords:list[str] = [], update_all = False):
Logger.debug("Start - update_post_sentiments()")
try:
posts = load_posts_from_database(to_rate = not update_all)
if(len(posts) == 0): return
lexicon_dict = lexicon.set_index('word').to_dict()['compound']
posts['title_score'] = posts['title_clean'].apply(lambda x: calculate_custom_sentiment(x, lexicon_dict, stopwords))
posts['content_score'] = posts['content_clean'].apply(lambda x: calculate_custom_sentiment(x, lexicon_dict, stopwords))
posts = posts[['post_id','title_score','content_score']].to_dict(orient='records')
with engine.connect() as conn:
if(len(posts) > 0):
batches = [posts[i:i + 100] for i in range(0, len(posts), 100)]
for i, batch in enumerate(batches):
query = insert(WP_Post).values(
batch
)
query = query.on_conflict_do_update(
index_elements=['post_id'],
set_=dict(title_score = query.excluded.title_score, content_score = query.excluded.content_score),
)
conn.execute(query)
conn.commit()
conn.close()
except:
Logger.error("Failed to update post sentiment scores", exc_info=True)
finally:
Logger.debug("End - update_post_sentiments()")
Primary Function¶
def score_word_press_sentiment(lexicon:pd.DataFrame, stopwords:list[str] = [], always_rescore=False):
Logger.debug("Start - score_word_press_sentiment()")
try:
update_comment_sentiments(lexicon,update_all = always_rescore)
update_post_sentiments(lexicon,update_all = always_rescore)
except:
Logger.error("Failed to set wordpress sentiment scores", exc_info=True)
finally:
Logger.debug("End - score_word_press_sentiment()")
Setup Unit Tests¶
The unit tests have been implemented to firstly showcase test driven development, as in the case of the TestCleanText
test case, as well as demonstrating the use of mocking libraries to run more complex tests, as in the case of TestLexiconGeneration
and TestScoreSentiment
Clean Text¶
class TestCleanText(unittest.TestCase):
@classmethod
def setUpClass(cls):
Logger.debug('Unit Tests Started - TestCleanText')
def test_remove_markup(self):
self.assertEqual(clean_text("<p>Hello</p>", remove_markup=True), "Hello")
def test_remove_bracketed(self):
self.assertEqual(clean_text("This is [useless] text.", remove_bracketed=True), "This is text.")
def test_replace_dashes(self):
self.assertEqual(clean_text("This is - a test.", replace_dashes=True), "This is a test.")
def test_remove_special_characters(self):
self.assertEqual(clean_text("This is a “test”", remove_special_characters=True), "This is a test")
def test_remove_punctuation(self):
self.assertEqual(clean_text("Hello, world!", remove_punctuation=True), "Hello world")
def test_remove_stopwords(self):
self.assertEqual(clean_text("This is a test", stopwords=["this", "is"], remove_punctuation=True), "a test")
def test_multiple_options(self):
self.assertEqual(
clean_text("<p>This is [not] a “test”.</p>",
remove_markup=True,
remove_bracketed=True,
remove_special_characters=True,
remove_punctuation = True),
"This is a test")
def test_no_options(self):
self.assertEqual(clean_text("This is a test"), "This is a test")
def test_empty_string(self):
self.assertEqual(clean_text(""), "")
@classmethod
def tearDownClass(cls):
cls.corpusList = None
Logger.debug('Unit Tests Ended - TestCleanText')
Hello
", remove_markup=True), "Hello") def test_remove_bracketed(self): self.assertEqual(clean_text("This is [useless] text.", remove_bracketed=True), "This is text.") def test_replace_dashes(self): self.assertEqual(clean_text("This is - a test.", replace_dashes=True), "This is a test.") def test_remove_special_characters(self): self.assertEqual(clean_text("This is a “test”", remove_special_characters=True), "This is a test") def test_remove_punctuation(self): self.assertEqual(clean_text("Hello, world!", remove_punctuation=True), "Hello world") def test_remove_stopwords(self): self.assertEqual(clean_text("This is a test", stopwords=["this", "is"], remove_punctuation=True), "a test") def test_multiple_options(self): self.assertEqual( clean_text("This is [not] a “test”.
", remove_markup=True, remove_bracketed=True, remove_special_characters=True, remove_punctuation = True), "This is a test") def test_no_options(self): self.assertEqual(clean_text("This is a test"), "This is a test") def test_empty_string(self): self.assertEqual(clean_text(""), "") @classmethod def tearDownClass(cls): cls.corpusList = None Logger.debug('Unit Tests Ended - TestCleanText')Lexicon Generation¶
class TestLexiconGeneration(unittest.TestCase):
@classmethod
def setUpClass(cls):
Logger.debug('Unit Tests Started - TestLexiconGeneration')
cls.corpusList = get_corpus_list()
def test_any_corpora(self):
self.assertTrue(TestLexiconGeneration.corpusList.any())
def test_corpora_of_type_bible(self):
self.assertTrue(all(isinstance(n, Bible) for n in TestLexiconGeneration.corpusList))
def test_more_than_one_corpora_groups(self):
self.assertGreaterEqual(len(np.unique(list(map(lambda x: x.Corpus, TestLexiconGeneration.corpusList)))), 2)
@classmethod
def tearDownClass(cls):
cls.corpusList = None
Logger.debug('Unit Tests Ended - TestLexiconGeneration')
Read Corpus¶
With fairly limited knowledge this attempts to use the mock functionality of unit testing to specify results to be returned on a call to a library.
class TestReadCorpus(unittest.TestCase):
@classmethod
def setUpClass(cls):
Logger.debug('Unit Tests Started - TestReadCorpus')
@patch('pandas.read_sql_query')
@patch('sqlite3.connect')
def test_read_single_corpus(self, mock_sqlite_connect, mock_read_sql_query):
# Arrange
mock_read_sql_query.return_value = pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
mock_sqlite_connect.return_value = MagicMock()
bible_1 = Bible('connectionPath1', 'corpus1', 'source1', 'name1', 'shorthand1')
bibles = [bible_1]
# Act
result = read_corpus(bibles)
# Assert
self.assertIsNotNone(result)
self.assertTrue('bible_version' in result.columns)
self.assertEqual(len(result), 2)
self.assertEqual(result['bible_version'][0], 'name1')
@patch('pandas.read_sql_query')
@patch('sqlite3.connect')
def test_read_multiple_corpora(self, mock_sqlite_connect, mock_read_sql_query):
# Arrange
mock_read_sql_query.return_value = pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
mock_sqlite_connect.return_value = MagicMock()
bible_1 = Bible('connectionPath1', 'corpus1', 'source1', 'name1', 'shorthand1')
bible_2 = Bible('connectionPath2', 'corpus2', 'source2', 'name2', 'shorthand2')
bibles = [bible_1, bible_2]
# Act
result = read_corpus(bibles)
# Assert
self.assertIsNotNone(result)
self.assertTrue('bible_version' in result.columns)
self.assertEqual(len(result), 4)
self.assertEqual(result['bible_version'][0], 'name1')
self.assertEqual(result['bible_version'][2], 'name2')
@classmethod
def tearDownClass(cls):
cls.corpusList = None
Logger.debug('Unit Tests Ended - TestReadCorpus')
Score Sentiment¶
class TestScoreSentiment(unittest.TestCase):
@classmethod
def setUpClass(cls):
Logger.debug('Unit Tests Started - TestScoreSentiment')
@patch.object(SentimentIntensityAnalyzer, 'polarity_scores')
def test_positive_sentiment(self, mock_polarity_scores):
mock_polarity_scores.return_value = {'compound': 0.5}
result = score_sentiment("This is good!")
self.assertEqual(result, 0.5)
@patch.object(SentimentIntensityAnalyzer, 'polarity_scores')
def test_negative_sentiment(self, mock_polarity_scores):
mock_polarity_scores.return_value = {'compound': -0.5}
result = score_sentiment("This is bad!")
self.assertEqual(result, -0.5)
def test_empty_string(self):
result = score_sentiment("")
self.assertEqual(result, 0.0)
def test_null_string(self):
result = score_sentiment(None)
self.assertEqual(result, 0.0)
@classmethod
def tearDownClass(cls):
cls.corpusList = None
Logger.debug('Unit Tests Ended - TestScoreSentiment')
score_sentiment(None)
0.0
Run Pipeline¶
1. Run Unit Tests¶
if(config['UnitTests']['Enabled']):
try:
if (not unittest.main(argv=['ignored', '-v'], exit= False).result.wasSuccessful()):
raise Exception('Unit test(s) failed')
except:
Logger.error("Problem during unit tests", exc_info=True)
raise
2023-09-17 16:41:59.453601 [DEBUG]: Unit Tests Started - TestCleanText test_empty_string (__main__.TestCleanText.test_empty_string) ... ok test_multiple_options (__main__.TestCleanText.test_multiple_options) ... ok test_no_options (__main__.TestCleanText.test_no_options) ... ok test_remove_bracketed (__main__.TestCleanText.test_remove_bracketed) ... ok test_remove_markup (__main__.TestCleanText.test_remove_markup) ... ok test_remove_punctuation (__main__.TestCleanText.test_remove_punctuation) ... ok test_remove_special_characters (__main__.TestCleanText.test_remove_special_characters) ... ok test_remove_stopwords (__main__.TestCleanText.test_remove_stopwords) ... ok test_replace_dashes (__main__.TestCleanText.test_replace_dashes) ... ok 2023-09-17 16:41:59.885597 [DEBUG]: Unit Tests Ended - TestCleanText 2023-09-17 16:41:59.905624 [DEBUG]: Unit Tests Started - TestLexiconGeneration test_any_corpora (__main__.TestLexiconGeneration.test_any_corpora) ... ok test_corpora_of_type_bible (__main__.TestLexiconGeneration.test_corpora_of_type_bible) ... ok test_more_than_one_corpora_groups (__main__.TestLexiconGeneration.test_more_than_one_corpora_groups) ... ok 2023-09-17 16:41:59.929652 [DEBUG]: Unit Tests Ended - TestLexiconGeneration 2023-09-17 16:41:59.950677 [DEBUG]: Unit Tests Started - TestReadCorpus test_read_multiple_corpora (__main__.TestReadCorpus.test_read_multiple_corpora) ... ok test_read_single_corpus (__main__.TestReadCorpus.test_read_single_corpus) ... ok 2023-09-17 16:41:59.976720 [DEBUG]: Unit Tests Ended - TestReadCorpus 2023-09-17 16:41:59.998744 [DEBUG]: Unit Tests Started - TestScoreSentiment test_empty_string (__main__.TestScoreSentiment.test_empty_string) ... ok test_negative_sentiment (__main__.TestScoreSentiment.test_negative_sentiment) ... ok test_null_string (__main__.TestScoreSentiment.test_null_string) ... ok test_positive_sentiment (__main__.TestScoreSentiment.test_positive_sentiment) ... ok 2023-09-17 16:42:00.026129 [DEBUG]: Unit Tests Ended - TestScoreSentiment ---------------------------------------------------------------------- Ran 18 tests in 0.592s OK
2. Run Lexicon Generation/Loading¶
(lexicon,stopwords) = load_sentiment_lexicon(
lexicon_file_name = config['LexiconGeneration']['LexiconFileName'],
score_database= config["Database"]["Scores"],
always_regenerate = config['LexiconGeneration']['AlwaysRegenerate'],
generation_minimum_word_occurrence = config['LexiconGeneration']['MinimumWordOccurrence'],
primary_corpus_group = config['LexiconGeneration']['PrimaryCorpusGroup'],
secondary_corpus_group = config['LexiconGeneration']['SecondaryCorpusGroup'],
download_path = config['LexiconGeneration']['DownloadPath'],
extracted_path = config['LexiconGeneration']['ExtractedPath'],
recalculate_primary_corpus_scores = config['LexiconGeneration']['RecalculatePrimaryScore'],
stopword_count = config['LexiconGeneration']['StopwordCount'],
)
2023-09-17 16:42:00.052866 [DEBUG]: Start - load_sentiment_lexicon() 2023-09-17 16:42:00.080903 [INFO]: Populated Lexicon file found 2023-09-17 16:42:00.105441 [DEBUG]: Lexicon entries: 4954 2023-09-17 16:42:00.123464 [DEBUG]: End - load_sentiment_lexicon()
3. Sync Wordpress Site¶
update_word_press_data(
base_url = config["WordPress"]["BaseURL"],
download_posts = config["WordPress"]["DownloadPosts"],
download_comments = config["WordPress"]["DownloadComments"],
download_categories = config["WordPress"]["DownloadCategories"],
download_tags = config["WordPress"]["DownloadTags"],
request_page_size = config["WordPress"]["RequestPageSize"],
download_start_date_time = config["WordPress"]["DownloadStartDateTime"],
clean_html_data = config["WordPress"]["CleanHtmlData"]
)
2023-09-17 16:42:00.147493 [DEBUG]: Start - update_word_press_data() 2023-09-17 16:42:00.166017 [DEBUG]: End - update_word_press_data()
4. Score Sentiment¶
score_word_press_sentiment(
lexicon = lexicon,
stopwords = stopwords,
always_rescore = config['WordPress']['AlwaysRescoreSentiment'],
)
2023-09-17 16:42:00.190354 [DEBUG]: Start - score_word_press_sentiment() 2023-09-17 16:42:00.208381 [DEBUG]: Start - update_comment_sentiments() 2023-09-17 16:42:00.227412 [DEBUG]: Start - load_comments_from_database() 2023-09-17 16:42:00.890308 [DEBUG]: End - load_comments_from_database() 2023-09-17 16:42:00.913932 [DEBUG]: End - update_comment_sentiments() 2023-09-17 16:42:00.932989 [DEBUG]: Start - update_post_sentiments() 2023-09-17 16:42:00.953123 [DEBUG]: Start - load_posts_from_database()
2023-09-17 16:42:02.797584 [DEBUG]: End - load_posts_from_database() 2023-09-17 16:42:02.818606 [DEBUG]: End - update_post_sentiments() 2023-09-17 16:42:02.838630 [DEBUG]: End - score_word_press_sentiment()
Generate Report¶
For the purpose of the project, in writing the report and generating the accompanying graphs, a focus is placed on two aspects.
Firstly, to discuss and demonstrate the ability of Sentiment Induction in sentiment analysis of lesser known languages. And secondly, to use the sentiment scoring to make relevant and interesting findings regarding the data.
In demonstrating the ability of Sentiment Induction, the following is used:
- Sentiment by Word Count.
- Sentiment Score distribution.
- Sentiment Score Box-Plots.
Interesting Findings include:
- Investigating the change in sentiment over time.
- Investigating the correlation between the sentiment of comments and that of post titles and content.
- Identifying how sentiment of comments relates to the categories and tags of articles.
- Identifying the most positively and negatively rated articles
1. Load Data¶
def load_post_categories(to_clean = False, to_rate = False):
Logger.debug("Start - load_post_categories()")
# Selecting a list of posts with the joined category
post_categories = []
with engine.connect() as conn:
post_categories = conn.execute(
sqlalchemy.text('SELECT PC.post_id, C.name `category` FROM PostCategory PC JOIN Category C on PC.category_id = C.category_id')
).all()
conn.close()
Logger.debug("End - load_post_categories()")
return pd.DataFrame(post_categories)
def load_post_tags(to_clean = False, to_rate = False):
Logger.debug("Start - load_post_tags()")
# Selecting a list of post ids joined with the linked tag
post_tags = []
with