100G的文件如何读取续集 - 第307篇
文章目录
一、大文件读取之文件分割法
二、大文件读取之多线程读取
三、悟纤小结
一、大文件读取之文件分割法
我们来看下这种方法的核心思路就是:不是文件太大了嘛?那么是否可以把文件拆分成几个小的文件,然后使用多线程进行读取呐?具体的步骤:
(1)先分割成多个文件。
(2)多个线程操作多个文件,避免两个线程操作同一个文件
(3)按行读文件
1.1 文件分割
在Mac和Linux都有文件分割的命令,可以使用:
split -b 1024m test2.txt /data/tmp/my/test.txt.
说明:
(1)split:分割命令;
(2)-b 1024m:指定每多少字就要切成一个小文件。支持单位:m,k;这里是将6.5G的文件按照1G进行拆分成7个文件左右。
(3)test2.txt:要分割的文件;
(4)test.txt. : 切割后文件的前置文件名,split会自动在前置文件名后再加上编号;
其它参数:
(1)-l<行数> : 指定每多少行就要切成一个小文件。
(2) -C<字节>:与-b参数类似,但切割时尽量维持每行的完整性。
分割成功之后文件是这样子的:
1.2 多线程读取分割文件
我们使用多线程读取分割的文件,然后开启线程对每个文件进行处理:
- public void readFileBySplitFile(String pathname) {
- //pathname这里是路径,非具体的文件名,比如:/data/tmp/my
- File file = new File(pathname);
- File[] files = file.listFiles();
- List<MyThread> threads = new ArrayList<>();
- for(File f:files) {
- MyThread thread = new MyThread(f.getPath());
- threads.add(thread);
- thread.start();
- }
- for(MyThread t:threads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- private class MyThread extends Thread{
- private String pathname;
-
- public MyThread(String pathname) {
- this.pathname = pathname;
- }
-
- @Override
- public void run() {
- readFileFileChannel(pathname);
- }
-
- }
-
说明:
(1)获取到指定目录下的所有分割的文件信息;
(2)遍历文件路径,将路径使用线程进行处理,这里线程的run使用readFileChannel进行读取每个文件的信息。
(3)join方法:就是让所有线程等待,然后回到主线程,不懂的可以参之前的一篇文章:《悟纤和师傅去女儿国「线程并行变为串行,Thread你好牛」》
测试:6.5G 耗时:4秒
这个多线程的方式,那么理论上是文件越大,优势会越明显。对于线程开启的个数,这里使用的是文件的个数,在实际中,能这么使用嘛?答案肯定是不行的。相信大家应该知道怎么进行改良下,这里不展开讲解。
二、大文件读取之多线程读取同一个文件
2.1 多线程1.0版本
我们在看一下这种方式就是使用多线程读取同一个文件,这种方式的思路,就是讲文件进行划分,从不同的位置进行读取,那么满足这种要求的就是RandomAccessFile,因为此类中有一个方法seek,可以指定开始的位置。
- public void readFileByMutiThread(String pathname, int threadCount) {
- BufferedRandomAccessFile randomAccessFile = null;
- try {
- randomAccessFile = new BufferedRandomAccessFile(pathname, "r");
-
- // 获取文件的长度,进行分割
- long fileTotalLength = randomAccessFile.length();
- // 分割的每个大小.
- long gap = fileTotalLength / threadCount;
-
- // 记录每个的开始位置和结束位置.
- long[] beginIndexs = new long[threadCount];
- long[] endIndexs = new long[threadCount];
- // 记录下一次的位置.
- long nextStartIndex = 0;
-
- // 找到每一段的开始和结束的位置.
- for (int n = 0; n < threadCount; n++) {
- beginIndexs[n] = nextStartIndex;
- // 如果是最后一个的话,剩下的部分,就全部给最后一个线程进行处理了.
- if (n + 1 == threadCount) {
- endIndexs[n] = fileTotalLength;
- break;
- }
- /*
- * 不是最后一个的话,需要获取endIndexs的位置.
- */
- // (1)上一个nextStartIndex的位置+gap就是下一个位置.
- nextStartIndex += gap;
-
- // (2)nextStartIndex可能不是刚好这一行的结尾部分,需要处理下.
- // 先将文件移动到这个nextStartIndex的位置,然后往后进行寻找位置.
- randomAccessFile.seek(nextStartIndex);
-
- // 主要是计算回车换行的位置.
- long gapToEof = 0;
- boolean eol = false;
- while (!eol) {
- switch (randomAccessFile.read()) {
- case -1:
- eol = true;
- break;
- case '\n':
- eol = true;
- break;
- case '\r':
- eol = true;
- break;
- default:
- gapToEof++;
- break;
- }
- }
-
- // while循环,那个位置刚好是对应的那一行的最后一个字符的结束,++就是换行符号的位置.
- gapToEof++;
-
- nextStartIndex += gapToEof;
- endIndexs[n] = nextStartIndex;
- }
-
- // 开启线程
- List<MyThread2> threads = new ArrayList<>();
- for (int i = 0; i < threadCount; i++) {
- MyThread2 thread = new MyThread2(pathname, beginIndexs[i], endIndexs[i]);
- threads.add(thread);
- thread.start();
- }
-
- // 等待汇总数据
- for (MyThread2 t : threads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
说明:此方法的作用就是对我们的文件根据线程的个数进行位置的分割,每个位置负责一部分的数据处理。
我们看下具体线程的处理:
- private class MyThread2 extends Thread{
- private long begin;
- private long end;
- private String pathname;
- public MyThread2(String pathname,long begin,long end) {
- this.pathname = pathname;
- this.begin = begin;
- this.end = end;
- }
-
- @Override
- public void run() {
- //System.out.println("TestReadFile.MyThread2.run()-"+begin+"--"+end);
- RandomAccessFile randomAccessFile = null;
- try {
- randomAccessFile = new RandomAccessFile(pathname, "r");
- //指定其实读取的位置.
- randomAccessFile.seek(begin);
-
- StringBuffer buffer = new StringBuffer();
- String str;
- while ((str = randomAccessFile.readLine()) != null) {
- //System.out.println(str+"--"+Thread.currentThread().getName());
- //处理字符串,并不会将字符串保存真正保存到内存中
- // 这里简单模拟下处理操作.
- buffer.append(str.substring(0,1));
-
- //+1 就是要加上回车换行符号
- begin += (str.length()+1);
- if(begin>=end) {
- break;
- }
- }
- System.out.println("buffer.length:"+buffer.length()+"--"+Thread.currentThread().getName());
- } catch (IOException e) {
- e.printStackTrace();
- }finally {
- //TODO close处理.
- }
-
- }
-
- }
说明:此线程的主要工作就是根据文件的位置点beginPosition和endPosition读取此区域的数据。
运行看下效果,6.5G的,居然要运行很久,不知道什么时候要结束,实在等待不了,就结束运行了。
为啥会这么慢呐?不是感觉这种处理方式很棒的嘛?为什么要伤害我弱小的心灵。
我们分析下:之前的方法readFileByRandomAccessFile,我们在测试的时候,结果也是很慢,所以可以得到并不是因为我们使用的线程的原因导致了很慢了,那么这个是什么原因导致的呐?
我们找到RandomAccessFile 的readLin()方法:
- public final String readLine() throws IOException {
- StringBuffer input = new StringBuffer();
- int c = -1;
- boolean eol = false;
-
- while (!eol) {
- switch (c = read()) {
- case -1:
- case '\n':
- eol = true;
- break;
- case '\r':
- eol = true;
- long cur = getFilePointer();
- if ((read()) != '\n') {
- seek(cur);
- }
- break;
- default:
- input.append((char)c);
- break;
- }
- }
-
- if ((c == -1) && (input.length() == 0)) {
- return null;
- }
- return input.toString();
- }
此方法的原理就是:使用while循环,不停的读取字符,如果遇到\n或者\r的话,那么readLine就结束,并且返回此行的数据,那么核心的方法就是read():
- public int read() throws IOException {
- return read0();
- }
-
- private native int read0() throws IOException;
直接调用的是本地方法了。那么这个方法是做了什么呢?我们可以通过注释分析下:
- * Reads a byte of data from this file. The byte is returned as an
- * integer in the range 0 to 255 ({@code 0x00-0x0ff}). This
- * method blocks if no input is yet available.
通过这里我们可以知道:read()方法会从该文件读取一个字节的数据。 字节返回为介于0到255之间的整数({@code 0x00-0x0ff})。 这个如果尚无输入可用,该方法将阻塞。
到这里,不知道你是否知道这个为啥会这么慢了。一个字节一个字节每次读取,那么肯定是比较慢的嘛。
2.2 多线程2.0版本
那么怎么办呢?有一个类BufferedRandomAccessFile,当然这个类并不属于jdk中的类,需要自己去找下源代码:
- package com.kfit.bloomfilter;
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- import java.io.File;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.io.RandomAccessFile;
- import java.util.Arrays;
-
- /**
- * A <code>BufferedRandomAccessFile</code> is like a
- * <code>RandomAccessFile</code>, but it uses a private buffer so that most
- * operations do not require a disk access.
- * <P>
- *
- * Note: The operations on this class are unmonitored. Also, the correct
- * functioning of the <code>RandomAccessFile</code> methods that are not
- * overridden here relies on the implementation of those methods in the
- * superclass.
- */
-
- public final class BufferedRandomAccessFile extends RandomAccessFile
- {
- static final int LogBuffSz_ = 16; // 64K buffer
- public static final int BuffSz_ = (1 << LogBuffSz_);
- static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
-
- private String path_;
-
- /*
- * This implementation is based on the buffer implementation in Modula-3's
- * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
- */
- private boolean dirty_; // true iff unflushed bytes exist
- private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately
- private long curr_; // current position in file
- private long lo_, hi_; // bounds on characters in "buff"
- private byte[] buff_; // local buffer
- private long maxHi_; // this.lo + this.buff.length
- private boolean hitEOF_; // buffer contains last file block?
- private long diskPos_; // disk position
-
- /*
- * To describe the above fields, we introduce the following abstractions for
- * the file "f":
- *
- * len(f) the length of the file curr(f) the current position in the file
- * c(f) the abstract contents of the file disk(f) the contents of f's
- * backing disk file closed(f) true iff the file is closed
- *
- * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
- * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
- * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
- * operation has the effect of making "disk(f)" identical to "c(f)".
- *
- * A file is said to be *valid* if the following conditions hold:
- *
- * V1. The "closed" and "curr" fields are correct:
- *
- * f.closed == closed(f) f.curr == curr(f)
- *
- * V2. The current position is either contained in the buffer, or just past
- * the buffer:
- *
- * f.lo <= f.curr <= f.hi
- *
- * V3. Any (possibly) unflushed characters are stored in "f.buff":
- *
- * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
- *
- * V4. For all characters not covered by V3, c(f) and disk(f) agree:
- *
- * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
- * disk(f)[i])
- *
- * V5. "f.dirty" is true iff the buffer contains bytes that should be
- * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
- *
- * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
- *
- * V6. this.maxHi == this.lo + this.buff.length
- *
- * Note that "f.buff" can be "null" in a valid file, since the range of
- * characters in V3 is empty when "f.lo == f.curr".
- *
- * A file is said to be *ready* if the buffer contains the current position,
- * i.e., when:
- *
- * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
- *
- * When a file is ready, reading or writing a single byte can be performed
- * by reading or writing the in-memory buffer without performing a disk
- * operation.
- */
-
- /**
- * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
- * in mode <code>mode</code>, which should be "r" for reading only, or
- * "rw" for reading and writing.
- */
- public BufferedRandomAccessFile(File file, String mode) throws IOException
- {
- this(file, mode, 0);
- }
-
- public BufferedRandomAccessFile(File file, String mode, int size) throws IOException
- {
- super(file, mode);
- path_ = file.getAbsolutePath();
- this.init(size);
- }
-
- /**
- * Open a new <code>BufferedRandomAccessFile</code> on the file named
- * <code>name</code> in mode <code>mode</code>, which should be "r" for
- * reading only, or "rw" for reading and writing.
- */
- public BufferedRandomAccessFile(String name, String mode) throws IOException
- {
- this(name, mode, 0);
- }
-
- public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
- {
- super(name, mode);
- path_ = name;
- this.init(size);
- }
-
- private void init(int size)
- {
- this.dirty_ = false;
- this.lo_ = this.curr_ = this.hi_ = 0;
- this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
- this.maxHi_ = (long) BuffSz_;
- this.hitEOF_ = false;
- this.diskPos_ = 0L;
- }
-
- public String getPath()
- {
- return path_;
- }
-
- public void sync() throws IOException
- {
- if (syncNeeded_)
- {
- flush();
- getChannel().force(true);
- syncNeeded_ = false;
- }
- }
-
- // public boolean isEOF() throws IOException
- // {
- // assert getFilePointer() <= length();
- // return getFilePointer() == length();
- // }
-
- public void close() throws IOException
- {
- this.flush();
- this.buff_ = null;
- super.close();
- }
-
- /**
- * Flush any bytes in the file's buffer that have not yet been written to
- * disk. If the file was created read-only, this method is a no-op.
- */
- public void flush() throws IOException
- {
- this.flushBuffer();
- }
-
- /* Flush any dirty bytes in the buffer to disk. */
- private void flushBuffer() throws IOException
- {
- if (this.dirty_)
- {
- if (this.diskPos_ != this.lo_)
- super.seek(this.lo_);
- int len = (int) (this.curr_ - this.lo_);
- super.write(this.buff_, 0, len);
- this.diskPos_ = this.curr_;
- this.dirty_ = false;
- }
- }
-
- /*
- * Read at most "this.buff.length" bytes into "this.buff", returning the
- * number of bytes read. If the return result is less than
- * "this.buff.length", then EOF was read.
- */
- private int fillBuffer() throws IOException
- {
- int cnt = 0;
- int rem = this.buff_.length;
- while (rem > 0)
- {
- int n = super.read(this.buff_, cnt, rem);
- if (n < 0)
- break;
- cnt += n;
- rem -= n;
- }
- if ( (cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length)) )
- {
- // make sure buffer that wasn't read is initialized with -1
- Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
- }
- this.diskPos_ += cnt;
- return cnt;
- }
-
- /*
- * This method positions <code>this.curr</code> at position <code>pos</code>.
- * If <code>pos</code> does not fall in the current buffer, it flushes the
- * current buffer and loads the correct one.<p>
- *
- * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
- * is at or past the end-of-file, which can only happen if the file was
- * opened in read-only mode.
- */
- public void seek(long pos) throws IOException
- {
- if (pos >= this.hi_ || pos < this.lo_)
- {
- // seeking outside of current buffer -- flush and read
- this.flushBuffer();
- this.lo_ = pos & BuffMask_; // start at BuffSz boundary
- this.maxHi_ = this.lo_ + (long) this.buff_.length;
- if (this.diskPos_ != this.lo_)
- {
- super.seek(this.lo_);
- this.diskPos_ = this.lo_;
- }
- int n = this.fillBuffer();
- this.hi_ = this.lo_ + (long) n;
- }
- else
- {
- // seeking inside current buffer -- no read required
- if (pos < this.curr_)
- {
- // if seeking backwards, we must flush to maintain V4
- this.flushBuffer();
- }
- }
- this.curr_ = pos;
- }
-
- public long getFilePointer()
- {
- return this.curr_;
- }
-
- public long length() throws IOException
- {
- // max accounts for the case where we have written past the old file length, but not yet flushed our buffer
- return Math.max(this.curr_, super.length());
- }
-
- public int read() throws IOException
- {
- if (this.curr_ >= this.hi_)
- {
- // test for EOF
- // if (this.hi < this.maxHi) return -1;
- if (this.hitEOF_)
- return -1;
-
- // slow path -- read another buffer
- this.seek(this.curr_);
- if (this.curr_ == this.hi_)
- return -1;
- }
- byte res = this.buff_[(int) (this.curr_ - this.lo_)];
- this.curr_++;
- return ((int) res) & 0xFF; // convert byte -> int
- }
-
- public int read(byte[] b) throws IOException
- {
- return this.read(b, 0, b.length);
- }
-
- public int read(byte[] b, int off, int len) throws IOException
- {
- if (this.curr_ >= this.hi_)
- {
- // test for EOF
- // if (this.hi < this.maxHi) return -1;
- if (this.hitEOF_)
- return -1;
-
- // slow path -- read another buffer
- this.seek(this.curr_);
- if (this.curr_ == this.hi_)
- return -1;
- }
- len = Math.min(len, (int) (this.hi_ - this.curr_));
- int buffOff = (int) (this.curr_ - this.lo_);
- System.arraycopy(this.buff_, buffOff, b, off, len);
- this.curr_ += len;
- return len;
- }
-
- public void write(int b) throws IOException
- {
- if (this.curr_ >= this.hi_)
- {
- if (this.hitEOF_ && this.hi_ < this.maxHi_)
- {
- // at EOF -- bump "hi"
- this.hi_++;
- }
- else
- {
- // slow path -- write current buffer; read next one
- this.seek(this.curr_);
- if (this.curr_ == this.hi_)
- {
- // appending to EOF -- bump "hi"
- this.hi_++;
- }
- }
- }
- this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
- this.curr_++;
- this.dirty_ = true;
- syncNeeded_ = true;
- }
-
- public void write(byte[] b) throws IOException
- {
- this.write(b, 0, b.length);
- }
-
- public void write(byte[] b, int off, int len) throws IOException
- {
- while (len > 0)
- {
- int n = this.writeAtMost(b, off, len);
- off += n;
- len -= n;
- this.dirty_ = true;
- syncNeeded_ = true;
- }
- }
-
- /*
- * Write at most "len" bytes to "b" starting at position "off", and return
- * the number of bytes written.
- */
- private int writeAtMost(byte[] b, int off, int len) throws IOException
- {
- if (this.curr_ >= this.hi_)
- {
- if (this.hitEOF_ && this.hi_ < this.maxHi_)
- {
- // at EOF -- bump "hi"
- this.hi_ = this.maxHi_;
- }
- else
- {
- // slow path -- write current buffer; read next one
- this.seek(this.curr_);
- if (this.curr_ == this.hi_)
- {
- // appending to EOF -- bump "hi"
- this.hi_ = this.maxHi_;
- }
- }
- }
- len = Math.min(len, (int) (this.hi_ - this.curr_));
- int buffOff = (int) (this.curr_ - this.lo_);
- System.arraycopy(b, off, this.buff_, buffOff, len);
- this.curr_ += len;
- return len;
- }
- }
然后将我们在上面使用到的类RandomAccessFile 替换成BufferedRandomAccessFile 即可。
来测试下吧:
如果是前面的方法:
TestReadFile.readFileByBufferedRandomAccessFile(pathname2);
6.5G 耗时:32秒
相比之前一直不能读取的情况下,已经是好很多了,但是相对于nio的话,还是慢了。
测试下多线程版本的吧:
6.5G 耗时:2个线程20秒,3个线程16秒,4个线程14秒,5个线程11秒,6个线程8秒,7个线程8秒,8个线程9秒
我这个Mac电脑是6核处理器,所以在6核的时候,达到了性能的最高点,在开启的更多的时候,线程的上下文切换会浪费这个时间,所以时间就越越来越高。但和上面的版本好像还是不能媲美。
2.3 多线程3.0版本
RandomAccessFile的绝大多数功能,在JDK 1.4以后被nio的”内存映射文件(memory-mapped files)”给取代了MappedByteBuffer,大家可以自行去尝试下,本文就不展开讲解了。
三、悟纤小结
师傅:本文有点难,也有点辣眼睛和骚脑,今天就为师给你总结下。
徒儿:师傅,我太难了,我都要听睡着了。
师傅:文件操作本身就会比较复杂,在一个项目中,也不是所有人都会去写IO流的代码。
来个小结,主要讲了两个知识点。
(1)第一:使用文件分隔的方式读取大文件,配套NIO的技术,速度会有提升。核心的思路就是:使用Mac/Linx下的split命令,将大文件分割成几个小的文件,然后使用多线程分别读取每个小文件。13.56G :分割为6个文件,耗时8秒;26G,耗时16秒。按照这样的情况,那么读取100G的时间,也就是1分钟左右的事情了,当然实际耗时,还是和你具体的获取数据的处理方法有很大的关系,比如你使用系统的System.out的话,那么这个时间就很长了。
(2)第二:使用多线程读取大文件。核心的思路就是:根据文件的长度将文件分割成n段,然后开启多线程利用类RandomAccessFile的位置定位seek方法,直接从此位置开启读取。13.56G :6个线程耗时23秒。
另外实际上NIO的FileChannel单线程下的读取速度也是挺快的:13.56G :耗时15秒,之前就提到过了Java天然支持大文件的处理,这就是Java ,不仅Write once ,而且Write happy。
最后要注意下,ByteBuffer读取到的是很多行的数据,不是一行一行的数据。
- 我就是我,是颜色不一样的烟火。
- 我就是我,是与众不同的小苹果。
购买完整视频,请前往:http://www.mark-to-win.com/TeacherV2.html?id=287