import json from random import shuffle, sample import math from collections import defaultdict import logging ''' A spelled out, simple implementation of Multinomial Naive Bayes for IN1140. This code shows how to write a NB classifier from scratch, with a use case of document source identification. The code does not depend on any external libraries and does not use anything beyond scripting (no classes etc). If you feel like it, try to improve the code! Here are some suggestions: - Try to implement the classifier as a Class object, with the train and predict functions as internal methods. - Try to experiment with using lemmatised data or even adding PoS tags! - Try to experiment with other types of smoothing. - Try to experiment with unbalanced data, sample an uneven split from the raw corpora. - Try to run the experiment multiple times and report the mean and std.dev of the performance over n runs. - Try to experiment with other features. - Lastly, open a new file and try to re-implement the entire pipeline yourself! ''' def get_stop_words(): with open('stop_words.txt', 'r', encoding='utf-8') as f: stop_words = f.read().splitlines() for punct in '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~??¨C': stop_words.append(punct)# For convenience, adding punctuation to the stop word list return stop_words def load_corpora(db_path, dn_path, n): logging.info('Loading corpora from disk... This might take a while...') with open(db_path, "r", encoding='utf-8') as f: db = json.load(f) with open(dn_path, "r", encoding='utf-8') as f: dn = json.load(f) return db[:n], dn[:n] def get_n_clean_docs(corpus, stop_words): docs = list() for article in corpus: sentences = [] for sentence in article['stanza']: sentences.append(word.lower() for word in sentence['tokens'] if word.lower() not in stop_words) doc = [item for sublist in sentences for item in sublist] docs.append(doc) return docs def get_features(doc_list, most_common, label): features = list() for doc in doc_list: doc_feature = dict.fromkeys(most_common.keys(), 1) #Laplace add 1 smoothing. for word in doc: if word in doc_feature.keys(): doc_feature[word] += 1 features.append((doc_feature, label)) return features def train(train_set): ''' In a Multinomial Naive Bayes, training is just calculating all the posterior probabilities. For each word, we get the posterior, P(c | d), by Maximum Likelihood Estimation: Count(w_i, c)/Count(w, c) for all words i that appear for a class. E.g: If we want the posterior for the word "helse", we count how often "helse" appear in documents of class c, divided by all the word frequences for that class. Then we do the same for the other class In this training function, we calculate this for all words in the training set and save them in one dictionary for each class. We only do this for the words that we previously chose as our features (the top n most frequent words) Args: train_set (List): Each item in the list is a tuple of (document, label). ''' logging.info('Training...') db_likelihood = defaultdict(int) dn_likelihod = defaultdict(int) for document in train_set: if document[-1] == 'db': for word, count in document[0].items(): db_likelihood[word] += count # Sum up counts (Count(w_i, c)) else: for word, count in document[0].items(): dn_likelihod[word] += count db_x = sum(db_likelihood.values()) # Calculate Count(w, c) for the Dagbladet class dn_x = sum(dn_likelihod.values()) # Calculate Count(w, c) for the Dagens N?ringsliv class # Do the division: Count(w_i, c)/Count(w, c) db_likelihood = {key: value/db_x for key, value in db_likelihood.items()} dn_likelihod = {key: value/dn_x for key, value in dn_likelihod.items()} return db_likelihood, dn_likelihod def predict_test(documents, db_likelihood, dn_likelihod, db_prior, dn_prior): ''' Now it is time to evaluate our classifier. We iterate over the features in our test set and take the product of their posteriors. In order to avoid numerical underflow, we sum their log likelihoods. Remember, adding in log space = multiplying in linear space. The final probability for a document and a class is P(c | d) = P(w1 | c) * ... * P(w_n | c) * (P(c)) for all words in d. ''' preds = [] gold = [] for doc in documents: label = 0 if doc[-1] == 'db' else 1 # We let Dagbladet be represented by the class 0, DN with 1. db_posterior = 1 dn_posterior = 1 for word, value in doc[0].items(): db_posterior += (math.log(db_likelihood[word])*value) dn_posterior += (math.log(dn_likelihod[word])*value) db_posterior + math.log(db_prior) # Add the prior dn_posterior + math.log(dn_prior) # Add the prior #Choose the class with the highest probability. if db_posterior > dn_posterior: pred = 0 else: pred = 1 preds.append(pred) gold.append(label) return sum(1 for x,y in zip(preds, gold) if x == y) / len(preds) #Accuracy calculation if __name__ == '__main__': logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", level=logging.INFO, ) # 0: Hyperparameters total_number_of_documents = 4000 #Max 2k for the provided data files, 2k of each. number_of_features = 50 logging.info(f'Multinomial Naive Bayes for two-class news source classification! Is the article from Dagbladet or Dagens N?ringsliv?') logging.info(f'Hyperparameters: N-documents: {total_number_of_documents}, number of features: {number_of_features}') # 1: Preprocessing stop_words = get_stop_words() db, dn = load_corpora('./data/db_corpus.json', './data/dn_corpus.json', n=int(total_number_of_documents/2)) db_documents = get_n_clean_docs(db, stop_words) dn_documents = get_n_clean_docs(dn, stop_words) # 2: Feature extraction (trekkuthenting) # We use the top n most frequent words in the combined corpus as our feature set (trekkliste) word_frequencies = defaultdict(int) for doc in dn_documents + db_documents: for word in doc: word_frequencies[word] +=1 top_words = dict() for word in sorted(word_frequencies, key=word_frequencies.get, reverse=True)[:number_of_features]: top_words[word] = word_frequencies[word] # 2.1: Transform our documents into features using a Bag-of-Words model. ''' Each document is represented with a list of length=number_of_features. The value at at position in this list is the number of times that word occur in the document. ''' db_features = get_features(db_documents, top_words, label="db") dn_features = get_features(dn_documents, top_words, label="dn") # 3: Split and shuffle our data data = db_features + dn_features #logging.info(f'Example document: {sample(data, 1)}') shuffle(data) train_set = data[:int(len(data)*0.7)] dev_set = data[int(len(data)*0.7):int(len(data)*0.8)] test_set = data[int(len(data)*0.8):] logging.info(f'Train size: {len(train_set)}, Dev size: {len(dev_set)}, Test size: {len(test_set)}') # 4: Training! db_prior = len(db_documents) / total_number_of_documents # Calculating the prior probability: P(C1) = N_c1/N), P(C2) = N_c2/N) dn_prior = len(dn_documents) / total_number_of_documents db_likelihood, dn_likelihod = train(train_set) # 5: Evaluating! logging.info('Evaluating...') result = predict_test(dev_set, db_likelihood, dn_likelihod, db_prior, dn_prior) logging.info(f'Accuracy: {result}')