import os
from unittest.mock import MagicMock, Mock, patch

import pytest
from crewai import Agent, Crew, Task
from crewai.events.listeners.tracing.first_time_trace_handler import (
    FirstTimeTraceHandler,
)
from crewai.events.listeners.tracing.trace_batch_manager import (
    TraceBatch,
    TraceBatchManager,
)
from crewai.events.listeners.tracing.trace_listener import (
    TraceCollectionListener,
)
from crewai.events.listeners.tracing.types import TraceEvent
from crewai.flow.flow import Flow, start
from tests.utils import wait_for_event_handlers


class TestTraceListenerSetup:
    """Test TraceListener is properly setup and collecting events"""

    @pytest.fixture(autouse=True)
    def mock_user_data_file_io(self):
        """Mock user data file I/O to prevent file system pollution between tests"""
        with patch(
            "crewai.events.listeners.tracing.utils._load_user_data",
            return_value={},
        ):
            yield

    @pytest.fixture(autouse=True)
    def mock_auth_token(self):
        """Mock authentication token for all tests in this class"""
        # Need to patch all the places where get_auth_token is imported/used
        with (
            patch(
                "crewai.auth.token.get_auth_token",
                return_value="mock_token_12345",
            ),
            patch(
                "crewai.events.listeners.tracing.trace_listener.get_auth_token",
                return_value="mock_token_12345",
            ),
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
                return_value="mock_token_12345",
            ),
        ):
            yield

    @pytest.fixture(autouse=True)
    def reset_tracing_singletons(self):
        """Reset tracing singleton instances between tests"""
        from crewai.events.event_bus import crewai_event_bus
        from crewai.events.event_listener import EventListener
        from crewai.events.listeners.tracing.utils import _tracing_enabled

        # Reset the tracing enabled contextvar
        try:
            _tracing_enabled.set(None)
        except (LookupError, AttributeError):
            pass

        # Clear event bus handlers BEFORE creating any new singletons
        with crewai_event_bus._rwlock.w_locked():
            crewai_event_bus._sync_handlers = {}
            crewai_event_bus._async_handlers = {}
            crewai_event_bus._handler_dependencies = {}
            crewai_event_bus._execution_plan_cache = {}

        # Reset TraceCollectionListener singleton - must reset instance attributes too
        if TraceCollectionListener._instance is not None:
            # Reset instance attributes that shadow class attributes (only if they exist as instance attrs)
            instance_dict = TraceCollectionListener._instance.__dict__
            if "_initialized" in instance_dict:
                del TraceCollectionListener._instance._initialized
            if "_listeners_setup" in instance_dict:
                del TraceCollectionListener._instance._listeners_setup

        # Reset class attributes
        TraceCollectionListener._instance = None
        TraceCollectionListener._initialized = False
        TraceCollectionListener._listeners_setup = False

        # Reset EventListener singleton
        if hasattr(EventListener, "_instance"):
            EventListener._instance = None

        yield

        # Clean up after test
        with crewai_event_bus._rwlock.w_locked():
            crewai_event_bus._sync_handlers = {}
            crewai_event_bus._async_handlers = {}
            crewai_event_bus._handler_dependencies = {}
            crewai_event_bus._execution_plan_cache = {}

        # Reset TraceCollectionListener singleton - must reset instance attributes too
        if TraceCollectionListener._instance is not None:
            # Reset instance attributes that shadow class attributes (only if they exist as instance attrs)
            instance_dict = TraceCollectionListener._instance.__dict__
            if "_initialized" in instance_dict:
                del TraceCollectionListener._instance._initialized
            if "_listeners_setup" in instance_dict:
                del TraceCollectionListener._instance._listeners_setup

        # Reset class attributes
        TraceCollectionListener._instance = None
        TraceCollectionListener._initialized = False
        TraceCollectionListener._listeners_setup = False

        if hasattr(EventListener, "_instance"):
            EventListener._instance = None

    @pytest.fixture(autouse=True)
    def mock_plus_api_calls(self):
        """Mock all PlusAPI HTTP calls to avoid network requests"""
        with (
            patch("requests.post") as mock_post,
            patch("requests.get") as mock_get,
            patch("requests.put") as mock_put,
            patch("requests.delete") as mock_delete,
            patch.object(TraceBatchManager, "_cleanup_batch_data", return_value=True),
        ):
            mock_response = MagicMock()
            mock_response.status_code = 200
            mock_response.json.return_value = {
                "id": "mock_trace_batch_id",
                "status": "success",
                "message": "Batch created successfully",
            }
            mock_response.raise_for_status.return_value = None

            mock_post.return_value = mock_response
            mock_get.return_value = mock_response
            mock_put.return_value = mock_response
            mock_delete.return_value = mock_response

            mock_mark_failed = MagicMock()
            mock_mark_failed.return_value = mock_response

            yield {
                "post": mock_post,
                "get": mock_get,
                "put": mock_put,
                "delete": mock_delete,
                "mark_trace_batch_as_failed": mock_mark_failed,
            }

    @pytest.mark.vcr()
    def test_trace_listener_collects_crew_events(self):
        """Test that trace listener properly collects events from crew execution"""

        with patch.dict(
            os.environ,
            {
                "CREWAI_TRACING_ENABLED": "true",
                "CREWAI_DISABLE_TELEMETRY": "false",
                "CREWAI_DISABLE_TRACKING": "false",
                "OTEL_SDK_DISABLED": "false",
            },
        ):
            agent = Agent(
                role="Test Agent",
                goal="Test goal",
                backstory="Test backstory",
                llm="gpt-4o-mini",
            )
            task = Task(
                description="Say hello to the world",
                expected_output="hello world",
                agent=agent,
            )
            crew = Crew(agents=[agent], tasks=[task], verbose=True)

            from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
            trace_listener = TraceCollectionListener()

            crew.kickoff()

            initialized = trace_listener.batch_manager.wait_for_batch_initialization(timeout=5.0)

            assert initialized, "Batch should have been initialized"
            assert trace_listener.batch_manager.is_batch_initialized()
            assert trace_listener.batch_manager.current_batch is not None

    @pytest.mark.vcr()
    def test_batch_manager_finalizes_batch_clears_buffer(self):
        """Test that batch manager properly finalizes batch and clears buffer"""

        with patch.dict(
            os.environ,
            {
                "CREWAI_TRACING_ENABLED": "true",
                "CREWAI_DISABLE_TELEMETRY": "false",
                "CREWAI_DISABLE_TRACKING": "false",
                "OTEL_SDK_DISABLED": "false",
            },
        ):
            agent = Agent(
                role="Test Agent",
                goal="Test goal",
                backstory="Test backstory",
                llm="gpt-4o-mini",
            )

            task = Task(
                description="Say hello to the world",
                expected_output="hello world",
                agent=agent,
            )

            crew = Crew(agents=[agent], tasks=[task], verbose=True)

            from crewai.events.event_bus import crewai_event_bus

            trace_listener = None
            with crewai_event_bus._rwlock.r_locked():
                for handler_set in crewai_event_bus._sync_handlers.values():
                    for handler in handler_set:
                        if hasattr(handler, "__self__") and isinstance(
                            handler.__self__, TraceCollectionListener
                        ):
                            trace_listener = handler.__self__
                            break
                    if trace_listener:
                        break
                if not trace_listener:
                    for handler_set in crewai_event_bus._async_handlers.values():
                        for handler in handler_set:
                            if hasattr(handler, "__self__") and isinstance(
                                handler.__self__, TraceCollectionListener
                            ):
                                trace_listener = handler.__self__
                                break
                        if trace_listener:
                            break

            if not trace_listener:
                pytest.skip(
                    "No trace listener found - tracing may not be properly enabled"
                )

            with patch.object(
                trace_listener.batch_manager,
                "finalize_batch",
                wraps=trace_listener.batch_manager.finalize_batch,
            ) as finalize_mock:
                crew.kickoff()

                assert finalize_mock.call_count >= 1

    @pytest.mark.vcr()
    def test_events_collection_batch_manager(self, mock_plus_api_calls):
        """Test that trace listener properly collects events from crew execution"""

        with patch.dict(
            os.environ,
            {
                "CREWAI_TRACING_ENABLED": "true",
                "CREWAI_DISABLE_TELEMETRY": "false",
                "CREWAI_DISABLE_TRACKING": "false",
                "OTEL_SDK_DISABLED": "false",
            },
        ):
            agent = Agent(
                role="Test Agent",
                goal="Test goal",
                backstory="Test backstory",
                llm="gpt-4o-mini",
            )
            task = Task(
                description="Say hello to the world",
                expected_output="hello world",
                agent=agent,
            )
            crew = Crew(agents=[agent], tasks=[task], verbose=True)

            from crewai.events.event_bus import crewai_event_bus

            # Create and setup trace listener explicitly
            trace_listener = TraceCollectionListener()
            trace_listener.setup_listeners(crewai_event_bus)

            with patch.object(
                trace_listener.batch_manager,
                "add_event",
                wraps=trace_listener.batch_manager.add_event,
            ) as add_event_mock:
                crew.kickoff()
                wait_for_event_handlers()

                assert add_event_mock.call_count >= 2

                completion_events = [
                    call.args[0]
                    for call in add_event_mock.call_args_list
                    if call.args[0].type == "crew_kickoff_completed"
                ]
                assert len(completion_events) >= 1

                # Verify the first completion event has proper structure
                completion_event = completion_events[0]
                assert "crew_name" in completion_event.event_data
                assert completion_event.event_data["crew_name"] == "crew"

                # Verify all events have proper structure
                for call in add_event_mock.call_args_list:
                    event = call.args[0]
                    assert isinstance(event, TraceEvent)
                    assert hasattr(event, "event_data")
                    assert hasattr(event, "type")

    @pytest.mark.vcr()
    def test_trace_listener_disabled_when_env_false(self):
        """Test that trace listener doesn't make HTTP calls when tracing is disabled"""

        with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "false"}):
            agent = Agent(
                role="Test Agent",
                goal="Test goal",
                backstory="Test backstory",
                llm="gpt-4o-mini",
            )
            task = Task(
                description="Say hello to the world",
                expected_output="hello world",
                agent=agent,
            )

            crew = Crew(agents=[agent], tasks=[task], verbose=True)
            result = crew.kickoff()
            assert result is not None

            from crewai.events.event_bus import crewai_event_bus

            trace_handlers = []
            with crewai_event_bus._rwlock.r_locked():
                for handlers in crewai_event_bus._sync_handlers.values():
                    for handler in handlers:
                        if hasattr(handler, "__self__") and isinstance(
                            handler.__self__, TraceCollectionListener
                        ):
                            trace_handlers.append(handler)
                for handlers in crewai_event_bus._async_handlers.values():
                    for handler in handlers:
                        if hasattr(handler, "__self__") and isinstance(
                            handler.__self__, TraceCollectionListener
                        ):
                            trace_handlers.append(handler)

            assert len(trace_handlers) == 0, (
                f"Found {len(trace_handlers)} TraceCollectionListener handlers when tracing should be disabled"
            )

    def test_trace_listener_setup_correctly_for_crew(self):
        """Test that trace listener is set up correctly when enabled"""

        with patch.dict(
            os.environ,
            {
                "CREWAI_TRACING_ENABLED": "true",
                "CREWAI_DISABLE_TELEMETRY": "false",
                "CREWAI_DISABLE_TRACKING": "false",
                "OTEL_SDK_DISABLED": "false",
            },
        ):
            agent = Agent(
                role="Test Agent",
                goal="Test goal",
                backstory="Test backstory",
                llm="gpt-4o-mini",
            )
            task = Task(
                description="Say hello to the world",
                expected_output="hello world",
                agent=agent,
            )
            with patch.object(
                TraceCollectionListener, "setup_listeners"
            ) as mock_listener_setup:
                Crew(agents=[agent], tasks=[task], verbose=True)
                assert mock_listener_setup.call_count >= 1

    @pytest.mark.vcr()
    def test_trace_listener_setup_correctly_for_flow(self):
        """Test that trace listener is set up correctly when enabled"""

        with patch.dict(
            os.environ,
            {
                "CREWAI_TRACING_ENABLED": "true",
                "CREWAI_DISABLE_TELEMETRY": "false",
                "CREWAI_DISABLE_TRACKING": "false",
                "OTEL_SDK_DISABLED": "false",
            },
        ):
            class FlowExample(Flow):
                @start()
                def start(self):
                    pass

            with patch.object(
                TraceCollectionListener, "setup_listeners"
            ) as mock_listener_setup:
                FlowExample()
                assert mock_listener_setup.call_count >= 1

    @pytest.mark.vcr()
    def test_trace_listener_ephemeral_batch(self):
        """Test that trace listener properly handles ephemeral batches"""
        with (
            patch.dict(
                os.environ,
                {
                    "CREWAI_TRACING_ENABLED": "true",
                    "CREWAI_DISABLE_TELEMETRY": "false",
                    "CREWAI_DISABLE_TRACKING": "false",
                    "OTEL_SDK_DISABLED": "false",
                },
            ),
            patch(
                "crewai.events.listeners.tracing.trace_listener.TraceCollectionListener._check_authenticated",
                return_value=False,
            ),
        ):
            agent = Agent(
                role="Test Agent",
                goal="Test goal",
                backstory="Test backstory",
                llm="gpt-4o-mini",
            )
            task = Task(
                description="Say hello to the world",
                expected_output="hello world",
                agent=agent,
            )
            crew = Crew(agents=[agent], tasks=[task], tracing=True)

            from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
            trace_listener = TraceCollectionListener()

            crew.kickoff()

            initialized = trace_listener.batch_manager.wait_for_batch_initialization(timeout=5.0)
            assert initialized, (
                "Batch should have been initialized for unauthenticated user"
            )

            wait_for_event_handlers()

    @pytest.mark.vcr()
    def test_trace_listener_with_authenticated_user(self):
        """Test that trace listener properly handles authenticated batches"""
        with patch.dict(
            os.environ,
            {
                "CREWAI_TRACING_ENABLED": "true",
                "CREWAI_DISABLE_TELEMETRY": "false",
                "CREWAI_DISABLE_TRACKING": "false",
                "OTEL_SDK_DISABLED": "false",
            },
        ):
            agent = Agent(
                role="Test Agent",
                goal="Test goal",
                backstory="Test backstory",
                llm="gpt-4o-mini",
            )
            task = Task(
                description="Say hello to the world",
                expected_output="hello world",
                agent=agent,
            )

            from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
            trace_listener = TraceCollectionListener()

            crew = Crew(agents=[agent], tasks=[task], tracing=True)
            crew.kickoff()

            initialized = trace_listener.batch_manager.wait_for_batch_initialization(timeout=5.0)
            assert initialized, (
                "Batch should have been initialized for authenticated user"
            )

            wait_for_event_handlers()

    # Helper method to ensure cleanup
    def teardown_method(self):
        """Cleanup after each test method"""
        from crewai.events.event_bus import crewai_event_bus
        from crewai.events.event_listener import EventListener

        with crewai_event_bus._rwlock.w_locked():
            crewai_event_bus._sync_handlers = {}
            crewai_event_bus._async_handlers = {}
            crewai_event_bus._handler_dependencies = {}
            crewai_event_bus._execution_plan_cache = {}

        # Reset EventListener singleton
        if hasattr(EventListener, "_instance"):
            EventListener._instance = None

    @classmethod
    def teardown_class(cls):
        """Final cleanup after all tests in this class"""
        from crewai.events.event_bus import crewai_event_bus
        from crewai.events.event_listener import EventListener

        with crewai_event_bus._rwlock.w_locked():
            crewai_event_bus._sync_handlers = {}
            crewai_event_bus._async_handlers = {}
            crewai_event_bus._handler_dependencies = {}
            crewai_event_bus._execution_plan_cache = {}

        # Reset EventListener singleton
        if hasattr(EventListener, "_instance"):
            EventListener._instance = None

    @pytest.mark.vcr()
    def test_first_time_user_trace_collection_with_timeout(self, mock_plus_api_calls):
        """Test first-time user trace collection logic with timeout behavior"""

        with (
            patch.dict(
                os.environ,
                {
                    "CREWAI_TRACING_ENABLED": "false",
                    "CREWAI_DISABLE_TELEMETRY": "false",
                    "CREWAI_DISABLE_TRACKING": "false",
                    "OTEL_SDK_DISABLED": "false",
                },
            ),
            patch(
                "crewai.events.listeners.tracing.utils._is_test_environment",
                return_value=False,
            ),
            patch(
                "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.utils.is_first_execution",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing",
                return_value=False,
            ) as mock_prompt,
            patch(
                "crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed"
            ) as mock_mark_completed,
        ):
            agent = Agent(
                role="Test Agent",
                goal="Test goal",
                backstory="Test backstory",
                llm="gpt-4o-mini",
            )
            task = Task(
                description="Say hello to the world",
                expected_output="hello world",
                agent=agent,
            )
            crew = Crew(agents=[agent], tasks=[task], verbose=True)

            from crewai.events.event_bus import crewai_event_bus

            trace_listener = TraceCollectionListener()
            trace_listener.setup_listeners(crewai_event_bus)

            trace_listener.first_time_handler = FirstTimeTraceHandler()
            if trace_listener.first_time_handler.initialize_for_first_time_user():
                trace_listener.first_time_handler.set_batch_manager(trace_listener.batch_manager)

            assert trace_listener.first_time_handler.is_first_time is True
            assert trace_listener.first_time_handler.collected_events is False

            trace_listener.batch_manager.batch_owner_type = "crew"

            result = crew.kickoff()
            wait_for_event_handlers()
            assert result is not None

            assert trace_listener.first_time_handler.collected_events is True, (
                "Events should have been collected"
            )

            mock_prompt.assert_called_once()

            mock_mark_completed.assert_called_once()

    @pytest.mark.vcr()
    def test_first_time_user_trace_collection_user_accepts(self, mock_plus_api_calls):
        """Test first-time user trace collection when user accepts viewing traces"""

        with (
            patch.dict(
                os.environ,
                {
                    "CREWAI_TRACING_ENABLED": "false",
                    "CREWAI_DISABLE_TELEMETRY": "false",
                    "CREWAI_DISABLE_TRACKING": "false",
                    "OTEL_SDK_DISABLED": "false",
                },
            ),
            patch(
                "crewai.events.listeners.tracing.utils._is_test_environment",
                return_value=False,
            ),
            patch(
                "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.utils.is_first_execution",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed"
            ) as mock_mark_completed,
        ):
            agent = Agent(
                role="Test Agent",
                goal="Test goal",
                backstory="Test backstory",
                llm="gpt-4o-mini",
            )
            task = Task(
                description="Say hello to the world",
                expected_output="hello world",
                agent=agent,
            )
            crew = Crew(agents=[agent], tasks=[task], verbose=True)

            from crewai.events.event_bus import crewai_event_bus

            trace_listener = TraceCollectionListener()
            trace_listener.setup_listeners(crewai_event_bus)

            # Re-initialize first-time handler after patches are applied to ensure clean state
            trace_listener.first_time_handler = FirstTimeTraceHandler()
            if trace_listener.first_time_handler.initialize_for_first_time_user():
                trace_listener.first_time_handler.set_batch_manager(trace_listener.batch_manager)

            trace_listener.batch_manager.ephemeral_trace_url = (
                "https://crewai.com/trace/mock-id"
            )

            assert trace_listener.first_time_handler.is_first_time is True

            trace_listener.first_time_handler.collected_events = True

            mock_batch_response = MagicMock()
            mock_batch_response.status_code = 201
            mock_batch_response.json.return_value = {
                "trace_id": "mock-trace-id",
                "ephemeral_trace_id": "mock-ephemeral-trace-id",
                "access_code": "TRACE-mock",
            }
            mock_events_response = MagicMock()
            mock_events_response.status_code = 200

            with (
                patch.object(
                    trace_listener.first_time_handler,
                    "_initialize_backend_and_send_events",
                    wraps=trace_listener.first_time_handler._initialize_backend_and_send_events,
                ) as mock_init_backend,
                patch.object(
                    trace_listener.first_time_handler, "_display_ephemeral_trace_link"
                ) as mock_display_link,
                patch.object(
                    trace_listener.batch_manager.plus_api,
                    "initialize_trace_batch",
                    return_value=mock_batch_response,
                ),
                patch.object(
                    trace_listener.batch_manager.plus_api,
                    "initialize_ephemeral_trace_batch",
                    return_value=mock_batch_response,
                ),
                patch.object(
                    trace_listener.batch_manager.plus_api,
                    "send_trace_events",
                    return_value=mock_events_response,
                ),
                patch.object(
                    trace_listener.batch_manager.plus_api,
                    "send_ephemeral_trace_events",
                    return_value=mock_events_response,
                ),
                patch.object(
                    trace_listener.batch_manager.plus_api,
                    "finalize_trace_batch",
                    return_value=mock_events_response,
                ),
                patch.object(
                    trace_listener.batch_manager.plus_api,
                    "finalize_ephemeral_trace_batch",
                    return_value=mock_events_response,
                ),
                patch.object(
                    trace_listener.batch_manager,
                    "_cleanup_batch_data",
                ),
            ):
                crew.kickoff()
                wait_for_event_handlers()

                mock_init_backend.assert_called_once()

                mock_display_link.assert_called_once()

            mock_mark_completed.assert_called_once()

    @pytest.mark.vcr()
    def test_first_time_user_trace_consolidation_logic(self, mock_plus_api_calls):
        """Test the consolidation logic for first-time users vs regular tracing"""
        with (
            patch.dict(
                os.environ,
                {
                    "CREWAI_TRACING_ENABLED": "",
                    "CREWAI_DISABLE_TELEMETRY": "false",
                    "CREWAI_DISABLE_TRACKING": "false",
                    "OTEL_SDK_DISABLED": "false",
                },
            ),
            patch(
                "crewai.events.listeners.tracing.utils._is_test_environment",
                return_value=False,
            ),
            patch(
                "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.utils.is_first_execution",
                return_value=True,
            ),
        ):
            from crewai.events.event_bus import crewai_event_bus

            with crewai_event_bus._rwlock.w_locked():
                crewai_event_bus._sync_handlers = {}
                crewai_event_bus._async_handlers = {}

            trace_listener = TraceCollectionListener()

            # Re-initialize first-time handler after patches are applied to ensure clean state
            # This is necessary because the singleton may have been created before patches were active
            trace_listener.first_time_handler = FirstTimeTraceHandler()
            if trace_listener.first_time_handler.initialize_for_first_time_user():
                trace_listener.first_time_handler.set_batch_manager(trace_listener.batch_manager)

            trace_listener.setup_listeners(crewai_event_bus)

            assert trace_listener.first_time_handler.is_first_time is True

            agent = Agent(
                role="Test Agent",
                goal="Test goal",
                backstory="Test backstory",
                llm="gpt-4o-mini",
            )
            task = Task(
                description="Test task", expected_output="test output", agent=agent
            )
            crew = Crew(agents=[agent], tasks=[task])

            result = crew.kickoff()

            wait_for_event_handlers()

            assert trace_listener.batch_manager.is_batch_initialized(), (
                "Batch should have been initialized for first-time user"
            )
            assert result is not None

    def test_first_time_handler_timeout_behavior(self):
        """Test the timeout behavior of the first-time trace prompt"""

        with (
            patch(
                "crewai.events.listeners.tracing.utils._is_test_environment",
                return_value=False,
            ),
            patch(
                "crewai.events.listeners.tracing.utils._is_interactive_terminal",
                return_value=True,
            ),
            patch("threading.Thread") as mock_thread,
        ):
            from crewai.events.listeners.tracing.utils import (
                prompt_user_for_trace_viewing,
            )

            mock_thread_instance = Mock()
            mock_thread_instance.is_alive.return_value = True
            mock_thread.return_value = mock_thread_instance

            result = prompt_user_for_trace_viewing(timeout_seconds=5)

            assert result is False
            mock_thread.assert_called_once()
            call_args = mock_thread.call_args
            assert call_args[1]["daemon"] is True

            mock_thread_instance.start.assert_called_once()
            mock_thread_instance.join.assert_called_once_with(timeout=5)
            mock_thread_instance.is_alive.assert_called_once()

    def test_first_time_handler_graceful_error_handling(self):
        """Test graceful error handling in first-time trace logic"""

        with (
            patch(
                "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing",
                side_effect=Exception("Prompt failed"),
            ),
            patch(
                "crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed"
            ) as mock_mark_completed,
        ):
            handler = FirstTimeTraceHandler()
            handler.is_first_time = True
            handler.collected_events = True

            handler.handle_execution_completion()

            mock_mark_completed.assert_called_once()

    def test_trace_batch_marked_as_failed_on_finalize_error(self):
        """Test that trace batch is marked as failed when finalization returns non-200 status"""
        # Test the error handling logic directly in TraceBatchManager
        with patch("crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", return_value=True):
            batch_manager = TraceBatchManager()

            # Initialize a batch
            batch_manager.current_batch = batch_manager.initialize_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={
                    "execution_type": "crew",
                    "crew_name": "test_crew",
                },
            )
            batch_manager.trace_batch_id = "test_batch_id_12345"
            batch_manager.backend_initialized = True

            # Mock the API responses
            with (
                patch.object(
                    batch_manager.plus_api,
                    "send_trace_events",
                    return_value=MagicMock(status_code=200),
                ),
                patch.object(
                    batch_manager.plus_api,
                    "finalize_trace_batch",
                    return_value=MagicMock(status_code=500, text="Internal Server Error"),
                ),
                patch.object(
                    batch_manager.plus_api,
                    "mark_trace_batch_as_failed",
                ) as mock_mark_failed,
            ):
                # Call finalize_batch directly
                batch_manager.finalize_batch()

                # Verify that mark_trace_batch_as_failed was called with the error message
                mock_mark_failed.assert_called_once_with(
                    "test_batch_id_12345", "Internal Server Error"
                )

    def test_ephemeral_batch_includes_anon_id(self):
        """Test that ephemeral batch initialization sends anon_id from get_user_id()"""
        fake_user_id = "abc123def456"

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.get_user_id",
                return_value=fake_user_id,
            ),
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
                return_value=False,
            ),
        ):
            batch_manager = TraceBatchManager()

            mock_response = MagicMock(
                status_code=201,
                json=MagicMock(return_value={
                    "ephemeral_trace_id": "test-trace-id",
                    "access_code": "TRACE-abc123",
                }),
            )

            with patch.object(
                batch_manager.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=mock_response,
            ) as mock_init:
                batch_manager.initialize_batch(
                    user_context={"privacy_level": "standard"},
                    execution_metadata={
                        "execution_type": "crew",
                        "crew_name": "test_crew",
                    },
                    use_ephemeral=True,
                )

                mock_init.assert_called_once()
                payload = mock_init.call_args[0][0]
                assert payload["user_identifier"] == fake_user_id
                assert "ephemeral_trace_id" in payload

    def test_non_ephemeral_batch_does_not_include_anon_id(self):
        """Test that non-ephemeral batch initialization does not send anon_id"""
        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
                return_value=False,
            ),
        ):
            batch_manager = TraceBatchManager()

            mock_response = MagicMock(
                status_code=201,
                json=MagicMock(return_value={"trace_id": "test-trace-id"}),
            )

            with patch.object(
                batch_manager.plus_api,
                "initialize_trace_batch",
                return_value=mock_response,
            ) as mock_init:
                batch_manager.initialize_batch(
                    user_context={"privacy_level": "standard"},
                    execution_metadata={
                        "execution_type": "crew",
                        "crew_name": "test_crew",
                    },
                    use_ephemeral=False,
                )

                mock_init.assert_called_once()
                payload = mock_init.call_args[0][0]
                assert "user_identifier" not in payload


class TestTraceBatchIdClearedOnFailure:
    """Tests: trace_batch_id is cleared when _initialize_backend_batch fails."""

    def _make_batch_manager(self):
        """Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user)."""
        with patch(
            "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
            return_value="mock_token",
        ):
            bm = TraceBatchManager()
        bm.current_batch = TraceBatch(
            user_context={"privacy_level": "standard"},
            execution_metadata={"execution_type": "crew", "crew_name": "test"},
        )
        bm.trace_batch_id = bm.current_batch.batch_id  # simulate line 96
        bm.is_current_batch_ephemeral = True
        return bm

    def test_trace_batch_id_cleared_on_exception(self):
        """trace_batch_id must be None when the API call raises an exception."""
        bm = self._make_batch_manager()
        assert bm.trace_batch_id is not None

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                side_effect=ConnectionError("network down"),
            ),
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=True,
            )

        assert bm.trace_batch_id is None

    def test_trace_batch_id_set_on_success(self):
        """trace_batch_id must be set from the server response on success."""
        bm = self._make_batch_manager()
        server_id = "server-ephemeral-trace-id-999"

        mock_response = MagicMock(
            status_code=201,
            json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
        )

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=mock_response,
            ),
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=True,
            )

        assert bm.trace_batch_id == server_id

    def test_send_events_skipped_when_trace_batch_id_none(self):
        """_send_events_to_backend must return early when trace_batch_id is None."""
        bm = self._make_batch_manager()
        bm.trace_batch_id = None
        bm.event_buffer = [MagicMock()]  # has events

        with patch.object(
            bm.plus_api, "send_ephemeral_trace_events"
        ) as mock_send:
            result = bm._send_events_to_backend()

        assert result == 500
        mock_send.assert_not_called()


class TestInitializeBackendBatchRetry:
    """Tests for retry logic in _initialize_backend_batch."""

    def _make_batch_manager(self):
        """Create a TraceBatchManager with a pre-set trace_batch_id."""
        with patch(
            "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
            return_value="mock_token",
        ):
            bm = TraceBatchManager()
        bm.current_batch = TraceBatch(
            user_context={"privacy_level": "standard"},
            execution_metadata={"execution_type": "crew", "crew_name": "test"},
        )
        bm.trace_batch_id = bm.current_batch.batch_id
        bm.is_current_batch_ephemeral = True
        return bm

    def test_retries_on_none_response_then_succeeds(self):
        """Retries when API returns None, succeeds on second attempt."""
        bm = self._make_batch_manager()
        server_id = "server-id-after-retry"

        success_response = MagicMock(
            status_code=201,
            json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
        )

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                side_effect=[None, success_response],
            ) as mock_init,
            patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=True,
            )

        assert bm.trace_batch_id == server_id
        assert mock_init.call_count == 2
        mock_sleep.assert_called_once_with(0.2)

    def test_retries_on_5xx_then_succeeds(self):
        """Retries on 500 server error, succeeds on second attempt."""
        bm = self._make_batch_manager()
        server_id = "server-id-after-5xx"

        error_response = MagicMock(status_code=500, text="Internal Server Error")
        success_response = MagicMock(
            status_code=201,
            json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
        )

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                side_effect=[error_response, success_response],
            ) as mock_init,
            patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=True,
            )

        assert bm.trace_batch_id == server_id
        assert mock_init.call_count == 2

    def test_no_retry_on_exception(self):
        """Exceptions (e.g. timeout, connection error) abort immediately without retry."""
        bm = self._make_batch_manager()

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                side_effect=ConnectionError("network down"),
            ) as mock_init,
            patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=True,
            )

        assert bm.trace_batch_id is None
        assert mock_init.call_count == 1
        mock_sleep.assert_not_called()

    def test_no_retry_on_4xx(self):
        """Does NOT retry on 422 — client error is not transient."""
        bm = self._make_batch_manager()

        error_response = MagicMock(status_code=422, text="Unprocessable Entity")

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=error_response,
            ) as mock_init,
            patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=True,
            )

        assert bm.trace_batch_id is None
        assert mock_init.call_count == 1
        mock_sleep.assert_not_called()

    def test_exhausts_retries_then_clears_batch_id(self):
        """After all retries fail, trace_batch_id is None."""
        bm = self._make_batch_manager()

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=None,
            ) as mock_init,
            patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=True,
            )

        assert bm.trace_batch_id is None
        assert mock_init.call_count == 2  # initial + 1 retry


class TestFirstTimeHandlerBackendInitGuard:
    """Tests: backend_initialized gated on actual batch creation success."""

    def _make_handler_with_manager(self):
        """Create a FirstTimeTraceHandler wired to a TraceBatchManager."""
        with patch(
            "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
            return_value="mock_token",
        ):
            bm = TraceBatchManager()
        bm.current_batch = TraceBatch(
            user_context={"privacy_level": "standard"},
            execution_metadata={"execution_type": "crew", "crew_name": "test"},
        )
        bm.trace_batch_id = bm.current_batch.batch_id
        bm.is_current_batch_ephemeral = True

        handler = FirstTimeTraceHandler()
        handler.is_first_time = True
        handler.collected_events = True
        handler.batch_manager = bm
        return handler, bm

    def test_backend_initialized_true_on_success(self):
        """Events are sent when batch creation succeeds, then state is cleaned up."""
        handler, bm = self._make_handler_with_manager()
        server_id = "server-id-abc"

        mock_init_response = MagicMock(
            status_code=201,
            json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
        )
        mock_send_response = MagicMock(status_code=200)

        trace_batch_id_during_send = None

        def capture_send(*args, **kwargs):
            nonlocal trace_batch_id_during_send
            trace_batch_id_during_send = bm.trace_batch_id
            return mock_send_response

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=mock_init_response,
            ),
            patch.object(
                bm.plus_api,
                "send_ephemeral_trace_events",
                side_effect=capture_send,
            ),
            patch.object(bm, "_finalize_backend_batch"),
        ):
            bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))]
            handler._initialize_backend_and_send_events()

        # trace_batch_id was set correctly during send
        assert trace_batch_id_during_send == server_id
        # State cleaned up after completion (singleton reuse)
        assert bm.backend_initialized is False
        assert bm.trace_batch_id is None
        assert bm.current_batch is None

    def test_backend_initialized_false_on_failure(self):
        """backend_initialized stays False and events are NOT sent when batch creation fails."""
        handler, bm = self._make_handler_with_manager()

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=None,  # server call fails
            ),
            patch.object(bm, "_send_events_to_backend") as mock_send,
            patch.object(bm, "_finalize_backend_batch") as mock_finalize,
            patch.object(handler, "_gracefully_fail") as mock_fail,
        ):
            bm.event_buffer = [MagicMock()]
            handler._initialize_backend_and_send_events()

        assert bm.backend_initialized is False
        assert bm.trace_batch_id is None
        mock_send.assert_not_called()
        mock_finalize.assert_not_called()
        mock_fail.assert_called_once()

    def test_backend_initialized_false_on_non_2xx(self):
        """backend_initialized stays False when server returns non-2xx."""
        handler, bm = self._make_handler_with_manager()

        mock_response = MagicMock(status_code=500, text="Internal Server Error")

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=mock_response,
            ),
            patch.object(bm, "_send_events_to_backend") as mock_send,
            patch.object(bm, "_finalize_backend_batch") as mock_finalize,
            patch.object(handler, "_gracefully_fail") as mock_fail,
        ):
            bm.event_buffer = [MagicMock()]
            handler._initialize_backend_and_send_events()

        assert bm.backend_initialized is False
        assert bm.trace_batch_id is None
        mock_send.assert_not_called()
        mock_finalize.assert_not_called()
        mock_fail.assert_called_once()


class TestFirstTimeHandlerAlwaysEphemeral:
    """Tests that first-time handler always uses ephemeral with skip_context_check."""

    def _make_handler_with_manager(self):
        with patch(
            "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
            return_value="mock_token",
        ):
            bm = TraceBatchManager()
        bm.current_batch = TraceBatch(
            user_context={"privacy_level": "standard"},
            execution_metadata={"execution_type": "crew", "crew_name": "test"},
        )
        bm.trace_batch_id = bm.current_batch.batch_id
        bm.is_current_batch_ephemeral = True

        handler = FirstTimeTraceHandler()
        handler.is_first_time = True
        handler.collected_events = True
        handler.batch_manager = bm
        return handler, bm

    def test_deferred_init_uses_ephemeral_and_skip_context_check(self):
        """Deferred backend init always uses ephemeral=True and skip_context_check=True."""
        handler, bm = self._make_handler_with_manager()

        with (
            patch.object(bm, "_initialize_backend_batch") as mock_init,
            patch.object(bm, "_send_events_to_backend"),
            patch.object(bm, "_finalize_backend_batch"),
        ):
            mock_init.side_effect = lambda **kwargs: None
            bm.event_buffer = [MagicMock()]
            handler._initialize_backend_and_send_events()

            mock_init.assert_called_once()
            assert mock_init.call_args.kwargs["use_ephemeral"] is True
            assert mock_init.call_args.kwargs["skip_context_check"] is True


class TestAuthFailbackToEphemeral:
    """Tests for ephemeral fallback when server rejects auth (401/403)."""

    def _make_batch_manager(self):
        """Create a TraceBatchManager with a pre-set trace_batch_id."""
        with patch(
            "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
            return_value="mock_token",
        ):
            bm = TraceBatchManager()
        bm.current_batch = TraceBatch(
            user_context={"privacy_level": "standard"},
            execution_metadata={"execution_type": "crew", "crew_name": "test"},
        )
        bm.trace_batch_id = bm.current_batch.batch_id
        bm.is_current_batch_ephemeral = False  # authenticated path
        return bm

    def test_401_non_ephemeral_falls_back_to_ephemeral(self):
        """A 401 on the non-ephemeral endpoint should retry as ephemeral."""
        bm = self._make_batch_manager()
        server_id = "ephemeral-fallback-id"

        auth_rejected = MagicMock(status_code=401, text="Bad credentials")
        ephemeral_success = MagicMock(
            status_code=201,
            json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
        )

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_trace_batch",
                return_value=auth_rejected,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=ephemeral_success,
            ) as mock_ephemeral,
            patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=False,
            )

        assert bm.trace_batch_id == server_id
        assert bm.is_current_batch_ephemeral is True
        mock_ephemeral.assert_called_once()

    def test_403_non_ephemeral_falls_back_to_ephemeral(self):
        """A 403 on the non-ephemeral endpoint should also fall back."""
        bm = self._make_batch_manager()
        server_id = "ephemeral-fallback-403"

        forbidden = MagicMock(status_code=403, text="Forbidden")
        ephemeral_success = MagicMock(
            status_code=201,
            json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
        )

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_trace_batch",
                return_value=forbidden,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=ephemeral_success,
            ),
            patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=False,
            )

        assert bm.trace_batch_id == server_id
        assert bm.is_current_batch_ephemeral is True

    def test_401_on_ephemeral_does_not_recurse(self):
        """A 401 on the ephemeral endpoint should NOT try to fall back again."""
        bm = self._make_batch_manager()
        bm.is_current_batch_ephemeral = True

        auth_rejected = MagicMock(status_code=401, text="Bad credentials")

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=auth_rejected,
            ) as mock_ephemeral,
            patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=True,
            )

        assert bm.trace_batch_id is None
        # Called only once — no recursive fallback
        mock_ephemeral.assert_called()

    def test_401_fallback_ephemeral_also_fails(self):
        """If ephemeral fallback also fails, trace_batch_id is cleared."""
        bm = self._make_batch_manager()

        auth_rejected = MagicMock(status_code=401, text="Bad credentials")
        ephemeral_fail = MagicMock(status_code=422, text="Validation failed")

        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch.object(
                bm.plus_api,
                "initialize_trace_batch",
                return_value=auth_rejected,
            ),
            patch.object(
                bm.plus_api,
                "initialize_ephemeral_trace_batch",
                return_value=ephemeral_fail,
            ),
            patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
        ):
            bm._initialize_backend_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={"execution_type": "crew"},
                use_ephemeral=False,
            )

        assert bm.trace_batch_id is None


class TestMarkBatchAsFailedRouting:
    """Tests: _mark_batch_as_failed routes to the correct endpoint."""

    def _make_batch_manager(self, ephemeral: bool = False):
        with patch(
            "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
            return_value="mock_token",
        ):
            bm = TraceBatchManager()
        bm.is_current_batch_ephemeral = ephemeral
        return bm

    def test_routes_to_ephemeral_endpoint_when_ephemeral(self):
        """Ephemeral batches must use mark_ephemeral_trace_batch_as_failed."""
        bm = self._make_batch_manager(ephemeral=True)

        with patch.object(
            bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
        ) as mock_ephemeral, patch.object(
            bm.plus_api, "mark_trace_batch_as_failed"
        ) as mock_non_ephemeral:
            bm._mark_batch_as_failed("batch-123", "some error")

        mock_ephemeral.assert_called_once_with("batch-123", "some error")
        mock_non_ephemeral.assert_not_called()

    def test_routes_to_non_ephemeral_endpoint_when_not_ephemeral(self):
        """Non-ephemeral batches must use mark_trace_batch_as_failed."""
        bm = self._make_batch_manager(ephemeral=False)

        with patch.object(
            bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
        ) as mock_ephemeral, patch.object(
            bm.plus_api, "mark_trace_batch_as_failed"
        ) as mock_non_ephemeral:
            bm._mark_batch_as_failed("batch-456", "another error")

        mock_non_ephemeral.assert_called_once_with("batch-456", "another error")
        mock_ephemeral.assert_not_called()


class TestBackendInitializedGatedOnSuccess:
    """Tests: backend_initialized reflects actual init success on non-first-time path."""

    def test_backend_initialized_true_on_success(self):
        """backend_initialized is True when _initialize_backend_batch succeeds."""
        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
                return_value=False,
            ),
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
                return_value="mock_token",
            ),
        ):
            bm = TraceBatchManager()
            mock_response = MagicMock(
                status_code=201,
                json=MagicMock(return_value={"trace_id": "server-id"}),
            )
            with patch.object(
                bm.plus_api, "initialize_trace_batch", return_value=mock_response
            ):
                bm.initialize_batch(
                    user_context={"privacy_level": "standard"},
                    execution_metadata={"execution_type": "crew"},
                )

        assert bm.backend_initialized is True
        assert bm.trace_batch_id == "server-id"

    def test_backend_initialized_false_on_failure(self):
        """backend_initialized is False when _initialize_backend_batch fails."""
        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
                return_value=False,
            ),
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
                return_value="mock_token",
            ),
        ):
            bm = TraceBatchManager()
            with patch.object(
                bm.plus_api, "initialize_trace_batch", return_value=None
            ):
                bm.initialize_batch(
                    user_context={"privacy_level": "standard"},
                    execution_metadata={"execution_type": "crew"},
                )

        assert bm.backend_initialized is False
        assert bm.trace_batch_id is None


class TestTraceBatchManagerDuplicateInitMerge:
    """Second initialize_batch call merges execution_metadata (flow after lazy action)."""

    def test_duplicate_initialize_merges_execution_metadata(self):
        with (
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
                return_value=True,
            ),
            patch(
                "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
                return_value=True,
            ),
        ):
            bm = TraceBatchManager()
            bm.initialize_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={
                    "crew_name": "Unknown Crew",
                    "crewai_version": "9.9.9",
                },
            )
            first_batch_id = bm.current_batch.batch_id
            bm.initialize_batch(
                user_context={"privacy_level": "standard"},
                execution_metadata={
                    "flow_name": "ResearchFlow",
                    "execution_type": "flow",
                    "crewai_version": "9.9.9",
                    "execution_start": "2026-01-01T00:00:00+00:00",
                },
            )

        assert bm.current_batch.batch_id == first_batch_id
        meta = bm.current_batch.execution_metadata
        assert meta.get("execution_type") == "flow"
        assert meta.get("flow_name") == "ResearchFlow"
        assert meta.get("crew_name") == "Unknown Crew"
