Android

[Android] Socket.io (feat. Channel)

김한토 2025. 1. 21. 16:21
반응형

글 보기 전 참고 자료

 

https://blog.naver.com/ghdalswl77/222401162545

 

API 와 Endpoint ? (둘 다 정확히 알고 있다면 안 봐도 되는 글)

API 와 Endpoint의 차이점? 둘 다 정확히 알고 있다면 안 봐도 되는 글. API 와 Endpoint의 차이 ...

blog.naver.com

 

https://velog.io/@rhdmstj17/%EC%86%8C%EC%BC%93%EA%B3%BC-%EC%9B%B9%EC%86%8C%EC%BC%93-%ED%95%9C-%EB%B2%88%EC%97%90-%EC%A0%95%EB%A6%AC-1

 

[소켓과 웹소켓] 한 번에 정리 (1) | 소켓이란?, 소켓 API의 실행 흐름, 클라이언트 소켓과 서버 소

넷응설 예습 스터디를 위해 소켓 프로그래밍에 대해 자료를 정리하던중 ... http와 TCP/IP소켓 웹소켓 등등 .. 비슷한듯 다른 개념들이 정리가 안되고 헷갈려가지고 한 번에 정리해보려한다. 이번

velog.io

 

https://www.peterkimzz.com/websocket-vs-socket-io

 

웹소켓과 socket.io

예전에 회사 프로젝트를 진행할 때, 지도에 실시간으로 사용자의 위치를 보여주는 기능이 필요해서 socket.io 를 사용해서 구현했던 적이 있습니다.

www.peterkimzz.com

 

https://kotlinlang.org/docs/channels.html#channel-basics

 

Channels | Kotlin

 

kotlinlang.org

 

https://medium.com/hongbeomi-dev/%EC%BD%94%ED%8B%80%EB%A6%B0%EC%9D%98-%EC%BD%94%EB%A3%A8%ED%8B%B4-6-channels-3c9ab42df14f

 

코틀린의 코루틴 — 6. Channels

코루틴의 Channels에 대해 알아봅니다.

medium.com

 

https://huisam.tistory.com/entry/coroutine-channel

 

[Kotlin] Coroutine - 5. 코루틴의 Channel 의 모든 것

안녕하세요~! Coroutine 에서 제공하는 다양한 Interfa ce 중에서 Queue 와 비슷한 개념을 가진 Channel 에 대해서 알아볼려고 해요 Channel 이란 Channel 은 쉽게 말씀드리면 데이터를 stream 처럼 전송하기 위

huisam.tistory.com

 

// Socket.IO 클라이언트
    implementation 'io.socket:socket.io-client:2.0.0'


class SocketUtils(
    private var context: Context,
    private val url: String,
    private val path: String,
    private val reconnect: Boolean,
    private val maxValue: Int,
    private val reconnectionDelay: Int
) {
    companion object {
        private const val TAG = "SocketUtil"
    }

    private val socket: Socket? by lazy {
        try {
            IO.socket(url, IO.Options().apply {
                this.reconnection = reconnect
                this.reconnectionAttempts = maxValue
                this.reconnectionDelay = reconnectionDelay.toLong()
                this.transports = arrayOf("websocket")
            })
        } catch (e: Exception) {
            Log.e(TAG, "Error initializing socket", e)
            null
        }
    }

    private lateinit var sharedPreferences: SharedPreferences

    private val dataChannel = Channel<String>(10)

    // Socket 연결을 위한 코루틴 Job
    private var socketJob: Job? = null

    /**
     * 이 스코프를 통해 비동기 작업을 실행하며, connect와 disconnect에서 관리됨.
     */
    private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

    /**
     * 소켓 연결을 시작합니다.
     * 연결에 성공하면 이벤트 리스너를 설정하고, 데이터 처리 채널을 활성화합니다.
     */
    fun connect() {
        if (socket?.connected() == true) {
            Log.d(TAG, "Socket already connected, skipping connection")
            return
        }

        socketJob = coroutineScope.launch {
            socket?.apply {
                if (!connected()) {
                    Log.d(TAG, "Attempting to connect socket")

                    setOnEvent()
                    connect()

                    on(Socket.EVENT_CONNECT) {
                        Log.d(TAG, "Socket connected")

                        ChannelUtils.startProcessingChannel(dataChannel, coroutineScope) { data ->
                            handleSocketData(data)
                        }
                    }
                }
            } ?: Log.e(TAG, "Socket is null")
        }
    }


    /**
     * 소켓 연결을 종료하고, 이벤트 리스너를 제거하며, 코루틴 스코프를 정리합니다.
     */
    fun disconnect() {
        Log.d(TAG, "Socket.IO disconnect")

        socket?.apply {
            disconnect()
            off() // 이벤트 리스너 해제
        }

        ChannelUtils.closeChannel(dataChannel)

        socketJob?.cancel() // 기존 코루틴 작업 취소
        socketJob = null
    }


    /**
     * 소켓이 연결되어 있는지 확인합니다.
     *
     * @return 연결 상태 (true: 연결됨, false: 연결되지 않음)
     */
    fun isConnected(): Boolean = socket?.connected() == true

    /**
     * 소켓 이벤트 리스너를 등록합니다.
     * 각 이벤트가 발생하면 데이터를 채널에 전송하거나 응답을 반환합니다.
     */
    private fun setOnEvent() {
        socket?.apply {
            off()

            onEvent(ex1) { args ->
                handleSocketEvent("ex1", "ex1_dataAck", "데이터 수신 성공", args)
            }

            onEvent(ex2) { args ->
                handleSocketEvent("ex2", "ex2_dataAck", "Success", args)
            }

            onEvent("duplication") { args ->
                Log.d(TAG, "duplication event received")
            }

        } ?: Log.e(TAG, "Socket is null. Cannot set events.")
    }

    private fun handleSocketEvent(
        eventName: String,
        ackEventName: String,
        successMessage: String,
        args: Array<Any>,
    ) {
        Log.d(TAG, "handleSocketEvent $eventName")


        val data = args.getJSONObjectOrNull(1)?.toString()

        Log.d(TAG, "handleSocketEvent $data")


        data?.let {
            ChannelUtils.sendItemToChannel(dataChannel, it)
            Log.d(TAG, "$eventName: Data sent to channel")
        }

        val response = createResponse("E000", successMessage)
        if (response != null) {
            sendResponse(ackEventName, response, args) { ackArgs ->
                Log.d(TAG, "$ackEventName response sent: ${ackArgs[0]}")
            }
        }

    }


    /**
     * 이벤트 인자 배열에서 지정된 인덱스의 JSON 객체를 안전하게 가져옵니다.
     *
     * @param index JSON 객체를 가져올 인덱스
     * @return JSONObject 또는 null
     */
    private fun Array<Any>.getJSONObjectOrNull(index: Int): JSONObject? {
        return try {
            this.getOrNull(index) as? JSONObject
        } catch (e: Exception) {
            Log.e(TAG, "Error parsing JSONObject at index $index", e)
            null
        }
    }


    /**
     * 특정 이벤트를 소켓에 등록합니다.
     *
     * @param event 이벤트 이름
     * @param callback 이벤트가 발생했을 때 실행될 리스너
     */
    private fun onEvent(event: String, callback: Emitter.Listener) {
        socket?.on(event, callback)
    }

    /**
     * 주어진 코드와 메시지를 포함하는 JSON 객체를 생성합니다.
     *
     * @param code 응답 코드
     * @param message 응답 메시지
     * @return 생성된 JSON 객체
     */
    private fun createResponse(code: String, message: String): JSONObject {
        Log.d(TAG, "createResponse")
        val response = JSONObject()
        try {
            response.put("code", code)
            response.put("message", message)
        } catch (e: java.lang.Exception) {
            Log.e(TAG, "JSONObject 생성 실패: " + e.message)
        }
        return response
    }


    /**
     * 이벤트에 대한 응답 데이터를 소켓으로 전송합니다.
     *
     * @param event 응답할 이벤트 이름
     * @param data 전송할 JSON 데이터
     * @param args 이벤트 인자 배열
     * @param callback 응답 전송 후 실행될 리스너
     */
    private fun sendResponse(
        event: String,
        data: JSONObject,
        args: Array<Any>,
        callback: Emitter.Listener,
    ) {
        socket?.emit(event, args.getOrNull(0), args.getOrNull(1), data, 0, callback)
        Log.d(TAG, "서버에 응답: " + args[0] as JSONObject)
    }


    private fun sendMessageWithTimeout(
        emitEvent: String,
        jsonString: String,
        responseEvent: String,
        onTimeout: () -> Unit,
        onResponseReceived: (response: Any) -> Unit,
    ) {
        if (socket != null && socket!!.connected()) {
            Log.d(TAG, "JSON Data sent : $jsonString")
            socket!!.emit(emitEvent, jsonString)

            // 타임아웃 핸들러 및 Runnable 설정
            val handler = Handler(Looper.getMainLooper())
            val timeoutRunnable = Runnable {
                Log.e(TAG, "Timeout occurred while waiting for response on $responseEvent")
                onTimeout()
            }

            handler.postDelayed(timeoutRunnable, TIMEOUT_MS.toLong())

            // 서버로부터 응답 받았을 때 처리
            socket!!.on(responseEvent) { args ->
                handler.removeCallbacks(timeoutRunnable)
                if (args.isNotEmpty()) {
                    Log.d(TAG, "Response received from $responseEvent: ${args[0]}")
                    onResponseReceived(args[0])
                } else {
                    Log.e(TAG, "No data received from server on $responseEvent")
                }
            }
        } else {
            Log.e(TAG, "Socket is not connected")
        }
    }

}

 

데이터 순차처리를 위해 channel을 사용


class MainApplication : Application() {

    private val TAG = "MainApplication"
    private var socketUtils: SocketUtils? = null

    private var sharedPreferences: SharedPreferences? = null

    override fun onCreate() {
        super.onCreate()
        instance = this

        sharedPreferences =
            PreferenceManager.getDefaultSharedPreferences(this.applicationContext) 

    }

    companion object {
        private var instance: MainApplication? = null

        fun getInstance(): MainApplication {
            if (instance == null) {
                throw IllegalStateException("MainApplication is not initialized!")
            }
            return instance!!
        }

    }

    /**
     * 소켓 Process 시작
     * POSBANK 소켓 초기 연결 세팅을 한다.
     */
    fun runSocketProcess() {
        
        // 소켓 초기화
        socketUtils = SocketUtils(
            applicationContext,
            "https://socket.ex.com",
            "/socket.io",
            true,
            Int.MAX_VALUE,
            1000
        )

        // 소켓 연결
        socketUtils?.connect()
    }



    fun getSocketUtils(): SocketUtils? {
        return socketUtils
    }

    fun offSocketProcess() {
        Log.d(TAG, "offSocketProcess()")

        if (socketUtils?.isConnected() == true) {
            // 앱 종료 시 소켓 종료
            try {
                socketUtils?.disconnect()
                socketUtils = null
            } catch (e: Exception) {
                Log.e(TAG, "Error disconnecting from socket", e)
            }
        }

    }

    override fun onTerminate() {
        Log.d(TAG, "onTerminate()")
        super.onTerminate()

        // 앱 종료 시 소켓 종료
        try {
            socketUtils?.disconnect()
            socketUtils = null
        } catch (e: Exception) {
            Log.e(TAG, "Error disconnecting from socket", e)
        }
    }


}

 

MainActivity 실행시,

MainApplication.getInstance().runSocketProcess()

 

을 통해 Socket과 연결

반응형