@@ -78,6 +78,7 @@ impl ServerCertVerifier for Insecure {
7878#[ derive( Clone , Debug ) ]
7979enum Bucket {
8080 Mem ( ResourceOwn < store:: Bucket > ) ,
81+ Redis ( ResourceOwn < store:: Bucket > ) ,
8182 Nats (
8283 ResourceOwn < wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: Bucket > ,
8384 wrpc_transport_nats:: Client ,
@@ -101,10 +102,11 @@ enum Bucket {
101102 ) ,
102103}
103104
104- #[ derive( Clone , Debug , Default ) ]
105+ #[ derive( Clone , Default ) ]
105106struct Handler {
106107 buckets : Arc < RwLock < HashMap < Bytes , Bucket > > > ,
107108 mem : wrpc_wasi_keyvalue_mem:: Handler ,
109+ redis : wrpc_wasi_keyvalue_redis:: Handler ,
108110}
109111
110112impl Handler {
@@ -123,12 +125,12 @@ fn parse_addr(url: &Url, default_port: u16) -> Result<SocketAddr> {
123125 if url. port ( ) . is_some ( ) {
124126 match auth. parse ( ) . context ( "failed to parse IP address" ) {
125127 Ok ( addr) => Ok ( addr) ,
126- Err ( err) => return Err ( store:: Error :: Other ( format ! ( "{err:#}" ) ) ) ,
128+ Err ( err) => Err ( store:: Error :: Other ( format ! ( "{err:#}" ) ) ) ,
127129 }
128130 } else {
129131 match auth. parse ( ) . context ( "failed to parse socket address" ) {
130132 Ok ( ip) => Ok ( SocketAddr :: new ( ip, default_port) ) ,
131- Err ( err) => return Err ( store:: Error :: Other ( format ! ( "{err:#}" ) ) ) ,
133+ Err ( err) => Err ( store:: Error :: Other ( format ! ( "{err:#}" ) ) ) ,
132134 }
133135 }
134136}
@@ -144,7 +146,7 @@ fn client_tls_config() -> rustls::ClientConfig {
144146}
145147
146148impl < C : Send + Sync > store:: Handler < C > for Handler {
147- #[ instrument( level = "trace" , skip( cx) , ret( level = "trace" ) ) ]
149+ #[ instrument( level = "trace" , skip( self , cx) , ret( level = "trace" ) ) ]
148150 async fn open (
149151 & self ,
150152 cx : C ,
@@ -161,12 +163,18 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
161163 buckets. insert ( id. clone ( ) , Bucket :: Mem ( bucket) ) ;
162164 return Ok ( Ok ( ResourceOwn :: from ( id) ) ) ;
163165 }
164- let ( url, identifier ) = identifier. split_once ( ';' ) . unwrap_or ( ( & identifier, "" ) ) ;
166+ let ( url, suffix ) = identifier. split_once ( ';' ) . unwrap_or ( ( & identifier, "" ) ) ;
165167 let mut url = match Url :: parse ( url) {
166168 Ok ( url) => url,
167169 Err ( err) => return Ok ( Err ( store:: Error :: Other ( err. to_string ( ) ) ) ) ,
168170 } ;
169171 let bucket = match url. scheme ( ) {
172+ "redis" | "rediss" | "redis+sentinel" | "rediss+sentinel" => {
173+ match self . redis . open ( cx, identifier) . await ? {
174+ Ok ( bucket) => Bucket :: Redis ( bucket) ,
175+ Err ( err) => return Ok ( Err ( err) ) ,
176+ }
177+ }
170178 "wrpc+nats" => {
171179 let nats = match async_nats:: connect_with_options (
172180 url. authority ( ) ,
@@ -188,9 +196,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
188196 Ok ( wrpc) => wrpc,
189197 Err ( err) => return Ok ( Err ( store:: Error :: Other ( format ! ( "{err:#}" ) ) ) ) ,
190198 } ;
191- match wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: open ( & wrpc, None , & identifier)
192- . await ?
193- {
199+ match wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: open ( & wrpc, None , suffix) . await ? {
194200 Ok ( bucket) => Bucket :: Nats ( bucket, wrpc) ,
195201 Err ( err) => return Ok ( Err ( err. into ( ) ) ) ,
196202 }
@@ -225,9 +231,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
225231 Err ( err) => return Ok ( Err ( store:: Error :: Other ( format ! ( "{err:#}" ) ) ) ) ,
226232 } ;
227233 let wrpc = wrpc_transport_quic:: Client :: from ( conn) ;
228- match wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: open ( & wrpc, ( ) , & identifier)
229- . await ?
230- {
234+ match wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: open ( & wrpc, ( ) , suffix) . await ? {
231235 Ok ( bucket) => Bucket :: Quic ( bucket, wrpc) ,
232236 Err ( err) => return Ok ( Err ( err. into ( ) ) ) ,
233237 }
@@ -238,9 +242,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
238242 Err ( err) => return Ok ( Err ( err) ) ,
239243 } ;
240244 let wrpc = wrpc_transport:: frame:: tcp:: Client :: from ( addr) ;
241- match wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: open ( & wrpc, ( ) , & identifier)
242- . await ?
243- {
245+ match wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: open ( & wrpc, ( ) , suffix) . await ? {
244246 Ok ( bucket) => Bucket :: Tcp ( bucket, wrpc) ,
245247 Err ( err) => return Ok ( Err ( err. into ( ) ) ) ,
246248 }
@@ -256,9 +258,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
256258 ) ) ) ;
257259 } ;
258260 let wrpc = wrpc_transport:: frame:: unix:: Client :: from ( path) ;
259- match wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: open ( & wrpc, ( ) , & identifier)
260- . await ?
261- {
261+ match wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: open ( & wrpc, ( ) , suffix) . await ? {
262262 Ok ( bucket) => Bucket :: Unix ( bucket, wrpc) ,
263263 Err ( err) => return Ok ( Err ( err. into ( ) ) ) ,
264264 }
@@ -288,9 +288,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
288288 Err ( err) => return Ok ( Err ( store:: Error :: Other ( format ! ( "{err:#}" ) ) ) ) ,
289289 } ;
290290 let wrpc = wrpc_transport_web:: Client :: from ( conn) ;
291- match wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: open ( & wrpc, ( ) , & identifier)
292- . await ?
293- {
291+ match wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: open ( & wrpc, ( ) , suffix) . await ? {
294292 Ok ( bucket) => Bucket :: Web ( bucket, wrpc) ,
295293 Err ( err) => return Ok ( Err ( err. into ( ) ) ) ,
296294 }
@@ -310,7 +308,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
310308}
311309
312310impl < C : Send + Sync > store:: HandlerBucket < C > for Handler {
313- #[ instrument( level = "trace" , skip( cx) , ret( level = "trace" ) ) ]
311+ #[ instrument( level = "trace" , skip( self , cx) , ret( level = "trace" ) ) ]
314312 async fn get (
315313 & self ,
316314 cx : C ,
@@ -319,6 +317,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
319317 ) -> anyhow:: Result < Result < Option < Bytes > > > {
320318 let res = match self . bucket ( bucket) . await ? {
321319 Bucket :: Mem ( bucket) => return self . mem . get ( cx, bucket. as_borrow ( ) , key) . await ,
320+ Bucket :: Redis ( bucket) => return self . redis . get ( cx, bucket. as_borrow ( ) , key) . await ,
322321 Bucket :: Nats ( bucket, wrpc) => {
323322 wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: Bucket :: get (
324323 & wrpc,
@@ -372,7 +371,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
372371 }
373372 }
374373
375- #[ instrument( level = "trace" , skip( cx) , ret( level = "trace" ) ) ]
374+ #[ instrument( level = "trace" , skip( self , cx) , ret( level = "trace" ) ) ]
376375 async fn set (
377376 & self ,
378377 cx : C ,
@@ -382,6 +381,9 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
382381 ) -> anyhow:: Result < Result < ( ) > > {
383382 let res = match self . bucket ( bucket) . await ? {
384383 Bucket :: Mem ( bucket) => return self . mem . set ( cx, bucket. as_borrow ( ) , key, value) . await ,
384+ Bucket :: Redis ( bucket) => {
385+ return self . redis . set ( cx, bucket. as_borrow ( ) , key, value) . await
386+ }
385387 Bucket :: Nats ( bucket, wrpc) => {
386388 wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: Bucket :: set (
387389 & wrpc,
@@ -440,7 +442,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
440442 }
441443 }
442444
443- #[ instrument( level = "trace" , skip( cx) , ret( level = "trace" ) ) ]
445+ #[ instrument( level = "trace" , skip( self , cx) , ret( level = "trace" ) ) ]
444446 async fn delete (
445447 & self ,
446448 cx : C ,
@@ -449,6 +451,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
449451 ) -> anyhow:: Result < Result < ( ) > > {
450452 let res = match self . bucket ( bucket) . await ? {
451453 Bucket :: Mem ( bucket) => return self . mem . delete ( cx, bucket. as_borrow ( ) , key) . await ,
454+ Bucket :: Redis ( bucket) => return self . redis . delete ( cx, bucket. as_borrow ( ) , key) . await ,
452455 Bucket :: Nats ( bucket, wrpc) => {
453456 wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: Bucket :: delete (
454457 & wrpc,
@@ -502,7 +505,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
502505 }
503506 }
504507
505- #[ instrument( level = "trace" , skip( cx) , ret( level = "trace" ) ) ]
508+ #[ instrument( level = "trace" , skip( self , cx) , ret( level = "trace" ) ) ]
506509 async fn exists (
507510 & self ,
508511 cx : C ,
@@ -511,6 +514,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
511514 ) -> anyhow:: Result < Result < bool > > {
512515 let res = match self . bucket ( bucket) . await ? {
513516 Bucket :: Mem ( bucket) => return self . mem . exists ( cx, bucket. as_borrow ( ) , key) . await ,
517+ Bucket :: Redis ( bucket) => return self . redis . exists ( cx, bucket. as_borrow ( ) , key) . await ,
514518 Bucket :: Nats ( bucket, wrpc) => {
515519 wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: Bucket :: exists (
516520 & wrpc,
@@ -564,7 +568,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
564568 }
565569 }
566570
567- #[ instrument( level = "trace" , skip( cx) , ret( level = "trace" ) ) ]
571+ #[ instrument( level = "trace" , skip( self , cx) , ret( level = "trace" ) ) ]
568572 async fn list_keys (
569573 & self ,
570574 cx : C ,
@@ -573,6 +577,9 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
573577 ) -> anyhow:: Result < Result < store:: KeyResponse > > {
574578 let res = match self . bucket ( bucket) . await ? {
575579 Bucket :: Mem ( bucket) => return self . mem . list_keys ( cx, bucket. as_borrow ( ) , cursor) . await ,
580+ Bucket :: Redis ( bucket) => {
581+ return self . redis . list_keys ( cx, bucket. as_borrow ( ) , cursor) . await
582+ }
576583 Bucket :: Nats ( bucket, wrpc) => {
577584 wrpc_wasi_keyvalue:: wasi:: keyvalue:: store:: Bucket :: list_keys (
578585 & wrpc,
0 commit comments