文件归并排序简单实现

如果要排序的数据太多了,就不能一次性加载到内存中进行排序,只能分而治之,然后再合并。 这里参考归并排序的过程简单地实现了一下,封装的还很不好。 另外在对多个文件合并时偷了懒,感觉需要用平衡二叉树,但是实现起来比较麻烦,就先用C++标准库中的multiset的排序功能了。 stl的set、map、multiset都是有序,并且是红黑树数据结构,查找、删除等性能都是log(n).

mergesort.h

#ifndef __MERGESORT_H
#define __MERGESORT_H

namespace MyNamespace
{
	typedef int (*Comparer)(const void *a, const void *b);

	template <typename T> 
	int defaultCompare(T *a, T *b)
	{
		if (*a < *b) return -1;
		if (*b < *a) return 1;
		return 0;
	}

	template <typename T> 
	void arrayCopy(T *a, const T *b, int n)
	{
		while (--n >= 0) a[n] = b[n];
	}

	inline int minIndex(int a, int b)
	{
		return a < b ? a : b;
	}

	template <typename T>
	int mergeSort(T a[], int n, Comparer comparer)
	{
		T *b = NULL;
		int i, j, t, start1, end1, start2, end2, len;
		int cmpResult;
		
		if (n < 2)
			return 0;

		b = new T[n];
		if (b == NULL) return -1;


		//len为每次分块的大小,即按照1,2,4,6,8,...,n增长
		len = 1;
		while (len < n)
		{
			t = 0;
			i = 0;
			j = i + len;
			while (j != n)
			{
				//start1,end1,start2,end2用于表示进行比较的两个数据块下标,
				//两个分块的下标范围为[start1,end1),[start2,end2)。
				start1 = minIndex(n, i);
				end1   = minIndex(n, i + len);
				start2 = minIndex(n, i + len);
				end2   = minIndex(n, i + len + len);

				//i,j分别表示当前进行比较的两个元素的下标
				i = start1;
				j = start2;
				while (i < end1 && j < end2)
				{
					if (comparer != NULL)
						cmpResult = comparer(a+i, a+j);
					else
						cmpResult = defaultCompare(a+i, a+j);

					b[t++] = (cmpResult <= 0) ?  a[i++] : a[j++];
				}

				if (i < end1)
				{
					while (i < end1)
						b[t++] = a[i++];
				}
				else
				{
					while (j < end2)
						b[t++] = a[j++];
				}
				//assert(i == end1 && j == end2 && t == end2);
				i = t;
			}
			len = minIndex(n, len * 2);
			arrayCopy(a, b, n);
		}

		delete []b;
		b = NULL;
		return 0;
	}

	template <typename T>
	int mergeSort(T a[], int n)
	{
		return mergeSort(a, n, NULL);
	}

};

#endif //__MERGESORT_H

filemergesort.h

#ifndef __FILEMERGESORT_H
#define __FILEMERGESORT_H

#include "mergesort.h"
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <set>
#include <map>
using namespace std;


#define SORT_AND_WRITE_TO_FILE	\
	ret = MyNamespace::mergeSort(&v[0], v.size(), (MyNamespace::Comparer)strCompare);\
				if (ret != 0)\
				{\
					/* 内存中归并排序失败,可能是内存不足 */\
					return -3;\
				}\
				ret = ouputSortedPartFile(fileName, ++partCount, v, separator);\
				\
				if (ret != 0)\
				{\
					/* 写排序后的部分数据到文件失败 */\
					return -4;\
				}\
				v.clear();


typedef struct
{
	string elment;
	int fileIndex;
} FileElement;


int defaultStrComparer(const string *str1, const string *str2)
{
	//if (*str1 == *str2) return 0;
	//return *str1 < *str2 ? -1 : 1;


	//对逗号分割的第一个元素进行排序
	string sub1, sub2;
	sub1 = str1->substr(0, str1->find(","));
	sub2 = str2->substr(0, str2->find(","));

	if (sub1 == sub2) return 0;
	return sub1 < sub2 ? -1 : 1;
}

class ElementCompareClass
{
public:
	bool operator()(const FileElement &e1, const FileElement &e2)
	{
		return defaultStrComparer(&e1.elment, &e2.elment) < 0;
	}
};

class FileMergeSort
{
public:
	
	typedef int (*StrComparer)(const string *a, const string *b);

	FileMergeSort()
		: separator(DEFAULT_SEPARATOR), maxMemorySize(DEFAULT_MAX_MEMORY_SIZE), strCompare(defaultStrComparer)
	{
	}

	FileMergeSort(const string separatorIn, int maxMemorySizeIn = 1024*1024*10)
		: separator(separatorIn), maxMemorySize(maxMemorySizeIn), strCompare(defaultStrComparer)
	{
		adjustMaxMemorySize();
	}


	void setMaxMemorySize(int bytes)
	{
		maxMemorySize = bytes;
		adjustMaxMemorySize();
	}
	int getMaxMemorySize(int bytes)
	{
		return maxMemorySize;
	}

	void setSeparator(const string s)
	{
		separator = s;
	}
	string getSeparator()
	{
		return separator;
	}

	int readFile(ifstream &inFile, string &sBuffer, int maxSize)
	{
		char ch;
		int size = 0;
		while (size < maxSize)
		{
			ch = inFile.get();
			if (inFile.good())
			{
				size++;
				sBuffer += ch;
			}
			else
			{
				break;
			}
		}
		return size;
	}


	int sort(const char *fileName)
	{
		int ret;
		int partCount = 0;
		int partFileSize = 0;
		int readSize, maxReadSize = 1024;
		string sBuffer, element;
		vector<string> v;

		ifstream inFile(fileName);
		if (!inFile.is_open())
		{
			//打开读文件失败
			return -1;
		}

		string::size_type index;
		while(inFile.good())
		{
			//从文件中最多读取maxReadSize个字符到sBuffer,返回实际读取个数
			readSize = readFile(inFile, sBuffer, maxReadSize);
			if (readSize <= 0)
				break;

			//根据分隔符把读取的字符串拆分成多个元素
			while (true)
			{
				index = sBuffer.find(separator);
				if (index != string::npos)
				{
					element = sBuffer.substr(0, index);
					v.push_back(element);
					sBuffer = sBuffer.substr(index + separator.length());
					partFileSize += element.size();
				}
				else
				{
					if (sBuffer.length() > maxMemorySize/2U)
					{
						//元素太长了(由separator分隔的,或许因为分隔符写错了)
						inFile.close();
						return -2;
					}
					break;
				}
			}

			/*
				如果分块容器v中的大小已经超过设置的大小了,就对这小块进行排序,并写入分块文件
				注意:这里的内存控制不那么准,可以能还差的远,跟vector的内存分分配有关;
				另外MergeSort还需要分配与v等量的内存;实际测试大概需要内存是maxMemorySize的四倍呢
			*/

			if (partFileSize >= maxMemorySize)
			{
				SORT_AND_WRITE_TO_FILE;
				partFileSize = 0;
			}
		}
		inFile.close();

		if (sBuffer.length() > 0)
		{
			v.push_back(sBuffer);
		}

		if (!v.empty())
		{
			SORT_AND_WRITE_TO_FILE;
		}

		//--------- 至此分割文件以及对分块排序完成,并且写入分块文件(filename.N.sortedpart)------------
		//--------- 下面对各个有序的分块文件进行合并 --------------------------------------------------

		
		ret = fileMerge(fileName, partCount);
		if (ret != 0)
		{
			//对分块文件进行归并出错。可能因为打开或创建文件失败、磁盘空间不足等
			return (ret - 100);
		}

		return 0;
	}

protected:	
	virtual void adjustMaxMemorySize()
	{
		if (maxMemorySize < 10240)
			maxMemorySize = 10240;
	}

	int maxMemorySize;
	int maxLineLength;
	const static int DEFAULT_MAX_MEMORY_SIZE;
	const static string DEFAULT_SEPARATOR;
	string separator;
	StrComparer strCompare;

	int fileMerge(const char *fileName, int totalPartCount)
	{
		char sPartNo[13] = {0};
		int partFileNameLength = strlen(fileName) + strlen("..sortedpart") + 13;
		char *partFileName = new char[partFileNameLength];
		
		vector<ifstream *> inFileVector;

		int i;
		for (i = 0; i < totalPartCount; i++)
		{
			itoa(i+1, sPartNo, 10);
			memset(partFileName, 0x00, partFileNameLength);
			sprintf(partFileName, "%s.%d.sortedpart", fileName, i+1);
			
			ifstream *fin = new ifstream(partFileName);
			if (fin == NULL)
			{
				return -1;
			}
			inFileVector.push_back(fin);
			if (!inFileVector[i]->is_open())
			{
				return -2;
			}
		}


		int outFileNameLength = strlen(fileName) + strlen(".sorted") + 1;
		char *outFileName = new char[outFileNameLength];
		if (outFileName == NULL)
		{
			//内存不足
			return -3;
		}

		memset(outFileName, 0x00, outFileNameLength);
		sprintf(outFileName, "%s.sorted", fileName);
		ofstream outFile(outFileName);
		if (!outFile.is_open())
		{
			//结果文件打开失败
			return -2;
		}


		int ret;
		FileElement fileElement;

		//重要:在对多个文件合并时,为了达到log(k)的插入和查找性能,所以使用stl中的multiset(红黑树数据结构),k表示文件的分块文件的个数
		multiset<FileElement, ElementCompareClass> dataSet;
		for (i = 0; i < totalPartCount; i++)
		{
			fileElement.fileIndex = i;
			ret = getElement(inFileVector, i, fileElement.elment);
			if (ret == -2)
			{
				//元素太长,或许分隔符不正确
				return -4;
			}
			if (ret != 0)
			{
				inFileVector[i]->close();
			}

			dataSet.insert(fileElement);
		}

		while (dataSet.size() > 0)
		{
			//把最小的数据写出去,并从这个元素对应的文件再读取一个元素放进去
			//直到所有的文件都空了
		
			fileElement = *(dataSet.begin());
			outFile.write(fileElement.elment.c_str(), fileElement.elment.length());
			outFile.write(separator.c_str(), separator.length());
			dataSet.erase(dataSet.begin());

			if (inFileVector[fileElement.fileIndex]->is_open())
			{
				ret = getElement(inFileVector, fileElement.fileIndex, fileElement.elment);
				if (ret == 0)
				{
					dataSet.insert(fileElement);
				}
				else if (ret == -2)
				{
					//元素太长,或许分隔符不正确
					return -4;
				}
				else
				{
					inFileVector[fileElement.fileIndex]->close();
					
					//TODO 此时可以删除此分块文件
				}
			}
		}

		outFile.close();
		return 0;
	}

	int getElement(vector<ifstream *> &inFileVector, int fileIndex, string &element)
	{
		static map<int, string> map_strBuffer;
		static map<int, int> map_bufferSize;
		int readSize = 0;
		int index;

		if (map_bufferSize.count(fileIndex) <= 0)
		{
			map_bufferSize[fileIndex] = 0;
		}

		while (true)
		{
			index = map_strBuffer[fileIndex].find(separator);
			if (index != string::npos)
			{
				element = map_strBuffer[fileIndex].substr(0, index);
				map_strBuffer[fileIndex] = map_strBuffer[fileIndex].substr(index + separator.length());
				map_bufferSize[fileIndex] -= index +separator.length();
				return 0;
			}

			readSize = readFile(*(inFileVector[fileIndex]), map_strBuffer[fileIndex], 1024);
			if (readSize <= 0)
			{
				//读到文件结尾了
				return -1;
			}

			map_bufferSize[fileIndex] += readSize;
			if (map_bufferSize[fileIndex] > maxMemorySize/2U)
			{
				//元素太长了(由separator分隔的,或许因为分隔符写错了)
				inFileVector[fileIndex]->close();
				return -2;
			}
		}
		return 0;
	}


	int ouputSortedPartFile(const char *fileName, int partCount, vector<string> v, const string &separator)
	{
		int partFileNameLength = strlen(fileName) + strlen("..sortedpart") + 13;
		char *partFileName = new char[partFileNameLength];

		memset(partFileName, 0x00, partFileNameLength);
		sprintf(partFileName, "%s.%d.sortedpart", fileName, partCount);

		ofstream outFile(partFileName);
		if (!outFile.is_open())
		{
			return -1;
		}

		int i, size = v.size();
		for (i = 0; i < size; i++)
		{
			outFile.write(v[i].c_str(), v[i].length());
			outFile.write(separator.c_str(), separator.length());
		}
		outFile.close();
		return 0;
	}

};

const int FileMergeSort::DEFAULT_MAX_MEMORY_SIZE = 1024*1024*10;
const string FileMergeSort::DEFAULT_SEPARATOR = "\n";

#endif //__FILEMERGESORT_H

main.c

#include <stdlib.h>
#include <string>
#include <iostream>
#include "mergesort.h"
#include "filemergesort.h"

using namespace std;


int main()
{
	FileMergeSort fileMergeSort("\n", 1024*1024*10);
	int ret = fileMergeSort.sort("Quote.csv");
	std::cout << ret << std::endl;

	system("pause");
	return 0;
}

Quote.csv

1373871611,SH600000,8.56,8.57,8.56,8.55,8.71,8.41,8.49,891040704.00,1038509.00
1373871665,SH600005,2.31,2.32,2.31,2.31,2.35,2.30,2.31,45215176.00,194893.00
1373871626,SH600009,12.81,12.82,12.81,12.86,13.07,12.73,12.80,39855712.00,30992.00
1373871665,SH600010,4.08,4.09,4.08,4.05,4.13,4.01,4.05,197604608.00,485515.00
1373871607,SH600011,5.46,5.47,5.47,5.45,5.56,5.43,5.50,123869376.00,226039.00
1373871626,SH600015,9.37,9.38,9.37,9.28,9.55,9.24,9.21,290867808.00,309105.00
1373871605,SH600016,9.15,9.16,9.16,9.16,9.41,9.00,9.11,1331242880.00,1451737.00
1373871607,SH600019,4.07,4.08,4.07,4.05,4.11,4.03,4.03,73100704.00,180051.00
1373871605,SH600022,1.63,1.64,1.64,1.62,1.65,1.61,1.63,19511916.00,119705.00
1373871600,SH600027,3.14,3.15,3.15,3.12,3.20,3.12,3.13,96782808.00,307247.00
1373871605,SH600028,4.55,4.56,4.56,4.59,4.62,4.52,4.59,283333152.00,619985.00
1373871611,SH600029,2.77,2.78,2.77,2.77,2.81,2.76,2.76,52826140.00,189341.00
1373871660,SH600030,10.96,10.97,10.96,10.80,11.29,10.65,10.54,1668629248.00,1518316.00
1373871603,SH600031,7.30,7.31,7.31,7.28,7.45,7.26,7.28,211298784.00,287873.00
1373871660,SH600036,11.66,11.67,11.65,11.74,11.91,11.55,11.62,789394304.00,674696.00
1373871603,SH600037,7.18,0.00,7.18,6.60,7.18,6.55,6.53,292799488.00,416500.00

……

Quote.csv.sorted

1373871600,SH600027,3.14,3.15,3.15,3.12,3.20,3.12,3.13,96782808.00,307247.00
1373871603,SH600031,7.30,7.31,7.31,7.28,7.45,7.26,7.28,211298784.00,287873.00
1373871603,SH600037,7.18,0.00,7.18,6.60,7.18,6.55,6.53,292799488.00,416500.00
1373871605,SH600016,9.15,9.16,9.16,9.16,9.41,9.00,9.11,1331242880.00,1451737.00
1373871605,SH600022,1.63,1.64,1.64,1.62,1.65,1.61,1.63,19511916.00,119705.00
1373871605,SH600028,4.55,4.56,4.56,4.59,4.62,4.52,4.59,283333152.00,619985.00
1373871607,SH600011,5.46,5.47,5.47,5.45,5.56,5.43,5.50,123869376.00,226039.00
1373871607,SH600019,4.07,4.08,4.07,4.05,4.11,4.03,4.03,73100704.00,180051.00
1373871611,SH600000,8.56,8.57,8.56,8.55,8.71,8.41,8.49,891040704.00,1038509.00
1373871611,SH600029,2.77,2.78,2.77,2.77,2.81,2.76,2.76,52826140.00,189341.00
1373871626,SH600009,12.81,12.82,12.81,12.86,13.07,12.73,12.80,39855712.00,30992.00
1373871626,SH600015,9.37,9.38,9.37,9.28,9.55,9.24,9.21,290867808.00,309105.00
1373871660,SH600030,10.96,10.97,10.96,10.80,11.29,10.65,10.54,1668629248.00,1518316.00
1373871660,SH600036,11.66,11.67,11.65,11.74,11.91,11.55,11.62,789394304.00,674696.00
1373871665,SH600005,2.31,2.32,2.31,2.31,2.35,2.30,2.31,45215176.00,194893.00
1373871665,SH600010,4.08,4.09,4.08,4.05,4.13,4.01,4.05,197604608.00,485515.00

点赞