18
18
import io .dapr .client .domain .ConfigurationItem ;
19
19
import io .dapr .client .domain .GetConfigurationRequest ;
20
20
import io .dapr .client .domain .SubscribeConfigurationRequest ;
21
+ import io .dapr .client .domain .SubscribeConfigurationResponse ;
22
+ import io .dapr .client .domain .UnsubscribeConfigurationResponse ;
21
23
import reactor .core .Disposable ;
22
24
import reactor .core .publisher .Flux ;
23
25
import reactor .core .publisher .Mono ;
24
26
25
27
import java .io .IOException ;
26
28
import java .util .ArrayList ;
27
29
import java .util .Arrays ;
30
+ import java .util .Collections ;
28
31
import java .util .List ;
29
32
import java .util .Map ;
30
33
import java .util .concurrent .atomic .AtomicReference ;
@@ -43,41 +46,8 @@ public class ConfigurationClient {
43
46
public static void main (String [] args ) throws Exception {
44
47
try (DaprPreviewClient client = (new DaprClientBuilder ()).buildPreviewClient ()) {
45
48
System .out .println ("Using preview client..." );
46
- getConfigurationForaSingleKey (client );
47
- getConfigurationsUsingVarargs (client );
48
49
getConfigurations (client );
49
- subscribeConfigurationRequestWithSubscribe (client );
50
- }
51
- }
52
-
53
- /**
54
- * Gets configuration for a single key.
55
- *
56
- * @param client DaprPreviewClient object
57
- */
58
- public static void getConfigurationForaSingleKey (DaprPreviewClient client ) {
59
- System .out .println ("*******trying to retrieve configuration given a single key********" );
60
- try {
61
- Mono <ConfigurationItem > item = client .getConfiguration (CONFIG_STORE_NAME , keys .get (0 ));
62
- System .out .println ("Value ->" + item .block ().getValue () + " key ->" + item .block ().getKey ());
63
- } catch (Exception ex ) {
64
- System .out .println (ex .getMessage ());
65
- }
66
- }
67
-
68
- /**
69
- * Gets configurations for varibale no. of arguments.
70
- *
71
- * @param client DaprPreviewClient object
72
- */
73
- public static void getConfigurationsUsingVarargs (DaprPreviewClient client ) {
74
- System .out .println ("*******trying to retrieve configurations for a variable no. of keys********" );
75
- try {
76
- Mono <Map <String , ConfigurationItem >> items =
77
- client .getConfiguration (CONFIG_STORE_NAME , "myconfig1" , "myconfig3" );
78
- items .block ().forEach ((k ,v ) -> print (v , k ));
79
- } catch (Exception ex ) {
80
- System .out .println (ex .getMessage ());
50
+ subscribeConfigurationRequest (client );
81
51
}
82
52
}
83
53
@@ -106,36 +76,25 @@ public static void getConfigurations(DaprPreviewClient client) {
106
76
*
107
77
* @param client DaprPreviewClient object
108
78
*/
109
- public static void subscribeConfigurationRequestWithSubscribe (DaprPreviewClient client ) {
110
- System .out .println ("*****Subscribing to keys using subscribe method: " + keys .toString () + " *****" );
111
- AtomicReference <Disposable > disposableAtomicReference = new AtomicReference <>();
112
- SubscribeConfigurationRequest req = new SubscribeConfigurationRequest (CONFIG_STORE_NAME , keys );
79
+ public static void subscribeConfigurationRequest (DaprPreviewClient client ) {
80
+ System .out .println ("Subscribing to key: myconfig1" );
81
+ SubscribeConfigurationRequest req = new SubscribeConfigurationRequest (
82
+ CONFIG_STORE_NAME , Collections .singletonList ("myconfig1" ));
83
+ Flux <SubscribeConfigurationResponse > outFlux = client .subscribeConfiguration (req );
113
84
Runnable subscribeTask = () -> {
114
- Flux <Map <String , ConfigurationItem >> outFlux = client .subscribeToConfiguration (req );
115
- disposableAtomicReference .set (outFlux
116
- .subscribe (
117
- cis -> cis .forEach ((k ,v ) -> print (v , k ))
118
- ));
85
+ outFlux .subscribe (cis -> {
86
+ System .out .println ("subscription ID : " + cis .getSubscriptionId ());
87
+ System .out .println ("subscribing to key myconfig1 is successful" );
88
+ });
119
89
};
120
90
new Thread (subscribeTask ).start ();
91
+ // To ensure main thread does not die before outFlux subscribe gets called
92
+ inducingSleepTime (5000 );
93
+ }
94
+
95
+ private static void inducingSleepTime (int timeInMillis ) {
121
96
try {
122
- // To ensure that subscribeThread gets scheduled
123
- Thread .sleep (0 );
124
- } catch (InterruptedException e ) {
125
- e .printStackTrace ();
126
- }
127
- Runnable updateKeys = () -> {
128
- int i = 1 ;
129
- while (i <= 3 ) {
130
- executeDockerCommand (i );
131
- i ++;
132
- }
133
- };
134
- new Thread (updateKeys ).start ();
135
- try {
136
- // To ensure main thread does not die before outFlux subscribe gets called
137
- Thread .sleep (10000 );
138
- disposableAtomicReference .get ().dispose ();
97
+ Thread .sleep (timeInMillis );
139
98
} catch (InterruptedException e ) {
140
99
e .printStackTrace ();
141
100
}
@@ -144,22 +103,4 @@ public static void subscribeConfigurationRequestWithSubscribe(DaprPreviewClient
144
103
private static void print (ConfigurationItem item , String key ) {
145
104
System .out .println (item .getValue () + " : key ->" + key );
146
105
}
147
-
148
- private static void executeDockerCommand (int postfix ) {
149
- String [] command = new String [] {
150
- "docker" , "exec" , "dapr_redis" , "redis-cli" ,
151
- "SET" ,
152
- "myconfig" + postfix , "update_myconfigvalue" + postfix + "||2"
153
- };
154
- ProcessBuilder processBuilder = new ProcessBuilder (command );
155
- Process process = null ;
156
- try {
157
- process = processBuilder .start ();
158
- process .waitFor ();
159
- } catch (IOException e ) {
160
- e .printStackTrace ();
161
- } catch (InterruptedException e ) {
162
- e .printStackTrace ();
163
- }
164
- }
165
106
}
0 commit comments