Skip to content
Snippets Groups Projects
Commit 338829f2 authored by Artem Zinnatullin's avatar Artem Zinnatullin Committed by Copybara-Service
Browse files

Fix retrying of SocketTimeoutExceptions in HttpConnector

As part of investigation of #8974 I found that recent change 982e0b83 broke retries of `SocketTimeoutException` in `HttpConnector`, intention of that change was good but lack of tests resulted in broken logic.

IntelliJ highlights the problem ? branch with `instanceof SocketTimeoutException` would have never been executed:

<img width="764" alt="Screen Shot 2019-07-29 at 3 26 11 PM" src="https://user-images.githubusercontent.com/967132/62089179-d7bfa400-b21c-11e9-882a-1c1fe1fcb683.png">

---

This PR adds missing tests and fixes the logic to still present `SocketTimeoutException` as `IOException` for upstream consumers while handling it properly internally in the `HttpConnector`.

Closes #9008.

PiperOrigin-RevId: 261675244
parent 299655ee
No related merge requests found
......@@ -185,11 +185,6 @@ class HttpConnector {
throw e;
} catch (IllegalArgumentException e) {
throw new UnrecoverableHttpException(e.getMessage());
} catch (SocketTimeoutException e) {
// SocketTimeoutExceptions are InterruptedExceptions; however they do not signify
// an external interruption, but simply a failed download due to some server timing
// out. So rethrow them as ordinary IOExceptions.
throw new IOException(e.getMessage(), e);
} catch (IOException e) {
if (connection != null) {
// If we got here, it means we might not have consumed the entire payload of the
......@@ -214,7 +209,12 @@ class HttpConnector {
throw e;
}
if (++retries == MAX_RETRIES) {
if (!(e instanceof SocketTimeoutException)) {
if (e instanceof SocketTimeoutException) {
// SocketTimeoutExceptions are InterruptedIOExceptions; however they do not signify
// an external interruption, but simply a failed download due to some server timing
// out. So rethrow them as ordinary IOExceptions.
e = new IOException(e.getMessage(), e);
} else {
eventHandler
.handle(Event.progress(format("Error connecting to %s: %s", url, e.getMessage())));
}
......
......@@ -41,6 +41,7 @@ import java.net.InetAddress;
import java.net.Proxy;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.net.URLConnection;
import java.util.Locale;
......@@ -53,6 +54,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
......@@ -79,10 +81,13 @@ public class HttpConnectorTest {
private final ExecutorService executor = Executors.newFixedThreadPool(2);
private final ManualClock clock = new ManualClock();
private final ManualSleeper sleeper = new ManualSleeper(clock);
/** Scale timeouts down to make tests fast. */
private final float timeoutScaling = 0.05f;
private final EventHandler eventHandler = mock(EventHandler.class);
private final ProxyHelper proxyHelper = mock(ProxyHelper.class);
private final HttpConnector connector =
new HttpConnector(Locale.US, eventHandler, proxyHelper, sleeper);
new HttpConnector(Locale.US, eventHandler, proxyHelper, sleeper, timeoutScaling);
@Before
public void before() throws Exception {
......@@ -202,6 +207,159 @@ public class HttpConnectorTest {
}
}
@Test
public void connectionRefused_retries() throws Exception {
final int port;
// Start and immediately stop server socket to get a free port.
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
port = server.getLocalPort();
}
final AtomicReference<ServerSocket> server = new AtomicReference<>();
try {
// Schedule server socket to be started only after retry to simulate connection retry.
sleeper.scheduleRunnable(
() -> {
try {
server.set(new ServerSocket(port, 1, InetAddress.getByName(null)));
} catch (IOException e) {
throw new RuntimeException(e);
}
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(
() -> {
while (!executor.isShutdown()) {
try (Socket socket = server.get().accept()) {
readHttpRequest(socket.getInputStream());
sendLines(
socket,
"HTTP/1.1 200 OK",
"Date: Fri, 31 Dec 1999 23:59:59 GMT",
"Connection: close",
"Content-Type: text/plain",
"Content-Length: 5",
"",
"hello");
}
}
return null;
});
},
1);
try (Reader payload =
new InputStreamReader(
connector
.connect(
new URL(String.format("http://localhost:%d", port)),
ImmutableMap.<String, String>of())
.getInputStream(),
ISO_8859_1)) {
assertThat(CharStreams.toString(payload)).isEqualTo("hello");
}
} finally {
ServerSocket serverSocket = server.get();
if (serverSocket != null) {
serverSocket.close();
}
}
}
@Test
public void socketTimeout_retries() throws Exception {
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(
() -> {
try (Socket socket = server.accept()) {
// Do nothing to cause SocketTimeoutException on client side.
}
// Schedule proper HTTP response once client retries.
sleeper.scheduleRunnable(
() -> {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError2 =
executor.submit(
() -> {
while (!executor.isShutdown()) {
try (Socket socket = server.accept()) {
readHttpRequest(socket.getInputStream());
sendLines(
socket,
"HTTP/1.1 200 OK",
"Date: Fri, 31 Dec 1999 23:59:59 GMT",
"Connection: close",
"Content-Type: text/plain",
"Content-Length: 5",
"",
"hello");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
},
1);
return null;
});
try (Reader payload =
new InputStreamReader(
connector
.connect(
new URL(String.format("http://localhost:%d", server.getLocalPort())),
ImmutableMap.<String, String>of())
.getInputStream(),
ISO_8859_1)) {
assertThat(CharStreams.toString(payload)).isEqualTo("hello");
assertThat(clock.currentTimeMillis()).isEqualTo(1);
}
}
}
/**
* It is important part of {@link HttpConnector} contract to not throw raw {@link
* SocketTimeoutException} because it extends {@link java.io.InterruptedIOException} and {@link
* HttpConnectorMultiplexer} relies on {@link java.io.InterruptedIOException} to only be thrown
* when actual interruption happened.
*/
@Test
public void socketTimeout_throwsIOExceptionInsteadOfSocketTimeoutException() throws Exception {
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(
() -> {
try (Socket socket = server.accept()) {
// Do nothing to cause SocketTimeoutException on client side.
}
return null;
});
try (Reader payload =
new InputStreamReader(
connector
.connect(
new URL(String.format("http://localhost:%d", server.getLocalPort())),
ImmutableMap.<String, String>of())
.getInputStream(),
ISO_8859_1)) {
fail("Should have thrown");
} catch (IOException expected) {
assertThat(expected).hasCauseThat().isInstanceOf(SocketTimeoutException.class);
assertThat(expected).hasCauseThat().hasMessageThat().isEqualTo("connect timed out");
}
}
}
@Test
public void permanentError_doesNotRetryAndThrowsIOException() throws Exception {
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
......
......@@ -17,12 +17,17 @@ package com.google.devtools.build.lib.testutil;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.util.Sleeper;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/** Fake sleeper for testing. */
public final class ManualSleeper implements Sleeper {
private final ManualClock clock;
private final List<Pair<Long, Runnable>> scheduledRunnables = new ArrayList<>(0);
public ManualSleeper(ManualClock clock) {
this.clock = checkNotNull(clock);
......@@ -31,6 +36,31 @@ public final class ManualSleeper implements Sleeper {
@Override
public void sleepMillis(long milliseconds) throws InterruptedException {
checkArgument(milliseconds >= 0, "sleeper can't time travel");
clock.advanceMillis(milliseconds);
final long resultedCurrentTimeMillis = clock.advanceMillis(milliseconds);
Iterator<Pair<Long, Runnable>> iterator = scheduledRunnables.iterator();
// Run those scheduled Runnables who's time has come.
while (iterator.hasNext()) {
Pair<Long, Runnable> scheduledRunnable = iterator.next();
if (resultedCurrentTimeMillis >= scheduledRunnable.first) {
iterator.remove();
scheduledRunnable.second.run();
}
}
}
/**
* Schedules a given {@link Runnable} to run when this Sleeper's clock has been adjusted with
* {@link #sleepMillis(long)} by {@code delayMilliseconds} or greater.
*
* @param runnable runnable to run, must not throw exceptions.
* @param delayMilliseconds delay in milliseconds from current value of {@link ManualClock} used
* by this {@link ManualSleeper}.
*/
public void scheduleRunnable(Runnable runnable, long delayMilliseconds) {
checkArgument(delayMilliseconds >= 0, "sleeper can't time travel");
scheduledRunnables.add(new Pair<>(clock.currentTimeMillis() + delayMilliseconds, runnable));
}
}
// Copyright 2019 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.testutil;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@code ManualSleeper}. */
@RunWith(JUnit4.class)
public class ManualSleeperTest {
private final ManualClock clock = new ManualClock();
private final ManualSleeper sleeper = new ManualSleeper(clock);
@Test
public void sleepMillis_0_ok() throws InterruptedException {
sleeper.sleepMillis(0);
assertThat(clock.currentTimeMillis()).isEqualTo(0);
}
@Test
public void sleepMillis_100_ok() throws InterruptedException {
sleeper.sleepMillis(100);
assertThat(clock.currentTimeMillis()).isEqualTo(100);
}
@Test
public void sleepMillis_minus1_throws() throws InterruptedException {
try {
sleeper.sleepMillis(-1);
fail("Should have thrown");
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat().isEqualTo("sleeper can't time travel");
}
}
@Test
public void scheduleRunnable_0_doesNotRunItImmediately() {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 0);
assertThat(counter.get()).isEqualTo(0);
}
@Test
public void scheduleRunnable_100_doesNotRunItImmediately() {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 100);
assertThat(counter.get()).isEqualTo(0);
}
@Test
public void scheduleRunnable_minus1_throws() {
AtomicInteger counter = new AtomicInteger();
try {
sleeper.scheduleRunnable(counter::incrementAndGet, -1);
fail("Should have thrown");
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat().isEqualTo("sleeper can't time travel");
assertThat(counter.get()).isEqualTo(0);
}
}
@Test
public void scheduleRunnable_0_runsAfterSleep0() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 0);
sleeper.sleepMillis(0);
assertThat(counter.get()).isEqualTo(1);
}
@Test
public void scheduleRunnable_0_runsAfterSleep0_doesNotRunSecondTime()
throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 0);
sleeper.sleepMillis(0);
sleeper.sleepMillis(0);
assertThat(counter.get()).isEqualTo(1);
}
@Test
public void scheduleRunnable_100_runsAfterSleepExactly100() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 100);
sleeper.sleepMillis(50);
assertThat(counter.get()).isEqualTo(0);
sleeper.sleepMillis(50);
assertThat(counter.get()).isEqualTo(1);
}
@Test
public void scheduleRunnable_100_runsAfterSleepOver100() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 100);
sleeper.sleepMillis(50);
assertThat(counter.get()).isEqualTo(0);
sleeper.sleepMillis(150);
assertThat(counter.get()).isEqualTo(1);
}
@Test
public void scheduleRunnable_100_doesNotRunAgain() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 100);
sleeper.sleepMillis(150);
assertThat(counter.get()).isEqualTo(1);
sleeper.sleepMillis(100);
assertThat(counter.get()).isEqualTo(1);
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment