这篇文章主要介绍Mapredu开发云主机域名ce RCFile如何写入和读取API,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!RCFile是FaceBook开发的高压缩比、高效读的行列存储结构。通常在Hive中可以直接对一张Text表使用insert-select转换,但有时希望使用Mapreduce进行RCFile的读写。
hadoop-client
hive-serde
hive-hcatalog-core
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceInputFormat;import java.io.IOException;public class RcFileReaderJob {
static class RcFileMapper extends Mapper @Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
super.cleanup(context);
} @Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context); }
} static class RcFileReduce extends Reducer
@Override
protected void reduce(Text key, Iterable
Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(conf);
job.setJarByClass(RcFileReaderJob.class);
job.setJobName(“RcFileReaderJob”);
job.setNumReduceTasks(1);
job.setMapperClass(RcFileMapper.class);
job.setReducerClass(RcFileReduce.class);
job.setInputFormatClass(RCFileMapReduceInputFormat.class);
// MultipleInputs.addInputPath(job, input, RCFileInputFormat.class);
RCFileMapReduceInputFormat.addInputPath(job, input);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, output);
return job.waitForCompletion(true);
} public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println(“Usage: rcfile
System.exit(2);
}
RcFileReaderJob.runLoadMapReducue(conf, new Path(args[0]), new Path(args[1]));
}
}
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceOutputFormat;import java.io.IOException;public class RcFileWriterJob extends Configured implements Tool{
public static class Map extends Mapper
@Override
protected void setup(Context context) throws IOException, InterruptedException {
numCols = context.getConfiguration().getInt(“hive.io.rcfile.column.number.conf”, 0);
bytes = new BytesRefArrayWritable(numCols);
}
public void map(Object key, Text line, Context context
) throws IOException, InterruptedException {
bytes.clear();
String[] cols = line.toString().split(“t”, -1);
System.out.println(“SIZE : “+cols.length);
for (int i=0; i
BytesRefWritable cu = new BytesRefWritable(fieldData, 0, fieldData.length);
bytes.set(i, cu);
}
context.write(NullWritable.get(), bytes);
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length System.out.println(“Usage: ” +
“hadoop jar RCFileLoader.jar
“-tableName
“-output
int numCols = 0;
String inputPath = “”;
String outputPath = “”;
int rowGroupSize = 16 *1024*1024;
int ioBufferSize = 128*1024;
for (int i=0; i
tableName = otherArgs[i+1];
}else if (“-numCols”.equals(otherArgs[i])){
numCols = Integer.parseInt(otherArgs[i+1]);
}else if (“-input”.equals(otherArgs[i])){
inputPath = otherArgs[i+1];
}else if(“-output”.equals(otherArgs[i])){
outputPath = otherArgs[i+1];
}else if(“-rowGroupSize”.equals(otherArgs[i])){
rowGroupSize = Integer.parseInt(otherArgs[i+1]);
}else if(“-ioBufferSize”.equals(otherArgs[i])){
ioBufferSize = Integer.parseInt(otherArgs[i+1]);
}
}
conf.setInt(“hive.io.rcfile.record.buffer.size”, rowGroupSize);
conf.setInt(“io.file.buffer.size”, ioBufferSize); Job job = Job.getInstance(conf);
job.setJobName(“RcFileWriterJob”);
job.setJarByClass(RcFileWriterJob.class);
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BytesRefArrayWritable.class);
// job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(inputPath));
job.setOutputFormatClass(RCFileMapReduceOutputFormat.class);
RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), numCols);
RCFileMapReduceOutputFormat.setOutputPath(job, new Path(outputPath));
RCFileMapReduceOutputFormat.setCompressOutput(job, false); System.out.println(“Loading table ” + tableName + ” from ” + inputPath + ” to RCFile located at ” + outputPath);
System.out.println(“number of columns:” + job.getConfiguration().get(“hive.io.rcfile.column.number.conf”));
System.out.println(“RCFile row group size:” + job.getConfiguration().get(“hive.io.rcfile.record.buffer.size”));
System.out.println(“io bufer size:” + job.getConfiguration().get(“io.file.buffer.size”));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new RcFileWriterJob(), args);
System.exit(res);
}}以上是“Mapreduce RCFile如何写入和读取API”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注开发云行业资讯频道!
虚拟主机能升级吗?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。可以,虚拟主机可以在线升级,一般都很方便。如果运行在虚拟主机上的网站出现以下情况,可以考虑升级虚拟主机了。网站速度缓开发云主机域名慢有…
免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。