[應用]輕探Web Flux

看板java作者 (kent)時間2年前 (2022/04/21 22:46), 編輯推噓0(000)
留言0則, 0人參與, 最新討論串1/1
網頁版在 https://kentyeh.blogspot.com/2022/04/webflux.html =================================== 本來標題是想寫輕探春日流轉的,想想還是算了。 程式碼 https://github.com/kentyeh/FluxWeb,您可以先git clone下來備用。 Spring 5 後來始導入non-blocking IO、reactive backpressure的Web開發方式;僅管 Spring官方稱WebFlux不會比Servlet快到哪去,但實際面臨到需要I/O的情況下,理論上 總是會快一點,像用reactor netty抓網頁的方式,我感覺就是比Apache HttpClient來的 快些。 轉到WebFlux首先要面臨的就是Servlet不再,沒了JSP,也沒了JSTL,一開始真的很難習 慣,忽然發現一堆Listener沒得用,也沒辦法 Wrap Servlet Request,但為了或許能快 那麼一點點,總是得付出些代價。 在學習的過程式,覺得困難點大概有三,分別是Web轉換、Secuity應用與WebSocket管控 ,我想就這幾點來說明如果克服(至於如何寫Reactive,不想在這裡多說, http://projectreactor.io 可以瞭解一下,網路也有一堆教學文件)。 首先要面臨的是Web撰寫方式的轉換: Spring boot 提供了一堆 spring-boot-starter-xxxx,可以很方便的開始一個專案,優 點是快速,缺點是引用了一堆可能用不到的Libraries,我並不打算以此為進入點。 WebFlux在少了Container的情況下,註定以應用程式的方式存在,而應用程式的方式就是 採用ApplicationContext去載入一些程式設定 package wf; public class Main { public static void main(String[] args) throws IOException { try (AbstractApplicationContext context = new AnnotationConfigApplicationContext(wf.config.AppConfig.class)) { context.registerShutdownHook(); context.getBean(DisposableServer.class).onDispose().block(); } .... 所以AppConfig.java就是設定的進入點,上述程式載入設定後,隨即就是啟動HttpServer 。 package wf.config; @Configuration @ImportResource("classpath:applicationContext.xml") @Import({WebConfig.class, PostgresR2dbConfig.class, H2R2dbConfig.class, SecConfig.class, WsConfig.class}) public class AppConfig { ... @Configuration不用多說,寫過Spring程式的人都應該知道 。 至於@ImportResource,嗯!我是念舊的人,習慣把設定放在XML內(從Ver 3開始養成的) ,applicationContext.xml包含了Component Scan 與 thymeleaf(取代JSP)的一些設定。 AppConfig.java依序載入了Web設定、資料庫設定、安全性設定與WebSocket設定。 WebConfig.java包含了WebFlux運作的基礎設定: 前面說了,沒了Servlet,WebFlux就必須找一些替代品,首先面臨的就是Session的問題 ,Spring Session提供了多種選擇,我想為了效能,您應該不會選用jDBC的選項,以前我 用過Hazelcast,好處是去中心化(不用多備一台主機,直接把函式庫綁入程式內),只要 還有一台Web存活(指的是Cluster架構),資料就不會丟失,但缺點也是去中心化,想要操 縱資料,除了自已寫管理程式加入其中,不然就得花錢錢找官方,所以這次採用了大多數 人會用的Redis,好處是有Cli界面可用,缺點是要多備一台機器,一旦機器掛點,程式就 全掛了。 package wf.config; @Configuration @Import(RedisCacheConfig.class) @EnableWebFlux @EnableRedisWebSession(maxInactiveIntervalInSeconds = 30 * 60) public class WebConfig implements WebFluxConfigurer, DisposableBean, ApplicationListener<ContextClosedEvent> { ... 設定檔必須繼承WebFluxConfigurer並標註@EnableWebFlux是基本要件, @EnableRedisWebSession則是說明以Redis做為Session資料戴體,理所當然,Redis可以 儲存Session資料,當然也可做為Cache所用,所以在此我 Import(RedisCacheConfig.class),並將連線Redis的程式放在RedisCacheConfig內。 WebConfig.java的重要責任就是建構httpServer()(也是Main.java程式啟動時主要載入的 標的),為了要在程式結束時優雅的結束httpServer,所以WebConfig也實做了 DisposableBean與ApplicationListener<ContextClosedEvent>,為了就在是程式終止時 順便關閉httpServer; 另外addResourceHandlers(...)是為了載入靜態目錄,當url指到了static時,首先從 Webjars先找(像我引用了JQuery與purecss的jar),找不到再找classpath裡的static目錄 。 @Override public void addResourceHandlers(ResourceHandlerRegistry registry) { registry.addResourceHandler("/static/**") .addResourceLocations("classpath:/META-INF/resources/webjars/") .addResourceLocations("classpath:/static/") .resourceChain(false); } 而localeContextResolver()則是指定使用url參數locale來變更語系(沒有多國語系,就 不用建構LocaleContextResolver)。 @Bean public LocaleContextResolver localeContextResolver() { return new LocaleContextResolver() { ... 至此再加寫一支帶有@Controller標記的程式(如wf.spring.RootController),就可以執 行Main.java讓WebFlux跑起來了。 在進入到下個主題之前,我必須提及spring.profiles.active這個系統屬性,Spring用這 個屬性來控制Profile,所以我決定當這個系統屬性為dev時,表示整個系統屬於開發模式 ,否則就是正式環境模式,所以您可能注意到AppConfig.java同時載入了 PostgresR2dbConfig.class(正式環境)與H2R2dbConfig.class(開發環境)。 package wf.config; @Configuration @Profile("dev") public class H2R2dbConfig extends R2dbConfig implements ApplicationListener<ContextClosedEvent> { ... 為此我在POM.xml設定的對應的兩個Profile,以便在開發模式下,可以引用不同的函式庫 並執行一些初始作業(如建構資料庫與啟動一個Redis Mock Server)。 <profiles> <profile> <id>dev</id> <properties> <spring.profiles.active>dev</spring.profiles.active> <http.port>8080</http.port> <spring.freemarker.checkTemplateLocation> false </spring.freemarker.checkTemplateLocation> </properties> <dependencies> ... </dependencies> </profile> <profile> <id>prod</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <spring.profiles.active>prod</spring.profiles.active> <http.port>80</http.port> </properties> 在開發時期,我只要執行 mvn -Pdev compile exec:java 就可以以測試環境的方式來執行程式。 當然,除了@Profile外,還有其它選擇,在RedisCacheConfig.java裡面有通往Redis Server的連線設定 package wf.config; Configuration @EnableCaching(mode = AdviceMode.PROXY) public class RedisCacheConfig extends CachingConfigurerSupport { @Bean("prod") @Conditional(LettuceConnFactoryCondition.class) public LettuceConnectionFactory redisProdConnectionFactory(GenericObjectPoolConfig gopc) { logger.info("採用正式環境:連到正式Redis主機"); RedisSocketConfiguration config = new RedisSocketConfiguration("/tmp/redis.sock"); ... LettucePoolingClientConfiguration poolConfig = LettucePoolingClientConfiguration.builder() .poolConfig(gopc).build(); return new LettuceConnectionFactory(config, poolConfig); } @Bean("dev") @Conditional(LettuceConnFactoryCondition.class) public LettuceConnectionFactory redisDevConnectionFactory(GenericObjectPoolConfig gopc) { logger.info("採用測試環境:連到測試Redis Mock"); RedisStandaloneConfiguration config = new RedisStandaloneConfiguration("localhost", context.getBean(RedisServer.class).getBindPort()); LettucePoolingClientConfiguration poolConfig = LettucePoolingClientConfiguration.builder() .poolConfig(gopc).build(); return new LettuceConnectionFactory(config, poolConfig); } 然後透過LettuceConnFactoryCondition.java來決定哪個Bean應該被建立 package wf.util; public class LettuceConnFactoryCondition implements Condition { @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { String profile = context.getEnvironment() .getProperty("spring.profiles.active", "prod"); Map<String, Object> attributes = metadata.getAnnotationAttributes( Bean.class.getName()); String beanName = attributes == null ? "" : ((String[]) attributes.get("value"))[0]; return beanName.startsWith(profile); } } 只要BeanName與開頭與系統屬性spring.profiles.active(預設為"prod")一致時則建立, 所以得以依這個屬性來決定要如何連結到Redis Server。 另外我想要提一下的是Ractive JDBC,目前有R2DBC(https://r2dbc.io)可用,當然 Spring也有對應的專案(https://spring.io/projects/spring-data-r2dbc),現下只支援 幾個主流的資料庫,而且大都不是官方開發的,最大的困擾還是在於沒有JNDI可用,讓我 沒法利用像Atomikos這種工具來作Two Phase Commit,失去了跨資料庫的機會,當然也可 換種想法,要快就不要跨資料庫Commit。 為了介紹後續的功能,我必須先說明一下我資料庫的Schema:只有包含兩個Table,一個 是成員,另一個是成員的角色。 CREATE TABLE IF NOT EXISTS member( account varchar(10) primary key, username varchar(16) not null, passwd varchar(20) not null, enabled varchar(1) default 'Y' check(enabled='Y' or enabled='N'), birthday date default CURRENT_DATE ); CREATE TABLE IF NOT EXISTS authorities( aid SERIAL primary key, account varchar(10) references member(account) on update cascade on delete cascade, authority varchar(50) not null ); create unique index IF NOT EXISTS authorities_idx on authorities(account,authority); 主要對應的類別是wf.model.Member,特別需要關注的是isNew()這個Method,Spring Data主要透過這個方法來決定資料是要Insert還是Update;其它相關的物件有 wf.data.MemberDao(負責資料庫對應的查詢或更新)與wf.data.MemberManager(負責交易 的控制)。 說到這,我不得不提Spring的DataBinder, @ControllerAdvice public class ControlBinder { /*private MemberManager manager; @Autowired public void setManager(MemberManager manager) { this.manager = manager; }*/ @InitBinder public void initBinder(WebDataBinder binder) { binder.registerCustomEditor(Date.class, new DatePropertyEditor()); binder.registerCustomEditor(LocalDate.class, new LocalDatePropertyEditor()); binder.registerCustomEditor(Boolean.class, new BooleanPropertyEditor()); //binder.registerCustomEditor(Member.Mono.class, manager); } } 上面註冊了一些物件,來做為資料在String與Object之間的轉換,可以看到這些類別都實 做了java.beans.PropertyEditor,(MemberManager也不例外,沒有註冊的原因是因為我 實做並在WebConfig.java註冊了另一個物件MemberFormatter),這種轉換有什麼用呢?且 看下面例子: package wf.spring; @Controller public class RootController { @GetMapping("/hello/{member}") public String hello(@PathVariable("member") Member.Mono member, Model model) { model.addAttribute("user", member.get().switchIfEmpty(Mono.empty())); return "hello"; } } 只要在網址列打上Member的帳號,在叫用Method前,會先將PathVariable透過轉換器轉換 成對應的物件。 另一個常用的功能則是用@ModelAttribute來蒐集前端輸入 https://i.imgur.com/7eFAzHr.png
@Controller public class RootController { @PreAuthorize("hasRole('ADMIN')") @PostMapping("/modifyMember/{member}") public Mono<String> modifyMember( @PathVariable("member") Member.Mono oriMember, @ModelAttribute Member member, Model model) { //避免後面有值但第一個沒勾,導致null字串 Iterables.removeIf(member.getRoles(), Predicates.isNull()); model.addAttribute("member", memberManager .saveMember(member.setNew(false))); return Mono.just("member"); } } 前端輸入的資料,毌論是Master主體資料與Detail角色資料一併被蒐集並轉成member物件 (生日也因為註冊過LocalDatePropertyEditor也同樣被轉成LocalDate)。 在進入Security之前要提一下projectreactor.io的reactor.util.Logger,常常在Mono或 Flux加入log()方法來記錄除錯過程,其實它是叫用log(Logger),可惜這個Logger,其底 層是採用SLF4J所實作的非同步Log,但您可以注意到,我採用的是Apache Log4j2,雖然 Log4j2,有asyncLogger,但若Mono或Flux沒有對應的Logger可用,有點遺憾,所以我實 做了一個Loggers4j2,可以替代原本的Logger來對Mono或Flux除錯。Log4j2的 asyncLogger本質上是不希望記錄Log發生在何處,因為找出記錄發生在何處會使得效能大 大降低,所以稟持相同理念,您應該在Log時,讓訊息本身彰顯足以判斷出處,當然在開 發模式下,我還是會找出訊息記錄的發生處。 Spring Security的設定如下,可惜沒有辦法用XML進行設定 package wf.config; @EnableWebFluxSecurity @EnableReactiveMethodSecurity public class SecConfig { ... @EnableWebFluxSecurity是說明採用Spring Security,@EnableReactiveMethodSecurity 則說明會採用Method級別的安全設定, public class SecConfig { @Bean public SecurityWebFilterChain securitygWebFilterChain( ServerHttpSecurity http) { SecurityWebFilterChain build = http .authorizeExchange() .pathMatchers("/", "/index", "/hello/**", "/static/**", "/login", "/logout").permitAll() ... .formLogin((ServerHttpSecurity.FormLoginSpec flt) -> { flt.authenticationManager(new CaptchaUserDetailsReactiveAuthenticationManager( userDetailsService())); flt.loginPage("/login"); flt.authenticationFailureHandler(new RedirectFluxWebAuthenticationFailureHandler( "/login?error")); }) ... .and().build(); build.getWebFilters().subscribe(filter -> { if (filter instanceof AuthenticationWebFilter) { AuthenticationWebFilter awf = (AuthenticationWebFilter) filter; awf.setServerAuthenticationConverter(new CustomServerFormLoginAuthenticationConverter()); } }); return build; } SecurityWebFilterChain(http)是主要的設定主體,可以看出我想自訂登錄畫面(主要是 加入Captcha),因為Spring Security使用UsernamePasswordAuthenticationToken來存放 用戶的帳號/密碼,所以以wf.security.UsernamePasswordCaptchaAuthenticationToken 來對應存放資訊,也因為多了Captcha,所以必須自行進行授權檢查,所以在formLogin裡 指定了自訂的wf.model.CaptchaUserDetailsReactiveAuthenticationManager,但問題來 了,Webflux把蒐集token的過程隱藏起來以致於沒辦法讓包含Captcha的token被轉送給 AuthenticationManager來處理,所以我也只能用過濾Filters的方式,把 wf.security.CustomServerFormLoginAuthenticationConverter替換給 AuthenticationWebFilter。 https://i.imgur.com/ASFzbtc.png
這裡要備註一點小提醒:Capatch驗證一旦取用,必須立即清除,否則機器人只要取用一 次,就可以無限次try帳/密就失去了Captch的意義了。 理論上加進了Security,那麼我們就能在Request Method裡面加上 @AuthenticationPrincipal來取的User Principal public class RootController { @GetMapping("/whoami") public String whomai(@AuthenticationPrincipal Mono<UserDetails> principal, Model model) { model.addAttribute("user", principal); return "index"; } 所以寫下了測試程式: package wf.spring; @WebFluxTest(controllers = RootController.class, excludeAutoConfiguration = {ReactiveSecurityAutoConfiguration.class}) @TestExecutionListeners({ReactorContextTestExecutionListener.class ,WithSecurityContextTestExecutionListener.class}) @ContextConfiguration(classes = {TestContext.class}) public class TestRootController extends AbstractTestNGSpringContextTests { @Test @WithMockUser(username = "nobody", password = "nobody", authorities = "ROLE_USER") void testWhoAmi() { Member member = new Member("nobody", "嘸人識君"); member.setPasswd("nobody"); member.addRole("ROLE_USER"); webClient .mutateWith(mockUser(new MemberDetails(member))) //.mutateWith(mockUser("嘸人識君").roles("USER")) .get().uri("/whoami").header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_HTML_VALUE) .exchange().expectBody().consumeWith(response -> Assertions.assertThat(new String(response.getResponseBody() , StandardCharsets.UTF_8)).contains("嘸人識君")); } 發現@WithMockUser完全沒用,我猜是不是WithMockUserSecurityContextFactory裡的 createEmptyContext()的關係,所以不得不改用上述程式碼裡的 mutateWith(mockUser(…))方式來mock User。 這裡第一隻測試程式TestRootController必須先說明一下: 沒錯,TestCase是繼承AbstractTestNGSpringContextTests,為什麼是TestNG?那是因為 我第一次寫單元測試,JUnit沒有多執行緒測試,所以只能改用TestNG,另外的一個原因 則是TestNG產生的報表比較美觀。也不知道是不是因為TestNG的關係,導致一些行為不如 我的預期。 Spring的測試一般都會排除Security設定,讓測試的行為儘量單純,所以上述測試, WebFluxTest首先要排除Reactive Security的自動設定,而@TestExecutionListeners用 來處理事前準備作業(其中的WithSecurityContextTestExecutionListener可以從所有測 試程式中移除,雖然我照著官方說明(https://bit.ly/3xIRhK5)來作,但完全看不出有 什麼用)。 然後實測結果,Principal還無無法傳播到 whoami(),所以我猜應該是某某不知名的原因 ,導致測試環境沒有建立HandlerMethodArgumentResolver,所以我從boot抄來 wf.util.ConditionalOnMissingBean與wf.util.MissingBeanCondition,並在 package wf.config; public class TestContext implements WebTestClientConfigurer { @Bean("testAuthenticationPrincipalResolver") @ConditionalOnMissingBean(AuthenticationPrincipalArgumentResolver.class) public HandlerMethodArgumentResolver authenticationPrincipalArgumentResolver(BeanFactory beanFactory) { return new TestAuthenticationPrincipalResolver(beanFactory); } 當環境缺少AuthenticationPrincipalArgumentResolver時,自動建立一個 wf.config.TestAuthenticationPrincipalResolver(也是抄來的),自此測試才算圓滿成 功。 這裡也要特別提醒,每一支AbstractTestNGSpringContextTests都運行在一個獨立的 Context中(多隻Tests運行時,會看到Spring Boot的LOGO跑出來多次)。 相信很多人對WebSocket的第一印象就是那個著名的Chat聊天程式,Client端發起一個 WebSocket連線到Server,Server則記著所有連線,只要接收到Client End傳來的訊息, 立即把該訊息逐一傳送給其它所有連線。 其實細究Client到Server建立連線有一個過程,一開始Client是透過URL連線到Server, 完成HandShake後才建立一個雙向連線(雙方都可發訊息給另一方),直到有一方中斷連線 。 所以Securiy的應用,第一步就是開始的那個URL連線,SpringSecurity管控URL是天經地 義;當WebSocket連線建立後,即使用戶登出,只要雙方沒有一方切斷連線,其實這個 WebSocket並不會受到影響,畢竟兩者處在不同世界,所以Server必須記著這個 WebSocket 連線,當用戶登出後,立即由Server端切斷WebSeocket連線。 記得前面說過,Spring Session可以用來建造Web Cluster嗎?因為Session是存在獨立的 Redis Server,所以Client端連線進來,並不在意Cookie會被送到叢集中的哪一台。她們 是等價的;但是WebSocket連線是一個持續的連線,一旦建立,Client便會和最後 HandShake的這台WebServer建立一條持久穩固的連線。也就是說:同一瀏覽器可能開啟多 個視窗連線到N台WebServer以建立WebSocket。 假設一用戶(Nobody)用兩個裝置的瀏覽器,先後登錄到WebServer並各自開啟兩個頁面(假 設是聊天室,可能連到不同的兩臺WebServer)並建立WebSocket。所以我們先確立幾件事 ※四個WebSocket連線共登錄了兩次,所以有兩個不同的Session Id ※任何給Nobody的訊息,都應該送達這4個頁面 ※其中一個瀏覽器進行登出,只會影響同瀏覽器的頁面,另一個裝置的兩個 WebSocket連線仍然持續運作 首先設定以/ws做為進入點,這個進入點在之前的安全設定必須為登錄過用戶使用,URL會 取得靜態網頁chat.html,同時這也是WebSocket HandShake的point. public class RootController { @GetMapping("/ws") public String chat() { return "chat"; } 然後在WsConfig.java指定WebSocket HandShake所在 package wf.config; @Configuration @Import(RedisCacheConfig.class) public class WsConfig { @Bean public HandlerMapping handlerMapping(WebSocketHandler webSocketHandler) { String path = "/chat"; Map<String, WebSocketHandler> map = new HashMap<>(); map.put(path, webSocketHandler); return new SimpleUrlHandlerMapping(map, -1); } @Bean public HandlerAdapter wsHandlerAdapter() { return new WebSocketHandlerAdapter(webSocketService()); } @Bean public WebSocketService webSocketService() { return new wf.spring.HandshakeFluxWebSocketService(); } 介入HandShake過程(靠自訂HandshakeFluxWebSocketService) WebSocket只能在HandShake時取得Session相關資訊,這也是為什麼需要介入HandShake Service的原因,我們在HandShake的同時,將額外的資料設定給WebSocket package wf.spring; public class HandshakeFluxWebSocketService extends HandshakeWebSocketService implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { setSessionAttributePredicate(s -> { logger.info("轉遞 ({}) 給 WebSoketHandler", s); return true; }); } @Override public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) { exchange.getSession().subscribe(ws -> { SecurityContext sc = ws.getAttribute( HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY); if (sc != null) { logger.info("HandshakeFluxWebSocketService-principal is [{}]{}" , sc.getAuthentication().getPrincipal().getClass().getName() , sc.getAuthentication().getPrincipal());} ws.getAttributes().put("JSESSIONID", ws.getId()); }); return super.handleRequest(exchange, handler); } 基本上,HandleShake會想要把Session裡面所有的東西設定給WebSocket,但會先問一下 ,基本上我是一律放行,所以setSessionAttributePredicate()都是回傳true; 之前也說過,瀏覽器登出時,要把相關的WebSocket全數關閉,所以我需要知道交互的對 象的Session ID,基本上Spring Session,所以在handleRequest(), 我取出後,直接放 到WebSocket的Attributes(Key值是JSESSIONID),您可以從執行的Log中看出端倪。 每個WebSocket連線後,SocketHandler都要建立一個WebSocketRedisListener物件(本身 會記住是屬於哪個JSESSIONID),物件的責任內容如下(this的部份不是很正確,勿怪): https://i.imgur.com/uyF09YL.png
wf.spring.FluxWebSocketHandler裡面有一個全域物件存放所有的 WebSocketRedisListener, public static final ListMultimap<String,websocketredislistener> userListenser = MultimapBuilder... WebSocketHandler一開始就是要建立一個WebSocketRedisListener,即使本身收到 Client End傳來的訊息,也要交由這個WebSocketRedisListener去廣播給所有WebSocket 連線。 @Component("serverLogoutSuccessHandler") public class FluxWebSocketHandler implements WebSocketHandler, ServerLogoutSuccessHandler { public Mono<Void> handle(WebSocketSession session) { Object jsessionId = session.getAttributes().get("JSESSIONID"); Object sectximp = session.getAttributes().get( HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY); if (sectximp != null && jsessionId != null && !jsessionId.toString().trim().isEmpty()) { SecurityContext sectx = (SecurityContextImpl) sectximp; Authentication auth = sectx.getAuthentication(); User user = auth == null ? null : ((User) auth.getPrincipal()); if (user != null) { ReactiveRedisMessageListenerContainer container = context.getBean( ReactiveRedisMessageListenerContainer.class); ReactiveRedisTemplate<String, JsonNode> redisTemplate = new ReactiveRedisTemplate<>(connectionFactory, serializationContext); UnaryOperator<JsonNode> processor = notify -> notify; WebSocketRedisListener<JsonNode> wsrl = context.getBean(WebSocketRedisListener.class ,session, container, user.getUsername(), jsessionId ,serializationContext, redisTemplate, processor); logger.info("put listener[{}] {}", jsessionId, userListenser.put(jsessionId.toString().trim(), wsrl)); return session.receive().flatMap(webSocketMessage -> { String payload = webSocketMessage.getPayloadAsText(); logger.debug("收到:{}", payload); //convert payload to JsonNode JsonNode notify = serializationContext .getValueSerializationPair() .read(ByteBuffer.wrap(payload.getBytes( StandardCharsets.UTF_8))); wsrl.getReactiveRedisTemplate().convertAndSend( user.getUsername(), notify).subscribe(); return Mono.empty(); }).doOnTerminate(() -> { if (userListenser.get( jsessionId.toString()).remove(wsrl)) { wsrl.destroy(); logger.info("移除監聽器"); } else { logger.error("移除監聽器失敗"); } }).doFinally(signal -> { ... 這個Handler也同時實做了ServerLogoutSuccessHandler,為的就是在用戶登出時,從 userListener清除同JSESSIONID的WebSocketRedisListener。 @Override public Mono<Void> onLogoutSuccess(WebFilterExchange exchange, Authentication authentication) { String username = authentication.getPrincipal() == null ? exchange.getExchange().getPrincipal() .map(p -> p.getName()).block() : User.class.isAssignableFrom( authentication.getPrincipal().getClass()) ? ((User) authentication.getPrincipal()).getUsername() : Principal.class.isAssignableFrom( authentication.getPrincipal().getClass()) ? ((Principal) authentication.getPrincipal()).getName() : null; logger.info(":{}登出", username); exchange.getExchange().getSession().subscribe(ws -> { logger.info("JSESSIONID:{}", ws.getId()); userListenser.removeAll(ws.getId()).forEach(wrl -> wrl.destroy()); }, t -> logger.error("登出排除WS時錯誤:" + t.getMessage(), t) ); ServerHttpResponse response = exchange.getExchange().getResponse(); response.setStatusCode(HttpStatus.FOUND); response.getCookies().remove("JSESSIONID"); response.getHeaders().setLocation(logoutSuccessUrl); return exchange.getExchange().getSession() .flatMap(WebSession::invalidate); } 至此,完成我對WebSocket的期待,我心目中的購物車,就是當商品被放入購物車的時後 ,資料打包丟給JMS去逐一處理(我用這種方式應付過9.8K-Google Analytics顯示人潮同 時開搶,可惜那時還不會應用WebSocket),後端處理完成後再透過WebSocket把購物車的 變動通知前端。 chat.html裡面有個放入購物車的按紐,就是透過Ajax通知後端放入購物車,然後把訊息 Publish給Redis,Listener收到後再透過WebSocket通知商品已放入購物車。 https://i.imgur.com/B1h1XFW.png
當您同時用不同裝置開啟蝦皮時,只要一個裝置放入商品,其它裝置的購物車也會同步更 新購物車就是我想要的效果。 再試試直接從Redis命令列直接發佈訊息 https://i.imgur.com/ewxrg7H.png
最後要考慮的是如何測試?前述的加入購物車,是個多步驟的過程,首先,需要有Redis的 環境,然後登錄,建立WebSocket連線,呼叫放入購物車,檢查回傳的訊息。近乎真實世 界的測試,此時已不能視為"單元"測試,而是應該視為"整合測試"。因為近乎真實,所以 會有CSRF、會有Captcha,為了測試的緣故,必須將CSRF與Catpch固定下來,所以Captcha 在有系統屬性"captcha"時,就會以此值做為預設值。也因為為了固定CSRF的值,所以測 試不會引入原本的wf.config.SecConfig,而是引入改寫過的TestSecConfig.java。 @EnableWebFluxSecurity @EnableReactiveMethodSecurity public class TestSecConfig extends SecConfig { SecurityWebFilterChain build = http .authorizeExchange() .pathMatchers("/", "/index", "/hello/**", "/static/**", "/login", "/logout").permitAll() .pathMatchers("/admin/**").hasAuthority("ROLE_ADMIN") .anyExchange().authenticated() .and().csrf(c -> c.csrfTokenRepository( new FixedCsrfTokenRepository(csrf))) ... 在測試WebSocket,我用了兩種方式,分別是透過Reactor Netty的方式與HtmlUnit的方式 ,ReactorNetty的方式比較低階(此時我比較想用這個,畢竟是在寫Reactive程式),必須 自行取Captcha圖檔,記住前次Cookies,發送FormData,HtmlUnit則比較高階,整個就是 個Headless Browser,除了看不到畫面,與一般的瀏覽器並無不同,但兩者都有同樣的問 題困擾我:就是無法真正掌握WebSocket建立連接的時刻(以前端的角度來看,就是無法掌 握WebSocket.onOpen的時機),逼得我沒辦法,只能以Thread.sleep(3000)應對,也許有 大神能夠開示一下。 在我學習WebFlux的過程,總感覺官方文件不是寫得很詳細,所以才想要記錄一下歷程, 也希望對後來者有點幫助。 -- ※ 發信站: 批踢踢實業坊(ptt.cc), 來自: 42.77.197.86 (臺灣) ※ 文章網址: https://www.ptt.cc/bbs/java/M.1650552417.A.F39.html
文章代碼(AID): #1YOMvXyv (java)
文章代碼(AID): #1YOMvXyv (java)