`
rose-zh
  • 浏览: 2468 次
  • 性别: Icon_minigender_1
  • 来自: 北京
最近访客 更多访客>>
社区版块
存档分类
最新评论

Hadoop Core 学习笔记(一) SequenceFile文件写入和读取Writable数据

阅读更多
[color=blue]

刚接触Hadoop时,对SequenceFile和Writable还产生了一点联想,以为是什么神奇的东西.后来也明白,不过就是自己IO的一些协议,用于自己的输入输出.这里介绍下如何从sequence file中读出和写入Writable数据.

Writable类似传输的数据,相对于Java来说等同于对象,只是引用到Hadoop中需要一套协议去进行传输转换这个对象.于是有了里面的 public void write(DataOutput out) throws IOException 和public void readFields(DataInput in) throws IOException方法,一个怎么写入,一个怎么读取.如此这些对象才可以在整个Hadoop集群无障碍的通行.至于Hadoop为什么要另起炉灶自己搞一套序列化的东西,之前也看过一些介绍,但还没有心得,日后再慢慢领会.

所以这个例子就是自己构造一个Writable对象,然后写入到sequence file以及读出.最后将读出的数据进行对比,是否正确.具体看代码吧:
1.package com.guoyun.hadoop.io.study;  
2. 
3.import java.io.DataInput;  
4.import java.io.DataOutput;  
5.import java.io.IOException;  
6.import java.util.ArrayList;  
7.import java.util.Collection;  
8.import java.util.HashSet;  
9.import java.util.List;  
10.import java.util.Set;  
11. 
12.import org.apache.hadoop.conf.Configuration;  
13.import org.apache.hadoop.fs.FileSystem;  
14.import org.apache.hadoop.fs.Path;  
15.import org.apache.hadoop.io.IOUtils;  
16.import org.apache.hadoop.io.LongWritable;  
17.import org.apache.hadoop.io.SequenceFile;  
18.import org.apache.hadoop.io.Writable;  
19.import org.apache.hadoop.util.ReflectionUtils;  
20. 
21.public class SequenceFileStudy {  
22.    
23.  public static class UserWritable implements Writable,Comparable{  
24.    private long userId;  
25.    private String userName;  
26.    private int userAge;  
27.      
28.      
29.    public long getUserId() {  
30.      return userId;  
31.    }  
32. 
33.    public void setUserId(long userId) {  
34.      this.userId = userId;  
35.    }  
36. 
37.    public String getUserName() {  
38.      return userName;  
39.    }  
40. 
41.    public void setUserName(String userName) {  
42.      this.userName = userName;  
43.    }  
44. 
45.    public int getUserAge() {  
46.      return userAge;  
47.    }  
48. 
49.    public void setUserAge(int userAge) {  
50.      this.userAge = userAge;  
51.    }  
52. 
53.    public UserWritable(long userId, String userName, int userAge) {  
54.      super();  
55.      this.userId = userId;  
56.      this.userName = userName;  
57.      this.userAge = userAge;  
58.    }  
59. 
60.    public UserWritable() {  
61.      super();  
62.    }  
63. 
64.    @Override 
65.    public void write(DataOutput out) throws IOException {  
66.      out.writeLong(this.userId);  
67.      out.writeUTF(this.userName);  
68.      out.writeInt(this.userAge);  
69.    }  
70. 
71.    @Override 
72.    public void readFields(DataInput in) throws IOException {  
73.      this.userId=in.readLong();  
74.      this.userName=in.readUTF();  
75.      this.userAge=in.readInt();  
76.    }  
77. 
78.    @Override 
79.    public String toString() {  
80.     return this.userId+"\t"+this.userName+"\t"+this.userAge;  
81.    }  
82. 
83.    /** 
84.     * 只对比userId 
85.     */ 
86.    @Override 
87.    public boolean equals(Object obj) {  
88.      if(obj instanceof UserWritable){  
89.        UserWritable u1=(UserWritable)obj;  
90.        return u1.getUserId()==this.getUserId();  
91.      }  
92.      return false;  
93.    }  
94.      
95.    /** 
96.     * 只对比userId 
97.     */ 
98.    @Override 
99.    public int compareTo(Object obj) {  
100.      int result=-1;  
101.      if(obj instanceof UserWritable){  
102.       UserWritable u1=(UserWritable)obj;  
103.       if(this.userId>u1.userId){  
104.         result=1;  
105.       }else if(this.userId==u1.userId){  
106.         result=1;  
107.       }  
108.      }  
109.      return result;   
110.    }  
111.      
112.    @Override 
113.    public int hashCode() {  
114.      return (int)this.userId&Integer.MAX_VALUE;  
115.    }  
116.      
117.  }  
118.    
119.  /** 
120.   * 写入到sequence file 
121.   *  
122.   * @param filePath 
123.   * @param conf 
124.   * @param datas 
125.   */ 
126.  public static void write2SequenceFile(String filePath,Configuration conf,Collection<UserWritable> datas){  
127.    FileSystem fs=null;  
128.    SequenceFile.Writer writer=null;  
129.    Path path=null;  
130.    LongWritable idKey=new LongWritable(0);  
131.      
132.    try {  
133.      fs=FileSystem.get(conf);  
134.      path=new Path(filePath);  
135.      writer=SequenceFile.createWriter(fs, conf, path, LongWritable.class, UserWritable.class);  
136.        
137.      for(UserWritable user:datas){  
138.        idKey.set(user.getUserId());  // userID为Key  
139.        writer.append(idKey, user);  
140.      }  
141.    } catch (IOException e) {  
142.      // TODO Auto-generated catch block  
143.      e.printStackTrace();  
144.    }finally{  
145.      IOUtils.closeStream(writer);  
146.    }  
147.  }  
148.    
149.  /** 
150.   * 从sequence file文件中读取数据 
151.   *  
152.   * @param sequeceFilePath 
153.   * @param conf 
154.   * @return 
155.   */ 
156.  public static List<UserWritable> readSequenceFile(String sequeceFilePath,Configuration conf){  
157.    List<UserWritable> result=null;  
158.    FileSystem fs=null;  
159.    SequenceFile.Reader reader=null;  
160.    Path path=null;  
161.    Writable key=null;  
162.    UserWritable value=new UserWritable();  
163.      
164.    try {  
165.      fs=FileSystem.get(conf);  
166.      result=new ArrayList<UserWritable>();  
167.      path=new Path(sequeceFilePath);  
168.      reader=new SequenceFile.Reader(fs, path, conf);   
169.      key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf); // 获得Key,也就是之前写入的userId  
170.      while(reader.next(key, value)){  
171.        result.add(value);  
172.        value=new UserWritable();  
173.      }  
174.        
175.    } catch (IOException e) {  
176.      // TODO Auto-generated catch block  
177.      e.printStackTrace();  
178.    }catch (Exception e){  
179.      e.printStackTrace();  
180.    }finally{  
181.        IOUtils.closeStream(reader);  
182.    }  
183.    return result;  
184.  }  
185.    
186.  private  static Configuration getDefaultConf(){  
187.    Configuration conf=new Configuration();  
188.    conf.set("mapred.job.tracker", "local");  
189.    conf.set("fs.default.name", "file:///");  
190.    //conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzoCodec");  
191.    return conf;  
192.  }  
193.    
194.  /** 
195.   * @param args 
196.   */ 
197.  public static void main(String[] args) {  
198.    String filePath="data/user.sequence"; // 文件路径  
199.    Set<UserWritable> users=new HashSet<UserWritable>();  
200.    UserWritable user=null;  
201.    // 生成数据  
202.    for(int i=1;i<=10;i++){  
203.      user=new UserWritable(i+(int)(Math.random()*100000),"name-"+(i+1),(int)(Math.random()*50)+10);  
204.      users.add(user);  
205.    }  
206.    // 写入到sequence file  
207.    write2SequenceFile(filePath,getDefaultConf(),users);  
208.    //从sequence file中读取  
209.    List<UserWritable> readDatas=readSequenceFile(filePath,getDefaultConf());  
210.      
211.    // 对比数据是否正确并输出  
212.    for(UserWritable u:readDatas){  
213.      if(users.contains(u)){  
214.        System.out.println(u.toString());  
215.      }else{  
216.        System.err.println("Error data:"+u.toString());  
217.      }  
218.    }  
219.      
220.  }  
221. 
222.} 
[/color]
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics