- 
                Notifications
    
You must be signed in to change notification settings  - Fork 54
 
Enable HttpPostRequestCallback to fail requests #124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
9e02c27
              29139e1
              93ba816
              084fd52
              00fdaf6
              c32d55b
              cb1ed41
              ae5f5fe
              c6f7b49
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -25,5 +25,5 @@ void call( | |
| RequestT requestEntry, | ||
| String endpointUrl, | ||
| Map<String, String> headerMap | ||
| ); | ||
| ) throws PostRequestCallbackException; | ||
                
       | 
||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| package com.getindata.connectors.http; | ||
| 
     | 
||
| public class PostRequestCallbackException extends Exception { | ||
| public PostRequestCallbackException(String message) { | ||
| super(message); | ||
| } | ||
| 
     | 
||
| public PostRequestCallbackException(String message, Throwable cause) { | ||
| super(message, cause); | ||
| } | ||
| } | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -13,6 +13,7 @@ | |
| import org.apache.flink.annotation.VisibleForTesting; | ||
| 
     | 
||
| import com.getindata.connectors.http.HttpPostRequestCallback; | ||
| import com.getindata.connectors.http.PostRequestCallbackException; | ||
| import com.getindata.connectors.http.internal.HeaderPreprocessor; | ||
| import com.getindata.connectors.http.internal.SinkHttpClient; | ||
| import com.getindata.connectors.http.internal.SinkHttpClientResponse; | ||
| 
          
            
          
           | 
    @@ -98,13 +99,20 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse( | |
| for (var response : responses) { | ||
| var sinkRequestEntry = response.getHttpRequest(); | ||
| var optResponse = response.getResponse(); | ||
| 
     | 
||
| httpPostRequestCallback.call( | ||
| optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); | ||
| var failedCallback = false; | ||
| 
     | 
||
| try { | ||
| httpPostRequestCallback.call( | ||
| optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); | ||
| } catch (PostRequestCallbackException e) { | ||
| failedCallback = true; | ||
| log.debug("request marked as failed due to callback exception", e); | ||
                
       | 
||
| } | ||
| 
     | 
||
| // TODO Add response processor here and orchestrate it with statusCodeChecker. | ||
| if (optResponse.isEmpty() || | ||
| statusCodeChecker.isErrorCode(optResponse.get().statusCode())) { | ||
| statusCodeChecker.isErrorCode(optResponse.get().statusCode()) || | ||
| failedCallback) { | ||
| failedResponses.add(sinkRequestEntry); | ||
| } else { | ||
| successfulResponses.add(sinkRequestEntry); | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -14,6 +14,7 @@ | |
| import org.apache.flink.util.StringUtils; | ||
| 
     | 
||
| import com.getindata.connectors.http.HttpPostRequestCallback; | ||
| import com.getindata.connectors.http.PostRequestCallbackException; | ||
| import com.getindata.connectors.http.internal.PollingClient; | ||
| import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; | ||
| import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; | ||
| 
          
            
          
           | 
    @@ -89,7 +90,14 @@ private Optional<RowData> processHttpResponse( | |
| HttpResponse<String> response, | ||
| HttpLookupSourceRequestEntry request) throws IOException { | ||
| 
     | 
||
| this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap()); | ||
| try { | ||
| this.httpPostRequestCallback.call( | ||
| response, request, "endpoint", Collections.emptyMap() | ||
| ); | ||
| } catch (PostRequestCallbackException e) { | ||
| log.debug("Error during post request callback.", e); | ||
| return Optional.empty(); | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we not throw an Exception here? There is already precedence as this method already throws throws IOException. Why is this not log.error - the text says Error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same reason as below, I don't think there's a need to bubble up the exception, just treat this request as failed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code has changed in this area. Prior to this change all failures resulted in the job ending. It is possible to specify in the table config the list of status codes to count as ignore, retry, failed or successful. If the logic in the callback is around status codes, then we do not need this extra code change. If your callback is doing more sophisticated checking of the response payload, then this would make sense, the exception should be thrown if you want to fail the call, continue-on-error true will determine if the job carries on or not. I do not think we should swallow the Exception as this is not in line with the design of this connector. WDYT?  | 
||
| } | ||
| 
     | 
||
| if (response == null) { | ||
| return Optional.empty(); | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -1,7 +1,9 @@ | ||
| package com.getindata.connectors.http.internal.sink.httpclient; | ||
| 
     | 
||
| import java.io.File; | ||
| import java.net.http.HttpResponse; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.Properties; | ||
| 
     | 
||
| import com.github.tomakehurst.wiremock.WireMockServer; | ||
| 
        
          
        
         | 
    @@ -19,6 +21,8 @@ | |
| import static org.junit.jupiter.api.Assertions.assertAll; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| 
     | 
||
| import com.getindata.connectors.http.HttpPostRequestCallback; | ||
| import com.getindata.connectors.http.PostRequestCallbackException; | ||
| import com.getindata.connectors.http.internal.HttpsConnectionTestBase; | ||
| import com.getindata.connectors.http.internal.SinkHttpClientResponse; | ||
| import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; | ||
| 
          
            
          
           | 
    @@ -62,6 +66,33 @@ public void testHttpConnection() { | |
| batchRequestSubmitterFactory); | ||
| } | ||
| 
     | 
||
| @Test | ||
| public void testHttpPostRequestCallbackWithException() { | ||
| wireMockServer = new WireMockServer(SERVER_PORT); | ||
| wireMockServer.start(); | ||
| mockEndPoint(wireMockServer); | ||
| 
     | 
||
| try { | ||
| JavaNetSinkHttpClient client = | ||
| new JavaNetSinkHttpClient( | ||
| properties, | ||
| new TestPostRequestCallbackWithException(), | ||
| headerPreprocessor, | ||
| perRequestSubmitterFactory); | ||
| HttpSinkRequestEntry requestEntry = new HttpSinkRequestEntry("GET", new byte[0]); | ||
| SinkHttpClientResponse response = | ||
| client.putRequests( | ||
| Collections.singletonList(requestEntry), | ||
| "https://localhost:" + HTTPS_SERVER_PORT + ENDPOINT | ||
| ).get(); | ||
| 
     | 
||
| assertThat(response.getSuccessfulRequests()).isEmpty(); | ||
| assertThat(response.getFailedRequests()).isNotEmpty(); | ||
| } catch (Exception e) { | ||
                
       | 
||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
| 
     | 
||
| @Test | ||
| public void testHttpsConnectionWithSelfSignedCert() { | ||
| 
     | 
||
| 
          
            
          
           | 
    @@ -366,4 +397,17 @@ private void mockEndPointWithBasicAuth(WireMockServer wireMockServer) { | |
| .withBody("{}")) | ||
| ); | ||
| } | ||
| 
     | 
||
| public static class TestPostRequestCallbackWithException | ||
| implements HttpPostRequestCallback<HttpRequest> { | ||
| @Override | ||
| public void call( | ||
| HttpResponse<String> response, | ||
| HttpRequest requestEntry, | ||
| String endpointUrl, | ||
| Map<String, String> headerMap | ||
| ) throws PostRequestCallbackException { | ||
| throw new PostRequestCallbackException("Test exception"); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why this is on on request call backs and not responses?
I find the words difficult to parse.
It is also possible to declare if a request should be considered failed from the
Can we say something like :
Throw HttpPostRequestCallbackException to indicate that a request should be considered as failed in your custom callback. The reason you might want to do this is ..... The side effects of doing this are .....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the description, lmk if this is easier to read