Skip to content
101 changes: 70 additions & 31 deletions internal/gensupport/resumable.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,34 +127,74 @@ func (rx *ResumableUpload) reportProgress(old, updated int64) {
}

// transferChunk performs a single HTTP request to upload a single chunk.
func (rx *ResumableUpload) transferChunk(ctx context.Context, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
// rCtx is derived from a context with a defined ChunkTransferTimeout with non-zero value.
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
// triggering a retry of the request.
var rCtx context.Context
var cancel context.CancelFunc

rCtx = ctx
// It uses a goroutine to perform the upload and a timer to enforce ChunkTransferTimeout or ChunkRetryDeadline.
func (rx *ResumableUpload) transferChunk(ctx context.Context, chunkRetryDeadline time.Duration, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
if rx.ChunkTransferTimeout != 0 {
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
defer cancel()
chunkRetryDeadline = rx.ChunkTransferTimeout
}

res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
if err != nil {
return res, err
}
// Start a timer for the configured duration.
timer := time.NewTimer(chunkRetryDeadline)

// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
// this file), so we don't expect to get a 308.
if res.StatusCode == 308 {
return nil, errors.New("unexpected 308 response status code")
// A struct to hold the result from the goroutine.
type uploadResult struct {
res *http.Response
err error
}

if res.StatusCode == http.StatusOK {
rx.reportProgress(off, off+int64(size))
// A buffered channel to receive the result of the upload.
resultCh := make(chan uploadResult, 1)

// Create a cancellable context for the upload request. This allows us to
// abort the request if the timer fires first.
rCtx, cancel := context.WithCancel(ctx)
// NOTE: We do NOT use `defer cancel()` here. The context must remain valid
// for the caller to read the response body of a successful request.
// Cancellation is handled manually on timeout paths.

// Starting the chunk upload in parallel.
go func() {
res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
resultCh <- uploadResult{res: res, err: err}
}()

// Wait for timer to fire or result channel to have the uploadResult or ctx to be cancelled.
select {
case <-ctx.Done():
// Context is cancelled for the overall upload.
cancel()
return nil, ctx.Err()
case <-timer.C:
// timer fired first so we cancel the chunk upload and return
// deadline exceeded for this chunk upload.
cancel()
return nil, context.DeadlineExceeded

case result := <-resultCh:
// The upload finished before the timer fired. Stop the timer to prevent a resource leak.
if !timer.Stop() {
// If Stop returns false, the timer has already fired and its value
// has been sent on the channel. We drain the channel to avoid a memory leak.
<-timer.C
}

// Handle the result from the upload.
if result.err != nil {
return result.res, result.err
}

// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
// this file), so we don't expect to get a 308.
if result.res.StatusCode == 308 {
return nil, errors.New("unexpected 308 response status code")
}

if result.res.StatusCode == http.StatusOK {
rx.reportProgress(off, off+int64(size))
}

return result.res, nil
}
return res, nil
}

// uploadChunkWithRetries attempts to upload a single chunk, with retries
Expand All @@ -164,14 +204,14 @@ func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.
shouldRetry := rx.Retry.errorFunc()

// Configure single chunk retry deadline.
retryDeadline := defaultRetryDeadline
chunkRetryDeadline := defaultRetryDeadline
if rx.ChunkRetryDeadline != 0 {
retryDeadline = rx.ChunkRetryDeadline
chunkRetryDeadline = rx.ChunkRetryDeadline
}

// Each chunk gets its own initialized-at-zero backoff and invocation ID.
bo := rx.Retry.backoff()
quitAfterTimer := time.NewTimer(retryDeadline)
quitAfterTimer := time.NewTimer(chunkRetryDeadline)
defer quitAfterTimer.Stop()
rx.attempts = 1
rx.invocationID = uuid.New().String()
Expand All @@ -184,20 +224,20 @@ func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.
for {
// Wait for the backoff period, unless the context is canceled or the
// retry deadline is hit.
pauseTimer := time.NewTimer(pause)
backoffPauseTimer := time.NewTimer(pause)
select {
case <-ctx.Done():
pauseTimer.Stop()
backoffPauseTimer.Stop()
if err == nil {
err = ctx.Err()
}
return resp, err
case <-pauseTimer.C:
case <-backoffPauseTimer.C:
case <-quitAfterTimer.C:
pauseTimer.Stop()
backoffPauseTimer.Stop()
return resp, err
}
pauseTimer.Stop()
backoffPauseTimer.Stop()

// Check for context cancellation or timeout once more. If more than one
// case in the select statement above was satisfied at the same time, Go
Expand Down Expand Up @@ -227,8 +267,7 @@ func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}

resp, err = rx.transferChunk(ctx, chunk, off, size, done)
resp, err = rx.transferChunk(ctx, chunkRetryDeadline, chunk, off, size, done)
status := 0
if resp != nil {
status = resp.StatusCode
Expand Down