7 #if !defined(NO_OS_DEPENDENCE) && defined(SOCKETS_AVAILABLE) 11 #define CRYPTOPP_TRACE_NETWORK 0 17 if (!m_maxBytesPerSecond)
20 const double curTime = GetCurTimeAndCleanUp();
21 CRYPTOPP_UNUSED(curTime);
24 for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
25 total += m_ops[i].second;
29 double LimitedBandwidth::TimeToNextTransceive()
31 if (!m_maxBytesPerSecond)
34 if (!m_nextTransceiveTime)
35 ComputeNextTransceiveTime();
40 void LimitedBandwidth::NoteTransceive(lword size)
42 if (m_maxBytesPerSecond)
44 double curTime = GetCurTimeAndCleanUp();
45 m_ops.push_back(std::make_pair(curTime, size));
46 m_nextTransceiveTime = 0;
50 void LimitedBandwidth::ComputeNextTransceiveTime()
52 double curTime = GetCurTimeAndCleanUp();
54 for (
unsigned int i=0; i!=m_ops.size(); ++i)
55 total += m_ops[i].second;
56 m_nextTransceiveTime =
57 (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
60 double LimitedBandwidth::GetCurTimeAndCleanUp()
62 if (!m_maxBytesPerSecond)
65 double curTime = m_timer.ElapsedTimeAsDouble();
66 while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
73 double nextTransceiveTime = TimeToNextTransceive();
74 if (nextTransceiveTime)
75 container.ScheduleEvent(nextTransceiveTime,
CallStack(
"LimitedBandwidth::GetWaitObjects()", &callStack));
81 lword& byteCount,
bool blockingOutput,
82 unsigned long maxTime,
bool checkDelimiter, byte delimiter)
84 m_blockedBySpeedLimit =
false;
86 if (!GetMaxBytesPerSecond())
88 size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
89 m_doPumpBlocked = (ret != 0);
94 unsigned long timeToGo = maxTime;
95 Timer timer(Timer::MILLISECONDS, forever);
96 lword maxSize = byteCount;
103 lword curMaxSize =
UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
105 if (curMaxSize || m_doPumpBlocked)
108 size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
109 m_doPumpBlocked = (ret != 0);
112 NoteTransceive(curMaxSize);
113 byteCount += curMaxSize;
119 if (maxSize != ULONG_MAX && byteCount >= maxSize)
129 double waitTime = TimeToNextTransceive();
130 if (!forever && waitTime > timeToGo)
132 m_blockedBySpeedLimit =
true;
137 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NonblockingSource::GeneralPump2() - speed limit", 0));
138 container.Wait((
unsigned long)waitTime);
146 if (messageCount == 0)
153 byteCount = LWORD_MAX;
154 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
155 }
while(byteCount == LWORD_MAX);
157 if (!m_messageEndSent && SourceExhausted())
159 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(),
true));
160 m_messageEndSent =
true;
168 m_blockedBySpeedLimit =
false;
170 size_t curBufSize = GetCurrentBufferSize();
171 if (curBufSize <= targetSize && (targetSize || !EofPending()))
174 if (!GetMaxBytesPerSecond())
175 return DoFlush(maxTime, targetSize);
178 unsigned long timeToGo = maxTime;
179 Timer timer(Timer::MILLISECONDS, forever);
180 lword totalFlushed = 0;
186 size_t flushSize =
UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
187 if (flushSize || EofPending())
190 size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
199 if (curBufSize <= targetSize && (targetSize || !EofPending()))
209 double waitTime = TimeToNextTransceive();
210 if (!forever && waitTime > timeToGo)
212 m_blockedBySpeedLimit =
true;
217 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NonblockingSink::TimedFlush() - speed limit", 0));
218 container.Wait((
unsigned long)waitTime);
227 return hardFlush && (!!GetCurrentBufferSize() || EofPending());
234 , m_putSize(0), m_dataBegin(0), m_dataEnd(0)
235 , m_waitingForResult(
false), m_outputBlocked(
false)
242 + GetReceiver().GetMaxWaitObjectCount()
243 + AttachedTransformation()->GetMaxWaitObjectCount();
248 if (BlockedBySpeedLimit())
249 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - speed limit", &callStack));
250 else if (!m_outputBlocked)
252 if (m_dataBegin == m_dataEnd)
253 AccessReceiver().GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - no data", &callStack));
255 container.SetNoWait(
CallStack(
"NetworkSource::GetWaitObjects() - have data", &callStack));
258 AttachedTransformation()->GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - attachment", &callStack));
261 size_t NetworkSource::DoPump(lword &byteCount,
bool blockingOutput,
unsigned long maxTime,
bool checkDelimiter, byte delimiter)
265 lword maxSize = byteCount;
268 Timer timer(Timer::MILLISECONDS, forever);
276 if (m_dataBegin == m_dataEnd)
278 if (receiver.EofReceived())
281 if (m_waitingForResult)
283 if (receiver.MustWaitForResult() &&
285 CallStack(
"NetworkSource::DoPump() - wait receive result", 0)))
288 unsigned int recvResult = receiver.GetReceiveResult();
289 #if CRYPTOPP_TRACE_NETWORK 290 OutputDebugString((
IntToString((
unsigned int)
this) +
": Received " +
IntToString(recvResult) +
" bytes\n").c_str());
292 m_dataEnd += recvResult;
293 m_waitingForResult =
false;
295 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
300 m_dataEnd = m_dataBegin = 0;
302 if (receiver.MustWaitToReceive())
305 CallStack(
"NetworkSource::DoPump() - wait receive", 0)))
308 receiver.
Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
309 m_waitingForResult =
true;
314 m_waitingForResult =
true;
317 #if CRYPTOPP_TRACE_NETWORK 318 OutputDebugString((
IntToString((
unsigned int)
this) +
": Receiving " +
IntToString(m_buf.size()-m_dataEnd) +
" bytes\n").c_str());
320 while (receiver.
Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
322 unsigned int recvResult = receiver.GetReceiveResult();
323 #if CRYPTOPP_TRACE_NETWORK 324 OutputDebugString((
IntToString((
unsigned int)
this) +
": Received " +
IntToString(recvResult) +
" bytes\n").c_str());
326 m_dataEnd += recvResult;
327 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
329 m_waitingForResult =
false;
338 m_putSize =
UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
341 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
344 size_t result = t->
PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
348 CallStack(
"NetworkSource::DoPump() - wait attachment", 0)))
352 m_outputBlocked =
true;
356 m_outputBlocked =
false;
358 byteCount += m_putSize;
359 m_dataBegin += m_putSize;
360 if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
362 if (maxSize != ULONG_MAX && byteCount == maxSize)
367 if (maxTime > 0 && timer.ElapsedTime() > maxTime)
377 NetworkSink::NetworkSink(
unsigned int maxBufferSize,
unsigned int autoFlushBound)
378 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
379 , m_needSendResult(
false), m_wasBlocked(
false), m_eofState(EOF_NONE)
380 , m_buffer(
STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
381 , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
382 , m_currentSpeed(0), m_maxObservedSpeed(0)
388 if (m_speedTimer.ElapsedTime() > 1000)
390 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
391 m_maxObservedSpeed =
STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
392 m_byteCountSinceLastTimerReset = 0;
393 m_speedTimer.StartTimer();
396 return m_currentSpeed;
401 lword m = GetMaxBytesPerSecond();
402 return m ?
STDMIN(m_maxObservedSpeed,
float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed;
412 if (BlockedBySpeedLimit())
413 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - speed limit", &callStack));
414 else if (m_wasBlocked)
415 AccessSender().GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - was blocked", &callStack));
416 else if (!m_buffer.IsEmpty())
417 AccessSender().GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
418 else if (EofPending())
419 AccessSender().GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - EOF pending", &callStack));
424 if (m_eofState == EOF_DONE)
426 if (length || messageEnd)
432 if (m_eofState > EOF_NONE)
438 assert(length >= m_skipBytes);
439 inString += m_skipBytes;
440 length -= m_skipBytes;
443 m_buffer.Put(inString, length);
445 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
448 size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
452 if (m_buffer.CurrentSize() > targetSize)
456 m_skipBytes += length;
457 size_t blockedBytes =
UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
458 return STDMAX<size_t>(blockedBytes, 1);
461 m_wasBlocked =
false;
467 m_eofState = EOF_PENDING_SEND;
471 if (m_eofState != EOF_DONE)
478 lword NetworkSink::DoFlush(
unsigned long maxTime,
size_t targetSize)
483 Timer timer(Timer::MILLISECONDS, forever);
484 unsigned int totalFlushSize = 0;
488 if (m_buffer.CurrentSize() <= targetSize)
491 if (m_needSendResult)
493 if (sender.MustWaitForResult() &&
495 CallStack(
"NetworkSink::DoFlush() - wait send result", 0)))
498 unsigned int sendResult = sender.GetSendResult();
499 #if CRYPTOPP_TRACE_NETWORK 500 OutputDebugString((
IntToString((
unsigned int)
this) +
": Sent " +
IntToString(sendResult) +
" bytes\n").c_str());
502 m_buffer.Skip(sendResult);
503 totalFlushSize += sendResult;
504 m_needSendResult =
false;
506 if (!m_buffer.AnyRetrievable())
510 unsigned long timeOut = maxTime ?
SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
511 if (sender.MustWaitToSend() && !sender.
Wait(timeOut,
CallStack(
"NetworkSink::DoFlush() - wait send", 0)))
514 size_t contiguousSize = 0;
515 const byte *block = m_buffer.Spy(contiguousSize);
517 #if CRYPTOPP_TRACE_NETWORK 518 OutputDebugString((
IntToString((
unsigned int)
this) +
": Sending " +
IntToString(contiguousSize) +
" bytes\n").c_str());
520 sender.Send(block, contiguousSize);
521 m_needSendResult =
true;
523 if (maxTime > 0 && timeOut == 0)
527 m_byteCountSinceLastTimerReset += totalFlushSize;
528 ComputeCurrentSpeed();
530 if (m_buffer.IsEmpty() && !m_needSendResult)
532 if (m_eofState == EOF_PENDING_SEND)
535 m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
538 while (m_eofState == EOF_PENDING_DELIVERY)
540 unsigned long timeOut = maxTime ?
SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
541 if (!sender.
Wait(timeOut,
CallStack(
"NetworkSink::DoFlush() - wait EOF", 0)))
544 if (sender.EofSent())
545 m_eofState = EOF_DONE;
549 return totalFlushSize;
554 #endif // #ifdef SOCKETS_AVAILABLE Base class for all exceptions thrown by the library.
container of wait objects
float GetMaxObservedSpeed() const
get the maximum observed speed of this sink in bytes per second
lword TimedFlush(unsigned long maxTime, size_t targetSize=0)
flush to device for no more than maxTime milliseconds
Some other error occurred not belonging to other categories.
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
Retrieves waitable objects.
size_t Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
Input multiple bytes for processing.
size_t PumpMessages2(unsigned int &messageCount, bool blocking=true)
Pump messages to attached transformation.
unsigned int GetMaxWaitObjectCount() const
Retrieves the maximum number of waitable objects.
size_t GeneralPump2(lword &byteCount, bool blockingOutput=true, unsigned long maxTime=INFINITE_TIME, bool checkDelimiter=false, byte delimiter='\n')
pump up to maxSize bytes using at most maxTime milliseconds
float ComputeCurrentSpeed()
compute the current speed of this sink in bytes per second
T1 SaturatingSubtract(const T1 &a, const T2 &b)
Performs a saturating subtract clamped at 0.
const T1 UnsignedMin(const T1 &a, const T2 &b)
Safe comparison of values that could be neagtive and incorrectly promoted.
const T & STDMIN(const T &a, const T &b)
Replacement function for std::min.
const unsigned long INFINITE_TIME
Represents infinite time.
unsigned int GetMaxWaitObjectCount() const
bool Wait(unsigned long milliseconds, CallStack const &callStack)
Wait on this object.
std::string IntToString(T value, unsigned int base=10)
Converts a value to a string.
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
Retrieves waitable objects.
const T & STDMAX(const T &a, const T &b)
Replacement function for std::max.
unsigned int GetMaxWaitObjectCount() const
Retrieves the maximum number of waitable objects.
virtual bool Receive(byte *buf, size_t bufLen)=0
receive data from network source, returns whether result is immediately available ...
Crypto++ library namespace.
bool IsolatedFlush(bool hardFlush, bool blocking)
Flushes data buffered by this object, without signal propagation.
a Source class that can pump from a device for a specified amount of time.