Implementing Affinity Propagation with big data transformation for Apache Flink

I’ve just written the results of some work I’ve done as maybe some pieces of it could be useful to someone. The only tests I’ve done is comparing that results were the same than scikit implementation of AP. Generated messages are the same than scikit, and to have same clusters convergence parameters need to be tuned.


Affinity Propagation is a clustering algorithm based on message passing that differs from other clustering algorithms like K-means in that it does not previously need the number of clusters to be known. It was first published on Science in 2007 with a later paper with a different approximation but same results published on 2009 (if you read them you will see that original messages can be derived from the later publication).

Same results can be obtained with both approximations with a different implementation, so first thing was choosing one among them. I first worked with the binary approximation of AP (the one from 2009 paper), mainly because I continued an existing implementation of it for Giraph. While that is the main reason I think there are some other reasons that make it a good option:

  • Is easy to add constraints to the clustering. One example can be found in the Capacitated Affinity Propagation section (adding an upper bound to the number of clusters)
  • It seems to fit good to big data graph libraries like Giraph from Hadoop or Gelly from Flink

So the first implementation I worked with was for Giraph, and although I did some progress with it there were some parts like damping, convergence or send messages in a synchronously manner (see Could messages be passed asynchronously in affinity propagation? found in faq section) that made the implementation a little bit tricky. Then I saw that Flink were interested on having AP implemented and they had an unfinished implementation of it so I started adapting that implementation for Gelly. After some work and some recommendations from Flink contributors we decided to implement the original algorithm with regular data transformations instead of binary approach using Gelly. And that is the one explained here.

    Affinity propagation

AP is a message passing algorithm where two types of messages are exchanged between nodes with existing similarities applying a damping factor that stops once it has converged. The algorithm is quite clear and those are the three points to implement (what is not that clear to me is how the message calculations where derived, an explanation can be found in the Derivation of affinity propagation as the max-sum algorithm in a factor graph section though, but to me is easier to follow it with the 2009 paper).

The two messages exchanged are what they call responsibility and availability. From the paper the responsibility r(i,k), sent from data point i to candidate exemplar point k, reflects the accumulated evidence for how well-suited point k is to serve as the exemplar for point i, taking into account other potential exemplars for point i and is calculated as follows:

r(i,k) \leftarrow s(i,k) -\max_{k' s.t. k'\neq k}\{a(i,k')+s(i,k')\}

And availability a(i,k), sent from candidate exemplar point k to point i, reflects the accumulated evidence for how appropriate it would be for point i to choose point k as its exemplar, taking into account the support from other points that point k should be an exemplar. And is calculated as follows with different calculation for self availability:

a(i,k) \leftarrow -\min\Big\{0,r(k,k)+\sum_{i'\textrm{s.t.}i'\notin\{i,k\}}\max\{0,r(i',k)\}\Big\}

a(k,k) \leftarrow \sum_{i'\textrm{s.t.}i'\neq k}\max\{0,r(i',k)\}

And this is damping explained from the paper: when updating the messages, it is important that they be damped to avoid numerical oscillations that arise in some circumstances. Each message is set to l times its value from the previous iteration plus 1 – l times its prescribed updated value, where the damping factor l is between 0 and 1. Is not mandatory to implement damping but they recommend doing it. They also recommend adding some noise to avoid oscillations, I did not implement that though.

Reading faq section was useful to me for the implementation.

    Implementation graph


This github repository contains the code.