libzypp  17.31.7
rangedownloader_p.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 ----------------------------------------------------------------------*/
9 
14 #include <zypp-core/AutoDispose.h>
15 #include <zypp-core/fs/PathInfo.h>
16 
17 #include "rangedownloader_p.h"
18 
19 namespace zyppng {
20 
22  { }
23 
24  void RangeDownloaderBaseState::onRequestProgress( NetworkRequest &, off_t , off_t, off_t , off_t )
25  {
26  off_t dlnowMulti = _downloadedMultiByteCount;
27  for( const auto &req : _runningRequests ) {
28  dlnowMulti += req->downloadedByteCount();
29  }
30 
31  if ( !assertExpectedFilesize( dlnowMulti ) )
32  return;
33 
34  stateMachine()._sigProgress.emit( *stateMachine().z_func(), _fileSize, dlnowMulti );
35  }
36 
38  {
39  auto lck = stateMachine().z_func()->shared_from_this();
40  auto it = std::find_if( _runningRequests.begin(), _runningRequests.end(), [ &req ]( const std::shared_ptr<Request> &r ) {
41  return ( r.get() == &req );
42  });
43  if ( it == _runningRequests.end() )
44  return;
45 
46  auto reqLocked = *it;
47 
48  //remove from running
49  _runningRequests.erase( it );
50 
51  //feed the working URL back into the mirrors in case there are still running requests that might fail
52  // @TODO , finishing the transfer might never be called in case of cancelling the request, need a better way to track running transfers
53  if ( reqLocked->_myMirror )
54  reqLocked->_myMirror->finishTransfer( !err.isError() );
55 
56  if ( err.isError() ) {
57  return handleRequestError( reqLocked, err );
58  }
59 
62  return;
63  }
64 
65  MIL_MEDIA << "Request finished "<<std::endl;
66  const auto &rngs = reqLocked->requestedRanges();
67  std::for_each( rngs.begin(), rngs.end(), []( const auto &b ){ DBG << "-> Block " << b.start << " finished." << std::endl; } );
68 
69  auto restartReqWithBlock = [ this ]( std::shared_ptr<Request> &req, std::vector<Block> &&blocks ) {
70  MIL_MEDIA << "Reusing Request to download blocks:"<<std::endl;
71  if ( !addBlockRanges( req, std::move( blocks ) ) )
72  return false;
73 
74  //this is not a new request, only add to queues but do not connect signals again
75  addNewRequest( req, false );
76  return true;
77  };
78 
79  //check if we already have enqueued all blocks if not reuse the request
80  if ( _ranges.size() ) {
81  MIL_MEDIA << "Reusing to download blocks: "<<std::endl;
82  if ( !restartReqWithBlock( reqLocked, getNextBlocks( reqLocked->url().getScheme() ) ) ) {
83  return setFailed( "Failed to restart request with new blocks." );
84  }
85  return;
86 
87  } else {
88  //if we have failed blocks, try to download them with this mirror
89  if ( !_failedRanges.empty() ) {
90 
91  auto fblks = getNextFailedBlocks( reqLocked->url().getScheme() );
92  MIL_MEDIA << "Reusing to download failed blocks: "<<std::endl;
93  if ( !restartReqWithBlock( reqLocked, std::move(fblks) ) ) {
94  return setFailed( "Failed to restart request with previously failed blocks." );
95  }
96  return;
97  }
98  }
99 
100  //feed the working URL back into the mirrors in case there are still running requests that might fail
101  _fileMirrors.push_back( reqLocked->_originalUrl );
102 
103  // make sure downloads are running, at this point
105  }
106 
107  void RangeDownloaderBaseState::handleRequestError( std::shared_ptr<Request> req, const zyppng::NetworkRequestError &err )
108  {
109  bool retry = false;
110  auto &parent = stateMachine();
111 
112 
113  //Handle the auth errors explicitly, we need to give the user a way to put in new credentials
114  //if we get valid new credentials we can retry the request
116  retry = parent.handleRequestAuthError( req, err );
117  } else {
118 
119  //if a error happens during a multi download we try to use another mirror to download the failed block
120  MIL << "Request failed " << req->extendedErrorString() << "(" << req->url() << ")" << std::endl;
121 
122  NetworkRequestError dummyErr;
123 
124  const auto &fRanges = req->failedRanges();
125  try {
126  std::transform( fRanges.begin(), fRanges.end(), std::back_inserter(_failedRanges), [ &req ]( const auto &r ){
127  Block b = std::any_cast<Block>(r.userData);;
128  b._failedWithErr = req->error();
130  DBG_MEDIA << "Adding failed block to failed blocklist: " << b.start << " " << b.len << " (" << req->error().toString() << " [" << req->error().nativeErrorString()<< "])" << std::endl;
131  return b;
132  });
133 
134  // try to fill the open spot right away
136  return;
137 
138  } catch ( const zypp::Exception &ex ) {
139  //we just log the exception and fall back to a normal download
140  WAR << "Multipart download failed: " << ex.asString() << std::endl;
141  }
142  }
143 
144  //if rety is true we just enqueue the request again, usually this means authentication was updated
145  if ( retry ) {
146  //make sure this request will run asap
147  req->setPriority( parent._defaultSubRequestPriority );
148 
149  //this is not a new request, only add to queues but do not connect signals again
150  addNewRequest( req, false );
151  return;
152  }
153 
154  //we do not have more mirrors left we can try
155  cancelAll ( err );
156 
157  // not all hope is lost, maybe a normal download can work out?
158  // fall back to normal download
159  _sigFailed.emit();
160  }
161 
163  {
165  return;
166 
167  zypp::OnScopeExit clearFlag( [this]() {
169  });
170 
172 
173  //check if there is still work to do
174  while ( _ranges.size() || _failedRanges.size() ) {
175 
176  // download was already finished
177  if ( _error.isError() )
178  return;
179 
180  if ( _runningRequests.size() >= 10 )
181  break;
182 
183  // prepareNextMirror will automatically call mirrorReceived() once there is a mirror ready
184  const auto &res = prepareNextMirror();
185  // if mirrors are delayed we stop here, once the mirrors are ready we get called again
187  return;
188  else if ( res == MirrorHandlingStateBase::Failed ) {
189  failedToPrepare();
190  return;
191  }
192  }
193 
194  // check if we are done at this point
195  if ( _runningRequests.empty() ) {
196 
197  if ( _failedRanges.size() || _ranges.size() ) {
199  return;
200  }
201 
202  // seems we were successfull , transition to finished state
203  setFinished();
204  }
205  }
206 
208  {
209 
210  auto &parent = stateMachine();
211  Url myUrl;
212  TransferSettings settings;
213 
214  auto err = setupMirror( mirror, myUrl, settings );
215  if ( err.isError() ) {
216  WAR << "Failure to setup mirror " << myUrl << " with error " << err.toString() << "("<< err.nativeErrorString() << "), dropping it from the list of mirrors." << std::endl;
217  // if a mirror fails , we remove it from our list
218  _fileMirrors.erase( mirror.first );
219 
220  // make sure this is retried
222  return;
223  }
224 
225  auto blocks = getNextBlocks( myUrl.getScheme() );
226  if ( !blocks.size() )
227  blocks = getNextFailedBlocks( myUrl.getScheme() );
228 
229  if ( !blocks.size() ) {
230  // We have no blocks. In theory, that should never happen, but for safety, we error out here. It is better than
231  // getting stuck.
232  setFailed( NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Mirror requested after all blocks were downloaded." ) );
233  return;
234  }
235 
236  const auto &spec = parent._spec;
237 
238  std::shared_ptr<Request> req = std::make_shared<Request>( ::internal::clearQueryString( myUrl ), spec.targetPath(), NetworkRequest::WriteShared );
239  req->_myMirror = mirror.second;
240  req->_originalUrl = myUrl;
241  req->setPriority( parent._defaultSubRequestPriority );
242  req->transferSettings() = settings;
243 
244  // if we download chunks we do not want to wait for too long on mirrors that have slow activity
245  // note: this sets the activity timeout, not the download timeout
246  req->transferSettings().setTimeout( 2 );
247 
248  DBG_MEDIA << "Creating Request to download blocks:"<<std::endl;
249  if ( !addBlockRanges( req, std::move(blocks) ) ) {
251  return;
252  }
253 
254  // we just use a mirror once per file, remove it from the list
255  _fileMirrors.erase( mirror.first );
256 
257  addNewRequest( req );
258 
259  // trigger next downloads
261  }
262 
264  {
265  // it was impossible to find a new mirror, check if we still have running requests we can wait for, if not
266  // we can only fail at this point
267  if ( !_runningRequests.size() ) {
269  }
270  }
271 
273  {
274  bool triggerResched = false;
275  for ( auto &req : _runningRequests ) {
276  if ( req->state() == NetworkRequest::Pending ) {
277  triggerResched = true;
278  req->setPriority( NetworkRequest::Critical, false );
279  }
280  }
281  if ( triggerResched )
282  stateMachine()._requestDispatcher->reschedule();
283  }
284 
285  void RangeDownloaderBaseState::addNewRequest(std::shared_ptr<Request> req , const bool connectSignals)
286  {
287  if ( connectSignals )
288  req->connectSignals( *this );
289 
290  _runningRequests.push_back( req );
291  stateMachine()._requestDispatcher->enqueue( req );
292 
293  if ( req->_myMirror )
294  req->_myMirror->startTransfer();
295  }
296 
298  {
299  const off_t expFSize = stateMachine()._spec.expectedFileSize();
300  if ( expFSize > 0 && expFSize < currentFilesize ) {
302  return false;
303  }
304  return true;
305  }
306 
310  bool RangeDownloaderBaseState::addBlockRanges ( std::shared_ptr<Request> req , std::vector<Block> &&blocks ) const
311  {
312  req->resetRequestRanges();
313  for ( const auto &block : blocks ) {
314  if ( block.chksumVec && block.chksumtype.size() ) {
315  std::shared_ptr<zypp::Digest> dig = std::make_shared<zypp::Digest>();
316  if ( !dig->create( block.chksumtype ) ) {
317  WAR_MEDIA << "Trying to create Digest with chksum type " << block.chksumtype << " failed " << std::endl;
318  return false;
319  }
320 
322  DBG_MEDIA << "Starting block " << block.start << " with checksum " << zypp::Digest::digestVectorToString( *block.chksumVec ) << "." << std::endl;
323  req->addRequestRange( block.start, block.len, dig, *block.chksumVec, std::any( block ), block.chksumCompareLen, block.chksumPad );
324  } else {
325 
327  DBG_MEDIA << "Starting block " << block.start << " without checksum." << std::endl;
328  req->addRequestRange( block.start, block.len, {}, {}, std::any( block ) );
329  }
330  }
331  return true;
332  }
333 
335  {
336  _error = std::move( err );
337  cancelAll( _error );
338  zypp::filesystem::unlink( stateMachine()._spec.targetPath() );
339  _sigFailed.emit();
340  }
341 
342  void RangeDownloaderBaseState::setFailed(std::string &&reason)
343  {
345  }
346 
348  {
350  _sigFinished.emit();
351  }
352 
354  {
355  while( _runningRequests.size() ) {
356  auto req = _runningRequests.back();
357  req->disconnectSignals();
358  _runningRequests.pop_back();
359  stateMachine()._requestDispatcher->cancel( *req, err );
360  if ( req->_myMirror )
361  req->_myMirror->cancelTransfer();
362  }
363  }
364 
365  std::vector<RangeDownloaderBaseState::Block> RangeDownloaderBaseState::getNextBlocks( const std::string &urlScheme )
366  {
367  std::vector<Block> blocks;
368  const auto &prefSize = std::max<zypp::ByteCount>( _preferredChunkSize, zypp::ByteCount(4, zypp::ByteCount::K) );
369  size_t accumulatedSize = 0;
370 
371  bool canDoRandomBlocks = ( zypp::str::hasPrefixCI( urlScheme, "http") );
372 
373  std::optional<size_t> lastBlockEnd;
374  while ( _ranges.size() && accumulatedSize < prefSize ) {
375  const auto &r = _ranges.front();
376 
377  if ( !canDoRandomBlocks && lastBlockEnd ) {
378  if ( static_cast<const size_t>(r.start) != (*lastBlockEnd)+1 )
379  break;
380  }
381 
382  lastBlockEnd = r.start + r.len - 1;
383  accumulatedSize += r.len;
384 
385  blocks.push_back( std::move( _ranges.front() ) );
386  _ranges.pop_front();
387 
388  }
389  DBG_MEDIA << "Accumulated " << blocks.size() << " blocks with accumulated size of: " << accumulatedSize << "." << std::endl;
390  return blocks;
391  }
392 
393  std::vector<RangeDownloaderBaseState::Block> RangeDownloaderBaseState::getNextFailedBlocks( const std::string &urlScheme )
394  {
395  const auto &prefSize = std::max<zypp::ByteCount>( _preferredChunkSize, zypp::ByteCount(4, zypp::ByteCount::K) );
396  // sort the failed requests by block number, this should make sure get them in offset order as well
397  _failedRanges.sort( []( const auto &a , const auto &b ){ return a.start < b.start; } );
398 
399  bool canDoRandomBlocks = ( zypp::str::hasPrefixCI( urlScheme, "http") );
400 
401  std::vector<Block> fblks;
402  std::optional<size_t> lastBlockEnd;
403  size_t accumulatedSize = 0;
404  while ( _failedRanges.size() ) {
405 
406  const auto &block =_failedRanges.front();
407 
408  //we need to check if we have consecutive blocks because only http mirrors support random request ranges
409  if ( !canDoRandomBlocks && lastBlockEnd ) {
410  if ( static_cast<const size_t>(block.start) != (*lastBlockEnd)+1 )
411  break;
412  }
413 
414  lastBlockEnd = block.start + block.len - 1;
415  accumulatedSize += block.len;
416 
417  fblks.push_back( std::move( _failedRanges.front() ));
418  _failedRanges.pop_front();
419 
420  fblks.back()._retryCount += 1;
421 
422  if ( accumulatedSize >= prefSize )
423  break;
424  }
425 
426  return fblks;
427  }
428 
430  {
431  // this case should never happen because we never start a multi download if we do not know the filesize beforehand
432  if ( filesize == 0 ) return 4 * 1024 * 1024;
433  else if ( filesize < 4*1024*1024 ) return filesize;
434  else if ( filesize < 8*1024*1024 ) return 4*1024*1024;
435  else if ( filesize < 16*1024*1024 ) return 8*1024*1024;
436  else if ( filesize < 256*1024*1024 ) return 10*1024*1024;
437  return 4*1024*1024;
438  }
439 }
bool assertExpectedFilesize(off_t currentFilesize)
bool isError() const
isError Will return true if this is a actual error
#define MIL
Definition: Logger.h:96
void onRequestFinished(NetworkRequest &req, const NetworkRequestError &err)
void onRequestProgress(NetworkRequest &, off_t, off_t, off_t, off_t)
#define DBG_MEDIA
Definition: mediadebug_p.h:28
void mirrorReceived(MirrorControl::MirrorPick mirror) override
unsigned short b
bool hasPrefixCI(const C_Str &str_r, const C_Str &prefix_r)
Definition: String.h:1030
Store and operate with byte count.
Definition: ByteCount.h:30
Holds transfer setting.
zypp::ByteCount downloadedByteCount() const
Returns the number of already downloaded bytes as reported by the backend.
Definition: request.cc:1402
Url clearQueryString(const Url &url)
Definition: curlhelper.cc:332
void setFailed(NetworkRequestError &&err)
static std::string digestVectorToString(const UByteArray &vec)
get hex string representation of the digest vector given as parameter
Definition: Digest.cc:184
static zypp::ByteCount makeBlksize(size_t filesize)
int unlink(const Pathname &path)
Like &#39;unlink&#39;.
Definition: PathInfo.cc:700
bool addBlockRanges(std::shared_ptr< Request > req, std::vector< Block > &&blocks) const
Just initialize the requests ranges from the internal blocklist.
std::string asString() const
Error message provided by dumpOn as string.
Definition: Exception.cc:75
void cancelAll(const NetworkRequestError &err)
void onRequestStarted(NetworkRequest &)
#define WAR
Definition: Logger.h:97
The NetworkRequestError class Represents a error that occured in.
std::vector< Block > getNextFailedBlocks(const std::string &urlScheme)
void setTimeout(long t)
set the transfer timeout
std::pair< std::vector< Url >::const_iterator, MirrorHandle > MirrorPick
NetworkRequestError setupMirror(const MirrorControl::MirrorPick &pick, Url &url, TransferSettings &set)
#define MIL_MEDIA
Definition: mediadebug_p.h:29
long ZYPP_MEDIA_CURL_DEBUG()
Long number for setting CURLOPT_DEBUGDATA.
Definition: curlhelper_p.h:35
#define WAR_MEDIA
Definition: mediadebug_p.h:30
void handleRequestError(std::shared_ptr< Request > req, const zyppng::NetworkRequestError &err)
Base class for Exception.
Definition: Exception.h:145
unsigned short a
static const Unit K
1024 Byte
Definition: ByteCount.h:45
std::vector< Block > getNextBlocks(const std::string &urlScheme)
Type type() const
type Returns the type of the error
void addNewRequest(std::shared_ptr< Request > req, const bool connectSignals=true)
std::vector< std::shared_ptr< Request > > _runningRequests
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})
#define DBG
Definition: Logger.h:95