X Tutup
Skip to content

devmrko/autonomous_ai_database_vectorize_process

Repository files navigation

Autonomous DB 문서 벡터화 파이프라인

0) 전체 흐름

  • DBMS_SCHEDULER (poller) 가 30초~5분마다 실행
  • pollerDBMS_CLOUD.LIST_OBJECTS 로 버킷 목록을 가져옴
  • 처리 이력 테이블을 보고 **"새 파일(또는 변경된 파일)"**만 doc_ingest_jobs에 INSERT
  • worker 스케줄러가 job을 가져와서 2단계로 처리

2단계 파이프라인

┌─────────────────────────────────────────────────────────────────────────┐
│                          Status Flow                                     │
│                                                                         │
│  PENDING ──► CHUNKING ──► CHUNKED ──► EMBEDDING ──► DONE               │
│                 │                         │                             │
│                 ▼                         ▼                             │
│            CHUNK_ERROR              EMBED_ERROR                         │
│                 │                         │                             │
│                 └── (retry) ──►          └── (retry) ──►               │
└─────────────────────────────────────────────────────────────────────────┘
단계 Stage 1 (청킹) Stage 2 (임베딩)
Worker chunk_worker embed_worker
프로시저 ingest_stage1_chunk ingest_stage2_embed
상태 변화 PENDING → CHUNKING → CHUNKED CHUNKED → EMBEDDING → DONE
실패 상태 CHUNK_ERROR EMBED_ERROR
처리 내용 다운로드 + 텍스트 추출 + 청킹 벡터 임베딩 생성

Stage 1 처리:

  1. Object Storage에서 다운로드 (DBMS_CLOUD.GET_OBJECT) → BLOB 로드
  2. (PDF) DBMS_VECTOR_CHAIN.UTL_TO_TEXT(blob) → CLOB 텍스트
  3. UTL_TO_CHUNKS(text) → 청크
  4. doc_chunks에 저장 (embed_vector = NULL)
  5. 상태: CHUNKED

Stage 2 처리:

  1. embed_vector가 NULL인 청크 조회
  2. 임베딩: DBMS_CLOUD_AI.GENERATE(..., profile_name => 'OCI_COHERE_EMBED', action => 'embedding')
  3. doc_chunks.embed_vector 업데이트
  4. 상태: DONE

장점:

  • 각 단계별 상태 추적 가능 (어디서 실패했는지 명확)
  • Stage 2 실패 시 청크는 보존됨 (Stage 1부터 재시작 불필요)
  • 임베딩 API rate limit 별도 관리 가능

핵심 포인트: 중복 방지(idempotency)를 위해 "파일 고유키(object_name + etag(or md5))" 를 저장해야 합니다.
그래야 같은 파일을 poller가 반복해서 job에 넣지 않습니다.


1) 준비물 (사전 설정)

1-1. Object Storage 접근 Credential

ADB에서 Object Storage를 조회/다운로드하려면 credential이 필요합니다.

BEGIN
  DBMS_CLOUD.CREATE_CREDENTIAL(
    credential_name => 'OBJ_STORE_CRED2',
    username        => '<oci-username-or-user-ocid>',
    password        => '<auth-token>'
  );
END;
/

1-2. GenAI(임베딩) 호출용 Credential

UTL_TO_EMBEDDING으로 ocigenai를 쓰려면 OCI GenAI용 credential이 필요합니다.

-- OCI GenAI(ocigenai)용 credential. Object Storage credential과 별도로 생성.
BEGIN
  DBMS_VECTOR_CHAIN.CREATE_CREDENTIAL(
    credential_name => 'OCI_GENAI_CRED',
    params          => JSON('{
      "user_ocid"       : "<user-ocid>",
      "tenancy_ocid"    : "<tenancy-ocid>",
      "compartment_ocid": "<compartment-ocid>",
      "private_key"     : "<private-key-string-without-BEGIN-END-lines>",
      "fingerprint"     : "<key-fingerprint>"
    }')
  );
END;
/

1-3. 임베딩용 DBMS_CLOUD_AI 프로필 (권장: Select AI와 동일 credential 사용)

Select AI 채팅이 GENAI_CRED로 동작한다면, 같은 credential로 임베딩도 하려면 DBMS_CLOUD_AI임베딩 전용 프로필을 하나 만듭니다.

BEGIN
  DBMS_CLOUD_AI.CREATE_PROFILE(
    profile_name => 'OCI_COHERE_EMBED',
    attributes   => '{
      "provider": "oci",
      "credential_name": "GENAI_CRED",
      "oci_compartment_id": "ocid1.tenancy.oc1..aaaaaaaa...",
      "oci_apiformat": "COHERE",
      "embedding_model": "cohere.embed-v4.0"
    }'
  );
END;
/
  • oci_compartment_id: 테넌시 OCID 또는 사용 중인 컴파트먼트 OCID.
  • 채팅용 프로필(GENAI)과 별도로, 임베딩 전용 프로필 이름(OCI_COHERE_EMBED)만 구분해 두면 됩니다.

2) 테이블 설계 (Queue + Manifest + Output)

2-1. manifest 테이블: "이미 본 오브젝트" 기록

이게 있어야 폴링이 안정적으로 동작합니다.

CREATE TABLE obj_manifest (
  bucket_name    VARCHAR2(255) NOT NULL,
  object_name    VARCHAR2(1024) NOT NULL,
  etag           VARCHAR2(200),
  size_bytes     NUMBER,
  last_modified  TIMESTAMP,
  first_seen_at  TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
  last_seen_at   TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
  processed_flag CHAR(1) DEFAULT 'N' NOT NULL,  -- N/Y
  CONSTRAINT obj_manifest_pk PRIMARY KEY (bucket_name, object_name, etag)
);

CREATE INDEX obj_manifest_i1 ON obj_manifest(processed_flag, last_seen_at);
  • etag는 보통 오브젝트 내용이 바뀌면 바뀌는 값이라 "버전 키"로 쓰기 좋습니다.
  • 만약 LIST_OBJECTS 결과에 etag가 없거나 쓰기 어렵다면:
    (bucket_name, object_name, size_bytes, last_modified) 조합을 유사 키로 사용 (덜 완벽)

2-2. job queue 테이블

CREATE TABLE doc_ingest_jobs (
  job_id        NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
  bucket_name   VARCHAR2(255) NOT NULL,
  object_name   VARCHAR2(1024) NOT NULL,
  object_uri    VARCHAR2(2000),
  content_type  VARCHAR2(255),
  size_bytes    NUMBER,
  etag          VARCHAR2(200),
  status        VARCHAR2(30) DEFAULT 'PENDING' NOT NULL,
  -- Status values (multi-stage pipeline):
  --   PENDING     : 새 job, Stage1 대기
  --   CHUNKING    : Stage1 실행 중 (다운로드 + 텍스트 추출 + 청킹)
  --   CHUNKED     : Stage1 완료, 청크 저장됨 (벡터 없음), Stage2 대기
  --   CHUNK_ERROR : Stage1 실패
  --   EMBEDDING   : Stage2 실행 중 (임베딩 생성)
  --   DONE        : 모든 단계 완료
  --   EMBED_ERROR : Stage2 실패 (청크는 있으나 벡터 미완료)
  attempts      NUMBER DEFAULT 0 NOT NULL,
  created_at    TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
  started_at    TIMESTAMP,
  chunked_at    TIMESTAMP,  -- Stage1 완료 시각
  finished_at   TIMESTAMP,
  error_msg     CLOB
);

CREATE INDEX doc_ingest_jobs_i1 ON doc_ingest_jobs(status, created_at);
CREATE UNIQUE INDEX doc_ingest_jobs_u1 ON doc_ingest_jobs(bucket_name, object_name, etag);

UNIQUE (bucket_name, object_name, etag)를 걸어두면 poller가 실수로 중복 INSERT 해도 DB가 막아줍니다.

기존 테이블에 컬럼 추가 (마이그레이션):

-- 기존 테이블에 chunked_at 컬럼 추가
ALTER TABLE doc_ingest_jobs ADD (chunked_at TIMESTAMP);

2-3. 청크/벡터 저장 테이블

CREATE TABLE doc_chunks (
  job_id        NUMBER NOT NULL,
  chunk_id      NUMBER NOT NULL,
  chunk_offset  NUMBER,
  chunk_length  NUMBER,
  chunk_text    CLOB,
  embed_vector  VECTOR,
  meta_json     JSON,
  CONSTRAINT doc_chunks_pk PRIMARY KEY (job_id, chunk_id),
  CONSTRAINT doc_chunks_fk1 FOREIGN KEY (job_id) REFERENCES doc_ingest_jobs(job_id)
);

3) poller 프로시저: 버킷을 조회해 신규 오브젝트를 Job으로 적재

3-1. LIST_OBJECTS로 목록 가져오기

DBMS_CLOUD.LIST_OBJECTS는 Object Storage 버킷의 오브젝트 목록을 반환합니다.
(출력 컬럼은 ADB/버전/옵션에 따라 조금씩 다를 수 있어요. 가장 안전한 방식은 "JSON 출력"을 받아 JSON_TABLE로 파싱하는 것입니다.)

여기서는 "개념적으로" 아래 형태로 구현합니다.

(A) poller 예시 (유니코드 파일명 지원)

중요: 한글/유니코드 파일명을 올바르게 처리하려면 UTL_URL.ESCAPE로 URL 인코딩해야 합니다. 또한 OCI 환경에 따라 URL 도메인이 다를 수 있습니다:

  • 표준: objectstorage.<region>.oraclecloud.com
  • 전용: <namespace>.objectstorage.<region>.oci.customer-oci.com
CREATE OR REPLACE PROCEDURE poll_object_storage_to_jobs IS
  v_namespace    VARCHAR2(200) := '<your-namespace>';
  v_bucket       VARCHAR2(255) := '<your-bucket>';
  v_region       VARCHAR2(50)  := '<region>'; -- ap-seoul-1 등
  v_credential   VARCHAR2(100) := 'GENAI_CRED';  -- 또는 'OBJ_STORE_CRED2'
  v_uri_base     VARCHAR2(2000);
  v_list_uri     VARCHAR2(2000);
BEGIN
  -- URL 베이스 설정 (환경에 맞게 수정)
  -- 옵션 1: 표준 OCI
  -- v_uri_base := 'https://objectstorage.' || v_region || '.oraclecloud.com/n/' ||
  --               v_namespace || '/b/' || v_bucket || '/o/';
  -- 옵션 2: 전용 고객 도메인 (Dedicated Region 등)
  v_uri_base := 'https://' || v_namespace || '.objectstorage.' || v_region || 
                '.oci.customer-oci.com/n/' || v_namespace || '/b/' || v_bucket || '/o/';
  
  v_list_uri := v_uri_base;

  /* 1) obj_manifest upsert */
  MERGE INTO obj_manifest m
  USING (
    SELECT
      v_bucket AS bucket_name,
      o.object_name,
      o.bytes       AS size_bytes,
      o.checksum    AS etag,
      o.last_modified
    FROM TABLE(
      DBMS_CLOUD.LIST_OBJECTS(
        credential_name => v_credential,
        location_uri    => v_list_uri
      )
    ) o
  ) s
  ON (m.bucket_name = s.bucket_name
      AND m.object_name = s.object_name
      AND m.etag = s.etag)
  WHEN MATCHED THEN
    UPDATE SET
      m.last_seen_at   = SYSTIMESTAMP,
      m.size_bytes     = s.size_bytes,
      m.last_modified  = s.last_modified
  WHEN NOT MATCHED THEN
    INSERT (bucket_name, object_name, etag, size_bytes, last_modified, processed_flag, last_seen_at)
    VALUES (s.bucket_name, s.object_name, s.etag, s.size_bytes, s.last_modified, 'N', SYSTIMESTAMP);

  /* 2) 신규(N) -> job queue (UTL_URL.ESCAPE로 유니코드 인코딩) */
  MERGE INTO doc_ingest_jobs j
  USING (
    SELECT
      m.bucket_name,
      m.object_name,
      m.etag,
      m.size_bytes,
      -- UTL_URL.ESCAPE: 유니코드/특수문자 URL 인코딩 (한글 파일명 지원)
      v_uri_base || UTL_URL.ESCAPE(m.object_name, TRUE) AS object_uri
    FROM obj_manifest m
    WHERE m.bucket_name = v_bucket
      AND m.processed_flag = 'N'
  ) s
  ON (j.bucket_name = s.bucket_name AND j.object_name = s.object_name AND j.etag = s.etag)
  WHEN NOT MATCHED THEN
    INSERT (bucket_name, object_name, etag, size_bytes, object_uri, status)
    VALUES (s.bucket_name, s.object_name, s.etag, s.size_bytes, s.object_uri, 'PENDING');

  UPDATE obj_manifest
     SET processed_flag = 'Y'
   WHERE bucket_name = v_bucket
     AND processed_flag = 'N';

  COMMIT;
END;
/

한글 파일명 트러블슈팅:

macOS에서 업로드된 파일은 NFD(분해형) 유니코드를 사용하고, Windows/Linux는 NFC(조합형)를 사용합니다. UTL_URL.ESCAPE는 둘 다 올바르게 인코딩합니다.

-- 테스트: 유니코드 파일명 URL 인코딩 확인
SELECT object_name, 
       UTL_URL.ESCAPE(object_name, TRUE) AS encoded_name
  FROM TABLE(
    DBMS_CLOUD.LIST_OBJECTS(
      credential_name => 'GENAI_CRED',
      location_uri    => 'https://<namespace>.objectstorage.<region>.oci.customer-oci.com/n/<namespace>/b/<bucket>/o/'
    )
  );

(B) 기존 job URL 수정 (마이그레이션)

이미 생성된 job의 object_uri가 잘못된 경우 아래 SQL로 일괄 수정합니다:

-- 환경 변수 설정 (실제 값으로 변경)
DEFINE v_namespace = 'apackrsct01'
DEFINE v_bucket = 'knowledge-base'
DEFINE v_region = 'ap-seoul-1'

-- 잘못된 URL을 가진 기존 job들의 object_uri 수정
UPDATE doc_ingest_jobs
   SET object_uri = 'https://&v_namespace..objectstorage.&v_region..oci.customer-oci.com/n/&v_namespace./b/&v_bucket./o/' 
                    || UTL_URL.ESCAPE(object_name, TRUE),
       status = CASE 
                  WHEN status IN ('CHUNK_ERROR', 'ERROR') THEN 'PENDING'
                  WHEN status IN ('EMBED_ERROR') THEN 'CHUNKED'
                  ELSE status 
                END
 WHERE object_uri NOT LIKE '%oci.customer-oci.com%'
    OR object_uri NOT LIKE '%' || UTL_URL.ESCAPE(object_name, TRUE);

COMMIT;

-- 또는 PL/SQL 블록으로 실행
DECLARE
  v_namespace VARCHAR2(200) := 'apackrsct01';
  v_bucket    VARCHAR2(255) := 'knowledge-base';
  v_region    VARCHAR2(50)  := 'ap-seoul-1';
  v_uri_base  VARCHAR2(2000);
BEGIN
  v_uri_base := 'https://' || v_namespace || '.objectstorage.' || v_region || 
                '.oci.customer-oci.com/n/' || v_namespace || '/b/' || v_bucket || '/o/';
  
  -- 모든 job의 object_uri 재생성
  UPDATE doc_ingest_jobs
     SET object_uri = v_uri_base || UTL_URL.ESCAPE(object_name, TRUE);
  
  -- 실패한 job들 재시도 상태로 변경
  UPDATE doc_ingest_jobs
     SET status = 'PENDING'
   WHERE status IN ('CHUNK_ERROR', 'ERROR');
  
  UPDATE doc_ingest_jobs
     SET status = 'CHUNKED'
   WHERE status = 'EMBED_ERROR';
  
  COMMIT;
  DBMS_OUTPUT.PUT_LINE('Updated ' || SQL%ROWCOUNT || ' jobs');
END;
/

-- 수정 결과 확인
SELECT job_id, object_name, status, 
       SUBSTR(object_uri, 1, 80) || '...' AS uri_preview
  FROM doc_ingest_jobs
 ORDER BY job_id;

현실 체크 (중요)

  • LIST_OBJECTS 반환 형태(테이블/JSON)가 환경마다 달라질 수 있습니다.
    → 실제 ADB에서 SELECT * FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(...)) 같은 방식이 가능하면 그 방식이 더 간단합니다.
  • 위 코드는 "흐름"이 핵심이고, JSON 경로/필드명은 실제 반환값에 맞게 한번만 맞춰주면 계속 안정적으로 돌 수 있습니다.

4) worker 프로시저: 2단계 파이프라인 (청킹 → 임베딩)

파이프라인을 2단계로 분리하여 각 단계별 상태 추적과 재시도가 가능합니다.

[상태 흐름]
PENDING → CHUNKING → CHUNKED → EMBEDDING → DONE
              ↓           ↓
         CHUNK_ERROR  EMBED_ERROR

4-1. Stage 1: 다운로드 + 텍스트 추출 + 청킹 (임베딩 제외)

CREATE OR REPLACE PROCEDURE ingest_stage1_chunk(p_job_id IN NUMBER) IS
  v_uri         VARCHAR2(2000);
  v_blob        BLOB;
  v_text        CLOB;
  v_error_msg   VARCHAR2(4000);
BEGIN
  -- Job claim: PENDING → CHUNKING
  UPDATE doc_ingest_jobs
     SET status     = 'CHUNKING',
         started_at = SYSTIMESTAMP,
         attempts   = attempts + 1
   WHERE job_id = p_job_id
     AND status IN ('PENDING', 'CHUNK_ERROR');  -- 재시도 지원

  IF SQL%ROWCOUNT = 0 THEN RETURN; END IF;

  -- Clean up any existing chunks from previous failed attempts
  DELETE FROM doc_chunks WHERE job_id = p_job_id;

  SELECT object_uri INTO v_uri
    FROM doc_ingest_jobs WHERE job_id = p_job_id;

  -- Download object into DATA_PUMP_DIR
  DBMS_CLOUD.GET_OBJECT(
    credential_name => 'GENAI_CRED',
    object_uri      => v_uri,
    directory_name  => 'DATA_PUMP_DIR',
    file_name       => 'ingest_' || p_job_id
  );

  -- Load as BLOB
  SELECT TO_BLOB(BFILENAME('DATA_PUMP_DIR', 'ingest_' || p_job_id))
    INTO v_blob FROM dual;

  -- PDF/TXT → text
  v_text := DBMS_VECTOR_CHAIN.UTL_TO_TEXT(v_blob);

  -- Chunks insert (embed_vector = NULL, Stage2에서 처리)
  FOR rec IN (
    SELECT JSON_VALUE(TO_CLOB(c.column_value), '$.chunk_id' RETURNING NUMBER) AS chunk_id,
           JSON_VALUE(TO_CLOB(c.column_value), '$.chunk_offset' RETURNING NUMBER) AS chunk_offset,
           JSON_VALUE(TO_CLOB(c.column_value), '$.chunk_length' RETURNING NUMBER) AS chunk_length,
           JSON_VALUE(TO_CLOB(c.column_value), '$.chunk_data') AS chunk_data
      FROM TABLE(DBMS_VECTOR_CHAIN.UTL_TO_CHUNKS(v_text)) c
  ) LOOP
    INSERT INTO doc_chunks(
      job_id, chunk_id, chunk_offset, chunk_length,
      chunk_text, embed_vector, meta_json
    ) VALUES (
      p_job_id,
      rec.chunk_id,
      rec.chunk_offset,
      rec.chunk_length,
      rec.chunk_data,
      NULL,  -- embed_vector는 Stage2에서 채움
      JSON_OBJECT('source_uri' VALUE v_uri)
    );
  END LOOP;

  -- Stage 1 완료: CHUNKING → CHUNKED
  UPDATE doc_ingest_jobs
     SET status     = 'CHUNKED',
         chunked_at = SYSTIMESTAMP,
         error_msg  = NULL
   WHERE job_id = p_job_id;

  COMMIT;

EXCEPTION WHEN OTHERS THEN
  v_error_msg := SQLERRM;
  UPDATE doc_ingest_jobs
     SET status     = 'CHUNK_ERROR',
         error_msg  = v_error_msg
   WHERE job_id = p_job_id;
  COMMIT;
  RAISE;
END;
/

4-2. Stage 2: 임베딩 생성 + 벡터 저장

임베딩은 DBMS_CLOUD_AI.GENERATE + OCI_COHERE_EMBED 프로필을 사용합니다. Select AI와 동일한 GENAI_CRED credential이 사용되므로 ORA-20003을 피할 수 있습니다.

CREATE OR REPLACE PROCEDURE ingest_stage2_embed(p_job_id IN NUMBER) IS
  v_uri         VARCHAR2(2000);
  v_error_msg   VARCHAR2(4000);
  v_embed_clob  CLOB;
  v_embedding   VECTOR;
BEGIN
  -- Job claim: CHUNKED → EMBEDDING
  UPDATE doc_ingest_jobs
     SET status   = 'EMBEDDING',
         attempts = attempts + 1
   WHERE job_id = p_job_id
     AND status IN ('CHUNKED', 'EMBED_ERROR');  -- 재시도 지원

  IF SQL%ROWCOUNT = 0 THEN RETURN; END IF;

  SELECT object_uri INTO v_uri
    FROM doc_ingest_jobs WHERE job_id = p_job_id;

  -- embed_vector가 NULL인 청크만 처리 (부분 재시도 가능)
  FOR rec IN (
    SELECT chunk_id, chunk_text
      FROM doc_chunks
     WHERE job_id = p_job_id
       AND embed_vector IS NULL
     ORDER BY chunk_id
  ) LOOP
    -- Embedding via DBMS_CLOUD_AI (same credential as Select AI)
    -- 응답이 이미 벡터 배열 형식: [-0.014, 0.023, ...]
    v_embed_clob := DBMS_CLOUD_AI.GENERATE(
      prompt       => 'Text to embed: "' || rec.chunk_text || '"',
      profile_name => 'OCI_COHERE_EMBED',
      action       => 'embedding'
    );

    -- 직접 변환 (JSON 파싱 불필요 - 응답이 이미 배열 형식)
    v_embedding := TO_VECTOR(TRIM(v_embed_clob));

    UPDATE doc_chunks
       SET embed_vector = v_embedding
     WHERE job_id = p_job_id
       AND chunk_id = rec.chunk_id;

    -- 청크 단위로 커밋하여 부분 진행 보존 (선택적)
    -- COMMIT;
  END LOOP;

  -- Stage 2 완료: EMBEDDING → DONE
  UPDATE doc_ingest_jobs
     SET status      = 'DONE',
         finished_at = SYSTIMESTAMP,
         error_msg   = NULL
   WHERE job_id = p_job_id;

  COMMIT;

EXCEPTION WHEN OTHERS THEN
  v_error_msg := SQLERRM;
  UPDATE doc_ingest_jobs
     SET status    = 'EMBED_ERROR',
         error_msg = v_error_msg
   WHERE job_id = p_job_id;
  COMMIT;
  RAISE;
END;

4-3. Worker: Stage 1 (청킹) - PENDING job 처리

CREATE OR REPLACE PROCEDURE chunk_worker IS
  v_job_id NUMBER;
BEGIN
  -- PENDING 또는 CHUNK_ERROR(재시도) 1건 가져오기
  SELECT job_id
    INTO v_job_id
    FROM doc_ingest_jobs
   WHERE status IN ('PENDING', 'CHUNK_ERROR')
   ORDER BY created_at
   FETCH FIRST 1 ROWS ONLY;

  ingest_stage1_chunk(v_job_id);

EXCEPTION
  WHEN NO_DATA_FOUND THEN
    NULL;  -- 처리할 job 없음
END;
/

4-4. Worker: Stage 2 (임베딩) - CHUNKED job 처리

CREATE OR REPLACE PROCEDURE embed_worker IS
  v_job_id NUMBER;
BEGIN
  -- CHUNKED 또는 EMBED_ERROR(재시도) 1건 가져오기
  SELECT job_id
    INTO v_job_id
    FROM doc_ingest_jobs
   WHERE status IN ('CHUNKED', 'EMBED_ERROR')
   ORDER BY chunked_at NULLS LAST, created_at
   FETCH FIRST 1 ROWS ONLY;

  ingest_stage2_embed(v_job_id);

EXCEPTION
  WHEN NO_DATA_FOUND THEN
    NULL;  -- 처리할 job 없음
END;
/

4-5. 레거시 호환: 단일 프로시저로 전체 처리

기존 ingest_one_job 스타일로 한 번에 처리하려면:

CREATE OR REPLACE PROCEDURE ingest_one_job(p_job_id IN NUMBER) IS
BEGIN
  -- Stage 1: 청킹
  ingest_stage1_chunk(p_job_id);
  
  -- Stage 1 성공 시에만 Stage 2 진행
  DECLARE
    v_status VARCHAR2(30);
  BEGIN
    SELECT status INTO v_status FROM doc_ingest_jobs WHERE job_id = p_job_id;
    IF v_status = 'CHUNKED' THEN
      ingest_stage2_embed(p_job_id);
    END IF;
  END;
END;
/

4-6. (레거시) 단일 worker - 양쪽 스테이지 모두 처리

CREATE OR REPLACE PROCEDURE ingest_worker IS
  v_job_id NUMBER;
BEGIN
  -- 먼저 PENDING job이 있으면 Stage 1 처리
  BEGIN
    SELECT job_id
      INTO v_job_id
      FROM doc_ingest_jobs
     WHERE status IN ('PENDING', 'CHUNK_ERROR')
     ORDER BY created_at
     FETCH FIRST 1 ROWS ONLY;
    
    ingest_stage1_chunk(v_job_id);
    RETURN;
  EXCEPTION
    WHEN NO_DATA_FOUND THEN NULL;
  END;

  -- PENDING 없으면 CHUNKED job Stage 2 처리
  BEGIN
    SELECT job_id
      INTO v_job_id
      FROM doc_ingest_jobs
     WHERE status IN ('CHUNKED', 'EMBED_ERROR')
     ORDER BY chunked_at NULLS LAST, created_at
     FETCH FIRST 1 ROWS ONLY;
    
    ingest_stage2_embed(v_job_id);
  EXCEPTION
    WHEN NO_DATA_FOUND THEN NULL;
  END;
END;
/

5) DBMS_SCHEDULER 설정 (poller + worker)

5-1. poller: 1분마다

BEGIN
  DBMS_SCHEDULER.CREATE_JOB(
    job_name        => 'OBJ_POLL_JOB',
    job_type        => 'STORED_PROCEDURE',
    job_action      => 'POLL_OBJECT_STORAGE_TO_JOBS',
    start_date      => SYSTIMESTAMP,
    repeat_interval => 'FREQ=MINUTELY;INTERVAL=1',
    enabled         => TRUE,
    auto_drop       => FALSE
  );
END;
/

5-2. 2단계 파이프라인 worker 설정

옵션 A: 분리된 worker (권장) - Stage 1(청킹)과 Stage 2(임베딩)를 별도 스케줄러로 관리

BEGIN
  -- Stage 1 Worker: PENDING → CHUNKED (청킹)
  DBMS_SCHEDULER.CREATE_JOB(
    job_name        => 'CHUNK_WORKER_01',
    job_type        => 'STORED_PROCEDURE',
    job_action      => 'CHUNK_WORKER',
    start_date      => SYSTIMESTAMP,
    repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
    enabled         => TRUE,
    auto_drop       => FALSE
  );

  -- Stage 2 Worker: CHUNKED → DONE (임베딩)
  DBMS_SCHEDULER.CREATE_JOB(
    job_name        => 'EMBED_WORKER_01',
    job_type        => 'STORED_PROCEDURE',
    job_action      => 'EMBED_WORKER',
    start_date      => SYSTIMESTAMP,
    repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
    enabled         => TRUE,
    auto_drop       => FALSE
  );

  -- 처리량 필요 시 워커 추가
  DBMS_SCHEDULER.CREATE_JOB(
    job_name        => 'CHUNK_WORKER_02',
    job_type        => 'STORED_PROCEDURE',
    job_action      => 'CHUNK_WORKER',
    start_date      => SYSTIMESTAMP,
    repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
    enabled         => TRUE,
    auto_drop       => FALSE
  );

  DBMS_SCHEDULER.CREATE_JOB(
    job_name        => 'EMBED_WORKER_02',
    job_type        => 'STORED_PROCEDURE',
    job_action      => 'EMBED_WORKER',
    start_date      => SYSTIMESTAMP,
    repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
    enabled         => TRUE,
    auto_drop       => FALSE
  );
END;
/

옵션 B: 통합 worker - 기존 INGEST_WORKER 스타일 (양쪽 스테이지 모두 처리)

BEGIN
  DBMS_SCHEDULER.CREATE_JOB(
    job_name        => 'INGEST_WORKER_01',
    job_type        => 'STORED_PROCEDURE',
    job_action      => 'INGEST_WORKER',
    start_date      => SYSTIMESTAMP,
    repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
    enabled         => TRUE,
    auto_drop       => FALSE
  );

  DBMS_SCHEDULER.CREATE_JOB(
    job_name        => 'INGEST_WORKER_02',
    job_type        => 'STORED_PROCEDURE',
    job_action      => 'INGEST_WORKER',
    start_date      => SYSTIMESTAMP,
    repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
    enabled         => TRUE,
    auto_drop       => FALSE
  );
END;
/

기존 스케줄러 job 삭제 후 재생성:

-- 기존 job 삭제
BEGIN
  DBMS_SCHEDULER.DROP_JOB('INGEST_WORKER_01', force => TRUE);
  DBMS_SCHEDULER.DROP_JOB('INGEST_WORKER_02', force => TRUE);
EXCEPTION WHEN OTHERS THEN NULL;
END;
/

-- 새 job 생성 (옵션 A 또는 B 선택)

5-3. Worker가 동작하지 않을 때 (job은 등록되는데 처리 안 됨)

가능한 원인과 확인 방법:

원인 확인 / 조치
Worker 스케줄러 job이 없거나 비활성 SELECT job_name, enabled FROM user_scheduler_jobs WHERE job_name LIKE '%WORKER%'; 로 확인 후, 5-2에서 CHUNK_WORKER_01, EMBED_WORKER_01 생성·enabled => TRUE 적용.
job 상태 확인 2단계 파이프라인에서는 상태를 확인해야 함: SELECT job_id, status, error_msg FROM doc_ingest_jobs ORDER BY created_at DESC;
Stuck CHUNKING Stage 1 도중 실패 시 CHUNKING으로 남음. 재시도: UPDATE doc_ingest_jobs SET status = 'PENDING' WHERE job_id = :id AND status = 'CHUNKING';
Stuck EMBEDDING Stage 2 도중 실패 시 EMBEDDING으로 남음. 재시도: UPDATE doc_ingest_jobs SET status = 'CHUNKED' WHERE job_id = :id AND status = 'EMBEDDING';
CHUNK_ERROR 재시도 chunk_worker가 자동으로 CHUNK_ERROR 상태도 처리함. 또는 수동: UPDATE doc_ingest_jobs SET status = 'PENDING' WHERE status = 'CHUNK_ERROR';
EMBED_ERROR 재시도 embed_worker가 자동으로 EMBED_ERROR 상태도 처리함. 또는 수동: UPDATE doc_ingest_jobs SET status = 'CHUNKED' WHERE status = 'EMBED_ERROR';
스케줄러 job 실행 실패 SELECT job_name, run_count, failure_count, last_start_date FROM user_scheduler_job_run_details WHERE job_name LIKE '%WORKER%' ORDER BY actual_start_date DESC FETCH FIRST 10 ROWS ONLY;
FREQ=SECONDLY 미지원 일부 Oracle/ADB에서는 FREQ=SECONDLY;INTERVAL=30 이 동작하지 않을 수 있음. repeat_interval => 'FREQ=MINUTELY;INTERVAL=1' 로 변경.

수동 테스트:

-- Stage 1 (청킹) 테스트
EXEC chunk_worker;

-- Stage 2 (임베딩) 테스트
EXEC embed_worker;

-- 또는 통합 worker
EXEC ingest_worker;

상태별 job 수 확인:

SELECT status, COUNT(*) AS cnt
  FROM doc_ingest_jobs
 GROUP BY status
 ORDER BY DECODE(status, 'PENDING',1, 'CHUNKING',2, 'CHUNKED',3, 
                         'EMBEDDING',4, 'DONE',5, 'CHUNK_ERROR',6, 'EMBED_ERROR',7, 8);

6) Vector index 및 RAG 테스트

임베딩이 저장된 doc_chunks유사도 검색RAG(질문 → 유사 청크 → GENAI 답변) 를 테스트하려면, 벡터 인덱스를 만들고(없으면 생성) RAG 프로시저를 실행하면 됩니다.

6-1. Vector index 생성 (없으면 생성)

Oracle 23ai에서 유사도 검색을 빠르게 하려면 doc_chunks.embed_vectorvector index를 생성합니다. 아래는 인덱스가 없을 때만 생성하는 PL/SQL입니다.

-- Vector index 없으면 생성 (한 번만 실행 또는 필요 시 실행)
DECLARE
  v_exists NUMBER;
BEGIN
  SELECT COUNT(*) INTO v_exists
    FROM user_indexes
   WHERE index_name = 'DOC_CHUNKS_VEC_IDX';
  IF v_exists = 0 THEN
    EXECUTE IMMEDIATE q'[
      CREATE VECTOR INDEX doc_chunks_vec_idx
      ON doc_chunks (embed_vector)
      ORGANIZATION INMEMORY NEIGHBOR GRAPH
      DISTANCE COSINE
      WITH TARGET ACCURACY 95
    ]';
    DBMS_OUTPUT.PUT_LINE('Vector index DOC_CHUNKS_VEC_IDX created.');
  ELSE
    DBMS_OUTPUT.PUT_LINE('Vector index DOC_CHUNKS_VEC_IDX already exists.');
  END IF;
END;
/
  • DISTANCE COSINE: Cohere 임베딩과 동일한 코사인 유사도 사용.
  • 인덱스가 이미 있으면 스킵. 재생성이 필요하면 DROP INDEX doc_chunks_vec_idx; 후 다시 실행.

6-2. RAG 테스트 (유사 청크 검색 + 선택: GENAI 답변)

파일: rag_test.sql
질문을 임베딩한 뒤 doc_chunks에서 유사한 청크를 찾고, 필요하면 GENAI 프로필로 답변까지 생성합니다.

  1. Vector index
    위 6-1 실행으로 DOC_CHUNKS_VEC_IDX가 있으면 APPROX 검색이 인덱스를 사용합니다.

  2. RAG 프로시저

    • RAG_TOP_CHUNKS(질문, 상위 K건): 질문 임베딩 → 유사도 순 상위 K개 청크만 반환.
    • RAG_ASK(질문, 상위 K건): 같은 검색 후, 상위 청크를 컨텍스트로 GENAI에 넘겨 한 문단 답변 생성.
  3. 실행 예시

    • EXEC RAG_TOP_CHUNKS('What is Oracle Autonomous Database?', 5); — 유사 청크만 출력.
    • EXEC RAG_ASK('What is Oracle Autonomous Database?', 5); — RAG 답변 출력.

자세한 프로시저 정의와 사용법은 rag_test.sql 에 있습니다.

6-3. Streamlit Knowledge Bot (선택)

파일: knowledge_bot.py
문서 하나를 선택한 뒤 질문을 입력하면, 해당 문서 내에서만 유사 청크를 검색하고 OCI Cohere로 RAG 답변을 생성합니다.

  • 문서 선택: doc_ingest_jobs에서 청크가 있는 문서의 distinct object_name 목록을 드롭다운으로 표시.
  • 검색: 선택한 문서에 한정해 doc_chunks에서 벡터 유사도(코사인) 상위 K개 청크 검색.
  • RAG 답변: 검색된 청크를 컨텍스트로 OCI Cohere chat API를 호출해 답변 생성.

실행:

macOS에서 pip가 없거나 "externally-managed-environment" 오류가 나면 가상환경을 쓰세요:

# 가상환경 생성 (한 번만)
python3 -m venv .venv
source .venv/bin/activate   # Windows: .venv\Scripts\activate

# 패키지 설치
pip install -r requirements.txt

# 앱 실행
streamlit run knowledge_bot.py

pip가 PATH에 있으면 pip install -r requirements.txt 만 해도 됩니다.

환경 변수: .envDB_USER, DB_PASSWORD, DB_DSN, TNS_ADMIN(또는 WALLET_LOCATION) 설정. OCI 설정은 ~/.oci/config (또는 oci config로 생성한 프로필) 사용.


About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

X Tutup