Locality-Sensitive Hashing on Spark with Clojure/Flambo

Locality-Sensitive Hashing on Spark with Clojure/Flambo

Record Linkage is a process of finding similar entities in a Dataset. Using this technique one can implement systems like: Plagiarism Detectors – which are able identify fraudulent scientific papers or articles, Document Similarity – finding similar articles on the internet, Fingerprint Matching, etc. The possibilities are endless. But the topic which we are focusing on in this article is *De-Duplication, which is the process of finding (and removing if need be) duplicates from a dataset.

Why do we need this? The answer is simple, removing or at least identifying redundant data to save Space (memory, disk space, etc.) or/and Time (avoiding unnecessary/repeated computation on duplicate data). One can simply go about doing this by comparing each entity in the dataset with every other entity, finding similarity score between those entities and doing some more computations on it depending on the application you are building.

But think of it in this way; if there are six strings in a dataset, [“aa”, “ab”, “ac” , “xx”, “xy”, “xz”] and I want to find possible duplicates from it, I’d rather compare “aa” with only “ab” and “ac” for finding its duplicates rather than the entire dataset because clearly [“aa”, “ab”, “ac”] are sort of similar to each other and way different from [“xx”, “xy”, “xz”]. But there is no merit in comparing everyone with everyone.

The computation time of this approach is O(n*(n -1)) where ‘n’ is the number of entities in the dataset. Now this approach is all good when the dataset size is very small. But when the data becomes very “Big”, the awful computation time of O(n*(n -1)) just won’t cut it. So we need to find a technique which reduces the number of candidates to be compared with a particular entity i.e. generating “similar” subsets from the main dataset and then running separate de-duplication tasks on the smaller datasets in a distributed manner which will greatly reduce the overall running time of the program.

So how do we achieve this? How do we intuitively create smaller “similar” datsets from the “bigger” main dataset without wasting too much time in pre-processing?

LSH to the rescue! It stands for Locality-sensitive hashing and it is one of the most common and a convenient algorithm for Document Similarity (in my opinion of course). The best part about this algorithm is that when one hashes the entities (documents or just strings) using LSH, all the “similar” entities tend to have similar hashes. From then on, it is just a matter of grouping the entities by their hash values which will give you smaller datasets and then finding duplicates in a distributed manner. Enough explanation, let’s just jump straight into the code. The language of choice is Clojure and we will be writing it for Apache Spark using a Clojure DSL called flambo.

We will be using two external libraries for this, flambo and a hashing library written on top of Google’s Guava, aesahaettr.

You can use this test file (contains restaurant details from two guides, fodors and zagats, and has about 112 duplicates in it) to play around with.

(require '[flambo.api :as f]
         '[flambo.conf :as conf])

(def c (-> (conf/spark-conf)
           (conf/master "local[2]")
           (conf/app-name "De-Duplication")))

(def sc (f/spark-context c))

(def rdd (f/text-file sc "/path/to/file"))

First things first, we need to generate shingles of the rows (string) of the RDD. The reason for doing this is that the chances of having the hash values match of a string’s corresponding shingles are greater than the entire string itself.

(defn k-shingles
  [n s]
  (let [indexed-str-seq (into {}
                          (map-indexed (fn [idx itm] {idx itm}) (seq s)))
        shingles (->> (map
                        (fn [[idx str-seq]]
                          (if (<= idx (- (dec (count indexed-str-seq)) (dec n)))
                            (reduce str
                              (map #(indexed-str-seq % "") (range idx (+ idx n))))))
                        indexed-str-seq)
                      (filter #(not (nil? %))))]
    shingles))

The function above requires two arguments, ‘n’ -> shingle size and ‘s’ -> The string.

NOTE: If your string size is very small, for e.g. if the rows of RDD are just first names, you can just create a list of individual strings of your string:

(map str (k-shingles 1 "punit"))

After this we need to hash each generated shingle of the string ‘X’ amount of times. This is again done to improve the chances of hash values matching.

(require '[æsahættr :as hasher])

(defn gen-hash-with-seeds
 [seeds]
 (map #(hasher/murmur3-32 %) seeds))

(defn hash-n-times
 [n shingles-list]
 (let [hash-fns (gen-hash-with-seeds (range n))]
   (map
     (fn [x]
       (map
         (fn [y] (hasher/hash->int (hasher/hash-string y x)))
         hash-fns))
     shingles-list)))

Now it is time to generate the MinHash Signature of that string (or document). We do this by taking the lists of hashed values (where all of them have the same size) and finding the minimum hash value at a position ‘i’ from every list thereby generating a single list of hash values which is the minhash for that string.

(defn min-hash
 [l]
 (reduce (fn [x y] (map (fn [a b] (min a b)) x y)) l))

Now we partition the minhash signature into smaller ‘bands’ and then hash each of them for a final time. The purpose of doing this is that candidate (or similar) strings will have at least one or matching ‘hashed’ band and then we can group the strings by their ‘hashed’ band and generate candidate lists.

(defn partition-into-bands
 [band-size min-hashed-list]
 (partition-all band-size min-hashed-list))

(defn band-hash-generator
 [banded-list]
 (let [r (range -1
           (unchecked-negate-int (inc (count banded-list))) -1)
       ; Incrementing "band-size" because we are starting from -1
       hash-fns (gen-hash-with-seeds r)
       hashed-banded-list (map
                            (fn [x y]
                              (hasher/hash->int
                                (hasher/hash-string x
                                  (clojure.string/join "-" y))))
                            hash-fns banded-list)]
   hashed-banded-list))

; Output of "partition-into-bands" is the input
; for "band-hash-generator"

After this we will have our RDD in the following form


RDD[String, List(Int)]

After this we have to write a code which maps through the List of hash values and then writes a Key-Value pair of [hash value, String]. Then you use the “combine-by-key” function of flambo to gather all the Strings (or Docs) with the same hash value. The only minor issue in this case is that when two strings have multiple matching bands, you will still have to collect the candidate list for all of them and then apply a distinct on the sorted candidate lists. Now it is only a matter of comparing the strings in all the candidate lists. The method that we generally use to compare strings is Levenshtein Distance. You can also set a threshold parameter that will classify the strings as duplicates only if the Levenshtein Distance is greater than it.

[*] De-Duplication is a subset of a larger topic, Document Similarity, which is mentioned above.

REFERENCES:

  1. Clojure for Data Science book – https://www.packtpub.com/big-data-and-business-intelligence/clojure-data-science
  2. spark-hash intro – https://github.com/mrsqueeze/spark-hash/blob/master/README.md