@@ -3465,6 +3465,156 @@ async def test_xtrim(self, r: redis.Redis):
34653465 # 1 message is trimmed
34663466 assert await r .xtrim (stream , 3 , approximate = False ) == 1
34673467
3468+ @skip_if_server_version_lt ("8.1.224" )
3469+ async def test_xdelex (self , r : redis .Redis ):
3470+ stream = "stream"
3471+
3472+ m1 = await r .xadd (stream , {"foo" : "bar" })
3473+ m2 = await r .xadd (stream , {"foo" : "bar" })
3474+ m3 = await r .xadd (stream , {"foo" : "bar" })
3475+ m4 = await r .xadd (stream , {"foo" : "bar" })
3476+
3477+ # Test XDELEX with default ref_policy (KEEPREF)
3478+ result = await r .xdelex (stream , m1 )
3479+ assert result == [1 ]
3480+
3481+ # Test XDELEX with explicit KEEPREF
3482+ result = await r .xdelex (stream , m2 , ref_policy = "KEEPREF" )
3483+ assert result == [1 ]
3484+
3485+ # Test XDELEX with DELREF
3486+ result = await r .xdelex (stream , m3 , ref_policy = "DELREF" )
3487+ assert result == [1 ]
3488+
3489+ # Test XDELEX with ACKED
3490+ result = await r .xdelex (stream , m4 , ref_policy = "ACKED" )
3491+ assert result == [1 ]
3492+
3493+ # Test with non-existent ID
3494+ result = await r .xdelex (stream , "999999-0" , ref_policy = "KEEPREF" )
3495+ assert result == [- 1 ]
3496+
3497+ # Test with multiple IDs
3498+ m5 = await r .xadd (stream , {"foo" : "bar" })
3499+ m6 = await r .xadd (stream , {"foo" : "bar" })
3500+ result = await r .xdelex (stream , m5 , m6 , ref_policy = "KEEPREF" )
3501+ assert result == [1 , 1 ]
3502+
3503+ # Test error cases
3504+ with pytest .raises (redis .DataError ):
3505+ await r .xdelex (stream , "123-0" , ref_policy = "INVALID" )
3506+
3507+ with pytest .raises (redis .DataError ):
3508+ await r .xdelex (stream ) # No IDs provided
3509+
3510+ @skip_if_server_version_lt ("8.1.224" )
3511+ async def test_xackdel (self , r : redis .Redis ):
3512+ stream = "stream"
3513+ group = "group"
3514+ consumer = "consumer"
3515+
3516+ m1 = await r .xadd (stream , {"foo" : "bar" })
3517+ m2 = await r .xadd (stream , {"foo" : "bar" })
3518+ m3 = await r .xadd (stream , {"foo" : "bar" })
3519+ m4 = await r .xadd (stream , {"foo" : "bar" })
3520+ await r .xgroup_create (stream , group , 0 )
3521+
3522+ await r .xreadgroup (group , consumer , streams = {stream : ">" })
3523+
3524+ # Test XACKDEL with default ref_policy (KEEPREF)
3525+ result = await r .xackdel (stream , group , m1 )
3526+ assert result == [1 ]
3527+
3528+ # Test XACKDEL with explicit KEEPREF
3529+ result = await r .xackdel (stream , group , m2 , ref_policy = "KEEPREF" )
3530+ assert result == [1 ]
3531+
3532+ # Test XACKDEL with DELREF
3533+ result = await r .xackdel (stream , group , m3 , ref_policy = "DELREF" )
3534+ assert result == [1 ]
3535+
3536+ # Test XACKDEL with ACKED
3537+ result = await r .xackdel (stream , group , m4 , ref_policy = "ACKED" )
3538+ assert result == [1 ]
3539+
3540+ # Test with non-existent ID
3541+ result = await r .xackdel (stream , group , "999999-0" , ref_policy = "KEEPREF" )
3542+ assert result == [- 1 ]
3543+
3544+ # Test error cases
3545+ with pytest .raises (redis .DataError ):
3546+ await r .xackdel (stream , group , m1 , ref_policy = "INVALID" )
3547+
3548+ with pytest .raises (redis .DataError ):
3549+ await r .xackdel (stream , group ) # No IDs provided
3550+
3551+ @skip_if_server_version_lt ("8.1.224" )
3552+ async def test_xtrim_with_options (self , r : redis .Redis ):
3553+ stream = "stream"
3554+
3555+ await r .xadd (stream , {"foo" : "bar" })
3556+ await r .xadd (stream , {"foo" : "bar" })
3557+ await r .xadd (stream , {"foo" : "bar" })
3558+ await r .xadd (stream , {"foo" : "bar" })
3559+
3560+ # Test XTRIM with KEEPREF ref_policy
3561+ assert (
3562+ await r .xtrim (stream , maxlen = 2 , approximate = False , ref_policy = "KEEPREF" )
3563+ == 2
3564+ )
3565+
3566+ await r .xadd (stream , {"foo" : "bar" })
3567+ await r .xadd (stream , {"foo" : "bar" })
3568+
3569+ # Test XTRIM with DELREF ref_policy
3570+ assert (
3571+ await r .xtrim (stream , maxlen = 2 , approximate = False , ref_policy = "DELREF" ) == 2
3572+ )
3573+
3574+ await r .xadd (stream , {"foo" : "bar" })
3575+ await r .xadd (stream , {"foo" : "bar" })
3576+
3577+ # Test XTRIM with ACKED ref_policy
3578+ assert (
3579+ await r .xtrim (stream , maxlen = 2 , approximate = False , ref_policy = "ACKED" ) == 2
3580+ )
3581+
3582+ # Test error case
3583+ with pytest .raises (redis .DataError ):
3584+ await r .xtrim (stream , maxlen = 2 , ref_policy = "INVALID" )
3585+
3586+ @skip_if_server_version_lt ("8.1.224" )
3587+ async def test_xadd_with_options (self , r : redis .Redis ):
3588+ stream = "stream"
3589+
3590+ # Test XADD with KEEPREF ref_policy
3591+ await r .xadd (
3592+ stream , {"foo" : "bar" }, maxlen = 2 , approximate = False , ref_policy = "KEEPREF"
3593+ )
3594+ await r .xadd (
3595+ stream , {"foo" : "bar" }, maxlen = 2 , approximate = False , ref_policy = "KEEPREF"
3596+ )
3597+ await r .xadd (
3598+ stream , {"foo" : "bar" }, maxlen = 2 , approximate = False , ref_policy = "KEEPREF"
3599+ )
3600+ assert await r .xlen (stream ) == 2
3601+
3602+ # Test XADD with DELREF ref_policy
3603+ await r .xadd (
3604+ stream , {"foo" : "bar" }, maxlen = 2 , approximate = False , ref_policy = "DELREF"
3605+ )
3606+ assert await r .xlen (stream ) == 2
3607+
3608+ # Test XADD with ACKED ref_policy
3609+ await r .xadd (
3610+ stream , {"foo" : "bar" }, maxlen = 2 , approximate = False , ref_policy = "ACKED"
3611+ )
3612+ assert await r .xlen (stream ) == 2
3613+
3614+ # Test error case
3615+ with pytest .raises (redis .DataError ):
3616+ await r .xadd (stream , {"foo" : "bar" }, ref_policy = "INVALID" )
3617+
34683618 @pytest .mark .onlynoncluster
34693619 async def test_bitfield_operations (self , r : redis .Redis ):
34703620 # comments show affected bits
0 commit comments