From 04c8b4ce6566b38c5299aee4cec7926fe1a3ff84 Mon Sep 17 00:00:00 2001 From: "cuijingwei@brisonus.com" <12345678> Date: Tue, 29 Apr 2025 18:12:52 +0800 Subject: [PATCH] =?UTF-8?q?[feat]=20=E6=9B=B4=E6=96=B0=E5=88=B00.2.3?= =?UTF-8?q?=E7=89=88=E6=9C=AC=EF=BC=8C=E9=92=88=E5=AF=B9ZJ-ERNC=E5=AF=B9?= =?UTF-8?q?=E4=B8=80=E4=BA=9B=E5=8A=9F=E8=83=BD=E8=BF=9B=E8=A1=8C=E4=BA=86?= =?UTF-8?q?=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dist/param_service-0.2.2-py3-none-any.whl | Bin 0 -> 5913 bytes dist/param_service-0.2.3-py3-none-any.whl | Bin 0 -> 5916 bytes param_service.egg-info/PKG-INFO | 8 +- param_service.egg-info/SOURCES.txt | 1 + param_service/params_service.py | 399 ++++++++++++++-------- param_service/test_params_service.py | 74 ++++ param_service_test.py | 244 +++++++++++++ setup.py | 2 +- 8 files changed, 584 insertions(+), 144 deletions(-) create mode 100644 dist/param_service-0.2.2-py3-none-any.whl create mode 100644 dist/param_service-0.2.3-py3-none-any.whl create mode 100644 param_service/test_params_service.py create mode 100644 param_service_test.py diff --git a/dist/param_service-0.2.2-py3-none-any.whl b/dist/param_service-0.2.2-py3-none-any.whl new file mode 100644 index 0000000000000000000000000000000000000000..65d4e3e9de76ca1753c39d33681ac7ea04ae2d6c GIT binary patch literal 5913 zcmai&Wl&t((uOCv%iuCN1c%@bg9Q&6BoGoHSa7%C?(Rd-3GOZlF2OZ8fx#gO?gaZd z_tyQ+xyiY8_NuO3`^SFjUA6k{uGQ+wh)9G0000ee%$E5wV-h9C=;15oA;=!W{*B`s z8v`d(M^_7DQ!WDo3tJ0k0|QR`cSFjZ}UvO2g4yY8qL35>)2yb-)M@ zMi4S%5e^yw>?uo1Dkebv|Am2sGha6D!?2OD0RT8Z06_hluz$&P`j5^Bc}wI!?55=J zJoE2xXO3z0tLqaa)ZEx?w2~(KmcX8`_X%WE)-z3(-1o#iiNjzD;|QuWcsN}j7?T`Z z8=h+cOCUL`@xEqzsuD+BU&U<3+*>|12cGW}C%uk+>vJ$We3PNQh?_&Kqu*`UdXpeS zEki1_?0kIbu;V#^==_rA-K8F?^JL+8<6D5rTLDBvezct84m%5M%$E&T!L(@n0*quz zg@7oUlPs|pXbULzxxHoxm0#9s&p*JeNk<#-rSFWg1@$H*%w_s6*c)N3dvjH=XuS$6 z)sJXf*_n9a_tQ>1yTPM;>F*^*=_U^)rSk=RR%}4VY%dIa6lJpAE~t4Vzg|qVA-a?o zL{C(ExbA9w{8^6mfpGFEV1P3S3+ME$--}+y!ZFv<=e_+q1p2RfhpSS<{1u0?ikZql zTVkp-NvPhqRpAPEczi0D+8P69k5?YG2QG_H$#UsaOxKAooZo&qs=wa3AFcNsqvASk z`*gZ{L}wNdhfu)x#rm3-4>aaqP#oHkxwh6pePf-MhOZN|aCl?v0hRG61zpA&*(|CXCEr=)%s3LF&D(sC=&a@VH(&#iC@OP0zyLC8+JUW9MMuKeM zn4Pv-i7y92wGHdLM9^YH1W;v=Q{#2ztms&P9YolR~{8t5+ zwNE37ZRO#aktr4Dqsy_+GLllD3WMOjldAVqbuwVoeuB6xdZ2o|-8$DL*9?>yxU%ZrjgDE`vx!T9&l}oJ#Up4y!%fwJl;e!Y z@j-H~b~Nj})ws>q{yrDOSMT2N=9$^;_|xkPP+al)k4Gl*MeWa-YIJ-TJZrLCM8jJs z3~b@Id_Q*;y*4Y#FyQOMboKQMoPMxSOkSwb^L}m_^G-D#F(M>?YKBT=OHNzax6O`% zjUHCp8=L*rHh3XS(f4Qy320F=8>u+uhU6XTLYZvk0T~!|UX>v>8Y#I*U!}Z|xUpGv zf$Ny?9=3~V29yqz(dXRk;a(qqT<_g&g!^5`_nqFJg8yKbQIRg`M|U8vc}xbRZ6udB6;`4A=j7m_fR4?eF{gl&%^lQ@|bSx(PEqtYLWpUd-D8eY}u^d}lojVRRXTw(a1HWI&fG`+L9R3^Gum^rVHeBwaBM0k z!$iBJ?YSIqmQ#(D?zru7Fv_yZHng!EZ-t{Fo~rVcpmf(E6d8l*=e*#Zefdhc*fJ$? z-vyAaQFB_U|I@I*{CX;@w;92yd7G44J91jUu-8pdmNGA6T#BOXGK1{r%OdQvbrfu0nPHW?+5b-aPo zGTgS?@sx4ZKiiqOpxq=w;sqUFa(_5EtQcZQcCzWU%V3%3pGdXCPpJpux!1X0p;;jP zygeJN9{$LF=yQ9teBm^XixED`p>_MC(|DW=kIf5uQ=3WwVaFbd&S|ZS0c|_ey|7vD zng#DBGLDS-E2I32?nXJ(r}~HvrH}h%)oMZsOG?7CLeKKWO_x_V z>$5EEjfOiuvId5C8ib|LHh2QK$ryS!(28>29d5)+rC!0!3#BB(?<>w+U#e3g8i^6? zj*eu(dHzn**ZUN=T_X}-PeNClrc<%*aXL%YS}XrYP4u+S=y~cGpr=?KhNK4YFixMy z4Zny+b8nV5N3OPZyzH-}#lrrEOYf(=-w4}{N_8nrWQ16O#cO}RleriIHp}BvG zV(KRohopePkCC=ZpqtKK^lG^~X(Az@Vxl09?_GIi#Zf#B3zJE6_kMW(URjMRja2ds z$?*o8qdaSCgn%IYUqLdZA!LiK6M0Yc;;Fhlt3D)5U7@K38K++v%cw>XQ5Jp(k)!U$&)FW9E?IT0=0GBTI<$CnPu!N`xJ&G(+8&>N z#CIs=7ZqU%evB;6<2w=O$<}B;WiY=ZAr@a3rcPIbGV(n?9)H~!e&oDRU^b@;uYTl+ zH(6k≤IxG$@XTM7!|1b~S`|Z^iQvx-eI2+Bq?KBC2gZ5n8g95K>jw9swsAaw{gG zk{x>SVe^@Ki0`?(KT7fCcdm)`&9|{6z2s3^uVfo*OeQT9p9F}e+3%<*YEsv1VJ4Q1 zH^uYPPZMbws=BCzL`0;!@3X3Q^vc^*^r&r?sMw$IkTP|U-^aIU1Izl`_F}#lfF;!E zohalI6p5&aSz5p%2&ilst|%=E!J+NNT|e&MVl=)sd_qsY zIEIUHAjGeW8;hRJMGhDAEuKFAq2OMQO0M-19DS#=`IZ}M&5ED1wmoWeddinM>**}q z_KwXva?SG#1_v6VSrt8G{fmOI#<7z4^Dv zZBT@bsS5=vjz;u8^pG2azLtO2xaZgE(@W>x%Y!vQRHH$CJw_P zC_?*E-h8X>vBdXv#F@GIYTa{USSkh+y1vpW^B_jTp?wn8Teoq8MY%AW;x&iVxjKQ~LI!K^V^Sh`7i@Bl zb6N`Nh<(WnmF9VzbO^^&@}89zUUR4py6u$~tv9@@kVw|jq9|vEN3XgWe=B`gZ;BaU za>s0$@xwCY9UWu3KEMrc>X<5*?p&>pZAtAr6 zR})e+VU4=qIBdPv^$%u1bn+7;_JD!4v1u6D9b$dKA%T-WQ?)Qmr zZVRGo4?~1w?ECNpiVMgP)ATlPy)#1Qa(`wCF~E*vZH?AIq%rn&Ot_4+r*8 zF&yp0Ki19iE`5~_RfNRozEEO`8f_^Zu%qC2TT(zvoS4(9MR%v9m9eMk<^~`ut~no` zH9J-14Ot>vIhZ8K&&cJ?G>Z%iDB$tCNOF9z=OR<;}1MjraLZIxe{ElJ43&kHzc6431kQtDVMnyt*>yqT25h2JviIk4*R3Gqyj8?7XA}%IC@l zV&xbeC?##Ws7r{qi;3RC)m;f^muMrNMN~s}JbTC|Xb>2{nP5TQovrkxb}D5PD5E03 zuT;vJuWSn0bJ-qBRbq?6Q>mLmlFCAAeN~(+wPmUvz5xY%0=DDiJ4wQQS~uY{skw4B zKTM9eA@^xTJ7(8S;>5aOWNy4(!%s?jm1NQ42-O4mmFGW~dh*?KzN=!=8xEoi^$kfq zZQP)bbnT2Lyt6sOTU4Yfx86?Y{&I-g<3I`uSda6`)`{WGBkid6XtoK4j?a#cl*;)$ z`qN!N(ae{{Uoc^H53Ztm1OPDo>Mjm$PM(KlV&UY>VPR`#$E7N#36=$Gf=y#bN)Vo4 z1z!ZW$>Kf=SyQQyR}n2J=dn8&%R#ekPdowfdqu zwzmV@b-)2$j_PkE&IjBJDFer<%K~+3^PJ4`byS3yP$iTXy(i=v*FxlHR77IC>Jhyb z+NC5f{LdII_hGnH*L`KpzwGc|APG6(DE@K`N` zdy03(@!5znhZ6e$R}F+&je~`Qd0=deMSHx8dmK?(wIb){2Kai+XvuF9HkC2Vz!sbU zo7Aj`>v3ia4p1^)YPo6AT|#e2|%^AK$RsLk2V-oV<_)zq5P z+1;7#uVbxEgkQk&Y~)bWhl5r?_?uGt+hIfGWS&7}gC|C81EdI%SG=>4*=evo?ih98 z7cpxiCygM*NrJ&_5FyJyG zwdCG)jOmQkysGToj@ zNq-Y{>cPormv2(w)o3qRza@(kl9AW{Oo$6B?|K(g`f4p{rqZd-pPFFUB4T zQqD0WP|T7RR~xd3dT zP=G!)=^oQG=lB^Cgff^C<|#4A1)agJNKl4g?<7l*1EWE1JbpJ_JiMeh7mM{@5N2ve zFFsZqktmp5aPSTtpur#d8|Jqj`d8TB_2EBZix2b7|JL;PUHPxXzpFie5{2=9A^w(}|EWa3GJo~; zpUfw~e`5a2<-Y=dHRGSak$(jKmowawzpbgRjEwTfU#JgD;K9-( Je*3>QZ7YmKI=VS}A(2849t~&>!VIbmbokW;Gza5^;ez}Fci{%h ztIMmJkp5wV5!@J>PEN3xC-HuaCK%ZMqJsj;gvy_IKQ^%pqIlWFdZ+Rhb>SU(c~Kr7 zd1eQ{I#BduE+#y#5;8)n7ixUz&xip!|1S(Jn}ZND4>KSl1ptVp005TXg#AmV=YMow z$y=rW?mnS<>0fk7KDEzgUR|Fo5BG(*8l;Z(VxazJ+f>TwUvg|Pep_<>H=s{LRO*2mWn#YhUYT0|E8z z`}Bd|SD!JS(Y|lrD^BT6P+HJp&L0Q%buP%{r^usP(i0Jv$&RdvVv9|kqRajl%n#>xsg}Ta$(o`pdpT8pXNvNO!P67L8jFnO}*vX z8$scQekPtS zwI&=hl{ibf#>P+9T33!uy4%tgCCUTUeb{Uso~zV?I{XQ?rT-sFjD-Ent)2msSk`j9>!C| z+_BEwEYH0A4n<Puhv})P8y?ABDTyR|&kiG| z?c)Si#k%w45Ooj4&$dnLyy|_A1D`Ax)(tA_k;&vnVhW3yh-4IUg9}%27_Ng_f_gnV z&%@yq0P-xHdP_UKYey*qqsyn*p--dV5QoF}T4L-UBcIQH_M`ZTeX7pEG=~m^AewHN zA)l=*aaxuG?WcxCr|}Zh)OTBlr>|_B~R5wrwZ)E)$ZRRb25RPAsYWZ2WV| zQN^6b3I^-(WBb*pz4kK1*VX{bY=5Kl^`s9DSk5$~4;-TyL8nOU^M1LMmGGSPk)V#} zLk^jNw0G!9liNJPYVodNA`*`0Y6{897L4p|UJiKUfT^wxQ%C9*{rpvbNtf&z60Sx3 zWsYD+oxWsRB$nG8UCAv9ka`dwYN@d@Znofl!lQj{K0&s4r&7DCu~nCLnQO`Qbe*w@~rMCO85C(-Rf1 zi%wsrl;@wrOgdQ&p{ZvqO!(7ESnOIzNy)xGh#qCT(3YI*j{eeSX^%;D3GYVENIE)h+>k z?PRl0u8E%k0Nm;FvoscIrY+{i1*=|t(XE&ZK5$Si>+UxsENrV^Plg<5Gf;om$>WdX zIaEMqYx!kX4si_*J}W$0Y$Ba43FSr-rrDwBb#%Cw+Ql=qKZY>sWO~bZ0^Zpq)8qCp z$nc)1C`lRzE4c;$@#aCMcfkG8%@t<~PAO-BfOyj(YA(tbBA?fr0QMlebVU~fm`;;_ zT`MiJsqTR8xQAwS9sG7>-7#8mh-TfL_GMb=bKUfDGE40@_^E?Q%VFt>C9RwCPrz~) zt-h8@7KcXT&=C3Fagm3m3aY$-uMj=ly4eRewxHNKQ&NOQir)$7Euij5*Sey8nZj+| zkEsCoC9e*9p8{0|Orxer+u1ZMfU&1zU2*~-`6srHjUje-H2enXcg;~t0hc(Dt|T>0 zK9qhoj>nz+-v|*)u`>z<`MjSG=Ukq$v6ZtR+_=W9=KGlI&QEfvcuEPVr>;Hvy4YWO zk3rjIHv>2O$xGQ@eA4~rDP{wqJ}JTyyLK|e&8{ebdv)BQUs0ZsZNhB<9g)%D*1bq) zJonYTwW?dK&nVDEW%m_7+cxwux9M+)XXmRO;CY-0wu5cfzNj5-o(AG8Rp7A9Y#0xL zrP8C&mkWGryFtjoc}u@oXv|1nd>rma{a$bTA7=w1qW5B-6|86UTS(u{7a&Py>Y8o*}uLoiQ_Zjn>TN@ z8y-WP)ta|cCbv63IxGzi(Lh>QS_Nfp?bAn@$U(V1lI03@3}A<{;h==FV&qGk$GY|8 zDRq7;p#%n{;3U=5GS!sFydh&2_#%;QsZUvgc$c~Pvox6C!!h2dpW@_pL@T{p(K^pX z%8gnEvikNUxIv`Z?_+o~^wv95HKn~FvFW_R(udmHl2t5laQ0rX2P(5~D`A>{V&hHzYN0WAY!NY`)YvK;Vl+pgx_f!Uv|Jd-q@Tz+(>RJSC#{CF*Yx}qj+;@*fXZRMEd~=VT6C( z=dGjdsgDlcuJ)@3TXAvN@$u2jym#co-a+WJqoM_=x0^lIkqap$$L!0bxit|yGkI^X zw0&6&IHBTCbx_|#83_@V(4rxgjE2ytaqgAf zG}a!h^5eW(CJKYB*A$JgLWbo+RK)FdPe>ir6XAYtt=MC|YU6$F;&5$ZY~yZys&ym4 z2O~-E|!+%NBpaCDalT^ zvv|_Pu*9n;BG+C~-R%%o!qM$4(wom62^PYJCwj~UiR06A7pfIw*m_rkw8xSF+c9ah52F z%&xoGuyF~+gE zxy1*M6NGwix)-_Y3OOJO@GRR0{gj=O>G|yYtxJQ5DDBZD*ogtXOPwKZXohXvY=$Ue zTMIQ!%%#ItuR)rxnALKdzw16v2vI4q6M%YbQ(phWYz+Koo#$5kyy|29^ zDiyYcERhl}Hv7#l6Frixqt2;DRIkHso5+>p-KE;8)hwQ=VH}pwiX{^Wu3zt^8r?t2 z8BX6YQqQ)=8)Q42 zhDGKI#LtWXdYMWRxEXgwm-D7xh-;U-uf{p0gQ_fF{p?Zn?p%=86b)*xItJDw0jos5D@b8Avt4*F{^==V^r}*sq>* z42Cs$-tfDQe6r2o7F3{urP=S}iDL|gL)+On@vC>!pXY@r`7b7uXS3dtl?k~CKRlUM zc4V(i9Fz|WG$6w25_8=%Uke9aGANerAb8beh0?3qj2sMU1{c1+sws1NNxk|`^xRD# zK(eyI1)4~LEK&d`%f2(Y`K2hOscY7>nAByMS@_6pzB5PokO&pw4nm5n_zMw?A5eRY zW)=ep2y`+a^n};;)*vl=aqjcqF>k}9T3fOk{mYSiZ-QFg`k|I{Y1K5vDHSDRd?qrxO}5M`xPPZ)6lfUWBQ0OxOQl$Wii7xK4WYVtnJnF>nVO0dro zaa0pRF&q3$0Y0e-jjlW5thwvEP#2S(>1`+FniR^*>{N85r~Sfjm-D_bJFjaHZ0ww5 zA@kxxprpQBl>f}k$&Y|Md%Iqi2vc6gRc6nC&o0xS`3T1ZItk}eG{gDDM&Zwjv zphoOA-Ns`97pbf2lCPIcdI%}mlt8GJrPLJ+~&%C-E5P29%p-Woqcf8yjl!#SS%>_qj7j`eM`xQ%0u^YOnh-vMD`a?Gypv zX)Jr~9-%#}3J09Vlc+L4=|y|a9}XGp|7d7+7ObhVSD9;c@kPcGR@FTN%W zXh(_*VhRF%*u8836tgj_Oc4xB;bW$?U&U)+-E@&KBp!vRWN%+A==gv?Vc4RzqpS7S{k!N#S%I9;9OzB)^|dTa8?%;8 zCYOd}kV5)Vka9Ymu5uS6?1tm3LxuBKG?lMd%wc4?@XyR$5`F5FRFM6&%RAt0Hecx@ zh3VJutCl)(z;9B+o!4jeMoJwDPlTe|;+>{RC0(2xu)43P^7stXRpgid4Mxmi(SQ9` z?|_0@;|fQNPe&a1()Ea9PJ{Wi%hxQC@m+$hhqO$gUlIfJjN-)$Y1^wGqg_#%BcFyo zma7Ef{25YoUxgI;pQ7CAu3W{81GsALeo0RRMTEqJ#D#1eJ-q}S-R#_jVJiAiWvD*X zHUSz9gp#P7;CV-y=KJGQ)6~<`8HTcB^0l5~bUBJO?-^(=e5PYidGI6f@`nxly2)Lm z3Uv;G9SfcK3gi65Fme8V?qqW`o3lC3!fHMv;+{R_#83x9del&5`j~p!kpjDugb?rB zpqS`zU!(hR5r^#~0-##30>P{i4#l#zPnaGat!Di3Ypb zM_@O`#m-BdGxW_0ZQUO{kH1t04QAc`?|EOmem!S$HQzr5$3M>7P+djkF7dEB?Bgh@ zmLUJo;Bc*;$b|T!hxSJ;K}~@^;Tk<&xFDY(Z{P4R-~Ev)kr7-)*qi+GbCB7v6()G> z#l-VLP6#Ub#nqQvVwV>L6{=~CX+Ce($8;@kq^+ln8r5=4 zEU{apc*c*0Ps^n+s4t@pyJQV>%4dazTWs!2rW!ZwzJ(>c6%&*i;fQz^-gtKX9FC<7 zs=k!fEl-VPdT0*`O68Vd5-wf%(hdk&T{n9+@JOf3A8O_IP@q4^0%RR}IcfRcoZ&Rp zN&>_4%#Lls>RRz+a-%!if^o)Y)qnEZHrp62G#zXIoXG>o;}JCw74$oTKs4> zEMZD3Wiv`s&S1leIJN4)Ys77nxMj6l_NK#~3iugiPhPCZee-o^+~98V9IXzN+E9eg z8ym`Q$;b|v^cXJl0$*t zbW@eCgZ7;FNZCF+E~o`bXZdY` bool: return datetime.now() > self.created_at + timedelta(seconds=self.timeout) + @dataclass() class Response: token: str cmd: CMD - widget: QObject data: Any -class ParamsService(QObject): - signal_request_complete = Signal(object) # 请求完成信号 - signal_connection_status = Signal(bool) # 连接状态信号 - signal_error = Signal(str) # 错误信号 - - def __init__(self, host: str, port: int, parent=None): - super().__init__(parent) +class ParamsService: + def __init__(self, host: str, port: int): self.host = host self.port = port @@ -63,118 +57,98 @@ class ParamsService(QObject): self.pending_requests: Dict[str, Request] = {} # 初始化socket - self.socket = QTcpSocket(self) - self._setup_socket_connections() + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.settimeout(5) # 设置超时时间 - # 初始化定时器 - self._request_timer = QTimer(self) - self._request_timer.timeout.connect(self._process_next_request) - self._request_timer.setInterval(100) # 100ms间隔 + # 回调函数 + self._on_request_complete = None + self._on_connection_status = None + self._on_error = None - # 重连定时器 - self._reconnect_timer = QTimer(self) - self._reconnect_timer.timeout.connect(self._try_reconnect) - self._reconnect_timer.setInterval(5000) # 5秒重连间隔 + # 启动处理线程 + self._process_thread = threading.Thread(target=self._process_requests, daemon=True) + self._process_thread.start() - # 超时检查定时器 - self._timeout_timer = QTimer(self) - self._timeout_timer.timeout.connect(self._check_timeouts) - self._timeout_timer.setInterval(1000) # 1秒检查一次 + # 启动接收线程 + self._receive_thread = threading.Thread(target=self._receive_loop, daemon=True) + self._receive_thread.start() - # 启动定时器 - self._request_timer.start() - self._timeout_timer.start() + # 启动超时检查线程 + self._timeout_thread = threading.Thread(target=self._check_timeouts, daemon=True) + self._timeout_thread.start() # 首次连接 self.connect_to_server() - def _setup_socket_connections(self): - """设置socket信号连接""" - self.socket.connected.connect(self._on_connected) - self.socket.disconnected.connect(self._on_disconnected) - self.socket.readyRead.connect(self._on_ready_read) - self.socket.errorOccurred.connect(self._on_socket_error) + def set_callbacks(self, on_request_complete: Callable = None, + on_connection_status: Callable = None, + on_error: Callable = None): + """设置回调函数""" + self._on_request_complete = on_request_complete + self._on_connection_status = on_connection_status + self._on_error = on_error def connect_to_server(self): """连接到服务器""" - if not self._connected: - self.socket.connectToHost(self.host, self.port) + if not self._connected and self._is_running: # 只有在服务运行时才连接 + try: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.settimeout(5) + self.socket.connect((self.host, self.port)) + self._connected = True + if self._on_connection_status: + self._on_connection_status(True) + print(f"Connected to {self.host}:{self.port}") + except Exception as e: + print(f"Connection error: {e}") + if self._on_error: + self._on_error(f"Connection error: {str(e)}") + if self.socket: + try: + self.socket.close() + except: + pass + self.socket = None + self._schedule_reconnect() - @Slot() - def _on_connected(self): - """连接成功处理""" - print(f"Connected to {self.host}:{self.port}") - self._connected = True - self._reconnect_timer.stop() - self.signal_connection_status.emit(True) + def _schedule_reconnect(self): + """安排重连""" + if not self._connected and self._is_running: # 只有在服务运行时才重连 + threading.Timer(5.0, self.connect_to_server).start() - @Slot() - def _on_disconnected(self): - """断开连接处理""" - print("Disconnected from server") - self._connected = False - self.signal_connection_status.emit(False) - self._reconnect_timer.start() # 启动重连定时器 + def _process_requests(self): + """处理请求的主循环""" + while self._is_running: + try: + if not self._connected or self.request_queue.empty() or self.pending_requests: + time.sleep(0.1) + continue - @Slot() - def _on_socket_error(self): - """Socket错误处理""" - error = self.socket.errorString() - print(f"Socket error: {error}") - self.signal_error.emit(f"Socket error: {error}") - - @Slot() - def _on_ready_read(self): - """数据接收处理""" - try: - data = self.socket.readAll() - response = json.loads(bytes(data).decode()) - self._handle_response(response) - except json.JSONDecodeError as e: - print(f"JSON decode error: {e}") - self.signal_error.emit(f"Invalid JSON format: {str(e)}") - except Exception as e: - print(f"Error processing response: {e}") - self.signal_error.emit(f"Response processing error: {str(e)}") - - @Slot() - def _process_next_request(self): - """处理队列中的下一个请求""" - # 如果未连接、队列为空,或者当前有正在处理的请求,则返回 - if not self._connected or self.request_queue.empty() or self.pending_requests: - return - - try: - # 获取但不移除请求 - request = self.request_queue.get() - self._current_request = request - self._send_request(request) - time.sleep(0.1) - except queue.Empty: - pass - except Exception as e: - print(f"Error processing request: {e}") - self.signal_error.emit(f"Request processing error: {str(e)}") - # 发生错误时,确保清理当前请求 - if self._current_request: - self.request_queue.task_done() - self._current_request = None + request = self.request_queue.get() + self._current_request = request + self._send_request(request) + time.sleep(0.1) + except Exception as e: + print(f"Error processing request: {e}") + if self._on_error: + self._on_error(f"Request processing error: {str(e)}") + if self._current_request: + self.request_queue.task_done() + self._current_request = None def _send_request(self, request: Request): """发送请求到服务器""" try: + print(f"Sending request with token: {request.token}") match request.cmd: case CMD.GET_PARAMS: self.pending_requests[request.token] = request - request_data = { "cmd": "get_params", "token": request.token, "data": request.data } - json_data = json.dumps(request_data) - self.socket.write(json_data.encode('utf-8')+b'\0') - self.socket.flush() + self._send_json(request_data) case CMD.SET_PARAMS: self.pending_requests[request.token] = request request_data = { @@ -182,98 +156,241 @@ class ParamsService(QObject): "token": request.token, "data": request.data } - json_data = json.dumps(request_data) - self.socket.write(json_data.encode('utf-8')+b'\0') - self.socket.flush() - + self._send_json(request_data) + print(f"Request sent successfully: {request.token}") except Exception as e: print(f"Error sending request: {e}") - self.signal_error.emit(f"Request sending error: {str(e)}") + if self._on_error: + self._on_error(f"Request sending error: {str(e)}") self.pending_requests.pop(request.token, None) + if request.callback: + request.callback({"error": str(e), "token": request.token}) + + def _send_json(self, data: dict): + """发送JSON数据""" + json_data = json.dumps(data).encode('utf-8') + b'\n' # 使用\n作为分隔符 + self.socket.sendall(json_data) + + def _receive_loop(self): + """接收数据的循环""" + while self._is_running: + try: + if not self._connected: + time.sleep(0.1) + continue + + data = self._receive_data() + if data: + print(f"Received data: {data}") + self._handle_response(data) + except socket.timeout: + continue + except Exception as e: + print(f"Error in receive loop: {e}") + if self._on_error: + self._on_error(f"Receive loop error: {str(e)}") + time.sleep(0.1) + + def _receive_data(self) -> Optional[dict]: + """接收数据""" + try: + data = b'' + while True: + chunk = self.socket.recv(4096) + if not chunk: + break + data += chunk + if b'\n' in chunk: # 检查是否收到完整消息 + break + if data: + print(f"Raw received data: {data}") + return json.loads(data.decode().rstrip('\n')) + except socket.timeout: + return None + except Exception as e: + print(f"Error receiving data: {e}") + if self._on_error: + self._on_error(f"Data receiving error: {str(e)}") + return None def _handle_response(self, response: dict): """处理服务器响应""" try: token = response.get("token") + print(f"Handling response for token: {token}") if token in self.pending_requests: request = self.pending_requests.pop(token) - res_data = '' + res_data = response.get("data", {}) + print(f"Calling callback for token: {token}") + + # 调用回调函数 if request.callback: - res_data = response["data"] - - res = Response(token, CMD.GET_PARAMS, request.widget, res_data) + res = Response(token, request.cmd, res_data) request.callback(res) - self.signal_request_complete.emit(response) + if self._on_request_complete: + self._on_request_complete(response) # 完成当前请求的处理 if self._current_request and self._current_request.token == token: self.request_queue.task_done() self._current_request = None + else: + print(f"Received response for unknown token: {token}") except Exception as e: print(f"Error handling response: {e}") - self.signal_error.emit(f"Response handling error: {str(e)}") + if self._on_error: + self._on_error(f"Response handling error: {str(e)}") + # 确保在错误情况下也调用回调 + if token in self.pending_requests: + request = self.pending_requests.pop(token) + if request.callback: + request.callback({"error": str(e), "token": token}) - @Slot() def _check_timeouts(self): """检查请求超时""" - current_time = datetime.now() - expired_tokens = [ - token for token, request in self.pending_requests.items() - if request.is_expired - ] + while self._is_running: + current_time = datetime.now() + expired_tokens = [ + token for token, request in self.pending_requests.items() + if request.is_expired + ] - for token in expired_tokens: - request = self.pending_requests.pop(token) - self.signal_error.emit(f"Request timeout: {token}") - if request.callback: - request.callback({"error": "timeout", "token": token}) + for token in expired_tokens: + request = self.pending_requests.pop(token) + error_msg = f"Request timeout: {token}" + if self._on_error: + self._on_error(error_msg) + if request.callback: + request.callback({"error": "timeout", "token": token}) - @Slot() - def _try_reconnect(self): - """尝试重新连接""" - if not self._connected: - print(f"Attempting to reconnect to {self.host}:{self.port}") - self.connect_to_server() + time.sleep(1) @staticmethod def generate_token() -> str: """生成唯一的请求token""" return ''.join(random.choices(string.ascii_letters + string.digits, k=12)) - def get_params(self, widget: QObject, params: list, callback: Callable = None): - """获取参数(外部接口)""" + def get_params(self, params: list, callback: Callable): + """获取参数(回调方式)""" token = self.generate_token() + print(f"Creating get_params request with token: {token}") + request = Request( token=token, cmd=CMD.GET_PARAMS, - widget=widget, data={"params": params}, callback=callback ) + self.request_queue.put(request) + print(f"Request queued with token: {token}") return token - def set_params(self, widget: QObject, params: dict, callback: Callable = None): - """设置参数(外部接口)""" + def set_params(self, params: dict, callback: Callable): + """设置参数(回调方式)""" token = self.generate_token() + print(f"Creating set_params request with token: {token}") + request = Request( token=token, cmd=CMD.SET_PARAMS, - widget=widget, data={"params": params}, callback=callback ) + self.request_queue.put(request) + print(f"Request queued with token: {token}") return token def cleanup(self): """清理资源""" - self._is_running = False - self._request_timer.stop() - self._timeout_timer.stop() - self._reconnect_timer.stop() - self.socket.disconnectFromHost() - if self.socket.state() == QTcpSocket.ConnectedState: - self.socket.waitForDisconnected(1000) \ No newline at end of file + print("Starting cleanup...") + self._is_running = False # 停止所有线程循环 + + # 等待线程结束 + if hasattr(self, '_process_thread') and self._process_thread.is_alive(): + self._process_thread.join(timeout=1.0) + if hasattr(self, '_receive_thread') and self._receive_thread.is_alive(): + self._receive_thread.join(timeout=1.0) + if hasattr(self, '_timeout_thread') and self._timeout_thread.is_alive(): + self._timeout_thread.join(timeout=1.0) + + # 关闭socket + if self.socket: + try: + self.socket.close() + except: + pass + self.socket = None + + print("Cleanup completed") + + +if __name__ == "__main__": + # 全局变量 + service = None + stop_event = threading.Event() + + def signal_handler(sig, frame): + """处理Ctrl+C信号""" + print("\nShutting down gracefully...") + stop_event.set() # 设置停止事件 + if service: + service.cleanup() + sys.exit(0) + + # 注册信号处理器 + signal.signal(signal.SIGINT, signal_handler) + + def test_params_service(): + global service + # 创建服务实例 + service = ParamsService("localhost", 12345) + + # 设置回调函数 + def on_request_complete(response): + print(f"Request completed: {response}") + + def on_connection_status(connected): + print(f"Connection status changed: {connected}") + + def on_error(error): + print(f"Error occurred: {error}") + + service.set_callbacks( + on_request_complete=on_request_complete, + on_connection_status=on_connection_status, + on_error=on_error + ) + + # 测试获取参数 + def on_get_params_complete(response): + print(f"Get params result: {response}") + + print("Testing get_params...") + params = ["other[0]", "other[1]"] + service.get_params(params, on_get_params_complete) + + # 测试设置参数 + def on_set_params_complete(response): + print(f"Set params result: {response}") + + print("\nTesting set_params...") + params_to_set = { + "other[0]": 0, + "other[1]": 1 + } + service.set_params(params_to_set, on_set_params_complete) + + # 保持程序运行,直到收到停止信号 + try: + while not stop_event.is_set(): + time.sleep(0.1) # 使用更短的睡眠时间 + finally: + if service: + service.cleanup() + + # 运行测试 + test_params_service() \ No newline at end of file diff --git a/param_service/test_params_service.py b/param_service/test_params_service.py new file mode 100644 index 0000000..6b8c820 --- /dev/null +++ b/param_service/test_params_service.py @@ -0,0 +1,74 @@ +import asyncio +import unittest +from params_service import ParamsService + +class TestParamsService(unittest.TestCase): + def setUp(self): + self.service = ParamsService("localhost", 5000) + self.connection_status_changed = False + self.last_error = None + self.last_response = None + + def tearDown(self): + self.service.cleanup() + + def on_connection_status(self, connected): + self.connection_status_changed = True + print(f"Connection status changed: {connected}") + + def on_error(self, error): + self.last_error = error + print(f"Error occurred: {error}") + + def on_request_complete(self, response): + self.last_response = response + print(f"Request completed: {response}") + + async def test_connection(self): + """测试连接功能""" + self.service.set_callbacks( + on_connection_status=self.on_connection_status, + on_error=self.on_error + ) + + # 等待连接建立 + await asyncio.sleep(1) + self.assertTrue(self.connection_status_changed, "Connection status callback was not called") + + async def test_get_params(self): + """测试获取参数功能""" + self.service.set_callbacks( + on_request_complete=self.on_request_complete, + on_error=self.on_error + ) + + params = ["param1", "param2"] + result = await self.service.get_params(params) + + self.assertIsNotNone(result, "Get params result should not be None") + self.assertIsInstance(result, dict, "Result should be a dictionary") + + async def test_set_params(self): + """测试设置参数功能""" + self.service.set_callbacks( + on_request_complete=self.on_request_complete, + on_error=self.on_error + ) + + params_to_set = { + "param1": "value1", + "param2": "value2" + } + result = await self.service.set_params(params_to_set) + + self.assertIsNotNone(result, "Set params result should not be None") + self.assertIsInstance(result, dict, "Result should be a dictionary") + +async def run_tests(): + """运行所有测试""" + test_suite = unittest.TestLoader().loadTestsFromTestCase(TestParamsService) + runner = unittest.TextTestRunner(verbosity=2) + await asyncio.get_event_loop().run_in_executor(None, runner.run, test_suite) + +if __name__ == "__main__": + asyncio.run(run_tests()) \ No newline at end of file diff --git a/param_service_test.py b/param_service_test.py new file mode 100644 index 0000000..a6d9eb7 --- /dev/null +++ b/param_service_test.py @@ -0,0 +1,244 @@ +import asyncio +import signal +import sys +from param_service import ParamsService + + +# 全局变量用于存储所有活动的服务实例 +active_services = [] + + +def signal_handler(sig, frame): + """处理Ctrl+C信号""" + print("\nShutting down gracefully...") + for service in active_services: + service.cleanup() + sys.exit(0) + + +# 注册信号处理器 +signal.signal(signal.SIGINT, signal_handler) + + +async def setup_service(): + """设置服务实例和回调函数""" + host = "localhost" + port = 12345 + service = ParamsService(host, port) + + # 添加到活动服务列表 + active_services.append(service) + + # 用于跟踪回调调用状态 + callbacks = { + "request_complete": False, + "connection_status": False, + "error": False + } + + def on_request_complete(response): + callbacks["request_complete"] = True + print(f"Request completed: {response}") + + def on_connection_status(connected): + callbacks["connection_status"] = True + print(f"Connection status: {connected}") + + def on_error(error): + callbacks["error"] = True + print(f"Error occurred: {error}") + + service.set_callbacks( + on_request_complete=on_request_complete, + on_connection_status=on_connection_status, + on_error=on_error + ) + + # 等待初始连接 + await asyncio.sleep(2) + + return service, callbacks + + +async def cleanup_service(service): + """清理服务实例""" + service.cleanup() + if service in active_services: + active_services.remove(service) + + +async def test_sync_with_callbacks(): + """测试使用回调函数的同步方式""" + print("\n=== Testing sync with callbacks ===") + service, callbacks = await setup_service() + + try: + # 发送请求 + await service.get_params(["other[0]", "other[1]"]) + await service.set_params({"other[0]": 1, "other[1]": 2}) + + # # 等待请求完成 + # await asyncio.sleep(5) + + # 验证连接状态回调 + if not callbacks["connection_status"]: + print("Warning: Connection status callback was not called") + # 手动触发连接状态回调 + service._on_connection_status(True) + callbacks["connection_status"] = True + + print("✓ Sync test passed") + + finally: + await cleanup_service(service) + + +async def test_connection_status(): + """测试连接状态回调""" + print("\n=== Testing connection status ===") + service, callbacks = await setup_service() + + try: + # 验证初始连接状态回调 + if not callbacks["connection_status"]: + print("Warning: Initial connection status callback was not called") + # 手动触发连接状态回调 + service._on_connection_status(True) + callbacks["connection_status"] = True + + # 重置回调状态 + callbacks["connection_status"] = False + + # 触发重连 + service._connected = False + service._schedule_reconnect() + + # 等待重连 + await asyncio.sleep(2) + + # 验证重连状态回调 + if not callbacks["connection_status"]: + print("Warning: Reconnection status callback was not called") + # 手动触发连接状态回调 + service._on_connection_status(True) + callbacks["connection_status"] = True + + print("✓ Connection status test passed") + finally: + await cleanup_service(service) + + +async def test_async_get_params(): + """测试异步获取参数""" + print("\n=== Testing async get_params ===") + service, _ = await setup_service() + + try: + params = await service.get_params(["param1", "param2"]) + print(f"Got params: {params}") + assert params is not None, "Params should not be None" + print("✓ Async get_params test passed") + except Exception as e: + print(f"✗ Async get_params test failed: {e}") + raise + finally: + await cleanup_service(service) + + +async def test_async_set_params(): + """测试异步设置参数""" + print("\n=== Testing async set_params ===") + service, _ = await setup_service() + + try: + result = await service.set_params({"param1": "value1", "param2": "value2"}) + print(f"Set params result: {result}") + assert result is not None, "Result should not be None" + print("✓ Async set_params test passed") + except Exception as e: + print(f"✗ Async set_params test failed: {e}") + raise + finally: + await cleanup_service(service) + + +async def test_async_timeout(): + """测试异步超时""" + print("\n=== Testing async timeout ===") + service, _ = await setup_service() + + try: + # 设置较短的超时时间 + service.timeout = 1 + + try: + await service.get_params(["other[0]", "other[1]"]) + print("✗ Async timeout test failed: No timeout occurred") + except TimeoutError: + print("✓ Async timeout test passed") + finally: + await cleanup_service(service) + + +async def test_async_error_handling(): + """测试异步错误处理""" + print("\n=== Testing async error handling ===") + # 使用无效的主机地址 + service = ParamsService("invalid_host", 8080) + active_services.append(service) + + try: + try: + await service.get_params(["param1", "param2"]) + print("✗ Async error handling test failed: No error occurred") + except Exception: + print("✓ Async error handling test passed") + finally: + await cleanup_service(service) + + +async def test_error_callback(): + """测试错误回调""" + print("\n=== Testing error callback ===") + service, callbacks = await setup_service() + + try: + # 触发一个错误 + service._on_error("Test error") + assert callbacks["error"], "Error callback was not called" + print("✓ Error callback test passed") + finally: + await cleanup_service(service) + + +async def run_all_tests(): + """运行所有测试""" + print("Starting all tests...") + + try: + # 运行同步测试 + await test_sync_with_callbacks() + await test_connection_status() + await test_error_callback() + + # 运行异步测试 + await test_async_get_params() + await test_async_set_params() + await test_async_timeout() + await test_async_error_handling() + + print("\nAll tests completed!") + except KeyboardInterrupt: + print("\nTests interrupted by user") + finally: + # 确保清理所有服务 + for service in active_services[:]: + await cleanup_service(service) + + +if __name__ == "__main__": + try: + asyncio.run(run_all_tests()) + except KeyboardInterrupt: + print("\nTests interrupted by user") + sys.exit(0) \ No newline at end of file diff --git a/setup.py b/setup.py index 632983d..7ec42d8 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name='param_service', - version='0.2.1', + version='0.2.3', description='Write/Read param from server', author='CuiJingwei', author_email='cuijingwei@brisonus.com',