These are my solutions to Part 2 of mini-project #1 of the Spring 2018 rendition of Stanford's CS168: The Modern Algorithmic Toolbox. This assignment covers some applications of hashing. Be warned that I am a learner myself so answers are not guaranteed to be error-free by any means as and I am not a Stanford student so do not have access to the solutions.
import numpy as np
import sys
import hashlib
class CountMinSketch(object):
"""
Implementation of CountMinSketch datastructure as specified in Part 2(a) of assignment
and including conservative update mode as specified in Part 2(f)
"""
def __init__(self, trial, n_tables, n_counters, mode='all'):
"""
Initialises a CountMinSketch object with specified paramters
Params:
trial: i in assignment
n_tables: l in notes (parameter of CMS)
n_counters: b in notes (paramter of CMS)
mode: for updating the counters
"""
self.trial = trial
self.n_counters = n_counters
self.n_tables = n_tables
self.table_inds = np.arange(self.n_tables)[:,np.newaxis]
self.data = np.zeros((n_tables, n_counters)).astype('int')
self.mode = mode
def _hash_fn(self, x, table):
i = self.trial
j = table
return np.array(hashlib.md5((str(x)+str(i-1)).encode('utf-8')).digest()[j-1]).astype('int')
def inc(self, x):
"""
Increments count for a subset of counters identified by the hash of x in the present trial.
The subset is all the counters in the default 'all' mode or only those with the minimum
count across all counters if the mode is 'conservative'.
Params:
x: element whose count will be incremented
"""
if self.mode == 'conservative':
assert np.size(x) == 1
inds = self._get_min_inds(x)
else:
inds = (self.table_inds, self.get_hashes(x))
self.data[inds] += 1
def _get_min_inds(self, x):
"""
Obtains indices for minimum counters
Params:
x: element for which the counters with lowest count will be identified
Returns:
The tables and indexing hashes for the subset of counters
with minimum count
"""
min_counts, counts, hashes = self.count(x, True)
min_rows = counts==min_counts
return self.table_inds[min_rows], hashes[min_rows]
def count(self, x, _return_counts_and_hashes=False):
"""
Obtains count for x
Prarams
x: element for which the minimum count will be returned
_return_counts_and_hashes: whether to return counts and hashes used to find
the minimum count, used internally by _get_min_inds
Returns:
The minimum count across all tables for x, which is guaranteed never to
be an underestimate of the true frequency of x
"""
hashes = self.get_hashes(x)
counts = self.data[self.table_inds, hashes]
min_counts = np.min(counts, axis=0)
if _return_counts_and_hashes:
return min_counts, counts, hashes
return min_counts
def get_hashes(self, vec):
"""
Applies _hash_fn to each element in each table
Params
vec: vector of elements to hash
Returns
n_tables x len(vec) matrix of hashes
"""
vec = np.reshape(vec, [-1])
n_elems = len(vec)
tables = self.table_inds.ravel()
result = np.zeros((self.n_tables, n_elems)).astype('int')
for i, table in enumerate(tables):
for j, elem in enumerate(vec):
result[i][j] = self._hash_fn(elem, table)
return result
Functions for data generation that will be useful later
def freq(x):
"""
Functionalises the frequencies for the datastreams
Params:
x: element or vector whose frequency will be returned, must be in [1,9050]
Returns:
frequency as specified in Part 2(a)
Raises:
ValueError if x is not in specified range
"""
if not np.all(np.logical_and(x>=1, x<=9050)):
raise ValueError('Values must be between 1 and 9050 inclusive')
i = (x-1)//1000 + 1
return np.where(i<=9, i, (x-9000)**2)
def data_stream(order):
"""
Generates datastream with elements in [1,9050] with
frequencies given by freq in either ascending or descending
order of value (and in fact frequency) or in random order.
Params:
order: how the elements are to be ordered by value, one
of 'forward', 'backward', or 'random'
Returns:
Array of elements in [1, 9050] in specified order and frequencies
given by freq
Raises:
ValueError if order is not one of 'forward', 'backward', or 'random'
"""
if order not in {'forward', 'backward', 'random'}:
raise ValueError("x must be one of 'forward', 'backward', or 'random'")
if order == 'backward':
x = np.arange(9050,0,-1)
else:
x = np.arange(1,9051)
f = freq(x)
stream = np.concatenate([[xi]*fi for xi, fi in zip(x, f)])
if order == 'random':
stream = np.random.permutation(stream)
return stream
To determine the number of heavy hitters first sum the frequencies for each element. There is no need to invoke freq for this purpose. For the first 9000 elements, the count increases by one for every 1000 elements hence:
counts_first9k = 1000*(np.arange(1,10))
For the last 50 elements, the count is $(i - 9000)^2$:
counts_last50 = np.arange(1,51)**2
Now find the total and heavy hitter threshold frequency:
total = counts_first9k.sum() + counts_last50.sum()
freq_hh = total*0.01
total, freq_hh
Since the frequency of values $\leq 9000$ is upper bounded by $9$, heavy hitters must be in the set of values $\gt 9000$. Since their frequency is $i^2$, heavy hitters are
$$\{9000 + i: i \geq \sqrt{879.25}\}$$
thresh = np.sqrt(freq_hh)
print(thresh)
heavy_hitters = 9000 + np.arange(np.ceil(thresh).astype(np.int64), 51)
print(len(heavy_hitters))
print(heavy_hitters)
def run_trial(n_trials, n_tables, n_counters, mode='all'):
"""
Generates datastreams in forward, backward and random orders
and updates and analyses a CountMinSketch with given mode for each.
n_trials: number of trials per datastream order and mode
n_tables: parameter l for CMS
n_counters: parameter b for CMS
mode: update mode for CMS
Returns a dictionary of results for each datastream order
"""
results = {}
n_elems = None
for order in ['forward', 'backward', 'random']:
print(order.capitalize())
if order != 'random':
stream = data_stream(order)
results[order] = {'count_9050':[], 'n_heavy_hitters':[]}
for trial in range(1, n_trials+1):
if order == 'random':
stream = data_stream(order)
print('Trial {}/{}'.format(trial, n_trials))
CMS = CountMinSketch(trial, n_tables, n_counters, mode)
if n_elems is None:
n_elems = len(stream)
for i, elem in enumerate(stream):
if not i%10:
sys.stdout.write('\r{}/{}'.format(i, n_elems))
CMS.inc(elem)
results[order]['count_9050'].append(CMS.count(9050)[0])
counts = CMS.count(np.arange(1, 9051))
n_heavy_hitters = np.sum(counts >= (len(stream)*0.01))
results[order]['n_heavy_hitters'].append(n_heavy_hitters)
print()
print(results[order])
return results
results = {}
for mode in ['all', 'conservative']:
print(mode.capitalize())
results[mode] = run_trial(10, 4, 256, mode)
Results of trials
reps = 50
for k,v in results.items():
print('='*reps)
print('Mode:', k)
print('='*reps)
for order in ['forward', 'backward', 'random']:
print('Order:', order)
print('-'*reps)
print('true count_9050:', freq(9050), '| CMS count_9050:', np.mean(v[order]['count_9050']))
print('true heavy hitters:', len(heavy_hitters), '| CMS heavy hitters:',np.mean(v[order]['n_heavy_hitters']))
print()
print('*'*reps)
print()
Note that for the default update mode, order does not matter because each update is independent of the previous and whatever the order of elements all tables get updated so each time the minimum count is the same. But with conservative updates, as the results indicate, order matters since whether or not a counter gets updated depends on the history of updates.
(e) Let $n_x = \{0, 1, 2, \ldots, f_x\}$ denote the number of times element $x$ has been seen in the stream so far. This is equivalently the number of times the CMS function inc has been called for $x$. We will prove that the CMS never underestimates by showing that after $n_x$ calls of inc of $x$, $\min_i CMS[i][h_i(x)] \ge n_x$ or equivalently $\forall_i CMS[i][h_i(x)] \ge n_x$. We rely on the fact that the counts of $x$ cannot decrease between occurences of $x$ since counters can never be decremented.
Base case, no updates, $n_x = 0$
Before the element is seen, $\min_i CMS[i][h_i(x)] \ge 0 \ge n_x$. (If there have been previous collisions with $y\neq x$ involving the hash function corresponding to the lowest counter for $y$, counters may have been updated already).
Base case, first update, $n_x = 1$
Before calling inc, $\min_i CMS[i][h_i(x)] \ge 0$. Therefore the subset of counters which are strictly greater than the minimum must have a count of at least 1 i.e.
$$\forall n \notin \{m: CMS[m][h_m(x)] = \min CMS[i][h_i(x)]\},\text{ } CMS[n][h_n(x)] \ge 1$$
After calling inc, the subset of counters with minimum count increases by 1 so that now all counters have a count of at least 1, $CMS[i][h_i(x)] \ge 1 = n_x$.
Inductive step, $n_x = k$
Assume that before calling inc, $\min_i CMS[i][h_i(x)] \ge k-1$. After calling inc we should have that it equals $k$. Similar to the reasoning for the base case of $n_x = 1$ note the following for the non-minimum counters:
$$\forall n \notin \{m: CMS[m][h_m(x)] = \min CMS[i][h_i(x)]\},\text{ } CMS[n][h_n(x)] \ge k$$
Again similar to before, after calling inc, $\forall_i CMS[i][h_i(x)] \ge k = n_x$.