From 9a3d9893f8853b9cda7eb4ce3a8cf29ab0cfa9e6 Mon Sep 17 00:00:00 2001 From: ibrahim-camelan Date: Sun, 31 Dec 2023 15:33:05 +0200 Subject: [PATCH 1/3] [CM-6211] * added errorListener to expose the error to the lib user * set pingInterval to small duration, so we can easily retry. --- publish.gradle | 186 +++++++++--------- .../java/io/getstream/cloud/CloudClient.java | 11 +- .../java/io/getstream/cloud/CloudFeed.java | 5 +- .../io/getstream/cloud/FeedSubscriber.java | 5 +- .../core/faye/client/FayeClient.java | 44 +++-- .../core/faye/client/FayeErrorListener.java | 7 + .../io/getstream/cloud/CloudFeedTest.java | 2 +- 7 files changed, 145 insertions(+), 115 deletions(-) create mode 100644 src/main/java/io/getstream/core/faye/client/FayeErrorListener.java diff --git a/publish.gradle b/publish.gradle index 1dc7b68c..44e490f8 100644 --- a/publish.gradle +++ b/publish.gradle @@ -1,93 +1,93 @@ -apply plugin: 'maven-publish' -apply plugin: 'signing' - -// Create variables with empty default values -ext["ossrhUsername"] = '' -ext["ossrhPassword"] = '' -ext["signing.keyId"] = '' -ext["signing.password"] = '' -ext["signing.secretKeyRingFile"] = '' -ext["sonatypeStagingProfileId"] = '' - -File secretPropsFile = project.rootProject.file('local.properties') -if (secretPropsFile.exists()) { - // Read local.properties file first if it exists - Properties p = new Properties() - new FileInputStream(secretPropsFile).withCloseable { is -> p.load(is) } - p.each { name, value -> ext[name] = value } -} else { - // Use system environment variables - ext["ossrhUsername"] = System.getenv('OSSRH_USERNAME') - ext["ossrhPassword"] = System.getenv('OSSRH_PASSWORD') - ext["signing.keyId"] = System.getenv('SIGNING_KEY_ID') - ext["signing.password"] = System.getenv('SIGNING_PASSWORD') - ext["signing.secretKeyRingFile"] = System.getenv('SIGNING_SECRET_KEY_RING_FILE') - ext["sonatypeStagingProfileId"] = System.getenv('SONATYPE_STAGING_PROFILE_ID') -} - -nexusPublishing { - repositories { - sonatype { - stagingProfileId = sonatypeStagingProfileId - username = ossrhUsername - password = ossrhPassword - } - } -} - -task javadocJar(type: Jar) { - archiveClassifier = 'javadoc' - from javadoc -} - -task sourcesJar(type: Jar) { - archiveClassifier = 'sources' - from sourceSets.main.allSource -} - -artifacts { - archives javadocJar, sourcesJar -} - -afterEvaluate { - publishing { - publications { - release(MavenPublication) { - from components.java - artifactId 'stream-java' - - artifact sourcesJar - artifact javadocJar - - pom { - name = "Stream Feeds official Java API Client" - description = "Stream Feeds Java Client for backend and android integrations" - url = 'https://github.com/getstream/stream-chat-java' - licenses { - license { - name = 'The 3-Clause BSD License' - url = 'https://opensource.org/licenses/BSD-3-Clause' - distribution = 'repo' - } - } - developers { - developer { - id = 'getstream-support' - name = 'Stream Support' - email = 'support@getstream.io' - } - } - scm { - connection = 'scm:git:github.com/getstream/stream-java.git' - developerConnection = 'scm:git:ssh://github.com/getstream/stream-java.git' - url = 'https://github.com/getstream/stream-java' - } - } - } - } - } -} - -signing { - sign publishing.publications -} +//apply plugin: 'maven-publish' +//apply plugin: 'signing' +// +//// Create variables with empty default values +//ext["ossrhUsername"] = '' +//ext["ossrhPassword"] = '' +//ext["signing.keyId"] = '' +//ext["signing.password"] = '' +//ext["signing.secretKeyRingFile"] = '' +//ext["sonatypeStagingProfileId"] = '' +// +//File secretPropsFile = project.rootProject.file('local.properties') +//if (secretPropsFile.exists()) { +// // Read local.properties file first if it exists +// Properties p = new Properties() +// new FileInputStream(secretPropsFile).withCloseable { is -> p.load(is) } +// p.each { name, value -> ext[name] = value } +//} else { +// // Use system environment variables +// ext["ossrhUsername"] = System.getenv('OSSRH_USERNAME') +// ext["ossrhPassword"] = System.getenv('OSSRH_PASSWORD') +// ext["signing.keyId"] = System.getenv('SIGNING_KEY_ID') +// ext["signing.password"] = System.getenv('SIGNING_PASSWORD') +// ext["signing.secretKeyRingFile"] = System.getenv('SIGNING_SECRET_KEY_RING_FILE') +// ext["sonatypeStagingProfileId"] = System.getenv('SONATYPE_STAGING_PROFILE_ID') +//} +// +//nexusPublishing { +// repositories { +// sonatype { +// stagingProfileId = sonatypeStagingProfileId +// username = ossrhUsername +// password = ossrhPassword +// } +// } +//} +// +//task javadocJar(type: Jar) { +// archiveClassifier = 'javadoc' +// from javadoc +//} +// +//task sourcesJar(type: Jar) { +// archiveClassifier = 'sources' +// from sourceSets.main.allSource +//} +// +//artifacts { +// archives javadocJar, sourcesJar +//} +// +//afterEvaluate { +// publishing { +// publications { +// release(MavenPublication) { +// from components.java +// artifactId 'stream-java' +// +// artifact sourcesJar +// artifact javadocJar +// +// pom { +// name = "Stream Feeds official Java API Client" +// description = "Stream Feeds Java Client for backend and android integrations" +// url = 'https://github.com/getstream/stream-chat-java' +// licenses { +// license { +// name = 'The 3-Clause BSD License' +// url = 'https://opensource.org/licenses/BSD-3-Clause' +// distribution = 'repo' +// } +// } +// developers { +// developer { +// id = 'getstream-support' +// name = 'Stream Support' +// email = 'support@getstream.io' +// } +// } +// scm { +// connection = 'scm:git:github.com/getstream/stream-java.git' +// developerConnection = 'scm:git:ssh://github.com/getstream/stream-java.git' +// url = 'https://github.com/getstream/stream-java' +// } +// } +// } +// } +// } +//} +// +//signing { +// sign publishing.publications +//} diff --git a/src/main/java/io/getstream/cloud/CloudClient.java b/src/main/java/io/getstream/cloud/CloudClient.java index e2417ec4..8f52422e 100644 --- a/src/main/java/io/getstream/cloud/CloudClient.java +++ b/src/main/java/io/getstream/cloud/CloudClient.java @@ -9,6 +9,7 @@ import io.getstream.core.faye.DefaultMessageTransformer; import io.getstream.core.faye.Message; import io.getstream.core.faye.client.FayeClient; +import io.getstream.core.faye.client.FayeErrorListener; import io.getstream.core.faye.subscription.ChannelSubscription; import io.getstream.core.http.HTTPClient; import io.getstream.core.http.OKHTTPClientAdapter; @@ -219,7 +220,7 @@ public CompletableFuture openGraph(URL url) throws StreamException { } private CompletableFuture feedSubscriber( - FeedID feedId, RealtimeMessageCallback messageCallback) { + FeedID feedId, RealtimeMessageCallback messageCallback, FayeErrorListener errorListener) { final CompletableFuture subscriberCompletion = new CompletableFuture<>(); try { checkNotNull(appID, "Missing app id, which is needed in order to subscribe feed"); @@ -239,10 +240,14 @@ private CompletableFuture feedSubscriber( Serialization.fromJSON(new String(payload), RealtimeMessage.class); messageCallback.onMessage(message); } catch (Exception e) { - e.printStackTrace(); + if (errorListener != null){ + errorListener.onError(e, null); + } } }, - () -> feedSubscriptions.remove("/" + notificationChannel)) + () -> feedSubscriptions.remove("/" + notificationChannel), + errorListener + ) .get(); subscription.channelSubscription = channelSubscription; diff --git a/src/main/java/io/getstream/cloud/CloudFeed.java b/src/main/java/io/getstream/cloud/CloudFeed.java index 07673c01..4299be97 100644 --- a/src/main/java/io/getstream/cloud/CloudFeed.java +++ b/src/main/java/io/getstream/cloud/CloudFeed.java @@ -6,6 +6,7 @@ import com.google.common.collect.Iterables; import io.getstream.core.exceptions.StreamException; +import io.getstream.core.faye.client.FayeErrorListener; import io.getstream.core.faye.subscription.ChannelSubscription; import io.getstream.core.http.Response; import io.getstream.core.models.Activity; @@ -52,9 +53,9 @@ protected final CloudClient getClient() { } public final CompletableFuture subscribe( - RealtimeMessageCallback messageCallback) { + RealtimeMessageCallback messageCallback, FayeErrorListener errorListener) { checkNotNull(subscriber, "A subscriber must be provided in order to start listening to a feed"); - return subscriber.subscribe(id, messageCallback); + return subscriber.subscribe(id, messageCallback, errorListener); } public final FeedID getID() { diff --git a/src/main/java/io/getstream/cloud/FeedSubscriber.java b/src/main/java/io/getstream/cloud/FeedSubscriber.java index 38872777..31585cb8 100644 --- a/src/main/java/io/getstream/cloud/FeedSubscriber.java +++ b/src/main/java/io/getstream/cloud/FeedSubscriber.java @@ -1,10 +1,11 @@ package io.getstream.cloud; +import io.getstream.core.faye.client.FayeErrorListener; import io.getstream.core.faye.subscription.ChannelSubscription; import io.getstream.core.models.FeedID; import java8.util.concurrent.CompletableFuture; public interface FeedSubscriber { - CompletableFuture subscribe( - FeedID feedID, RealtimeMessageCallback messageCallback); + CompletableFuture subscribe( + FeedID feedID, RealtimeMessageCallback messageCallback, FayeErrorListener errorListener); } diff --git a/src/main/java/io/getstream/core/faye/client/FayeClient.java b/src/main/java/io/getstream/core/faye/client/FayeClient.java index caf58572..286d7494 100644 --- a/src/main/java/io/getstream/core/faye/client/FayeClient.java +++ b/src/main/java/io/getstream/core/faye/client/FayeClient.java @@ -20,6 +20,8 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; @@ -58,6 +60,8 @@ public FayeClient(URL baseURL) { private MessageTransformer messageTransformer = new DefaultMessageTransformer(); + private FayeErrorListener errorListener; + public void setMessageTransformer(MessageTransformer messageTransformer) { this.messageTransformer = messageTransformer; } @@ -76,7 +80,9 @@ public void setStateChangeListener(StateChangeListener stateChangeListener) { } private WebSocket webSocket; - private final OkHttpClient httpClient = new OkHttpClient(); + private final OkHttpClient httpClient = new OkHttpClient.Builder() + .pingInterval(5, TimeUnit.SECONDS) + .build(); private Timer timer = new Timer(); @@ -123,6 +129,10 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { // 'Error occurred', error, stacktrace); closeWebSocket(); initWebSocket(); + + if (errorListener != null) { + errorListener.onError(t, response); + } } private boolean manuallyClosed = false; @@ -137,15 +147,19 @@ public void onClosed(WebSocket webSocket, int code, String reason) { } private void scheduleTimerTask(Callback callback, long duration) { - if (timer == null) timer = new Timer(); - timer.schedule( - new TimerTask() { - @Override - public void run() { - callback.call(); - } - }, - duration); + try { + if (timer == null) timer = new Timer(); + timer.schedule( + new TimerTask() { + @Override + public void run() { + callback.call(); + } + }, + duration); + } catch (Exception ignored) { + // We don't really care if the timer is cancelled, we create a new client anyway. + } } public void handshake() { @@ -251,23 +265,25 @@ private void subscribeChannels(String[] channels) { public CompletableFuture subscribe( String channel, ChannelDataCallback callback) { - return subscribe(channel, callback, null, null); + return subscribe(channel, callback, null,null, null); } private CompletableFuture subscribe(String channel, Boolean force) { - return subscribe(channel, null, null, force); + return subscribe(channel, null, null, null, force); } public CompletableFuture subscribe( - String channel, ChannelDataCallback callback, SubscriptionCancelledCallback onCancelled) { - return subscribe(channel, callback, onCancelled, null); + String channel, ChannelDataCallback callback, SubscriptionCancelledCallback onCancelled, FayeErrorListener errorListener) { + return subscribe(channel, callback, onCancelled, errorListener, null); } private CompletableFuture subscribe( String channel, ChannelDataCallback onData, SubscriptionCancelledCallback onCancelled, + FayeErrorListener errorListener, Boolean force) { + this.errorListener = errorListener; // default value if (force == null) force = false; diff --git a/src/main/java/io/getstream/core/faye/client/FayeErrorListener.java b/src/main/java/io/getstream/core/faye/client/FayeErrorListener.java new file mode 100644 index 00000000..3e74e413 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/client/FayeErrorListener.java @@ -0,0 +1,7 @@ +package io.getstream.core.faye.client; + +import okhttp3.Response; + +public interface FayeErrorListener { + void onError(Throwable t, Response response); +} diff --git a/src/test/java/io/getstream/cloud/CloudFeedTest.java b/src/test/java/io/getstream/cloud/CloudFeedTest.java index 85512dfb..5bd1bbda 100644 --- a/src/test/java/io/getstream/cloud/CloudFeedTest.java +++ b/src/test/java/io/getstream/cloud/CloudFeedTest.java @@ -164,7 +164,7 @@ public void TestFaye() throws Exception { CloudClient client = CloudClient.builder(apiKey, token, userID, appId).build(); CloudFlatFeed feed = client.flatFeed("user", userID); CompletableFuture subscription = - feed.subscribe(message -> msg.set(message)); + feed.subscribe(message -> msg.set(message), null); feed.addActivity( Activity.builder() From 7275ea80efa49a4428213515267cd38bd692ec0e Mon Sep 17 00:00:00 2001 From: ibrahim-camelan Date: Sun, 31 Dec 2023 15:44:02 +0200 Subject: [PATCH 2/3] [CM-6211] minor refactoring --- .../java/io/getstream/core/faye/client/FayeErrorListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/getstream/core/faye/client/FayeErrorListener.java b/src/main/java/io/getstream/core/faye/client/FayeErrorListener.java index 3e74e413..e13e026a 100644 --- a/src/main/java/io/getstream/core/faye/client/FayeErrorListener.java +++ b/src/main/java/io/getstream/core/faye/client/FayeErrorListener.java @@ -3,5 +3,5 @@ import okhttp3.Response; public interface FayeErrorListener { - void onError(Throwable t, Response response); + void onError(Throwable throwable, Response response); } From fce80221c3b1ff3375b3e14c9c93a86621823d0f Mon Sep 17 00:00:00 2001 From: ibrahim-camelan Date: Sun, 3 Mar 2024 15:05:15 +0200 Subject: [PATCH 3/3] [CM-6437] remove pingInterval to reduce api calls to getstream --- src/main/java/io/getstream/core/faye/client/FayeClient.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/io/getstream/core/faye/client/FayeClient.java b/src/main/java/io/getstream/core/faye/client/FayeClient.java index 286d7494..2d56793a 100644 --- a/src/main/java/io/getstream/core/faye/client/FayeClient.java +++ b/src/main/java/io/getstream/core/faye/client/FayeClient.java @@ -80,9 +80,7 @@ public void setStateChangeListener(StateChangeListener stateChangeListener) { } private WebSocket webSocket; - private final OkHttpClient httpClient = new OkHttpClient.Builder() - .pingInterval(5, TimeUnit.SECONDS) - .build(); + private final OkHttpClient httpClient = new OkHttpClient(); private Timer timer = new Timer();