Skip to content

Commit

Permalink
Merge pull request #8 from shuiyisong/refactor/log
Browse files Browse the repository at this point in the history
refactor: minor update
  • Loading branch information
paomian authored Jun 5, 2024
2 parents 51df233 + 8b6a652 commit 6ca15ad
Show file tree
Hide file tree
Showing 14 changed files with 864 additions and 210 deletions.
26 changes: 0 additions & 26 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,29 +297,6 @@ pub enum Error {
source: table::error::Error,
},

#[snafu(display("Failed to insert pipeline to pipeline table, name: {}", name))]
InsertPipeline {
name: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},

#[snafu(display("Failed to parse pipeline"))]
ParsePipeline {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},

#[snafu(display("Failed to get pipeline to pipeline table, name: {}", name))]
GetPipeline {
name: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},

#[snafu(display("Unsupported format: {:?}", format))]
UnsupportedFormat {
#[snafu(implicit)]
Expand Down Expand Up @@ -406,9 +383,6 @@ impl ErrorExt for Error {
| Error::EmptyData { .. }
| Error::ColumnNoneDefaultValue { .. }
| Error::IncompleteGrpcRequest { .. }
| Error::InsertPipeline { .. }
| Error::ParsePipeline { .. }
| Error::GetPipeline { .. }
| Error::InvalidTlsConfig { .. } => StatusCode::InvalidArguments,

Error::NotSupported { .. } => StatusCode::Unsupported,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use meta_client::MetaClientOptions;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer;
use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
Expand Down Expand Up @@ -89,7 +90,6 @@ use crate::error::{
};
use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
use crate::pipeline::PipelineOperator;
use crate::script::ScriptExecutor;

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use operator::request::Requester;
use operator::statement::{StatementExecutor, StatementExecutorRef};
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use pipeline::pipeline_operator::PipelineOperator;
use query::QueryEngineFactory;
use servers::server::ServerHandlers;
use snafu::OptionExt;
Expand All @@ -38,7 +39,6 @@ use crate::error::{self, Result};
use crate::heartbeat::HeartbeatTask;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::Instance;
use crate::pipeline::PipelineOperator;
use crate::script::ScriptExecutor;

/// The frontend [`Instance`] builder.
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use pipeline::{GreptimeTransformer, Pipeline};
use servers::error::{AuthSnafu, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::error::{AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult};
use servers::query_handler::LogHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand Down Expand Up @@ -49,8 +49,7 @@ impl LogHandler for Instance {
self.pipeline_operator
.get_pipeline(query_ctx, name)
.await
.map_err(BoxedError::new)
.context(servers::error::GetPipelineSnafu { name })
.context(PipelineSnafu)
}

async fn insert_pipeline(
Expand All @@ -63,6 +62,7 @@ impl LogHandler for Instance {
self.pipeline_operator
.insert_pipeline(name, content_type, pipeline, query_ctx)
.await
.context(PipelineSnafu)
}

async fn delete_pipeline(&self, _name: &str, _query_ctx: QueryContextRef) -> ServerResult<()> {
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub mod frontend;
pub mod heartbeat;
pub mod instance;
pub(crate) mod metrics;
mod pipeline;
mod script;
pub mod server;
pub mod service_config;
13 changes: 13 additions & 0 deletions src/pipeline/src/etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# pipeline

ETL capability

## processors

refer [elastic ingest processor][elastic-ingest-processor] for detail

### Example

Go to [pipeline](../../tests/pipeline.rs)

[elastic-ingest-processor]: https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html
Loading

0 comments on commit 6ca15ad

Please sign in to comment.