- DBMS_SCHEDULER (poller) 가 30초~5분마다 실행
- poller가
DBMS_CLOUD.LIST_OBJECTS로 버킷 목록을 가져옴 - 처리 이력 테이블을 보고 **"새 파일(또는 변경된 파일)"**만
doc_ingest_jobs에 INSERT - worker 스케줄러가 job을 가져와서 2단계로 처리
┌─────────────────────────────────────────────────────────────────────────┐
│ Status Flow │
│ │
│ PENDING ──► CHUNKING ──► CHUNKED ──► EMBEDDING ──► DONE │
│ │ │ │
│ ▼ ▼ │
│ CHUNK_ERROR EMBED_ERROR │
│ │ │ │
│ └── (retry) ──► └── (retry) ──► │
└─────────────────────────────────────────────────────────────────────────┘
| 단계 | Stage 1 (청킹) | Stage 2 (임베딩) |
|---|---|---|
| Worker | chunk_worker |
embed_worker |
| 프로시저 | ingest_stage1_chunk |
ingest_stage2_embed |
| 상태 변화 | PENDING → CHUNKING → CHUNKED | CHUNKED → EMBEDDING → DONE |
| 실패 상태 | CHUNK_ERROR | EMBED_ERROR |
| 처리 내용 | 다운로드 + 텍스트 추출 + 청킹 | 벡터 임베딩 생성 |
Stage 1 처리:
- Object Storage에서 다운로드 (
DBMS_CLOUD.GET_OBJECT) → BLOB 로드 - (PDF)
DBMS_VECTOR_CHAIN.UTL_TO_TEXT(blob)→ CLOB 텍스트 UTL_TO_CHUNKS(text)→ 청크doc_chunks에 저장 (embed_vector = NULL)- 상태:
CHUNKED
Stage 2 처리:
- embed_vector가 NULL인 청크 조회
- 임베딩:
DBMS_CLOUD_AI.GENERATE(..., profile_name => 'OCI_COHERE_EMBED', action => 'embedding') doc_chunks.embed_vector업데이트- 상태:
DONE
장점:
- 각 단계별 상태 추적 가능 (어디서 실패했는지 명확)
- Stage 2 실패 시 청크는 보존됨 (Stage 1부터 재시작 불필요)
- 임베딩 API rate limit 별도 관리 가능
핵심 포인트: 중복 방지(idempotency)를 위해 "파일 고유키(object_name + etag(or md5))" 를 저장해야 합니다.
그래야 같은 파일을 poller가 반복해서 job에 넣지 않습니다.
ADB에서 Object Storage를 조회/다운로드하려면 credential이 필요합니다.
BEGIN
DBMS_CLOUD.CREATE_CREDENTIAL(
credential_name => 'OBJ_STORE_CRED2',
username => '<oci-username-or-user-ocid>',
password => '<auth-token>'
);
END;
/UTL_TO_EMBEDDING으로 ocigenai를 쓰려면 OCI GenAI용 credential이 필요합니다.
-- OCI GenAI(ocigenai)용 credential. Object Storage credential과 별도로 생성.
BEGIN
DBMS_VECTOR_CHAIN.CREATE_CREDENTIAL(
credential_name => 'OCI_GENAI_CRED',
params => JSON('{
"user_ocid" : "<user-ocid>",
"tenancy_ocid" : "<tenancy-ocid>",
"compartment_ocid": "<compartment-ocid>",
"private_key" : "<private-key-string-without-BEGIN-END-lines>",
"fingerprint" : "<key-fingerprint>"
}')
);
END;
/Select AI 채팅이 GENAI_CRED로 동작한다면, 같은 credential로 임베딩도 하려면 DBMS_CLOUD_AI에 임베딩 전용 프로필을 하나 만듭니다.
BEGIN
DBMS_CLOUD_AI.CREATE_PROFILE(
profile_name => 'OCI_COHERE_EMBED',
attributes => '{
"provider": "oci",
"credential_name": "GENAI_CRED",
"oci_compartment_id": "ocid1.tenancy.oc1..aaaaaaaa...",
"oci_apiformat": "COHERE",
"embedding_model": "cohere.embed-v4.0"
}'
);
END;
/oci_compartment_id: 테넌시 OCID 또는 사용 중인 컴파트먼트 OCID.- 채팅용 프로필(
GENAI)과 별도로, 임베딩 전용 프로필 이름(OCI_COHERE_EMBED)만 구분해 두면 됩니다.
이게 있어야 폴링이 안정적으로 동작합니다.
CREATE TABLE obj_manifest (
bucket_name VARCHAR2(255) NOT NULL,
object_name VARCHAR2(1024) NOT NULL,
etag VARCHAR2(200),
size_bytes NUMBER,
last_modified TIMESTAMP,
first_seen_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
last_seen_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
processed_flag CHAR(1) DEFAULT 'N' NOT NULL, -- N/Y
CONSTRAINT obj_manifest_pk PRIMARY KEY (bucket_name, object_name, etag)
);
CREATE INDEX obj_manifest_i1 ON obj_manifest(processed_flag, last_seen_at);- etag는 보통 오브젝트 내용이 바뀌면 바뀌는 값이라 "버전 키"로 쓰기 좋습니다.
- 만약
LIST_OBJECTS결과에 etag가 없거나 쓰기 어렵다면:
(bucket_name, object_name, size_bytes, last_modified)조합을 유사 키로 사용 (덜 완벽)
CREATE TABLE doc_ingest_jobs (
job_id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
bucket_name VARCHAR2(255) NOT NULL,
object_name VARCHAR2(1024) NOT NULL,
object_uri VARCHAR2(2000),
content_type VARCHAR2(255),
size_bytes NUMBER,
etag VARCHAR2(200),
status VARCHAR2(30) DEFAULT 'PENDING' NOT NULL,
-- Status values (multi-stage pipeline):
-- PENDING : 새 job, Stage1 대기
-- CHUNKING : Stage1 실행 중 (다운로드 + 텍스트 추출 + 청킹)
-- CHUNKED : Stage1 완료, 청크 저장됨 (벡터 없음), Stage2 대기
-- CHUNK_ERROR : Stage1 실패
-- EMBEDDING : Stage2 실행 중 (임베딩 생성)
-- DONE : 모든 단계 완료
-- EMBED_ERROR : Stage2 실패 (청크는 있으나 벡터 미완료)
attempts NUMBER DEFAULT 0 NOT NULL,
created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
started_at TIMESTAMP,
chunked_at TIMESTAMP, -- Stage1 완료 시각
finished_at TIMESTAMP,
error_msg CLOB
);
CREATE INDEX doc_ingest_jobs_i1 ON doc_ingest_jobs(status, created_at);
CREATE UNIQUE INDEX doc_ingest_jobs_u1 ON doc_ingest_jobs(bucket_name, object_name, etag);UNIQUE (bucket_name, object_name, etag)를 걸어두면 poller가 실수로 중복 INSERT 해도 DB가 막아줍니다.
기존 테이블에 컬럼 추가 (마이그레이션):
-- 기존 테이블에 chunked_at 컬럼 추가
ALTER TABLE doc_ingest_jobs ADD (chunked_at TIMESTAMP);CREATE TABLE doc_chunks (
job_id NUMBER NOT NULL,
chunk_id NUMBER NOT NULL,
chunk_offset NUMBER,
chunk_length NUMBER,
chunk_text CLOB,
embed_vector VECTOR,
meta_json JSON,
CONSTRAINT doc_chunks_pk PRIMARY KEY (job_id, chunk_id),
CONSTRAINT doc_chunks_fk1 FOREIGN KEY (job_id) REFERENCES doc_ingest_jobs(job_id)
);DBMS_CLOUD.LIST_OBJECTS는 Object Storage 버킷의 오브젝트 목록을 반환합니다.
(출력 컬럼은 ADB/버전/옵션에 따라 조금씩 다를 수 있어요. 가장 안전한 방식은 "JSON 출력"을 받아 JSON_TABLE로 파싱하는 것입니다.)
여기서는 "개념적으로" 아래 형태로 구현합니다.
중요: 한글/유니코드 파일명을 올바르게 처리하려면 UTL_URL.ESCAPE로 URL 인코딩해야 합니다.
또한 OCI 환경에 따라 URL 도메인이 다를 수 있습니다:
- 표준:
objectstorage.<region>.oraclecloud.com - 전용:
<namespace>.objectstorage.<region>.oci.customer-oci.com
CREATE OR REPLACE PROCEDURE poll_object_storage_to_jobs IS
v_namespace VARCHAR2(200) := '<your-namespace>';
v_bucket VARCHAR2(255) := '<your-bucket>';
v_region VARCHAR2(50) := '<region>'; -- ap-seoul-1 등
v_credential VARCHAR2(100) := 'GENAI_CRED'; -- 또는 'OBJ_STORE_CRED2'
v_uri_base VARCHAR2(2000);
v_list_uri VARCHAR2(2000);
BEGIN
-- URL 베이스 설정 (환경에 맞게 수정)
-- 옵션 1: 표준 OCI
-- v_uri_base := 'https://objectstorage.' || v_region || '.oraclecloud.com/n/' ||
-- v_namespace || '/b/' || v_bucket || '/o/';
-- 옵션 2: 전용 고객 도메인 (Dedicated Region 등)
v_uri_base := 'https://' || v_namespace || '.objectstorage.' || v_region ||
'.oci.customer-oci.com/n/' || v_namespace || '/b/' || v_bucket || '/o/';
v_list_uri := v_uri_base;
/* 1) obj_manifest upsert */
MERGE INTO obj_manifest m
USING (
SELECT
v_bucket AS bucket_name,
o.object_name,
o.bytes AS size_bytes,
o.checksum AS etag,
o.last_modified
FROM TABLE(
DBMS_CLOUD.LIST_OBJECTS(
credential_name => v_credential,
location_uri => v_list_uri
)
) o
) s
ON (m.bucket_name = s.bucket_name
AND m.object_name = s.object_name
AND m.etag = s.etag)
WHEN MATCHED THEN
UPDATE SET
m.last_seen_at = SYSTIMESTAMP,
m.size_bytes = s.size_bytes,
m.last_modified = s.last_modified
WHEN NOT MATCHED THEN
INSERT (bucket_name, object_name, etag, size_bytes, last_modified, processed_flag, last_seen_at)
VALUES (s.bucket_name, s.object_name, s.etag, s.size_bytes, s.last_modified, 'N', SYSTIMESTAMP);
/* 2) 신규(N) -> job queue (UTL_URL.ESCAPE로 유니코드 인코딩) */
MERGE INTO doc_ingest_jobs j
USING (
SELECT
m.bucket_name,
m.object_name,
m.etag,
m.size_bytes,
-- UTL_URL.ESCAPE: 유니코드/특수문자 URL 인코딩 (한글 파일명 지원)
v_uri_base || UTL_URL.ESCAPE(m.object_name, TRUE) AS object_uri
FROM obj_manifest m
WHERE m.bucket_name = v_bucket
AND m.processed_flag = 'N'
) s
ON (j.bucket_name = s.bucket_name AND j.object_name = s.object_name AND j.etag = s.etag)
WHEN NOT MATCHED THEN
INSERT (bucket_name, object_name, etag, size_bytes, object_uri, status)
VALUES (s.bucket_name, s.object_name, s.etag, s.size_bytes, s.object_uri, 'PENDING');
UPDATE obj_manifest
SET processed_flag = 'Y'
WHERE bucket_name = v_bucket
AND processed_flag = 'N';
COMMIT;
END;
/한글 파일명 트러블슈팅:
macOS에서 업로드된 파일은 NFD(분해형) 유니코드를 사용하고, Windows/Linux는 NFC(조합형)를 사용합니다.
UTL_URL.ESCAPE는 둘 다 올바르게 인코딩합니다.
-- 테스트: 유니코드 파일명 URL 인코딩 확인
SELECT object_name,
UTL_URL.ESCAPE(object_name, TRUE) AS encoded_name
FROM TABLE(
DBMS_CLOUD.LIST_OBJECTS(
credential_name => 'GENAI_CRED',
location_uri => 'https://<namespace>.objectstorage.<region>.oci.customer-oci.com/n/<namespace>/b/<bucket>/o/'
)
);이미 생성된 job의 object_uri가 잘못된 경우 아래 SQL로 일괄 수정합니다:
-- 환경 변수 설정 (실제 값으로 변경)
DEFINE v_namespace = 'apackrsct01'
DEFINE v_bucket = 'knowledge-base'
DEFINE v_region = 'ap-seoul-1'
-- 잘못된 URL을 가진 기존 job들의 object_uri 수정
UPDATE doc_ingest_jobs
SET object_uri = 'https://&v_namespace..objectstorage.&v_region..oci.customer-oci.com/n/&v_namespace./b/&v_bucket./o/'
|| UTL_URL.ESCAPE(object_name, TRUE),
status = CASE
WHEN status IN ('CHUNK_ERROR', 'ERROR') THEN 'PENDING'
WHEN status IN ('EMBED_ERROR') THEN 'CHUNKED'
ELSE status
END
WHERE object_uri NOT LIKE '%oci.customer-oci.com%'
OR object_uri NOT LIKE '%' || UTL_URL.ESCAPE(object_name, TRUE);
COMMIT;
-- 또는 PL/SQL 블록으로 실행
DECLARE
v_namespace VARCHAR2(200) := 'apackrsct01';
v_bucket VARCHAR2(255) := 'knowledge-base';
v_region VARCHAR2(50) := 'ap-seoul-1';
v_uri_base VARCHAR2(2000);
BEGIN
v_uri_base := 'https://' || v_namespace || '.objectstorage.' || v_region ||
'.oci.customer-oci.com/n/' || v_namespace || '/b/' || v_bucket || '/o/';
-- 모든 job의 object_uri 재생성
UPDATE doc_ingest_jobs
SET object_uri = v_uri_base || UTL_URL.ESCAPE(object_name, TRUE);
-- 실패한 job들 재시도 상태로 변경
UPDATE doc_ingest_jobs
SET status = 'PENDING'
WHERE status IN ('CHUNK_ERROR', 'ERROR');
UPDATE doc_ingest_jobs
SET status = 'CHUNKED'
WHERE status = 'EMBED_ERROR';
COMMIT;
DBMS_OUTPUT.PUT_LINE('Updated ' || SQL%ROWCOUNT || ' jobs');
END;
/
-- 수정 결과 확인
SELECT job_id, object_name, status,
SUBSTR(object_uri, 1, 80) || '...' AS uri_preview
FROM doc_ingest_jobs
ORDER BY job_id;현실 체크 (중요)
LIST_OBJECTS반환 형태(테이블/JSON)가 환경마다 달라질 수 있습니다.
→ 실제 ADB에서SELECT * FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(...))같은 방식이 가능하면 그 방식이 더 간단합니다.- 위 코드는 "흐름"이 핵심이고, JSON 경로/필드명은 실제 반환값에 맞게 한번만 맞춰주면 계속 안정적으로 돌 수 있습니다.
파이프라인을 2단계로 분리하여 각 단계별 상태 추적과 재시도가 가능합니다.
[상태 흐름]
PENDING → CHUNKING → CHUNKED → EMBEDDING → DONE
↓ ↓
CHUNK_ERROR EMBED_ERROR
CREATE OR REPLACE PROCEDURE ingest_stage1_chunk(p_job_id IN NUMBER) IS
v_uri VARCHAR2(2000);
v_blob BLOB;
v_text CLOB;
v_error_msg VARCHAR2(4000);
BEGIN
-- Job claim: PENDING → CHUNKING
UPDATE doc_ingest_jobs
SET status = 'CHUNKING',
started_at = SYSTIMESTAMP,
attempts = attempts + 1
WHERE job_id = p_job_id
AND status IN ('PENDING', 'CHUNK_ERROR'); -- 재시도 지원
IF SQL%ROWCOUNT = 0 THEN RETURN; END IF;
-- Clean up any existing chunks from previous failed attempts
DELETE FROM doc_chunks WHERE job_id = p_job_id;
SELECT object_uri INTO v_uri
FROM doc_ingest_jobs WHERE job_id = p_job_id;
-- Download object into DATA_PUMP_DIR
DBMS_CLOUD.GET_OBJECT(
credential_name => 'GENAI_CRED',
object_uri => v_uri,
directory_name => 'DATA_PUMP_DIR',
file_name => 'ingest_' || p_job_id
);
-- Load as BLOB
SELECT TO_BLOB(BFILENAME('DATA_PUMP_DIR', 'ingest_' || p_job_id))
INTO v_blob FROM dual;
-- PDF/TXT → text
v_text := DBMS_VECTOR_CHAIN.UTL_TO_TEXT(v_blob);
-- Chunks insert (embed_vector = NULL, Stage2에서 처리)
FOR rec IN (
SELECT JSON_VALUE(TO_CLOB(c.column_value), '$.chunk_id' RETURNING NUMBER) AS chunk_id,
JSON_VALUE(TO_CLOB(c.column_value), '$.chunk_offset' RETURNING NUMBER) AS chunk_offset,
JSON_VALUE(TO_CLOB(c.column_value), '$.chunk_length' RETURNING NUMBER) AS chunk_length,
JSON_VALUE(TO_CLOB(c.column_value), '$.chunk_data') AS chunk_data
FROM TABLE(DBMS_VECTOR_CHAIN.UTL_TO_CHUNKS(v_text)) c
) LOOP
INSERT INTO doc_chunks(
job_id, chunk_id, chunk_offset, chunk_length,
chunk_text, embed_vector, meta_json
) VALUES (
p_job_id,
rec.chunk_id,
rec.chunk_offset,
rec.chunk_length,
rec.chunk_data,
NULL, -- embed_vector는 Stage2에서 채움
JSON_OBJECT('source_uri' VALUE v_uri)
);
END LOOP;
-- Stage 1 완료: CHUNKING → CHUNKED
UPDATE doc_ingest_jobs
SET status = 'CHUNKED',
chunked_at = SYSTIMESTAMP,
error_msg = NULL
WHERE job_id = p_job_id;
COMMIT;
EXCEPTION WHEN OTHERS THEN
v_error_msg := SQLERRM;
UPDATE doc_ingest_jobs
SET status = 'CHUNK_ERROR',
error_msg = v_error_msg
WHERE job_id = p_job_id;
COMMIT;
RAISE;
END;
/임베딩은 DBMS_CLOUD_AI.GENERATE + OCI_COHERE_EMBED 프로필을 사용합니다. Select AI와 동일한 GENAI_CRED credential이 사용되므로 ORA-20003을 피할 수 있습니다.
CREATE OR REPLACE PROCEDURE ingest_stage2_embed(p_job_id IN NUMBER) IS
v_uri VARCHAR2(2000);
v_error_msg VARCHAR2(4000);
v_embed_clob CLOB;
v_embedding VECTOR;
BEGIN
-- Job claim: CHUNKED → EMBEDDING
UPDATE doc_ingest_jobs
SET status = 'EMBEDDING',
attempts = attempts + 1
WHERE job_id = p_job_id
AND status IN ('CHUNKED', 'EMBED_ERROR'); -- 재시도 지원
IF SQL%ROWCOUNT = 0 THEN RETURN; END IF;
SELECT object_uri INTO v_uri
FROM doc_ingest_jobs WHERE job_id = p_job_id;
-- embed_vector가 NULL인 청크만 처리 (부분 재시도 가능)
FOR rec IN (
SELECT chunk_id, chunk_text
FROM doc_chunks
WHERE job_id = p_job_id
AND embed_vector IS NULL
ORDER BY chunk_id
) LOOP
-- Embedding via DBMS_CLOUD_AI (same credential as Select AI)
-- 응답이 이미 벡터 배열 형식: [-0.014, 0.023, ...]
v_embed_clob := DBMS_CLOUD_AI.GENERATE(
prompt => 'Text to embed: "' || rec.chunk_text || '"',
profile_name => 'OCI_COHERE_EMBED',
action => 'embedding'
);
-- 직접 변환 (JSON 파싱 불필요 - 응답이 이미 배열 형식)
v_embedding := TO_VECTOR(TRIM(v_embed_clob));
UPDATE doc_chunks
SET embed_vector = v_embedding
WHERE job_id = p_job_id
AND chunk_id = rec.chunk_id;
-- 청크 단위로 커밋하여 부분 진행 보존 (선택적)
-- COMMIT;
END LOOP;
-- Stage 2 완료: EMBEDDING → DONE
UPDATE doc_ingest_jobs
SET status = 'DONE',
finished_at = SYSTIMESTAMP,
error_msg = NULL
WHERE job_id = p_job_id;
COMMIT;
EXCEPTION WHEN OTHERS THEN
v_error_msg := SQLERRM;
UPDATE doc_ingest_jobs
SET status = 'EMBED_ERROR',
error_msg = v_error_msg
WHERE job_id = p_job_id;
COMMIT;
RAISE;
END;CREATE OR REPLACE PROCEDURE chunk_worker IS
v_job_id NUMBER;
BEGIN
-- PENDING 또는 CHUNK_ERROR(재시도) 1건 가져오기
SELECT job_id
INTO v_job_id
FROM doc_ingest_jobs
WHERE status IN ('PENDING', 'CHUNK_ERROR')
ORDER BY created_at
FETCH FIRST 1 ROWS ONLY;
ingest_stage1_chunk(v_job_id);
EXCEPTION
WHEN NO_DATA_FOUND THEN
NULL; -- 처리할 job 없음
END;
/CREATE OR REPLACE PROCEDURE embed_worker IS
v_job_id NUMBER;
BEGIN
-- CHUNKED 또는 EMBED_ERROR(재시도) 1건 가져오기
SELECT job_id
INTO v_job_id
FROM doc_ingest_jobs
WHERE status IN ('CHUNKED', 'EMBED_ERROR')
ORDER BY chunked_at NULLS LAST, created_at
FETCH FIRST 1 ROWS ONLY;
ingest_stage2_embed(v_job_id);
EXCEPTION
WHEN NO_DATA_FOUND THEN
NULL; -- 처리할 job 없음
END;
/기존 ingest_one_job 스타일로 한 번에 처리하려면:
CREATE OR REPLACE PROCEDURE ingest_one_job(p_job_id IN NUMBER) IS
BEGIN
-- Stage 1: 청킹
ingest_stage1_chunk(p_job_id);
-- Stage 1 성공 시에만 Stage 2 진행
DECLARE
v_status VARCHAR2(30);
BEGIN
SELECT status INTO v_status FROM doc_ingest_jobs WHERE job_id = p_job_id;
IF v_status = 'CHUNKED' THEN
ingest_stage2_embed(p_job_id);
END IF;
END;
END;
/CREATE OR REPLACE PROCEDURE ingest_worker IS
v_job_id NUMBER;
BEGIN
-- 먼저 PENDING job이 있으면 Stage 1 처리
BEGIN
SELECT job_id
INTO v_job_id
FROM doc_ingest_jobs
WHERE status IN ('PENDING', 'CHUNK_ERROR')
ORDER BY created_at
FETCH FIRST 1 ROWS ONLY;
ingest_stage1_chunk(v_job_id);
RETURN;
EXCEPTION
WHEN NO_DATA_FOUND THEN NULL;
END;
-- PENDING 없으면 CHUNKED job Stage 2 처리
BEGIN
SELECT job_id
INTO v_job_id
FROM doc_ingest_jobs
WHERE status IN ('CHUNKED', 'EMBED_ERROR')
ORDER BY chunked_at NULLS LAST, created_at
FETCH FIRST 1 ROWS ONLY;
ingest_stage2_embed(v_job_id);
EXCEPTION
WHEN NO_DATA_FOUND THEN NULL;
END;
END;
/BEGIN
DBMS_SCHEDULER.CREATE_JOB(
job_name => 'OBJ_POLL_JOB',
job_type => 'STORED_PROCEDURE',
job_action => 'POLL_OBJECT_STORAGE_TO_JOBS',
start_date => SYSTIMESTAMP,
repeat_interval => 'FREQ=MINUTELY;INTERVAL=1',
enabled => TRUE,
auto_drop => FALSE
);
END;
/옵션 A: 분리된 worker (권장) - Stage 1(청킹)과 Stage 2(임베딩)를 별도 스케줄러로 관리
BEGIN
-- Stage 1 Worker: PENDING → CHUNKED (청킹)
DBMS_SCHEDULER.CREATE_JOB(
job_name => 'CHUNK_WORKER_01',
job_type => 'STORED_PROCEDURE',
job_action => 'CHUNK_WORKER',
start_date => SYSTIMESTAMP,
repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
enabled => TRUE,
auto_drop => FALSE
);
-- Stage 2 Worker: CHUNKED → DONE (임베딩)
DBMS_SCHEDULER.CREATE_JOB(
job_name => 'EMBED_WORKER_01',
job_type => 'STORED_PROCEDURE',
job_action => 'EMBED_WORKER',
start_date => SYSTIMESTAMP,
repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
enabled => TRUE,
auto_drop => FALSE
);
-- 처리량 필요 시 워커 추가
DBMS_SCHEDULER.CREATE_JOB(
job_name => 'CHUNK_WORKER_02',
job_type => 'STORED_PROCEDURE',
job_action => 'CHUNK_WORKER',
start_date => SYSTIMESTAMP,
repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
enabled => TRUE,
auto_drop => FALSE
);
DBMS_SCHEDULER.CREATE_JOB(
job_name => 'EMBED_WORKER_02',
job_type => 'STORED_PROCEDURE',
job_action => 'EMBED_WORKER',
start_date => SYSTIMESTAMP,
repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
enabled => TRUE,
auto_drop => FALSE
);
END;
/옵션 B: 통합 worker - 기존 INGEST_WORKER 스타일 (양쪽 스테이지 모두 처리)
BEGIN
DBMS_SCHEDULER.CREATE_JOB(
job_name => 'INGEST_WORKER_01',
job_type => 'STORED_PROCEDURE',
job_action => 'INGEST_WORKER',
start_date => SYSTIMESTAMP,
repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
enabled => TRUE,
auto_drop => FALSE
);
DBMS_SCHEDULER.CREATE_JOB(
job_name => 'INGEST_WORKER_02',
job_type => 'STORED_PROCEDURE',
job_action => 'INGEST_WORKER',
start_date => SYSTIMESTAMP,
repeat_interval => 'FREQ=SECONDLY;INTERVAL=30',
enabled => TRUE,
auto_drop => FALSE
);
END;
/기존 스케줄러 job 삭제 후 재생성:
-- 기존 job 삭제
BEGIN
DBMS_SCHEDULER.DROP_JOB('INGEST_WORKER_01', force => TRUE);
DBMS_SCHEDULER.DROP_JOB('INGEST_WORKER_02', force => TRUE);
EXCEPTION WHEN OTHERS THEN NULL;
END;
/
-- 새 job 생성 (옵션 A 또는 B 선택)가능한 원인과 확인 방법:
| 원인 | 확인 / 조치 |
|---|---|
| Worker 스케줄러 job이 없거나 비활성 | SELECT job_name, enabled FROM user_scheduler_jobs WHERE job_name LIKE '%WORKER%'; 로 확인 후, 5-2에서 CHUNK_WORKER_01, EMBED_WORKER_01 생성·enabled => TRUE 적용. |
| job 상태 확인 | 2단계 파이프라인에서는 상태를 확인해야 함: SELECT job_id, status, error_msg FROM doc_ingest_jobs ORDER BY created_at DESC; |
| Stuck CHUNKING | Stage 1 도중 실패 시 CHUNKING으로 남음. 재시도: UPDATE doc_ingest_jobs SET status = 'PENDING' WHERE job_id = :id AND status = 'CHUNKING'; |
| Stuck EMBEDDING | Stage 2 도중 실패 시 EMBEDDING으로 남음. 재시도: UPDATE doc_ingest_jobs SET status = 'CHUNKED' WHERE job_id = :id AND status = 'EMBEDDING'; |
| CHUNK_ERROR 재시도 | chunk_worker가 자동으로 CHUNK_ERROR 상태도 처리함. 또는 수동: UPDATE doc_ingest_jobs SET status = 'PENDING' WHERE status = 'CHUNK_ERROR'; |
| EMBED_ERROR 재시도 | embed_worker가 자동으로 EMBED_ERROR 상태도 처리함. 또는 수동: UPDATE doc_ingest_jobs SET status = 'CHUNKED' WHERE status = 'EMBED_ERROR'; |
| 스케줄러 job 실행 실패 | SELECT job_name, run_count, failure_count, last_start_date FROM user_scheduler_job_run_details WHERE job_name LIKE '%WORKER%' ORDER BY actual_start_date DESC FETCH FIRST 10 ROWS ONLY; |
| FREQ=SECONDLY 미지원 | 일부 Oracle/ADB에서는 FREQ=SECONDLY;INTERVAL=30 이 동작하지 않을 수 있음. repeat_interval => 'FREQ=MINUTELY;INTERVAL=1' 로 변경. |
수동 테스트:
-- Stage 1 (청킹) 테스트
EXEC chunk_worker;
-- Stage 2 (임베딩) 테스트
EXEC embed_worker;
-- 또는 통합 worker
EXEC ingest_worker;상태별 job 수 확인:
SELECT status, COUNT(*) AS cnt
FROM doc_ingest_jobs
GROUP BY status
ORDER BY DECODE(status, 'PENDING',1, 'CHUNKING',2, 'CHUNKED',3,
'EMBEDDING',4, 'DONE',5, 'CHUNK_ERROR',6, 'EMBED_ERROR',7, 8);임베딩이 저장된 doc_chunks로 유사도 검색과 RAG(질문 → 유사 청크 → GENAI 답변) 를 테스트하려면, 벡터 인덱스를 만들고(없으면 생성) RAG 프로시저를 실행하면 됩니다.
Oracle 23ai에서 유사도 검색을 빠르게 하려면 doc_chunks.embed_vector에 vector index를 생성합니다. 아래는 인덱스가 없을 때만 생성하는 PL/SQL입니다.
-- Vector index 없으면 생성 (한 번만 실행 또는 필요 시 실행)
DECLARE
v_exists NUMBER;
BEGIN
SELECT COUNT(*) INTO v_exists
FROM user_indexes
WHERE index_name = 'DOC_CHUNKS_VEC_IDX';
IF v_exists = 0 THEN
EXECUTE IMMEDIATE q'[
CREATE VECTOR INDEX doc_chunks_vec_idx
ON doc_chunks (embed_vector)
ORGANIZATION INMEMORY NEIGHBOR GRAPH
DISTANCE COSINE
WITH TARGET ACCURACY 95
]';
DBMS_OUTPUT.PUT_LINE('Vector index DOC_CHUNKS_VEC_IDX created.');
ELSE
DBMS_OUTPUT.PUT_LINE('Vector index DOC_CHUNKS_VEC_IDX already exists.');
END IF;
END;
/- DISTANCE COSINE: Cohere 임베딩과 동일한 코사인 유사도 사용.
- 인덱스가 이미 있으면 스킵. 재생성이 필요하면
DROP INDEX doc_chunks_vec_idx;후 다시 실행.
파일: rag_test.sql
질문을 임베딩한 뒤 doc_chunks에서 유사한 청크를 찾고, 필요하면 GENAI 프로필로 답변까지 생성합니다.
-
Vector index
위 6-1 실행으로DOC_CHUNKS_VEC_IDX가 있으면 APPROX 검색이 인덱스를 사용합니다. -
RAG 프로시저
RAG_TOP_CHUNKS(질문, 상위 K건): 질문 임베딩 → 유사도 순 상위 K개 청크만 반환.RAG_ASK(질문, 상위 K건): 같은 검색 후, 상위 청크를 컨텍스트로 GENAI에 넘겨 한 문단 답변 생성.
-
실행 예시
EXEC RAG_TOP_CHUNKS('What is Oracle Autonomous Database?', 5);— 유사 청크만 출력.EXEC RAG_ASK('What is Oracle Autonomous Database?', 5);— RAG 답변 출력.
자세한 프로시저 정의와 사용법은 rag_test.sql 에 있습니다.
파일: knowledge_bot.py
문서 하나를 선택한 뒤 질문을 입력하면, 해당 문서 내에서만 유사 청크를 검색하고 OCI Cohere로 RAG 답변을 생성합니다.
- 문서 선택:
doc_ingest_jobs에서 청크가 있는 문서의 distinct object_name 목록을 드롭다운으로 표시. - 검색: 선택한 문서에 한정해
doc_chunks에서 벡터 유사도(코사인) 상위 K개 청크 검색. - RAG 답변: 검색된 청크를 컨텍스트로 OCI Cohere chat API를 호출해 답변 생성.
실행:
macOS에서 pip가 없거나 "externally-managed-environment" 오류가 나면 가상환경을 쓰세요:
# 가상환경 생성 (한 번만)
python3 -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
# 패키지 설치
pip install -r requirements.txt
# 앱 실행
streamlit run knowledge_bot.pypip가 PATH에 있으면 pip install -r requirements.txt 만 해도 됩니다.
환경 변수: .env에 DB_USER, DB_PASSWORD, DB_DSN, TNS_ADMIN(또는 WALLET_LOCATION) 설정. OCI 설정은 ~/.oci/config (또는 oci config로 생성한 프로필) 사용.