최근 적용한 WebSocket에 대해서 포스팅 해보려고 합니다.

프론트엔드 관련 내용은 기정님 이 작성해주셨습니다. :0

시작하며

최근 프로젝트 개발 및 유지보수 작업을 진행하면서, 알림 팝업창을 띄우는 방식에서 이슈가 있다는 것을 느꼈습니다. 그건 바로 폴링 방식으로 매번 프론트엔드 소스에서 신규 데이터가 있는지를 체크 하고 조회하는 형태로 구현이 되어 있었고, 바로 아래와 같이 신규 데이터가 있던 없던 간에 1초 마다 API를 call 하는 형태였습니다.

이런 형태의 폴링 방식은 부하를 가져올 것이라는 것을 예상했고, 이번 고도화 작업을 통해 웹 소켓(WebSocket) + STOMP를 적용해 신규 데이터가 있을 때만, 수신하고 있는 유저에게 팝업을 띄워주는 방식으로 적용하는 과정에 대해서 이야기 해보려고 합니다.

구성 단계

기본적으로 웹 소켓을 사용할 수 있도록 디펜던시를 pom.xml에 추가하고,

pom.xml
<!-- https://mvnrepository.com/artifact/org.springframework/spring-websocket -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-websocket</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework/spring-messaging -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.eclipse.jetty.websocket/javax-websocket-client-impl -->
<dependency>
    <groupId>org.eclipse.jetty.websocket</groupId>
    <artifactId>javax-websocket-client-impl</artifactId>
</dependency>

WebSocket 구성을 위한 config 파일을 만들어 WebSocket을 사용할 수 있도록 구성했습니다.

WebSocketConfig.java
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    // https://www.baeldung.com/spring-security-websockets
    // https://supawer0728.github.io/2018/03/30/spring-websocket/

    /**
     * Configure message broker.
     *
     * @param config the config
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // eg. To invoke @MessageMapping("/test"), send message's destination is /app/test
        config.setApplicationDestinationPrefixes("/app");

        // eg. Subscribe message's destination is /topic/{value} and use @SendTo annotation in controller
        // eg. Subscribe message's destination is /user/queue/{value} and use @SendToUser annotation in controller
        config.enableSimpleBroker("/topic", "/queue");
    }

    /**
     * Register stomp endpoints.
     *
     * @param registry the registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").setAllowedOrigins("*").addInterceptors(new HttpHandshakeInterceptor());
        registry.addEndpoint("/websocket").setAllowedOrigins("*").addInterceptors(new HttpHandshakeInterceptor()).withSockJS();
    }

    /**
     * Configure web socket transport.
     *
     * @param registration the registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(160 * 64 * 1024);    // Max incoming message size, default : 64 * 1024
        registration.setSendTimeLimit(20 * 10000);            // default : 10 * 10000
        registration.setSendBufferSizeLimit(10 * 512 * 1024); // Max outgoing buffer size, default : 512 * 1024
    }
}

여기서 클라이언트에서 End point를 /websocket으로 해서 구독을 할 수 있도록 설정했고, 다들 아시겠지만 구독 중인 전체 사용자 대상으로 통신을 전달할 /topic과 단일로 연결된 대상에게 통신을 전달할 /queue를 구독할 수 있도록 설정했습니다. 또한, WebSecurityConfig에서 End Point 가 접근할 수 있도록 허용하도록 코드를 수정했습니다.

... (생략)
.antMatchers("/websocket/**").permitAll()
... (생략)


그리고 기본적으로 Interaction 프로세스가 어떠한 방법으로 진행될지에 대해 먼저 정리를 한번 해주는게 좋을 것 같아 UML(Unified Modeling language)을 이용해 그려보았습니다.


위와 같은 형태로 유저가 로그인을 통해 websocket 커넥션이 맺어지게 되면, 로그인 인증을 했던 토큰에 대한 검증을 진행 후, 해당 유저와의 통신을 위한 통로(destination)를 전달하고 유저는 해당 통로를 이용해 구독(subscribe)을 하게 됩니다. 그리고 이벤트가 발생하면 해당 통로를 통해 이벤트 데이터를 전달하게 됩니다.


기본 구성은 끝났으니, 웹 소켓이 연결이 되었을 때, 연결 요청에 대한 전처리 및 후처리를 담당하는 핸드셰이크 인터셉터를 만들어 요청의 세션 정보를 같이 넘겨주도록 했습니다.

HttpHandshakeInterceptor.java
public class HttpHandshakeInterceptor implements HandshakeInterceptor {

    /**
     * Before handshake boolean.
     *
     * @param request    the request
     * @param response   the response
     * @param wsHandler  the ws handler
     * @param attributes the attributes
     * @return the boolean
     * @throws Exception the exception
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                                   Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
            HttpSession session = servletRequest.getServletRequest().getSession();

            attributes.put("sessionId", session.getId());
        }

        return true;
    }

    /**
     * After handshake.
     *
     * @param request   the request
     * @param response  the response
     * @param wsHandler the ws handler
     * @param exception the exception
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                               WebSocketHandler wsHandler, Exception exception) {
		// nothing to do.
    }
}


그리고나서, 웹 소켓 연결이 이루어 지거나, 연결이 끊어지는 등의 이벤트가 발생했을 때의 처리 과정, 유저 세션 정보 등록, 세션 정보가 유효할 때의 정보를 조회 및 특정 사용자에게 메시지를 전달하기 위한 헤더를 만들어주는 메소드 등을 다음과 같이 리스너(Listener)를 통해서 구현을 진행했습니다.

EventListener.java
@Component
public class EventListener {

    private static final Logger logger = LoggerFactory.getLogger(EventListener.class);

    private static Map<String, BrowserSession> browserSessionMap = new ConcurrentHashMap<String, BrowserSession>();

    /**
     * Handle session connected events.
     *
     * @param event the event
     */
    @EventListener
    public void handleWebSocketConnectListener(SessionConnectedEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        //GenericMessage msg = (GenericMessage) headerAccessor.getMessageHeaders().get("simpConnectMessage");

        logger.info("Received a new web socket connection. Session ID : [{}]", headerAccessor.getSessionId());
    }

    /**
     * Handle session disconnected events.
     *
     * @param event the event
     */
    @EventListener
    public void handleWebSocketDisConnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        String sessionId = findBrowserSessionId(headerAccessor.getSessionId());
        if(sessionId != null) {
            browserSessionMap.remove(headerAccessor.getSessionId());
        }

        logger.info("Web socket session closed. Message : [{}]", event.getMessage());
    }

    /**
     * Find session id by session id.
     *
     * @param sessionId
     * @return
     */
    public String findBrowserSessionId(String sessionId) {
        String session = null;

        for (Map.Entry<String, BrowserSession> entry : browserSessionMap.entrySet()) {
            if (entry.getKey().equals(sessionId)) {
                session = entry.getKey();
            }
        }

        return session;
    }

    /**
     * Register browser session.
     *
     * @param browserSession the browser session
     * @param sessionId      the session id
     */
    public synchronized void registerBrowserSession(BrowserSession browserSession, String sessionId) {
        browserSessionMap.put(sessionId, browserSession);
    }

    /**
     * Find session ids by user name list.
     *
     * @param username the member id
     * @return the list
     */
    public List<String> findSessionIdsByMemberId(String username) {
        List<String> sessionIdList = new ArrayList<String>();

        for (Map.Entry<String, BrowserSession> entry : browserSessionMap.entrySet()) {
            if (entry.getValue().getUserId().equals(username)) {
                sessionIdList.add(entry.getKey());
            }
        }

        return sessionIdList;
    }

    /**
     * Create headers message headers.
     *
     * @param sessionId the session id
     * @return the message headers
     */
    public MessageHeaders createHeaders(String sessionId) {
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);

        return headerAccessor.getMessageHeaders();
    }
}


클라이언트에서 연결이 이루어졌을 때, 세션에 대한 핸들링 처리를 하기 위해  Session Handler를 만들었으며 afterConnected를 구현해 연결이 이루어졌을 때 구독을 하도록 설정했습니다. 기본적으로 StompSessionHandlerAdaptor를 상속 받아서 afterConnected, handlerException, getPayloadType, handleFrame 등을 override 해야 됩니다.

SessionHandler.java
@Component
public class SessionHandler extends StompSessionHandlerAdapter {

    /**
     * The constant logger.
     */
    private static final Logger logger = LoggerFactory.getLogger(SessionHandler.class);

    /**
     * The Session.
     */
    private StompSession session;

    /**
     * After connected.
     *
     * @param session          the session
     * @param connectedHeaders the connected headers
     */
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        logger.info("New session established : " + session.getSessionId());
        this.session = session;

        subscribe(WS_QUEUE_USER + WS_QUEUE_REPLY);
        subscribe(WS_QUEUE_USER + WS_QUEUE_ERROR);
    }

    /**
     * Handle exception.
     *
     * @param session   the session
     * @param command   the command
     * @param headers   the headers
     * @param payload   the payload
     * @param exception the exception
     */
    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
        logger.error("Got an exception. Reason : {}", exception.getMessage());
        logger.error("StompHeaders : [{}], Payload : [{}]", headers, new String(payload));
    }

    /**
     * Gets payload type.
     *
     * @param headers the headers
     * @return the payload type
     */
    @Override
    public Type getPayloadType(StompHeaders headers) {
        return SimpleJsonResponse.class;
    }

    /**
     * Handle frame.
     *
     * @param headers the headers
     * @param payload the payload
     */
    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        SimpleJsonResponse msg = (SimpleJsonResponse) payload;
        logger.info("Received Message : [{}]", msg);
    }

    /**
     * Subscribe.
     *
     * @param destination the destination
     */
    public synchronized void subscribe(String destination) {
        session.subscribe(destination, this);
        logger.debug("[{}] Subscribed.", destination);
    }
}


마지막으로 웹 소켓 통신 처리를 위한 컨트롤러(Controller)를 만들어, 요청에 대한 처리를 할 수 있도록 구성 해줍니다. 로그인을 하게 되면 새로운 웹 소켓 통신을 할 수 있는 통로를 클라이언트로 반환해주면서 해당 통로로 연결된 유저는 응답 값을 받을 수 있도록 설정했습니다.

WebSocketController.java
@Controller
public class WebSocketController {

    /**
     * The constant logger.
     */
    private static final Logger logger = LoggerFactory.getLogger(WebSocketController.class);

    /**
     * The Session listener.
     */
    @Autowired
    private EcotrolEventListener sessionListener;

    @Autowired
    private MemberService memberService;

    @Autowired
    private JwtTokenUtil jwtTokenUtil;

    @MessageMapping("/test")
    public void test(SimpMessageHeaderAccessor headerAccessor, final SimpleJsonResponse message) {
        logger.info("Init - SessionID : [{}], Message : [{}]", headerAccessor.getSessionId(), message);
    }

    /**
     * <pre>
     * 로그인 한 사용자가 웹 소켓 통신을 할 수 있도록 url 정보를 리턴한다.
     * e.g) /user/queue/alarm/{uuid}
     * </pre>
     *
     * @param headerAccessor
     * @param message
     * @return
     */
    @MessageMapping("/login")
    @SendToUser(WS_QUEUE_ALARM)
    public SimpleJsonResponse login(SimpMessageHeaderAccessor headerAccessor, final SimpleJsonResponse message) {
        logger.info("Login - SessionID : [{}], Message : [{}]", headerAccessor.getSessionId(), message.toString());

        SimpleJsonResponse response = new SimpleJsonResponse(WS_CODE_LOGIN_RESPONSE);
        if (message.getResultCode() == WS_CODE_LOGIN) {
            try {
                String authToken = (String) message.getData();

                if ("REFRESH_TOKEN".equals(jwtTokenUtil.getAuthFromToken(authToken))) {
                    throw new Exception("token is not authorized.");
                }

                String username = jwtTokenUtil.getUsernameFromToken(authToken);

                MemberManagement member = memberService.getMember(username);

                if (member == null) {
                    throw new Exception("member does not exists.");
                }

                String uuid = headerAccessor.getSessionId();

                BrowserSession browserSession = new BrowserSession();
                browserSession.setSessionId(headerAccessor.getSessionId());
                browserSession.setUserId(member.getId());
                browserSession.setUuid(uuid);

                // register session
                sessionListener.registerBrowserSession(browserSession, headerAccessor.getSessionId());

                response.setStatus(true);
                response.setData(WS_QUEUE_USER + WS_QUEUE_ALARM + "/" + uuid);
            } catch (Exception e) {
                logger.error("Unhandled exception occurred while handle /app/login.", e);

                response.setStatus(false);
                response.setResultMessage(e.getMessage());
            }
        } else {
            response.setStatus(false);
            response.setResultMessage("Message code must be [" + WS_CODE_LOGIN + "] for /app/login.");
        }

        return response;
    }

    /**
     * Heartbeat.
     *
     * @param message the message
     * @throws Exception the exception
     */
    @MessageMapping("/heartbeat")
    public void heartbeat(SimpMessageHeaderAccessor headerAccessor, final SimpleJsonResponse message) throws Exception {
        logger.info("Heartbeat - SessionID : [{}], Message : [{}]", headerAccessor.getSessionId(), message);
    }

    /**
     * Handle exception string.
     *
     * @param exception
     * @return
     */
    @MessageExceptionHandler
    @SendToUser(WS_QUEUE_ERROR)
    public String handleException(Throwable exception) {
        return exception.getMessage();
    }
}


테스트

우선 백엔드 자바 코드 상으로 연결이 제대로 되는지 테스트를 진행하기 위해 간단한 메인 Method가 포함된 클래스를 다음과 같이 작성하고 서버 실행 후, 메인 Method를 실행해봤습니다. 기본적으로 로컬 상의 테스트이기 때문에 웹 소켓 연결 주소로는 " ws://localhost:8080/websocket " 으로 End Point 까지 구성한 대로 설정해주었습니다.

WebSocketClient.java
public class WebSocketClient {

    private static final String URL = "ws://localhost:8080/websocket";

    public static void main(String[] args) {
        org.springframework.web.socket.client.WebSocketClient client = new StandardWebSocketClient();
        WebSocketStompClient stompClient = new WebSocketStompClient(client);
        stompClient.setMessageConverter(new MappingJackson2MessageConverter());

        stompClient.connect(URL, new EcotrolSessionHandler());

        new Scanner(System.in).nextLine(); // Don't close immediately.
    }
}


그 결과, 다음과 같이 SessionHandler에서 로그로 찍어준 코드가 찍히면서, 새로운 세션 연결이 이루졌다는 것을 확인해보실 수 있습니다.

메시지가 정상적으로 전송이 되는지 정확히 테스트를 진행하기 위해 다음과 같이 SessionHandler의 afterConnected 메소드에 메시지 Object를 하나 만들고, 해당 메시지를 전송하도록 테스트 코드를 추가하고 테스트를 진행하면 아래와 같이 클라이언트에서 응답 값을 받아서 확인할 수 있습니다.

public void sendAlarmToMember(Member member) {
		List<String> sessionList = sessionListener.findSessionIdsByMemberId(member.getId());

		for (String sessionId : sessionList) {
			Message message = new Message("Today ME", "Hello world! :)");

			logger.info("Send Alarm To member : [{}], message : [{}]", member.getId(), message);

			// set websocket response message.
			SimpleJsonResponse response = new SimpleJsonResponse();
			response.setResultCode(WS_CODE_COMMAND_RESPONSE);
			response.setStatus(true);
			response.setData(message);

			// send websocket message
			simpMessagingTemplate.convertAndSendToUser(sessionId, WS_QUEUE_ALARM + "/" + sessionId,
					response, sessionListener.createHeaders(sessionId));
		}
}


프론트엔드

react에서 websocket을 연동하기 위해 stompjs( npmjs.com/package/stompjs ) 라이브러리를 사용하였습니다.

stompjs는 websoket 사용을 위한 STOMP 클라이언트를 제공합니다.

STOMP 는 Ruby, Python, Perl 과 같은 스크립트 언어를 위해 고안된 단순한 메시징 프로토콜이다. 그것은 메시징 프로토콜에서 일반적으로 사용되는 패턴들의 일부를 제공한다. 
STOMP 는 TCP 나 WebSocket 과 같은 신뢰성있는 양방향 streaming network protocol 상에 사용될 수 있다.

우선, 모듈 설치를 진행합니다.

npm install stompjs

모듈 설치 완료 후, 모듈을 사용할 수 있도록 import 합니다.

import stompjs from 'stompjs';

websocket 연결 및 구독을 위한 openSocket 메소드를 작성합니다.

//Subscribe destination
const WEBSOCKLOGIN = '/app/login';
const USERREPLY = '/user/queue/reply';
const HEARTBEAT = 'app/heartbeat';
const ERROR = '/user/queue/error'

openSocket = endpoint => {
    let { token } = this.props;
    const sock = stompjs.over(new WebSocket(endpoint));	//웹소켓을 통해 STOMP브로커에 연결

    const onMessage = e => {
      const body = JSON.parse(e.body);
      if (body.resultCode === 5) {
        this.sockDestination = body.data;
        sock.subscribe(this.sockDestination, onMessage);
      }else if(body.resultCode === 16){
        const item  = body.data;
        const { alarms, alarmHistory } = this.state;
        this.setState({
          alarms: [item, ...alarms],
          alarmHistory: [item, ...alarmHistory],
        });
      }
    };

    const errorOnMessage = e =>{
      const error =e.body;
      console.log('Alarm Websocket - Error: ',error)
      throw error;
    }

    const onError = err => console.info('Alarm Websocket - Error', err);

    const onConnection = () => {
      sock.subscribe(ERROR, errorOnMessage)	//구독, subscribe(destination, callback, headers)
      sock.subscribe(USERREPLY, onMessage);
      sock.subscribe(WEBSOCKLOGIN);
      sock.send(WEBSOCKLOGIN, {}, JSON.stringify({ data: token, resultCode: 3 }));	//send(destination, headers, body)
      this.sock = sock;
     
      this.timer = setInterval(() => {		//websocket 연결 유지를 위해 타이머로 주기적으로 send를 보낸다.
        if(this.sock.connected){		// 연결되어있다면, 메세지 전달
          this.sock.send(HEARTBEAT, {}, JSON.stringify({ data: 'Listening...' }));
        }else{		// 그렇지않다면, timer 종료 후 재연결
          window.clearInterval(this.timer);
          if(token){
            this.openSocket(this.endpoint);
          }
        } 
      }, 30000);
    };

    sock.connect({}, onConnection, onError);	//websocket 연결, connect(hearders, connectCallback, errorCallback)
    sock.debug = () => {};
  };

react Life cycle 중 componentDidMount()에서 openSocket을 호출하여 websocket 연결 및 구독을 합니다.

 componentDidMount() {
    this.endpoint = '';
    if (window.location.protocol === 'https:') {
      this.endpoint = apiUrl.replace('https:', 'wss:');
    } else if (window.location.protocol === 'http:') {
      this.endpoint = apiUrl.replace('http:', 'ws:');
    }
    this.endpoint += apiUrl[apiUrl.length-1] === '/' ? 'websocket' :'/websocket';

    this.openSocket(this.endpoint);
  }

websocket 연결유지를 위해 실행하던 타이머 종료와 websocket의 연결을 끊어 주기 위한 closeSocket 메소드를 작성합니다.

closeSocket = () => {
    window.clearInterval(this.timer);
    if(this.sock){  
      const subscriptions = this.sock.subscriptions;

      Object.keys(subscriptions).forEach(subscription=>{
       this.sock.unsubscribe(subscription)	// 구독해제, unsubscribe(id, headers)
      })
      this.sock.disconnect();		// 연결해제, disconnect(disconnectCallback, headers);
    }
  };


react Life cycle 중 componentWillUnmount()에서 위에서 작성한 closeSocket()을 호출하여 websocket의 구독취소와 연결해제를 합니다.

  componentWillUnmount() {
    this.closeSocket();
  }


결과

마치며

적용하는데까지 꽤나 시간이 걸린 것 같지만, 이번 기회를 통해서 어떠한 서비스 또는 제품을 개발 및 기능 개선을 진행하면서 성능, 부하 등에 대한 지속적인 고민과 개선해 나가야 한다는 좋은 경험을 해볼 수 있었으므로, 해당 내용이 의미 있는 작업이 되었다고 생각이 됩니다. 하나의 기술을 도입해 적용해 나가는 과정에서 많은 협업 과정이 필요했으며 이번 웹 소켓 적용에 도움을 주신 분들께 감사드립니다. 긴 글 읽어주셔서 감사합니다.

bje's profile image

bje

2019-12-23

Read more posts by this author