Merge branch 'rolling' into FIX/463_results_from_different_search_engines_get_cached_as_the_same_key

This commit is contained in:
alamin655 2024-01-11 16:47:15 +05:30 committed by GitHub
commit 326131aac4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 432 additions and 15 deletions

View file

@ -2,8 +2,9 @@
//!
//! This module contains the main function which handles the logging of the application to the
//! stdout and handles the command line arguments provided and launches the `websurfx` server.
#[cfg(not(feature = "dhat-heap"))]
use mimalloc::MiMalloc;
use std::net::TcpListener;
use websurfx::{cache::cacher::create_cache, config::parser::Config, run};

279
src/cache/cacher.rs vendored
View file

@ -4,6 +4,7 @@
use error_stack::Report;
#[cfg(feature = "memory-cache")]
use mini_moka::sync::Cache as MokaCache;
#[cfg(feature = "memory-cache")]
use std::time::Duration;
use tokio::sync::Mutex;
@ -14,6 +15,9 @@ use super::error::CacheError;
#[cfg(feature = "redis-cache")]
use super::redis_cacher::RedisCache;
#[cfg(any(feature = "encrypt-cache-results", feature = "cec-cache-results"))]
use super::encryption::*;
/// Abstraction trait for common methods provided by a cache backend.
#[async_trait::async_trait]
pub trait Cacher: Send + Sync {
@ -69,6 +73,237 @@ pub trait Cacher: Send + Sync {
fn hash_url(&self, url: &str) -> String {
blake3::hash(url.as_bytes()).to_string()
}
/// A helper function that returns either encrypted or decrypted results.
/// Feature flags (**encrypt-cache-results or cec-cache-results**) are required for this to work.
///
/// # Arguments
///
/// * `bytes` - It takes a slice of bytes as an argument.
/// * `encrypt` - A boolean to choose whether to encrypt or decrypt the bytes
///
/// # Error
/// Returns either encrypted or decrypted bytes on success otherwise it returns a CacheError
/// on failure.
#[cfg(any(
// feature = "compress-cache-results",
feature = "encrypt-cache-results",
feature = "cec-cache-results"
))]
fn encrypt_or_decrypt_results(
&mut self,
mut bytes: Vec<u8>,
encrypt: bool,
) -> Result<Vec<u8>, Report<CacheError>> {
use chacha20poly1305::{
aead::{Aead, AeadCore, KeyInit, OsRng},
ChaCha20Poly1305,
};
let cipher = CIPHER.get_or_init(|| {
let key = ChaCha20Poly1305::generate_key(&mut OsRng);
ChaCha20Poly1305::new(&key)
});
let encryption_key = ENCRYPTION_KEY.get_or_init(
|| ChaCha20Poly1305::generate_nonce(&mut OsRng), // 96-bits; unique per message
);
bytes = if encrypt {
cipher
.encrypt(encryption_key, bytes.as_ref())
.map_err(|_| CacheError::EncryptionError)?
} else {
cipher
.decrypt(encryption_key, bytes.as_ref())
.map_err(|_| CacheError::EncryptionError)?
};
Ok(bytes)
}
/// A helper function that returns compressed results.
/// Feature flags (**compress-cache-results or cec-cache-results**) are required for this to work.
///
/// # Arguments
///
/// * `bytes` - It takes a slice of bytes as an argument.
///
/// # Error
/// Returns the compressed bytes on success otherwise it returns a CacheError
/// on failure.
#[cfg(any(feature = "compress-cache-results", feature = "cec-cache-results"))]
fn compress_results(&mut self, mut bytes: Vec<u8>) -> Result<Vec<u8>, Report<CacheError>> {
use std::io::Write;
let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 11, 22);
writer
.write_all(&bytes)
.map_err(|_| CacheError::CompressionError)?;
bytes = writer.into_inner();
Ok(bytes)
}
/// A helper function that returns compressed-encrypted results.
/// Feature flag (**cec-cache-results**) is required for this to work.
///
/// # Arguments
///
/// * `bytes` - It takes a slice of bytes as an argument.
///
/// # Error
/// Returns the compressed and encrypted bytes on success otherwise it returns a CacheError
/// on failure.
#[cfg(feature = "cec-cache-results")]
fn compress_encrypt_compress_results(
&mut self,
mut bytes: Vec<u8>,
) -> Result<Vec<u8>, Report<CacheError>> {
// compress first
bytes = self.compress_results(bytes)?;
// encrypt
bytes = self.encrypt_or_decrypt_results(bytes, true)?;
// compress again;
bytes = self.compress_results(bytes)?;
Ok(bytes)
}
/// A helper function that returns compressed results.
/// Feature flags (**compress-cache-results or cec-cache-results**) are required for this to work.
/// If bytes where
/// # Arguments
///
/// * `bytes` - It takes a slice of bytes as an argument.
///
/// # Error
/// Returns the uncompressed bytes on success otherwise it returns a CacheError
/// on failure.
#[cfg(any(feature = "compress-cache-results", feature = "cec-cache-results"))]
fn decompress_results(&mut self, bytes: &[u8]) -> Result<Vec<u8>, Report<CacheError>> {
cfg_if::cfg_if! {
if #[cfg(feature = "compress-cache-results")]
{
decompress_util(bytes)
}
else if #[cfg(feature = "cec-cache-results")]
{
let decompressed = decompress_util(bytes)?;
let decrypted = self.encrypt_or_decrypt_results(decompressed, false)?;
decompress_util(&decrypted)
}
}
}
/// A helper function that compresses or encrypts search results before they're inserted into a cache store
/// # Arguments
///
/// * `search_results` - A reference to the search_Results to process.
///
///
/// # Error
/// Returns a Vec of compressed or encrypted bytes on success otherwise it returns a CacheError
/// on failure.
fn pre_process_search_results(
&mut self,
search_results: &SearchResults,
) -> Result<Vec<u8>, Report<CacheError>> {
#[allow(unused_mut)] // needs to be mutable when any of the features is enabled
let mut bytes: Vec<u8> = search_results.try_into()?;
#[cfg(feature = "compress-cache-results")]
{
let compressed = self.compress_results(bytes)?;
bytes = compressed;
}
#[cfg(feature = "encrypt-cache-results")]
{
let encrypted = self.encrypt_or_decrypt_results(bytes, true)?;
bytes = encrypted;
}
#[cfg(feature = "cec-cache-results")]
{
let compressed_encrypted_compressed = self.compress_encrypt_compress_results(bytes)?;
bytes = compressed_encrypted_compressed;
}
Ok(bytes)
}
/// A helper function that decompresses or decrypts search results after they're fetched from the cache-store
/// # Arguments
///
/// * `bytes` - A Vec of bytes stores in the cache.
///
///
/// # Error
/// Returns the SearchResults struct on success otherwise it returns a CacheError
/// on failure.
#[allow(unused_mut)] // needs to be mutable when any of the features is enabled
fn post_process_search_results(
&mut self,
mut bytes: Vec<u8>,
) -> Result<SearchResults, Report<CacheError>> {
#[cfg(feature = "compress-cache-results")]
{
let decompressed = self.decompress_results(&bytes)?;
bytes = decompressed
}
#[cfg(feature = "encrypt-cache-results")]
{
let decrypted = self.encrypt_or_decrypt_results(bytes, false)?;
bytes = decrypted
}
#[cfg(feature = "cec-cache-results")]
{
let decompressed_decrypted = self.decompress_results(&bytes)?;
bytes = decompressed_decrypted;
}
Ok(bytes.try_into()?)
}
}
/// A helper function that returns compressed results.
/// Feature flags (**compress-cache-results or cec-cache-results**) are required for this to work.
/// If bytes where
/// # Arguments
///
/// * `bytes` - It takes a slice of bytes as an argument.
///
/// # Error
/// Returns the uncompressed bytes on success otherwise it returns a CacheError
/// on failure.
#[cfg(any(feature = "compress-cache-results", feature = "cec-cache-results"))]
fn decompress_util(input: &[u8]) -> Result<Vec<u8>, Report<CacheError>> {
use std::io::Write;
let mut writer = brotli::DecompressorWriter::new(Vec::new(), 4096);
writer
.write_all(input)
.map_err(|_| CacheError::CompressionError)?;
let bytes = writer
.into_inner()
.map_err(|_| CacheError::CompressionError)?;
Ok(bytes)
}
#[cfg(feature = "redis-cache")]
@ -85,10 +320,14 @@ impl Cacher for RedisCache {
}
async fn cached_results(&mut self, url: &str) -> Result<SearchResults, Report<CacheError>> {
use base64::Engine;
let hashed_url_string: &str = &self.hash_url(url);
let json = self.cached_json(hashed_url_string).await?;
Ok(serde_json::from_str::<SearchResults>(&json)
.map_err(|_| CacheError::SerializationError)?)
let base64_string = self.cached_json(hashed_url_string).await?;
let bytes = base64::engine::general_purpose::STANDARD_NO_PAD
.decode(base64_string)
.map_err(|_| CacheError::Base64DecodingOrEncodingError)?;
self.post_process_search_results(bytes)
}
async fn cache_results(
@ -96,10 +335,29 @@ impl Cacher for RedisCache {
search_results: &SearchResults,
url: &str,
) -> Result<(), Report<CacheError>> {
let json =
serde_json::to_string(search_results).map_err(|_| CacheError::SerializationError)?;
use base64::Engine;
let bytes = self.pre_process_search_results(search_results)?;
let base64_string = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes);
let hashed_url_string = self.hash_url(url);
self.cache_json(&json, &hashed_url_string).await
self.cache_json(&base64_string, &hashed_url_string).await
}
}
/// TryInto implementation for SearchResults from Vec<u8>
use std::convert::TryInto;
impl TryInto<SearchResults> for Vec<u8> {
type Error = CacheError;
fn try_into(self) -> Result<SearchResults, Self::Error> {
serde_json::from_slice(&self).map_err(|_| CacheError::SerializationError)
}
}
impl TryInto<Vec<u8>> for &SearchResults {
type Error = CacheError;
fn try_into(self) -> Result<Vec<u8>, Self::Error> {
serde_json::to_vec(self).map_err(|_| CacheError::SerializationError)
}
}
@ -107,7 +365,7 @@ impl Cacher for RedisCache {
#[cfg(feature = "memory-cache")]
pub struct InMemoryCache {
/// The backend cache which stores data.
cache: MokaCache<String, SearchResults>,
cache: MokaCache<String, Vec<u8>>,
}
#[cfg(feature = "memory-cache")]
@ -126,7 +384,7 @@ impl Cacher for InMemoryCache {
async fn cached_results(&mut self, url: &str) -> Result<SearchResults, Report<CacheError>> {
let hashed_url_string = self.hash_url(url);
match self.cache.get(&hashed_url_string) {
Some(res) => Ok(res),
Some(res) => self.post_process_search_results(res),
None => Err(Report::new(CacheError::MissingValue)),
}
}
@ -137,7 +395,8 @@ impl Cacher for InMemoryCache {
url: &str,
) -> Result<(), Report<CacheError>> {
let hashed_url_string = self.hash_url(url);
self.cache.insert(hashed_url_string, search_results.clone());
let bytes = self.pre_process_search_results(search_results)?;
self.cache.insert(hashed_url_string, bytes);
Ok(())
}
}
@ -282,3 +541,5 @@ pub async fn create_cache(config: &Config) -> impl Cacher {
#[cfg(not(any(feature = "memory-cache", feature = "redis-cache")))]
return DisabledCache::build(config).await;
}
//#[cfg(feature = "Compress-cache-results")]

25
src/cache/encryption.rs vendored Normal file
View file

@ -0,0 +1,25 @@
use chacha20poly1305::{
consts::{B0, B1},
ChaChaPoly1305,
};
use std::sync::OnceLock;
use chacha20::{
cipher::{
generic_array::GenericArray,
typenum::{UInt, UTerm},
StreamCipherCoreWrapper,
},
ChaChaCore,
};
/// The ChaCha20 core wrapped in a stream cipher for use in ChaCha20-Poly1305 authenticated encryption.
type StreamCipherCoreWrapperType =
StreamCipherCoreWrapper<ChaChaCore<UInt<UInt<UInt<UInt<UTerm, B1>, B0>, B1>, B0>>>;
/// Our ChaCha20-Poly1305 cipher instance, lazily initialized.
pub static CIPHER: OnceLock<ChaChaPoly1305<StreamCipherCoreWrapperType>> = OnceLock::new();
/// The type alias for our encryption key, a 32-byte array.
type GenericArrayType = GenericArray<u8, UInt<UInt<UInt<UInt<UTerm, B1>, B1>, B0>, B0>>;
/// Our encryption key, lazily initialized.
pub static ENCRYPTION_KEY: OnceLock<GenericArrayType> = OnceLock::new();

18
src/cache/error.rs vendored
View file

@ -18,6 +18,12 @@ pub enum CacheError {
SerializationError,
/// Returned when the value is missing.
MissingValue,
/// whenever encryption or decryption of cache results fails
EncryptionError,
/// Whenever compression of the cache results fails
CompressionError,
/// Whenever base64 decoding failed
Base64DecodingOrEncodingError,
}
impl fmt::Display for CacheError {
@ -43,6 +49,18 @@ impl fmt::Display for CacheError {
CacheError::SerializationError => {
write!(f, "Unable to serialize, deserialize from the cache")
}
CacheError::EncryptionError => {
write!(f, "Failed to encrypt or decrypt cache-results")
}
CacheError::CompressionError => {
write!(f, "failed to compress or uncompress cache results")
}
CacheError::Base64DecodingOrEncodingError => {
write!(f, "base64 encoding or decoding failed")
}
}
}
}

6
src/cache/mod.rs vendored
View file

@ -1,7 +1,11 @@
//! This module provides the modules which provide the functionality to cache the aggregated
//! results fetched and aggregated from the upstream search engines in a json format.
pub mod cacher;
#[cfg(any(feature = "encrypt-cache-results", feature = "cec-cache-results"))]
/// encryption module contains encryption utils such the cipher and key
pub mod encryption;
pub mod error;
#[cfg(feature = "redis-cache")]
pub mod redis_cacher;

View file

@ -44,7 +44,7 @@ impl RedisCache {
let mut tasks: Vec<_> = Vec::new();
for _ in 0..pool_size {
tasks.push(client.get_tokio_connection_manager());
tasks.push(client.get_connection_manager());
}
let redis_cache = RedisCache {