Files
2026-03-02 22:04:18 +03:00

1087 lines
36 KiB
C++
Executable File

#include "NativeFeatureIncludes.h"
#if _RAKNET_SUPPORT_FileListTransfer==1 && _RAKNET_SUPPORT_FileOperations==1
#include "FileListTransfer.h"
#include "DS_HuffmanEncodingTree.h"
#include "FileListTransferCBInterface.h"
#include "StringCompressor.h"
#include "FileList.h"
#include "DS_Queue.h"
#include "MessageIdentifiers.h"
#include "RakNetTypes.h"
#include "RakPeerInterface.h"
#include "RakNetStatistics.h"
#include "IncrementalReadInterface.h"
#include "RakAssert.h"
#include "RakAlloca.h"
#ifdef _MSC_VER
#pragma warning( push )
#endif
namespace RakNet
{
struct FLR_MemoryBlock
{
char *flrMemoryBlock;
};
struct FileListReceiver
{
FileListReceiver();
~FileListReceiver();
FileListTransferCBInterface *downloadHandler;
SystemAddress allowedSender;
unsigned short setID;
unsigned setCount;
unsigned setTotalCompressedTransmissionLength;
unsigned setTotalFinalLength;
unsigned setTotalDownloadedLength;
bool gotSetHeader;
bool deleteDownloadHandler;
bool isCompressed;
int filesReceived;
DataStructures::Map<unsigned int, FLR_MemoryBlock> pushedFiles;
// Notifications
unsigned int partLength;
};
} // namespace RakNet
using namespace RakNet;
FileListReceiver::FileListReceiver() {filesReceived=0; setTotalDownloadedLength=0; partLength=1; DataStructures::Map<unsigned int, FLR_MemoryBlock>::IMPLEMENT_DEFAULT_COMPARISON();}
FileListReceiver::~FileListReceiver() {
unsigned int i=0;
for (i=0; i < pushedFiles.Size(); i++)
rakFree_Ex(pushedFiles[i].flrMemoryBlock, _FILE_AND_LINE_ );
}
STATIC_FACTORY_DEFINITIONS(FileListTransfer,FileListTransfer)
void FileListTransfer::FileToPushRecipient::DeleteThis(void)
{
for (unsigned int j=0; j < filesToPush.Size(); j++)
RakNet::OP_DELETE(filesToPush[j],_FILE_AND_LINE_);
RakNet::OP_DELETE(this,_FILE_AND_LINE_);
}
void FileListTransfer::FileToPushRecipient::AddRef(void)
{
refCountMutex.Lock();
++refCount;
refCountMutex.Unlock();
}
void FileListTransfer::FileToPushRecipient::Deref(void)
{
refCountMutex.Lock();
--refCount;
if (refCount==0)
{
refCountMutex.Unlock();
DeleteThis();
return;
}
refCountMutex.Unlock();
}
FileListTransfer::FileListTransfer()
{
setId=0;
DataStructures::Map<unsigned short, FileListReceiver*>::IMPLEMENT_DEFAULT_COMPARISON();
}
FileListTransfer::~FileListTransfer()
{
threadPool.StopThreads();
Clear();
}
void FileListTransfer::StartIncrementalReadThreads(int numThreads, int threadPriority)
{
(void) threadPriority;
threadPool.StartThreads(numThreads, 0);
}
unsigned short FileListTransfer::SetupReceive(FileListTransferCBInterface *handler, bool deleteHandler, SystemAddress allowedSender)
{
if (rakPeerInterface && rakPeerInterface->GetConnectionState(allowedSender)!=IS_CONNECTED)
return (unsigned short)-1;
FileListReceiver *receiver;
if (fileListReceivers.Has(setId))
{
receiver=fileListReceivers.Get(setId);
receiver->downloadHandler->OnDereference();
if (receiver->deleteDownloadHandler)
RakNet::OP_DELETE(receiver->downloadHandler, _FILE_AND_LINE_);
RakNet::OP_DELETE(receiver, _FILE_AND_LINE_);
fileListReceivers.Delete(setId);
}
unsigned short oldId;
receiver = RakNet::OP_NEW<FileListReceiver>( _FILE_AND_LINE_ );
RakAssert(handler);
receiver->downloadHandler=handler;
receiver->allowedSender=allowedSender;
receiver->gotSetHeader=false;
receiver->deleteDownloadHandler=deleteHandler;
receiver->setID=setId;
fileListReceivers.Set(setId, receiver);
oldId=setId;
if (++setId==(unsigned short)-1)
setId=0;
return oldId;
}
void FileListTransfer::Send(FileList *fileList, RakNet::RakPeerInterface *rakPeer, SystemAddress recipient, unsigned short setID, PacketPriority priority, char orderingChannel, IncrementalReadInterface *_incrementalReadInterface, unsigned int _chunkSize)
{
for (unsigned int flpcIndex=0; flpcIndex < fileListProgressCallbacks.Size(); flpcIndex++)
fileList->AddCallback(fileListProgressCallbacks[flpcIndex]);
unsigned int i, totalLength;
RakNet::BitStream outBitstream;
bool sendReference;
const char *dataBlocks[2];
int lengths[2];
totalLength=0;
for (i=0; i < fileList->fileList.Size(); i++)
{
const FileListNode &fileListNode = fileList->fileList[i];
totalLength+=fileListNode.fileLengthBytes;
}
// Write the chunk header, which contains the frequency table, the total number of files, and the total number of bytes
bool anythingToWrite;
outBitstream.Write((MessageID)ID_FILE_LIST_TRANSFER_HEADER);
outBitstream.Write(setID);
anythingToWrite=fileList->fileList.Size()>0;
outBitstream.Write(anythingToWrite);
if (anythingToWrite)
{
outBitstream.WriteCompressed(fileList->fileList.Size());
outBitstream.WriteCompressed(totalLength);
if (rakPeer)
rakPeer->Send(&outBitstream, priority, RELIABLE_ORDERED, orderingChannel, recipient, false);
else
SendUnified(&outBitstream, priority, RELIABLE_ORDERED, orderingChannel, recipient, false);
DataStructures::Queue<FileToPush*> filesToPush;
for (i=0; i < fileList->fileList.Size(); i++)
{
sendReference = fileList->fileList[i].isAReference && _incrementalReadInterface!=0;
if (sendReference)
{
FileToPush *fileToPush = RakNet::OP_NEW<FileToPush>(_FILE_AND_LINE_);
fileToPush->fileListNode.context=fileList->fileList[i].context;
fileToPush->setIndex=i;
fileToPush->fileListNode.filename=fileList->fileList[i].filename;
fileToPush->fileListNode.fullPathToFile=fileList->fileList[i].fullPathToFile;
fileToPush->fileListNode.fileLengthBytes=fileList->fileList[i].fileLengthBytes;
fileToPush->fileListNode.dataLengthBytes=fileList->fileList[i].dataLengthBytes;
// fileToPush->systemAddress=recipient;
fileToPush->setID=setID;
fileToPush->packetPriority=priority;
fileToPush->orderingChannel=orderingChannel;
fileToPush->currentOffset=0;
fileToPush->incrementalReadInterface=_incrementalReadInterface;
fileToPush->chunkSize=_chunkSize;
filesToPush.Push(fileToPush,_FILE_AND_LINE_);
}
else
{
outBitstream.Reset();
outBitstream.Write((MessageID)ID_FILE_LIST_TRANSFER_FILE);
outBitstream << fileList->fileList[i].context;
// outBitstream.Write(fileList->fileList[i].context);
outBitstream.Write(setID);
StringCompressor::Instance()->EncodeString(fileList->fileList[i].filename, 512, &outBitstream);
outBitstream.WriteCompressed(i);
outBitstream.WriteCompressed(fileList->fileList[i].dataLengthBytes); // Original length in bytes
outBitstream.AlignWriteToByteBoundary();
dataBlocks[0]=(char*) outBitstream.GetData();
lengths[0]=outBitstream.GetNumberOfBytesUsed();
dataBlocks[1]=fileList->fileList[i].data;
lengths[1]=fileList->fileList[i].dataLengthBytes;
SendListUnified(dataBlocks,lengths,2,priority, RELIABLE_ORDERED, orderingChannel, recipient, false);
}
}
if (filesToPush.IsEmpty()==false)
{
FileToPushRecipient *ftpr=0;
fileToPushRecipientListMutex.Lock();
for (unsigned int i=0; i < fileToPushRecipientList.Size(); i++)
{
if (fileToPushRecipientList[i]->systemAddress==recipient)
{
ftpr=fileToPushRecipientList[i];
ftpr->AddRef();
break;
}
}
fileToPushRecipientListMutex.Unlock();
if (ftpr==0)
{
ftpr = RakNet::OP_NEW<FileToPushRecipient>(_FILE_AND_LINE_);
ftpr->systemAddress=recipient;
ftpr->refCount=2; // Allocated and in the list
fileToPushRecipientList.Push(ftpr, _FILE_AND_LINE_);
}
while (filesToPush.IsEmpty()==false)
{
ftpr->filesToPush.Push(filesToPush.Pop(), _FILE_AND_LINE_);
}
// ftpr out of scope
ftpr->Deref();
SendIRIToAddress(recipient);
return;
}
else
{
for (unsigned int flpcIndex=0; flpcIndex < fileListProgressCallbacks.Size(); flpcIndex++)
fileListProgressCallbacks[flpcIndex]->OnFilePushesComplete(recipient, setID);
}
}
else
{
for (unsigned int flpcIndex=0; flpcIndex < fileListProgressCallbacks.Size(); flpcIndex++)
fileListProgressCallbacks[flpcIndex]->OnFilePushesComplete(recipient, setID);
if (rakPeer)
rakPeer->Send(&outBitstream, priority, RELIABLE_ORDERED, orderingChannel, recipient, false);
else
SendUnified(&outBitstream, priority, RELIABLE_ORDERED, orderingChannel, recipient, false);
}
}
bool FileListTransfer::DecodeSetHeader(Packet *packet)
{
bool anythingToWrite=false;
unsigned short setID;
RakNet::BitStream inBitStream(packet->data, packet->length, false);
inBitStream.IgnoreBits(8);
inBitStream.Read(setID);
FileListReceiver *fileListReceiver;
if (fileListReceivers.Has(setID)==false)
{
// If this assert hits you didn't call SetupReceive
#ifdef _DEBUG
RakAssert(0);
#endif
return false;
}
fileListReceiver=fileListReceivers.Get(setID);
if (fileListReceiver->allowedSender!=packet->systemAddress)
{
#ifdef _DEBUG
RakAssert(0);
#endif
return false;
}
#ifdef _DEBUG
RakAssert(fileListReceiver->gotSetHeader==false);
#endif
inBitStream.Read(anythingToWrite);
if (anythingToWrite)
{
inBitStream.ReadCompressed(fileListReceiver->setCount);
if (inBitStream.ReadCompressed(fileListReceiver->setTotalFinalLength))
{
fileListReceiver->setTotalCompressedTransmissionLength=fileListReceiver->setTotalFinalLength;
fileListReceiver->gotSetHeader=true;
return true;
}
}
else
{
FileListTransferCBInterface::DownloadCompleteStruct dcs;
dcs.setID=fileListReceiver->setID;
dcs.numberOfFilesInThisSet=fileListReceiver->setCount;
dcs.byteLengthOfThisSet=fileListReceiver->setTotalFinalLength;
dcs.senderSystemAddress=packet->systemAddress;
dcs.senderGuid=packet->guid;
if (fileListReceiver->downloadHandler->OnDownloadComplete(&dcs)==false)
{
fileListReceiver->downloadHandler->OnDereference();
fileListReceivers.Delete(setID);
if (fileListReceiver->deleteDownloadHandler)
RakNet::OP_DELETE(fileListReceiver->downloadHandler, _FILE_AND_LINE_);
RakNet::OP_DELETE(fileListReceiver, _FILE_AND_LINE_);
}
return true;
}
return false;
}
bool FileListTransfer::DecodeFile(Packet *packet, bool isTheFileAndIsNotDownloadProgress)
{
FileListTransferCBInterface::OnFileStruct onFileStruct;
RakNet::BitStream inBitStream(packet->data, packet->length, false);
inBitStream.IgnoreBits(8);
onFileStruct.senderSystemAddress=packet->systemAddress;
onFileStruct.senderGuid=packet->guid;
unsigned int partCount=0;
unsigned int partTotal=0;
unsigned int partLength=0;
onFileStruct.fileData=0;
if (isTheFileAndIsNotDownloadProgress==false)
{
// Disable endian swapping on reading this, as it's generated locally in ReliabilityLayer.cpp
inBitStream.ReadBits( (unsigned char* ) &partCount, BYTES_TO_BITS(sizeof(partCount)), true );
inBitStream.ReadBits( (unsigned char* ) &partTotal, BYTES_TO_BITS(sizeof(partTotal)), true );
inBitStream.ReadBits( (unsigned char* ) &partLength, BYTES_TO_BITS(sizeof(partLength)), true );
inBitStream.IgnoreBits(8);
// The header is appended to every chunk, which we continue to read after this statement flrMemoryBlock
}
inBitStream >> onFileStruct.context;
// inBitStream.Read(onFileStruct.context);
inBitStream.Read(onFileStruct.setID);
FileListReceiver *fileListReceiver;
if (fileListReceivers.Has(onFileStruct.setID)==false)
{
return false;
}
fileListReceiver=fileListReceivers.Get(onFileStruct.setID);
if (fileListReceiver->allowedSender!=packet->systemAddress)
{
#ifdef _DEBUG
RakAssert(0);
#endif
return false;
}
#ifdef _DEBUG
RakAssert(fileListReceiver->gotSetHeader==true);
#endif
if (StringCompressor::Instance()->DecodeString(onFileStruct.fileName, 512, &inBitStream)==false)
{
#ifdef _DEBUG
RakAssert(0);
#endif
return false;
}
inBitStream.ReadCompressed(onFileStruct.fileIndex);
inBitStream.ReadCompressed(onFileStruct.byteLengthOfThisFile);
onFileStruct.bytesDownloadedForThisFile=onFileStruct.byteLengthOfThisFile;
if (isTheFileAndIsNotDownloadProgress)
{
// Support SendLists
inBitStream.AlignReadToByteBoundary();
onFileStruct.fileData = (char*) rakMalloc_Ex( (size_t) onFileStruct.byteLengthOfThisFile, _FILE_AND_LINE_ );
inBitStream.Read((char*)onFileStruct.fileData, onFileStruct.byteLengthOfThisFile);
fileListReceiver->setTotalDownloadedLength+=onFileStruct.byteLengthOfThisFile;
}
onFileStruct.numberOfFilesInThisSet=fileListReceiver->setCount;
// onFileStruct.setTotalCompressedTransmissionLength=fileListReceiver->setTotalCompressedTransmissionLength;
onFileStruct.byteLengthOfThisSet=fileListReceiver->setTotalFinalLength;
// User callback for this file.
if (isTheFileAndIsNotDownloadProgress)
{
onFileStruct.bytesDownloadedForThisSet=fileListReceiver->setTotalDownloadedLength;
FileListTransferCBInterface::FileProgressStruct fps;
fps.onFileStruct=&onFileStruct;
fps.partCount=1;
fps.partTotal=1;
fps.dataChunkLength=onFileStruct.byteLengthOfThisFile;
fps.firstDataChunk=onFileStruct.fileData;
fps.iriDataChunk=onFileStruct.fileData;
fps.allocateIrIDataChunkAutomatically=true;
fps.iriWriteOffset=0;
fps.senderSystemAddress=packet->systemAddress;
fps.senderGuid=packet->guid;
fileListReceiver->downloadHandler->OnFileProgress(&fps);
// Got a complete file
// Either we are using IncrementalReadInterface and it was a small file or
// We are not using IncrementalReadInterface
if (fileListReceiver->downloadHandler->OnFile(&onFileStruct))
rakFree_Ex(onFileStruct.fileData, _FILE_AND_LINE_ );
fileListReceiver->filesReceived++;
// If this set is done, free the memory for it.
if ((int) fileListReceiver->setCount==fileListReceiver->filesReceived)
{
FileListTransferCBInterface::DownloadCompleteStruct dcs;
dcs.setID=fileListReceiver->setID;
dcs.numberOfFilesInThisSet=fileListReceiver->setCount;
dcs.byteLengthOfThisSet=fileListReceiver->setTotalFinalLength;
dcs.senderSystemAddress=packet->systemAddress;
dcs.senderGuid=packet->guid;
if (fileListReceiver->downloadHandler->OnDownloadComplete(&dcs)==false)
{
fileListReceiver->downloadHandler->OnDereference();
if (fileListReceiver->deleteDownloadHandler)
RakNet::OP_DELETE(fileListReceiver->downloadHandler, _FILE_AND_LINE_);
fileListReceivers.Delete(onFileStruct.setID);
RakNet::OP_DELETE(fileListReceiver, _FILE_AND_LINE_);
}
}
}
else
{
inBitStream.AlignReadToByteBoundary();
char *firstDataChunk;
unsigned int unreadBits = inBitStream.GetNumberOfUnreadBits();
unsigned int unreadBytes = BITS_TO_BYTES(unreadBits);
firstDataChunk=(char*) inBitStream.GetData()+BITS_TO_BYTES(inBitStream.GetReadOffset());
onFileStruct.bytesDownloadedForThisSet=fileListReceiver->setTotalDownloadedLength+unreadBytes;
onFileStruct.bytesDownloadedForThisFile=onFileStruct.byteLengthOfThisFile;
FileListTransferCBInterface::FileProgressStruct fps;
fps.onFileStruct=&onFileStruct;
fps.partCount=partCount;
fps.partTotal=partTotal;
fps.dataChunkLength=unreadBytes;
fps.firstDataChunk=firstDataChunk;
fps.iriDataChunk=0;
fps.allocateIrIDataChunkAutomatically=true;
fps.iriWriteOffset=0;
fps.senderSystemAddress=packet->systemAddress;
fps.senderGuid=packet->guid;
// Remote system is sending a complete file, but the file is large enough that we get ID_PROGRESS_NOTIFICATION from the transport layer
fileListReceiver->downloadHandler->OnFileProgress(&fps);
}
return true;
}
PluginReceiveResult FileListTransfer::OnReceive(Packet *packet)
{
switch (packet->data[0])
{
case ID_FILE_LIST_TRANSFER_HEADER:
DecodeSetHeader(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case ID_FILE_LIST_TRANSFER_FILE:
DecodeFile(packet, true);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case ID_FILE_LIST_REFERENCE_PUSH:
OnReferencePush(packet, true);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case ID_FILE_LIST_REFERENCE_PUSH_ACK:
OnReferencePushAck(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case ID_DOWNLOAD_PROGRESS:
if (packet->length>sizeof(MessageID)+sizeof(unsigned int)*3)
{
if (packet->data[sizeof(MessageID)+sizeof(unsigned int)*3]==ID_FILE_LIST_TRANSFER_FILE)
{
DecodeFile(packet, false);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
}
if (packet->data[sizeof(MessageID)+sizeof(unsigned int)*3]==ID_FILE_LIST_REFERENCE_PUSH)
{
OnReferencePush(packet, false);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
}
}
break;
}
return RR_CONTINUE_PROCESSING;
}
void FileListTransfer::OnRakPeerShutdown(void)
{
threadPool.StopThreads();
threadPool.ClearInput();
Clear();
}
void FileListTransfer::Clear(void)
{
unsigned i;
for (i=0; i < fileListReceivers.Size(); i++)
{
fileListReceivers[i]->downloadHandler->OnDereference();
if (fileListReceivers[i]->deleteDownloadHandler)
RakNet::OP_DELETE(fileListReceivers[i]->downloadHandler, _FILE_AND_LINE_);
RakNet::OP_DELETE(fileListReceivers[i], _FILE_AND_LINE_);
}
fileListReceivers.Clear();
fileToPushRecipientListMutex.Lock();
for (unsigned int i=0; i < fileToPushRecipientList.Size(); i++)
{
FileToPushRecipient *ftpr = fileToPushRecipientList[i];
// Taken out of the list
ftpr->Deref();
}
fileToPushRecipientList.Clear(false,_FILE_AND_LINE_);
fileToPushRecipientListMutex.Unlock();
//filesToPush.Clear(false, _FILE_AND_LINE_);
}
void FileListTransfer::OnClosedConnection(const SystemAddress &systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason )
{
(void) lostConnectionReason;
(void) rakNetGUID;
RemoveReceiver(systemAddress);
}
void FileListTransfer::CancelReceive(unsigned short setId)
{
if (fileListReceivers.Has(setId)==false)
{
#ifdef _DEBUG
RakAssert(0);
#endif
return;
}
FileListReceiver *fileListReceiver=fileListReceivers.Get(setId);
fileListReceiver->downloadHandler->OnDereference();
if (fileListReceiver->deleteDownloadHandler)
RakNet::OP_DELETE(fileListReceiver->downloadHandler, _FILE_AND_LINE_);
RakNet::OP_DELETE(fileListReceiver, _FILE_AND_LINE_);
fileListReceivers.Delete(setId);
}
void FileListTransfer::RemoveReceiver(SystemAddress systemAddress)
{
unsigned i;
i=0;
threadPool.LockInput();
while (i < threadPool.InputSize())
{
if (threadPool.GetInputAtIndex(i).systemAddress==systemAddress)
{
threadPool.RemoveInputAtIndex(i);
}
else
i++;
}
threadPool.UnlockInput();
i=0;
while (i < fileListReceivers.Size())
{
if (fileListReceivers[i]->allowedSender==systemAddress)
{
fileListReceivers[i]->downloadHandler->OnDereference();
if (fileListReceivers[i]->deleteDownloadHandler)
RakNet::OP_DELETE(fileListReceivers[i]->downloadHandler, _FILE_AND_LINE_);
RakNet::OP_DELETE(fileListReceivers[i], _FILE_AND_LINE_);
fileListReceivers.RemoveAtIndex(i);
}
else
i++;
}
fileToPushRecipientListMutex.Lock();
for (unsigned int i=0; i < fileToPushRecipientList.Size(); i++)
{
if (fileToPushRecipientList[i]->systemAddress==systemAddress)
{
FileToPushRecipient *ftpr = fileToPushRecipientList[i];
// Tell the user that this recipient was lost
for (unsigned int flpcIndex=0; flpcIndex < fileListProgressCallbacks.Size(); flpcIndex++)
fileListProgressCallbacks[flpcIndex]->OnSendAborted(ftpr->systemAddress);
fileToPushRecipientList.RemoveAtIndex(i);
// Taken out of the list
ftpr->Deref();
break;
}
}
fileToPushRecipientListMutex.Unlock();
}
bool FileListTransfer::IsHandlerActive(unsigned short setId)
{
return fileListReceivers.Has(setId);
}
void FileListTransfer::AddCallback(FileListProgress *cb)
{
if (cb==0)
return;
if (fileListProgressCallbacks.GetIndexOf(cb)==(unsigned int) -1)
fileListProgressCallbacks.Push(cb, _FILE_AND_LINE_);
}
void FileListTransfer::RemoveCallback(FileListProgress *cb)
{
unsigned int idx = fileListProgressCallbacks.GetIndexOf(cb);
if (idx!=(unsigned int) -1)
fileListProgressCallbacks.RemoveAtIndex(idx);
}
void FileListTransfer::ClearCallbacks(void)
{
fileListProgressCallbacks.Clear(true, _FILE_AND_LINE_);
}
void FileListTransfer::GetCallbacks(DataStructures::List<FileListProgress*> &callbacks)
{
callbacks = fileListProgressCallbacks;
}
void FileListTransfer::Update(void)
{
unsigned i;
i=0;
while (i < fileListReceivers.Size())
{
if (fileListReceivers[i]->downloadHandler->Update()==false)
{
fileListReceivers[i]->downloadHandler->OnDereference();
if (fileListReceivers[i]->deleteDownloadHandler)
RakNet::OP_DELETE(fileListReceivers[i]->downloadHandler, _FILE_AND_LINE_);
RakNet::OP_DELETE(fileListReceivers[i], _FILE_AND_LINE_);
fileListReceivers.RemoveAtIndex(i);
}
else
i++;
}
}
void FileListTransfer::OnReferencePush(Packet *packet, bool isTheFileAndIsNotDownloadProgress)
{
RakNet::BitStream refPushAck;
if (isTheFileAndIsNotDownloadProgress)
{
// This is not a progress notification, it is actually the entire packet
refPushAck.Write((MessageID)ID_FILE_LIST_REFERENCE_PUSH_ACK);
SendUnified(&refPushAck,HIGH_PRIORITY, RELIABLE, 0, packet->systemAddress, false);
}
else
{
// 12/23/09 Why do I care about ID_DOWNLOAD_PROGRESS for reference pushes?
return;
}
FileListTransferCBInterface::OnFileStruct onFileStruct;
RakNet::BitStream inBitStream(packet->data, packet->length, false);
inBitStream.IgnoreBits(8);
unsigned int partCount=0;
unsigned int partTotal=1;
unsigned int partLength=0;
onFileStruct.fileData=0;
if (isTheFileAndIsNotDownloadProgress==false)
{
// UNREACHABLE CODE
// Disable endian swapping on reading this, as it's generated locally in ReliabilityLayer.cpp
inBitStream.ReadBits( (unsigned char* ) &partCount, BYTES_TO_BITS(sizeof(partCount)), true );
inBitStream.ReadBits( (unsigned char* ) &partTotal, BYTES_TO_BITS(sizeof(partTotal)), true );
inBitStream.ReadBits( (unsigned char* ) &partLength, BYTES_TO_BITS(sizeof(partLength)), true );
inBitStream.IgnoreBits(8);
// The header is appended to every chunk, which we continue to read after this statement flrMemoryBlock
}
inBitStream >> onFileStruct.context;
// inBitStream.Read(onFileStruct.context);
inBitStream.Read(onFileStruct.setID);
FileListReceiver *fileListReceiver;
if (fileListReceivers.Has(onFileStruct.setID)==false)
{
return;
}
fileListReceiver=fileListReceivers.Get(onFileStruct.setID);
if (fileListReceiver->allowedSender!=packet->systemAddress)
{
#ifdef _DEBUG
RakAssert(0);
#endif
return;
}
#ifdef _DEBUG
RakAssert(fileListReceiver->gotSetHeader==true);
#endif
if (StringCompressor::Instance()->DecodeString(onFileStruct.fileName, 512, &inBitStream)==false)
{
#ifdef _DEBUG
RakAssert(0);
#endif
return;
}
inBitStream.ReadCompressed(onFileStruct.fileIndex);
inBitStream.ReadCompressed(onFileStruct.byteLengthOfThisFile);
unsigned int offset;
unsigned int chunkLength;
inBitStream.ReadCompressed(offset);
inBitStream.ReadCompressed(chunkLength);
bool lastChunk=false;
inBitStream.Read(lastChunk);
bool finished = lastChunk && isTheFileAndIsNotDownloadProgress;
if (isTheFileAndIsNotDownloadProgress==false)
fileListReceiver->partLength=partLength;
FLR_MemoryBlock mb;
if (fileListReceiver->pushedFiles.Has(onFileStruct.fileIndex)==false)
{
mb.flrMemoryBlock=(char*) rakMalloc_Ex(onFileStruct.byteLengthOfThisFile, _FILE_AND_LINE_);
fileListReceiver->pushedFiles.SetNew(onFileStruct.fileIndex, mb);
}
else
{
mb=fileListReceiver->pushedFiles.Get(onFileStruct.fileIndex);
}
unsigned int unreadBits = inBitStream.GetNumberOfUnreadBits();
unsigned int unreadBytes = BITS_TO_BYTES(unreadBits);
unsigned int amountToRead;
if (isTheFileAndIsNotDownloadProgress)
amountToRead=chunkLength;
else
amountToRead=unreadBytes;
inBitStream.AlignReadToByteBoundary();
FileListTransferCBInterface::FileProgressStruct fps;
if (isTheFileAndIsNotDownloadProgress)
{
if (mb.flrMemoryBlock)
{
// Either the very first block, or a subsequent block and allocateIrIDataChunkAutomatically was true for the first block
memcpy(mb.flrMemoryBlock+offset, inBitStream.GetData()+BITS_TO_BYTES(inBitStream.GetReadOffset()), amountToRead);
fps.iriDataChunk=mb.flrMemoryBlock+offset;
}
else
{
// In here mb.flrMemoryBlock is null
// This means the first block explicitly deallocated the memory, and no blocks will be permanently held by RakNet
fps.iriDataChunk=(char*) inBitStream.GetData()+BITS_TO_BYTES(inBitStream.GetReadOffset());
}
onFileStruct.bytesDownloadedForThisFile=offset+amountToRead;
}
else
{
fileListReceiver->setTotalDownloadedLength+=partLength;
onFileStruct.bytesDownloadedForThisFile=partCount*partLength;
fps.iriDataChunk=(char*) inBitStream.GetData()+BITS_TO_BYTES(inBitStream.GetReadOffset());
}
onFileStruct.bytesDownloadedForThisSet=fileListReceiver->setTotalDownloadedLength;
onFileStruct.numberOfFilesInThisSet=fileListReceiver->setCount;
// onFileStruct.setTotalCompressedTransmissionLength=fileListReceiver->setTotalCompressedTransmissionLength;
onFileStruct.byteLengthOfThisSet=fileListReceiver->setTotalFinalLength;
// Note: mb.flrMemoryBlock may be null here
onFileStruct.fileData=mb.flrMemoryBlock;
onFileStruct.senderSystemAddress=packet->systemAddress;
onFileStruct.senderGuid=packet->guid;
unsigned int totalNotifications;
unsigned int currentNotificationIndex;
if (chunkLength==0 || chunkLength==onFileStruct.byteLengthOfThisFile)
totalNotifications=1;
else
totalNotifications = onFileStruct.byteLengthOfThisFile / chunkLength + 1;
if (chunkLength==0)
currentNotificationIndex = 0;
else
currentNotificationIndex = offset / chunkLength;
fps.onFileStruct=&onFileStruct;
fps.partCount=currentNotificationIndex;
fps.partTotal=totalNotifications;
fps.dataChunkLength=amountToRead;
fps.firstDataChunk=mb.flrMemoryBlock;
fps.allocateIrIDataChunkAutomatically=true;
fps.onFileStruct->fileData=mb.flrMemoryBlock;
fps.iriWriteOffset=offset;
fps.senderSystemAddress=packet->systemAddress;
fps.senderGuid=packet->guid;
if (finished)
{
char *oldFileData=fps.onFileStruct->fileData;
if (fps.partCount==0)
fps.firstDataChunk=fps.iriDataChunk;
if (fps.partTotal==1)
fps.onFileStruct->fileData=fps.iriDataChunk;
fileListReceiver->downloadHandler->OnFileProgress(&fps);
// Incremental read interface sent us a file chunk
// This is the last file chunk we were waiting for to consider the file done
if (fileListReceiver->downloadHandler->OnFile(&onFileStruct))
rakFree_Ex(oldFileData, _FILE_AND_LINE_ );
fileListReceiver->pushedFiles.Delete(onFileStruct.fileIndex);
fileListReceiver->filesReceived++;
// If this set is done, free the memory for it.
if ((int) fileListReceiver->setCount==fileListReceiver->filesReceived)
{
FileListTransferCBInterface::DownloadCompleteStruct dcs;
dcs.setID=fileListReceiver->setID;
dcs.numberOfFilesInThisSet=fileListReceiver->setCount;
dcs.byteLengthOfThisSet=fileListReceiver->setTotalFinalLength;
dcs.senderSystemAddress=packet->systemAddress;
dcs.senderGuid=packet->guid;
if (fileListReceiver->downloadHandler->OnDownloadComplete(&dcs)==false)
{
fileListReceiver->downloadHandler->OnDereference();
fileListReceivers.Delete(onFileStruct.setID);
if (fileListReceiver->deleteDownloadHandler)
RakNet::OP_DELETE(fileListReceiver->downloadHandler, _FILE_AND_LINE_);
RakNet::OP_DELETE(fileListReceiver, _FILE_AND_LINE_);
}
}
}
else
{
if (isTheFileAndIsNotDownloadProgress)
{
// 12/23/09 Don't use OnReferencePush anymore, just use OnFileProgress
fileListReceiver->downloadHandler->OnFileProgress(&fps);
if (fps.allocateIrIDataChunkAutomatically==false)
{
rakFree_Ex(fileListReceiver->pushedFiles.Get(onFileStruct.fileIndex).flrMemoryBlock, _FILE_AND_LINE_ );
fileListReceiver->pushedFiles.Get(onFileStruct.fileIndex).flrMemoryBlock=0;
}
}
else
{
// This is a download progress notification for a file chunk using incremental read interface
// We don't have all the data for this chunk yet
// UNREACHABLE CODE
totalNotifications = onFileStruct.byteLengthOfThisFile / fileListReceiver->partLength + 1;
if (isTheFileAndIsNotDownloadProgress==false)
currentNotificationIndex = (offset+partCount*fileListReceiver->partLength) / fileListReceiver->partLength ;
else
currentNotificationIndex = (offset+chunkLength) / fileListReceiver->partLength ;
unreadBytes = onFileStruct.byteLengthOfThisFile - ((currentNotificationIndex+1) * fileListReceiver->partLength);
if (rakPeerInterface)
{
// Thus chunk is incomplete
fps.iriDataChunk=0;
fileListReceiver->downloadHandler->OnFileProgress(&fps);
}
}
}
return;
}
namespace RakNet
{
int SendIRIToAddressCB(FileListTransfer::ThreadData threadData, bool *returnOutput, void* perThreadData)
{
(void) perThreadData;
FileListTransfer *fileListTransfer = threadData.fileListTransfer;
SystemAddress systemAddress = threadData.systemAddress;
*returnOutput=false;
// Was previously using GetStatistics to get outgoing buffer size, but TCP with UnifiedSend doesn't have this
unsigned int bytesRead;
const char *dataBlocks[2];
int lengths[2];
unsigned int smallFileTotalSize=0;
RakNet::BitStream outBitstream;
unsigned int ftpIndex;
fileListTransfer->fileToPushRecipientListMutex.Lock();
for (ftpIndex=0; ftpIndex < fileListTransfer->fileToPushRecipientList.Size(); ftpIndex++)
{
FileListTransfer::FileToPushRecipient *ftpr = fileListTransfer->fileToPushRecipientList[ftpIndex];
// Referenced by both ftpr and list
ftpr->AddRef();
fileListTransfer->fileToPushRecipientListMutex.Unlock();
if (ftpr->systemAddress==systemAddress)
{
FileListTransfer::FileToPush *ftp = ftpr->filesToPush.Peek();
// Read and send chunk. If done, delete at this index
void *buff = rakMalloc_Ex(ftp->chunkSize, _FILE_AND_LINE_);
if (buff==0)
{
ftpr->Deref();
notifyOutOfMemory(_FILE_AND_LINE_);
return 0;
}
// Read the next file chunk
bytesRead=ftp->incrementalReadInterface->GetFilePart(ftp->fileListNode.fullPathToFile, ftp->currentOffset, ftp->chunkSize, buff, ftp->fileListNode.context);
bool done = ftp->fileListNode.dataLengthBytes == ftp->currentOffset+bytesRead;
while (done && ftp->currentOffset==0 && ftpr->filesToPush.Size()>=2 && smallFileTotalSize<ftp->chunkSize)
{
// Send all small files at once, rather than wait for ID_FILE_LIST_REFERENCE_PUSH. But at least one ID_FILE_LIST_REFERENCE_PUSH must be sent
outBitstream.Reset();
outBitstream.Write((MessageID)ID_FILE_LIST_TRANSFER_FILE);
// outBitstream.Write(ftp->fileListNode.context);
outBitstream << ftp->fileListNode.context;
outBitstream.Write(ftp->setID);
StringCompressor::Instance()->EncodeString(ftp->fileListNode.filename, 512, &outBitstream);
outBitstream.WriteCompressed(ftp->setIndex);
outBitstream.WriteCompressed(ftp->fileListNode.dataLengthBytes); // Original length in bytes
outBitstream.AlignWriteToByteBoundary();
dataBlocks[0]=(char*) outBitstream.GetData();
lengths[0]=outBitstream.GetNumberOfBytesUsed();
dataBlocks[1]=(const char*) buff;
lengths[1]=bytesRead;
fileListTransfer->SendListUnified(dataBlocks,lengths,2,ftp->packetPriority, RELIABLE_ORDERED, ftp->orderingChannel, systemAddress, false);
// LWS : fixed freed pointer reference
// unsigned int chunkSize = ftp->chunkSize;
RakNet::OP_DELETE(ftp,_FILE_AND_LINE_);
ftpr->filesToPush.Pop();
smallFileTotalSize+=bytesRead;
//done = bytesRead!=ftp->chunkSize;
ftp = ftpr->filesToPush.Peek();
bytesRead=ftp->incrementalReadInterface->GetFilePart(ftp->fileListNode.fullPathToFile, ftp->currentOffset, ftp->chunkSize, buff, ftp->fileListNode.context);
done = ftp->fileListNode.dataLengthBytes == ftp->currentOffset+bytesRead;
}
outBitstream.Reset();
outBitstream.Write((MessageID)ID_FILE_LIST_REFERENCE_PUSH);
// outBitstream.Write(ftp->fileListNode.context);
outBitstream << ftp->fileListNode.context;
outBitstream.Write(ftp->setID);
StringCompressor::Instance()->EncodeString(ftp->fileListNode.filename, 512, &outBitstream);
outBitstream.WriteCompressed(ftp->setIndex);
outBitstream.WriteCompressed(ftp->fileListNode.dataLengthBytes); // Original length in bytes
outBitstream.WriteCompressed(ftp->currentOffset);
ftp->currentOffset+=bytesRead;
outBitstream.WriteCompressed(bytesRead);
outBitstream.Write(done);
for (unsigned int flpcIndex=0; flpcIndex < fileListTransfer->fileListProgressCallbacks.Size(); flpcIndex++)
fileListTransfer->fileListProgressCallbacks[flpcIndex]->OnFilePush(ftp->fileListNode.filename, ftp->fileListNode.fileLengthBytes, ftp->currentOffset-bytesRead, bytesRead, done, systemAddress, ftp->setID);
dataBlocks[0]=(char*) outBitstream.GetData();
lengths[0]=outBitstream.GetNumberOfBytesUsed();
dataBlocks[1]=(char*) buff;
lengths[1]=bytesRead;
//rakPeerInterface->SendList(dataBlocks,lengths,2,ftp->packetPriority, RELIABLE_ORDERED, ftp->orderingChannel, ftp->systemAddress, false);
fileListTransfer->SendListUnified(dataBlocks,lengths,2,ftp->packetPriority, RELIABLE_ORDERED, ftp->orderingChannel, systemAddress, false);
// Mutex state: FileToPushRecipient (ftpr) has AddRef. fileToPushRecipientListMutex not locked.
if (done)
{
// Done
unsigned short setId = ftp->setID;
RakNet::OP_DELETE(ftp,_FILE_AND_LINE_);
ftpr->filesToPush.Pop();
if (ftpr->filesToPush.Size()==0)
{
for (unsigned int flpcIndex=0; flpcIndex < fileListTransfer->fileListProgressCallbacks.Size(); flpcIndex++)
fileListTransfer->fileListProgressCallbacks[flpcIndex]->OnFilePushesComplete(systemAddress, setId);
// Remove ftpr from fileToPushRecipientList
fileListTransfer->RemoveFromList(ftpr);
}
}
// ftpr out of scope
ftpr->Deref();
rakFree_Ex(buff, _FILE_AND_LINE_ );
return 0;
}
else
{
ftpr->Deref();
fileListTransfer->fileToPushRecipientListMutex.Lock();
}
}
fileListTransfer->fileToPushRecipientListMutex.Unlock();
return 0;
}
}
void FileListTransfer::SendIRIToAddress(SystemAddress systemAddress)
{
ThreadData threadData;
threadData.fileListTransfer=this;
threadData.systemAddress=systemAddress;
if (threadPool.WasStarted())
{
threadPool.AddInput(SendIRIToAddressCB, threadData);
}
else
{
bool doesNothing;
SendIRIToAddressCB(threadData, &doesNothing, 0);
}
}
void FileListTransfer::OnReferencePushAck(Packet *packet)
{
SendIRIToAddress(packet->systemAddress);
}
void FileListTransfer::RemoveFromList(FileToPushRecipient *ftpr)
{
fileToPushRecipientListMutex.Lock();
for (unsigned int i=0; i < fileToPushRecipientList.Size(); i++)
{
if (fileToPushRecipientList[i]==ftpr)
{
fileToPushRecipientList.RemoveAtIndex(i);
// List no longer references
ftpr->Deref();
fileToPushRecipientListMutex.Unlock();
return;
}
}
fileToPushRecipientListMutex.Unlock();
}
unsigned int FileListTransfer::GetPendingFilesToAddress(SystemAddress recipient)
{
fileToPushRecipientListMutex.Lock();
for (unsigned int i=0; i < fileToPushRecipientList.Size(); i++)
{
if (fileToPushRecipientList[i]->systemAddress==recipient)
{
unsigned int size = fileToPushRecipientList[i]->filesToPush.Size();
fileToPushRecipientListMutex.Unlock();
return size;
}
}
fileToPushRecipientListMutex.Unlock();
return 0;
}
#ifdef _MSC_VER
#pragma warning( pop )
#endif
#endif // _RAKNET_SUPPORT_*