利用Hadoop實現求共同好友的示例詳解
前言
在很多社交APP中,比如大傢熟悉的QQ好友列表中,打開會話框,經常可以看到下面有一欄共同好友的推薦列表,用戶通過這種方式,可以添加潛在的關聯好友
這種功能該如何實現呢?對redis比較瞭解的同學應該能很快想到,可以使用redis來實現這個功能。沒錯,redis確實是個不錯的可以實現這個功能的方案。
但redis的實現有一定的局限性,因為redis存儲和數據和計算時需要耗費較多的內存資源,設想一下,像騰訊QQ這樣的規模,如果用這種方式做的話,估計Redis服務器的投入成本將是一筆不小的開銷。
利用hadoop中的MapReduce同樣可以實現這個功能,該如何實現呢?
業務分析
下面是原始的數據文件,第一欄可理解為本人,第二行為該用戶的好友列表,以逗號分割,比如A用戶的好友包括:B,C,D,F,E,O這幾個,後面的行依次類推
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
現在的需求是:通過原始的數據文件,輸出該文件中所有用戶中哪些人兩兩之間存在共同好友並輸出,格式如下:
A-B C,E
A-C F,D
A-D E,F
……
實現思路分析
步驟一:將原始數據拆分為如下格式
通過這一步,得到一組K/V,可以清晰的反映出一個用戶的所有好友
B:A #B是A的好友
C:A #C是A的好友
D:A #D是A的好友
F:A
E:A
O:AA:B
C:B
E:B
K:BF:C
A:C
D:C
I:CB:E
C:E
D:E
M:E
L:E
步驟二、對第一步的數據進一步處理成如下格式
從第一步格式完畢後的數據,可以很明顯看出並總結出一個規律,那就是左邊那些用戶的好友列表,以C用戶為例,可以看出C這個用戶有A,B,E三個好友,反過來講,ABE這三個用戶,他們有一個共同的好友A
其他的類推進行理解
C A-B-E #C是A和B和E的共同好友
D A-C #D是A和B的共同好友
A B-C #A是B和C的共同好友
B A-E #A是E和B的共同好友
……
步驟三、將步驟二中的數據調換位置
從步驟2中我們得知,C的好友有ABE,反過來說,ABE他們的共同好友有C,針對這種超過3個的,可以考慮下一步進行兩兩組合即可
A-B-E C #A、B、E有共同好友C
A-C D #A與C有共同好友D
B-C A #B與C有共同好友A
A-E B #A與E有共同好友B
步驟四、將步驟三得到的數據繼續拆分
步驟三中,像 : A-B-E C 這種數據,顯然需要進一步拆分,因為最終的結果是求取兩兩好友之間的共同好友,所以可以拆為: A-B C,A-E C,B-E C,為下一步數據組合做最後的準備
A-B C
A-E C
B-E C
A-C D
B-C A
A-E B
……
步驟五、將步驟四得到的數據合並
在使用MapReduce編程中我們知道,Map階段出去的數據,進入reduce方法中的數據都是key相同的,以第四步中的: A-E 這個key為例,就有2個,這樣通過 reduce方法最終輸出的結果就是: A-E C,B ,即A-E 這兩個用戶的共同好友為 C和B
A-B C #A,B共同好友有C
A-E C,B #A,E有共同好友 C,B
B-E C #B,E有共同好友 C
A-C D #A,C有共同好友 D
B-C A #B,C有共同好友 A
……
通過以上的數據分析,最終可以達到預期的效果,同時也可以看出,上面的步驟劃分到MapRedcue中,顯然一個MapReduce肯定是無法完成的,至少需要2個
下面是結合上面的步驟分析,得出需要兩個MapReduce的數據流程圖,參考這個圖來協助我們分析編寫代碼邏輯做參考
編碼實現
1、第一個map類
public class FirstMapper extends Mapper<LongWritable,Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String val = value.toString(); String[] split = val.split(":"); //A:B,C,D,F,E,O 拆分後,左邊是原用戶,右邊是好友 String user = split[0]; String friends = split[1]; String[] friendLists = friends.split(","); //Map1 輸出的結果為 : /** * B A * C A * D A * F A * E A */ for(String str :friendLists ){ context.write(new Text(str),new Text(user)); } } }
2、第一個Reduce類
public class FirstReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer stringBuffer = new StringBuffer(); for (Text text : values){ stringBuffer.append(text).append("-"); } //最終寫出去的數據格式為: A-E B ...... context.write(new Text(stringBuffer.toString()),key); } }
3、第一個Job類
public class FirstJob { public static void main(String[] args) throws Exception { //1、獲取job Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //2、設置jar路徑 job.setJarByClass(FirstJob.class); //3、關聯mapper 和 Reducer job.setMapperClass(FirstMapper.class); job.setReducerClass(FirstReducer.class); //4、設置 map輸出的 key/val 的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //5、設置最終輸出的key / val 類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //6、設置最終的輸出路徑 String inputPath = "F:\\網盤\\csv\\friends.txt"; String outPath = "F:\\網盤\\csv\\friends1"; FileInputFormat.setInputPaths(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outPath)); // 7 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
運行上面的Job代碼,然後打開運行完畢後的第一個階段的文件,從內容格式上看,符合第一階段的輸出結果要求的, 即下面的這種數據格式
4、第二個map類
public class SecondMapper extends Mapper<LongWritable,Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // I-K-C-B-G-F-H-O-D- A 階段1的文件輸出格式 /** * 最終輸出格式: * I-K A * I-C A * I-B A * ...... */ //需要將左邊的數據進行兩兩拆分,與V進行組合輸出 String val = value.toString(); String[] split = val.split("\t"); String v2 = split[1]; String[] allUsers = split[0].split("-"); Arrays.sort(allUsers); for(int i=0;i<allUsers.length-1;i++){ for(int j=i+1;j<allUsers.length;j++){ context.write(new Text(allUsers[i] + "-" + allUsers[j]),new Text(v2)); } } } }
5、第二個Reducer類
public class SecondReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //上一步輸出的結果: /** * A-B C * A-B D * A-E C * A-E D * ...... */ //隻需要將相同的key的Val進行組合即可,即 : A-B C-D,A-E C-D StringBuffer stringBuffer = new StringBuffer(); for (Text text :values ){ stringBuffer.append(text.toString()).append("-"); } context.write(key,new Text(stringBuffer.toString())); } }
6、第二個Job類
public class SecondJob { public static void main(String[] args) throws Exception { //1、獲取job Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //2、設置jar路徑 job.setJarByClass(SecondJob.class); //3、關聯mapper 和 Reducer job.setMapperClass(SecondMapper.class); job.setReducerClass(SecondReducer.class); //4、設置 map輸出的 key/val 的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //5、設置最終輸出的key / val 類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //6、設置最終的輸出路徑 String inputPath = "F:\\網盤\\csv\\friends1\\part-r-00000"; String outPath = "F:\\網盤\\csv\\friends2"; FileInputFormat.setInputPaths(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outPath)); // 7 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
運行上面的Job代碼,查看最終的輸出結果,可以看到,也是符合我們預期的業務的
以上就是利用Hadoop實現求共同好友的示例詳解的詳細內容,更多關於Hadoop求共同好友的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- Java基礎之MapReduce框架總結與擴展知識點
- 雲計算實驗:Java MapReduce編程
- 教你怎麼使用hadoop來提取文件中的指定內容
- hadoop 全面解讀自定義分區
- Hadoop中的壓縮與解壓縮案例詳解